Injecting Python Code¶
Certain Zuar Runner inputters, transforms, and steps allow the injection of Python code into a running job:
inputter -
ExampleInput
transform -
PythonTransform
step -
PythonStep
TODO: add links to relevant docs
General¶
The value of python_code
can be any of the following:
A string containing the name of a file located in
/var/mitto/data
containing valid Python code.A string containing the fully-qualified path to a file containing valid Python code.
A list of one or more strings, with each string being a line of valid Python. The individual strings are joined into a single string that is passed to the Python
exec
function.
Depending upon where the python_code
is used, additional constraints
may be placed on the code.
Formatting the List of Strings¶
When python_code
is a list of strings, a non-standard formatting
convention is used due to inconsistent handling of indentation by
HJSON. This is best explained by example:
{
use: mitto.iov2.steps.builtin#PythonStep
python_code: [
# Executed in the context of an instance of the PythonStep class
# Because this uses the store as input, the job must be configured
# with a store.
def _dynamic_step(self):
. logging.info("start")
. from mitto.iov2.input import StoreInput
. from mitto.io.db.utils import (DEFAULT_ENCODE_ERRORS, to_copyfrom_line)
. from mitto.io.db.redshift import StreamIter
. streamer = StreamIter(
. to_copyfrom_line(record, DEFAULT_ENCODE_ERRORS).encode("utf-8")
. for record in self.environ[STORE].list()
. )
. data = streamer.read()
. logging.info("stop")
# Function must be assigned to `step`
self.step = _dynamic_step
]
}
Things to note:
The first non-space character on the line is considered to be “column 1”.
If the first non-space character is a
.
, it is converted to a space.Python comments can be used
The variables available for use depend upon the context of execution
Execution Context and Other Requirements¶
PythonStep
¶
When using the PythonStep
step, python_code
must define a function
that will be valid as a method of the PythonStep
class. The
function must:
Accept a single argument:
self
Expect to be called once during the execution of the job
Not return a value
Be assigned to the
step
attribute of the class instance
PythonTransform
¶
When using the PythonTransform
transform, python_code
must define
a function that will be valid as a method of the PythonTransform
class. The function must:
Accept two arguments:
self
andrecord
Expect to be called once for each row of data
Return
record
or a modified version ofrecord
Be assigned to the
transform_
attributed of the class instance
Tips and Tricks¶
If you are running the job manually using the CLI via
job_io.py config.json
, you can invoke the python debugger via, e.g.:{ use: mitto.iov2.steps.builtin#PythonStep python_code: [ import pdb; pdb.set_trace() ] }
Note: this is not possible when the job is being run from the UI, the scheduler, a sequence, or via
mitto run
.You can easily add logging statements.
To log every row at a certain point in a set of transforms:
{ use: mitto.iov2.transform.builtin#PythonTransform python_code: [ def transform_(self, record): . logging.info("record=%s", record) . return record self.transform_ = transform_ ] }
To log the job execution environment at a certain point in the steps:
{ use: mitto.iov2.steps.builtin#PythonStep python_code: [ logging.info("environ=%s", self.environ) ] }