Middleware

The middleware wraps a node to intercept the input/output of a node. This mechanism allows you to automatically track input/output info or even directly modify input/output.

Middleware is declared in project-wide project settings. This way, all pipelines in a project can share the same set of middlewares.

In project-wide setting, the declared middlewares is a list of dot-separated function.

settings.py
MIDDLEWARE = {
    "default": [
        "theflow.middleware.TrackProgressMiddleware",
        "theflow.middleware.SkipComponentMiddleware",
        "theflow.middleware.CachingMiddleware",
    ]
}

In Function’s config, they can be set on/off. By default, they are set on.

flows.py
class Pipeline(Function):
    ...
    class Config:
        middleware_section = "default"
        middleware_switches: Dict[str, bool] = {
            "theflow.middleware.TrackProgressMiddleware": True,
            "theflow.middleware.SkipComponentMiddleware": True,
            "theflow.middleware.CachingMiddleware": True,
        }

When a Function is instantiated, the middleware will be constructed as follow:

  1. Get the middleware section from config, and retrieve the middleware list from project-wide settings.
  2. Get the middleware switches from the config. Default to {}.
  3. For each of the middleware, check if it’s on or off. If it’s on, add it to the middleware list. If the middleware doesn’t exist in the switches, it will be considered on by default.

If a middleware is declared in the project-wide setting but not in the middleware switches, it will be considered on by default.

If a middleware exists in the middleware switches but not in the project-wide setting, it will be ignored.

Middleware initialization and execution

A middleware object is initiated with a Function object and the callback to next function call.

Each middleware execution __call__ looks something like this:

class Middleware:
    def __init__(self, obj: Function, next_call: Callable):
        self._obj = obj
        self._next_call = next_call

    def __call__(self, *args, **kwargs):
        # do something before the function call

        result = self._next_call(*args, **kwargs)

        # do something after the function call
        return result

Order of execution

Order of execution matters. Due to wrapped nature of middleware, there are forward and backward passes.

During forward pass, the code will execute from the 1st middleware to the last middleware, and then the main run function. Then the code will execute from the last middleware to the 1st middleware.

Managging multiple sets of middlewares

You can declare multiple sets of middlewares in the project setting. For example:

settings.py
MIDDLEWARE = {
    "default": [
        "theflow.middleware.TrackProgressMiddleware",
        "theflow.middleware.SkipComponentMiddleware",
        "theflow.middleware.CachingMiddleware",
    ],
    "nodebug": [
        "theflow.middleware.CachingMiddleware",
    ]
}

Then in the Function’s config, you set the middleware_section to the set of middlewares that you want to use.

flows.py
class Pipeline(Function):
    ...
    class Config:
        middleware_section = "nodebug"

Inherit and override middleware config for child Function

A Function subclasses from another Function can override its parent’s middleware config.

For middleware_section, the latest declared value will be used.

For middleware_switches, the parent’s switches will be updated with newer values from the child switches.

flows.py
class PipelineA(Function):
    ...
    class Config:
        middleware_section = "default"
        middleware_switches: Dict[str, bool] = {
            "theflow.middleware.SkipComponentMiddleware": False,
            "theflow.middleware.CachingMiddleware": True,
        }


class PipelineB(PipelineA):
    ...
    class Config:
        middleware_switches: Dict[str, bool] = {
            "theflow.middleware.CachingMiddleware": False,
        }

Then the PipelineB will have the set of middleware switches:

{
    "theflow.middleware.SkipComponentMiddleware": False,
    "theflow.middleware.CachingMiddleware": False,
}

Default middleware config

The default middleware config is:

settings.py
MIDDLEWARE = {
    "default": [
        "theflow.middleware.TrackProgressMiddleware",
        "theflow.middleware.CachingMiddleware",
        "theflow.middleware.SkipComponentMiddleware",
    ]
}
configs.py
class Config:
    middleware_section = "default"
    middleware_switches = {
        "theflow.middleware.TrackProgressMiddleware": True,
        "theflow.middleware.SkipComponentMiddleware": True,
        "theflow.middleware.CachingMiddleware": True,
    }