Injecting Python Code

Certain Zuar Runner inputters, transforms, and steps allow the injection of Python code into a running job:

  • inputter - ExampleInput

  • transform - PythonTransform

  • step - PythonStep

TODO: add links to relevant docs

General

The value of python_code can be any of the following:

  1. A string containing the name of a file located in /var/mitto/data containing valid Python code.

  2. A string containing the fully-qualified path to a file containing valid Python code.

  3. A list of one or more strings, with each string being a line of valid Python. The individual strings are joined into a single string that is passed to the Python exec function.

Depending upon where the python_code is used, additional constraints may be placed on the code.

Formatting the List of Strings

When python_code is a list of strings, a non-standard formatting convention is used due to inconsistent handling of indentation by HJSON. This is best explained by example:

{
    use: mitto.iov2.steps.builtin#PythonStep
    python_code: [
        # Executed in the context of an instance of the PythonStep class
        # Because this uses the store as input, the job must be configured
        # with a store.
        def _dynamic_step(self):
        .    logging.info("start")
        .    from mitto.iov2.input import StoreInput
        .    from mitto.io.db.utils import (DEFAULT_ENCODE_ERRORS, to_copyfrom_line)
        .    from mitto.io.db.redshift import StreamIter
        .    streamer = StreamIter(
        .        to_copyfrom_line(record, DEFAULT_ENCODE_ERRORS).encode("utf-8")
        .        for record in self.environ[STORE].list()
        .    )
        .    data = streamer.read()
        .    logging.info("stop")
        # Function must be assigned to `step`
        self.step = _dynamic_step
    ]
}

Things to note:

  • The first non-space character on the line is considered to be “column 1”.

  • If the first non-space character is a ., it is converted to a space.

  • Python comments can be used

  • The variables available for use depend upon the context of execution

Execution Context and Other Requirements

PythonStep

When using the PythonStep step, python_code must define a function that will be valid as a method of the PythonStep class. The function must:

  • Accept a single argument: self

  • Expect to be called once during the execution of the job

  • Not return a value

  • Be assigned to the step attribute of the class instance

PythonTransform

When using the PythonTransform transform, python_code must define a function that will be valid as a method of the PythonTransform class. The function must:

  • Accept two arguments: self and record

  • Expect to be called once for each row of data

  • Return record or a modified version of record

  • Be assigned to the transform_ attributed of the class instance

Tips and Tricks

  1. If you are running the job manually using the CLI via job_io.py config.json, you can invoke the python debugger via, e.g.:

    {
        use: mitto.iov2.steps.builtin#PythonStep
        python_code: [
            import pdb; pdb.set_trace()
        ]
    }
    

    Note: this is not possible when the job is being run from the UI, the scheduler, a sequence, or via mitto run.

  2. You can easily add logging statements.

    To log every row at a certain point in a set of transforms:

    {
        use: mitto.iov2.transform.builtin#PythonTransform
        python_code: [
            def transform_(self, record):
            .   logging.info("record=%s", record)
            .   return record
            self.transform_ = transform_
       ]
    }
    

    To log the job execution environment at a certain point in the steps:

    {
        use: mitto.iov2.steps.builtin#PythonStep
        python_code: [
            logging.info("environ=%s", self.environ)
        ]
    }