Function
Node and Param related functionalities
Declare node and param
Node and params are Function attributes.
They can be declared implicitly through type annotation, or can be declared explicitly by using Node
and Param
.
You can implicitly declare node by annotating the attributes with Function
type or any of its subclass. Attributes with other types will be treated as params.
You can explicitly declare node and param by assigning it to Node()
and Param()
descriptors respectively.
Node and param names cannot start with underscore _
.
Declare node and param with default value
The Node()
and Param()
descriptors accept a default
and default_callback
argument. If a node or param is accessed without being set, the value in default
will be used, or the default_callback
will be called to get the default value. Only one of default
and default_callback
can be set.
Note that default
and default_callback
are lazily evaluated only when the attribute is accessed, not when the Function object is initiated.
With default
When assinging a Node default
, one can supply:
- A
Function
object or any of its subclass. This object will be called()
to initiate real instance once the node is accessed. - If ones want the function object (let’s say named
ComponentA
) to be initiated with specific set of arguments, one can declare withwithx
:Node(default=ComponentA.withx(arg1=val1, arg2=val2))
and the component will be initiated withComponentA(arg1=val1, arg2=val2)
.
When assigning a Param default
, one can supply any value. If one wishes the default object to be init with specific arguments, one can use theflow.utils.modules.objectInitDeclaration
.
from theflow import Function, Node, Param
from theflow.utils.modules import ObjectInitDeclaration
class Pipeline(x: Function):
# default value of x1 is Pipeline2()
x1: Function = Pipeline2
# have the same effect as x1
x2 = Node(default=Pipeline2)
# default value of y1 is Pipeline2(param1=1, param2=2)
y1: Function = Pipeline2.withx(param1=1, param2=2)
# have the same effect as y1
y2 = Node(default=Pipeline2.withx(param1=1, param2=2))
# default value of a1 is 1
a1: int = 1
# have the same effect as a1
a2 = Param(default=1)
# default value of b1 is Obj1(), without `ObjectInitDeclaration`,
# the default value of b1 will be just Obj1
b1: Obj1 = ObjectInitDeclaration(Obj1)
# have the same effect as b1
b2 = Param(default=ObjectInitDeclaration(Obj1))
# default value of c1 is Obj1(param1=1, param2=2)
c1: Obj1 = ObjectInitDeclaration(Obj1).withx(param1=1, param2=2)
# have the same effect as c1
c2 = Param(default=ObjectInitDeclaration(Obj1).withx(param1=1, param2=2))
With default_callback
The default_callback
takes in a function with signature func(obj) -> val
where obj
will be supplied with parent Function object, and val
will be the default node or param value.
To make declaration clean, theflow
provides a @Param.default
and @Node.param
decorators that will turn a method into a default function for the node/param of the same name.
from theflow import Function, Node, Param
class Pipeline(Function):
# default value of node x1 is Pipeline2()
x1 = Node(default_callback=lambda obj: Pipeline2())
@Node.default()
def x2(self):
"""This is the same as x2 = Node(default_callback=lambda obj: obj.x1)"""
return self.x1
# return list of 10 as default
a1 = Param(default_callback=lambda obj: list(range(10)))
@Param.default()
def a2(self):
"""This is the same as
a2 = Param(default_callback=lambda obj: obj.a1 + list(range(11, 20)))
"""
return self.a1 + list(range(11, 20))
Declare node and param that depends on other nodes and params
These nodes and params cannot be set explicitly. Instead, they depend on other values in a Function object. One supply an auto_callback
function in the Node()
and Param()
descriptors to make a node/param into an auto version. This function will be called when the node or param is accessed to calculate its value.
auto_callback
is different from default_callback
in that default_callback
only applies when the node or param is not set, while auto_callback
can work after that. Also, param/node with auto_callback
cannot be manually set, while it’s possible to set param/node with default_callback
.
theflow
also provides a Node.auto()
and Param.auto()
decorators to turn a method into an auto callback function.
When using @Node/Param.auto()
and auto_callback
, one can set cache
and depends_on
parameters to control the caching behavior.
With cache=True
, the auto callback function will be executed if any of the nodes and params in the Function changes. This value is set to True by default with @Node/Param.auto()
, and to False by default with Node/Param()
.
With depends_on=["name1", "name2"]
, the auto callback function will be executed if any of the mentioned nodes and params changes. This option requires cache=True
. Essentially, it limits the scope of searching for change when cache is enabled.
from theflow import Function, Node, Param
class Pipeline(Function):
x1 = Node(default_callback=lambda obj: Pipeline2())
@Node.auto(depends_on=["x1"])
def x2(self):
"""This is the same as
x2 = Node(
auto_callback=lambda obj: Pipeline3(x=obj.x1),
cache=True,
depends_on=["x1"]
)
"""
return Pipeline3(x=self.x1)
a1: int
a2: int
@Param.auto(depends_on=["a1", "a2"])
def a3(self):
"""This is the same as
a3 = Param(
auto_callback=lambda obj: obj.a1 + obj.a2,
cache=True,
depends_on=["a1", "a2"]
)
"""
return self.a1 + self.a2
Access node and param
Nodes and params can be accessed with normal Python dot notation from the parent. This also applies to nested node and params of child nodes as well (e.g. obj.node1.node2.param1
).
One can also access with .get_from_path(path: str)
, where path is a .
-delimited path. This method is convenient in case one has a string representation of a param or node path.
During function execution (run
is executed), if a node is accessed with .get_from_path(path)
, the relationship between the parent function and the node will not be established.
Example:
from theflow import Function
class Inner(Function):
x: int = 1
def run(self, y):
return self.x + y
class Outer(Function):
inner: Inner = Inner.withx(x=1)
def run(self, y):
if isinstance(self.inner, Inner):
return self.inner(y)
return 0
flow = Outer()
flow(1)
print(flow.last_run.logs().keys())
You will see that .inner[1]
shows up rather than .inner
. This is because the .inner
name is registered when it is called in the isinstance(...)
call. As a result, self.inner(y)
will be registered with .inner[1]
. To avoid this, use isinstance(self.get_from_path("inner"), Inner)
instead.
Manually establish the connection between parent and child node
Sometimes during run
execution, the parent function and child node cannot be automatically established. This can happens if the node is not explicitly declared or annotated as a node, but as a value inside a dict or list.
Example:
from theflow import Function
class Inner(Function):
x: int = 1
def run(self, y):
return self.x + y
class Outer(Function):
funcs: list[Function]
def run(self, y):
return self.funcs[0](y)
flow = Outer(funcs=[Inner(x=1)])
flow.run(1)
print(flow.last_run.logs().keys())
You won’t see the logging of inner funcions, because those functions aren’t officially registered as nodes in the parent function, but as value of params funcs
. To explicitly establish the relationship between parent function and child node during run
execution, you can call self._prepare_child(node, name)
on the child. This will link the node with the parent function under the name name
.
In the above example, one can do:
Get missing nodes and params
To get the list of nodes and params that are required but missing, one can use .missing()
method. It will return nodes and params that do not have default value or default callback set, and also are not set by users. Once called, this method will return a dictionary:
{
"params": [], # name of missing params (list[str])
"nodes": [], # name of missing nodes (list[str])
}
Get specification about a param or a node
One can get the Node or Param definition of a Function object with .spec(path: str)
. It can also access definition within nested nodes. Just use .
-delimited path.
Example, obj.spec(".node1.node2.param1")
will return the Param spec of nested node1’s node2’s param1.
Customize Node
and Param
descriptor behavior
Depending on how much you want to customize, there are 3 ways:
- Only extra data, no behavior changes: just supply extra data into
Node
andParam
, and they can be accessed with._extras
as a dictionary. TheNode.to_dict()
andParam.to_dict()
will also include the extra data. - Pre-defined behavior changes: the default
Node
andParam
can access certain pre-defined callbacks from the parent Function - Arbitrary behavior changes: subclass
Node
andParam
, and override methods as needed. TheNode.to_dict()
andParam.to_dict()
will also show the extra data.
Callback way
_prepare_child(self, child: "Function", name: str)
is called when a child node is being accessed. Thechild
corresponds to the node object, andname
corresponds to the node name in the parent function.
Subclass way
After subclassing Node and Param, you can directly use them in your Function definition. Also, you can add them in project-wide settings: NODE_CLASS
and PARAM_CLASS
respectively, and they will take effect as default node and param definition.
Creating Function
Syntax shortcut to create sequential and concurrent functions
theflow
supports >>
operator to create a sequential function, and supports a //
operator to create concurrent function.
When running func1 >> func2
, it will create a sequential function that runs func1
first, then func2
.
When running func1 // func2
, it will create a concurrent function that runs func1
and func2
at the same time and returns a tuple of results.
Function internals
Declare and list protected keywords
Some names are not allowed as node or param names, because they might conflict with important attributes and methods of Function, like config
, set
…
When you create your own Function, if you want to avoid any naming conflict with the subclass, you can reserve the names in _keywords
. This will raise error when future subclasses use one or more of those keywords as node and param names.
You can list a Function’s keywords with cls._protected_keywords()
. This will return a dictionary of {keyword-name: class-that-defines-that-keyword}. It will traverse mro and look for _keywords
, so don’t worry about the child class _keywords
replacing parent class _keywords
.