API Reference¶
- class async_graph_data_flow.AsyncGraph(halt_on_exception: bool = False)¶
- Attributes:
edgesThe set of edges, each with the names of the source and destination nodes.
nodesThe list of nodes, each with its function and configurations.
nodes_to_edgesThe mapping between source nodes and their destination nodes.
Methods
add_edge(src_node, dst_node)Add an edge.
add_node(func, *[, name, halt_on_exception, ...])Add a node by providing its function and optional configurations.
- __init__(halt_on_exception: bool = False) None¶
Initialize a graph.
- Parameters:
- halt_on_exceptionbool, optional
To halt graph execution when any node has an unhandled exception, set this argument to
True. Defaults toFalse.
- add_edge(src_node: str | Callable, dst_node: str | Callable) None¶
Add an edge.
- Parameters:
- src_nodestr | Callable
The source node, either the function name or the function itself.
- dst_nodestr | Callable
The destination node, either the function name or the function itself.
- add_node(func: Callable, *, name: str | None = None, halt_on_exception: bool = False, unpack_input: bool = True, max_tasks: int = 1, queue_size: int = 10000, check_async_gen: bool = True) None¶
Add a node by providing its function and optional configurations.
- Parameters:
- funcCallable
The function that this node runs. See notes below for the function’s requirements.
- namestr, optional
The name of this node. If not provided, the
__name__attributefuncis used.- halt_on_exceptionbool, optional
To halt graph execution when this node has an unhandled exception, set this argument to
True. Defaults toFalse.- unpack_inputbool, optional
By default (i.e.,
unpack_inputisTrue),funcwith arguments yielded from a source node is called as eitherfunc(*args)orfunc(**kwargs). To callfunc(arg)with no unpacking, setunpack_inputtoFalse. See notes below for more details.- max_tasksint, optional
The number of tasks that this node runs concurrently.
- queue_sizeint, optional
The maximum number of data items allowed to be in the
Queueobject between this node as a source and its destination node(s).- check_async_genbool, optional
If
True(the default), the callablefuncis verified to be an async generator function byinspect.isasyncgenfunction(). Pass inFalseto disable this check iffuncwould fail the check while the callable under the hood is still an async generator function (e.g., your function is wrapped by a decorator).
Notes
These details concern the requirements of the node’s function and the keyword argument
unpack_input.Each function in the graph must be an asynchronous generator function, i.e., it’s defined by
async defand it yields.Each function can have any signature, with no arguments or with any valid argument types (those listed here). This being said, because the functions are connected as a graph, in a given edge the destination function must be able to correctly handle whatever is yielded by the source function. Specifically:
If the destination function takes no args (i.e.,
async def dest_func(): # no input args), then it is called as dest_func() with no args, regardless of what the source function yields.If the destination function can take args, and if the source function yields a tuple, then the tuple
argswill be unpacked and dest_func(*args) will be called.If the source function yields a dict instead, then dest_func(**args) will be called.
In case the destination function requires more input values than are available from the unpacked args (from either a tuple or dict), a
TypeErroris raised.If the source function yields a tuple or dict, and for whatever reason you do not want to unpack it, set
unpack_inputtoFalseatadd_node()when constructing theAsyncGraphobject.If the destination function can take args, and if the source function yields an object that is neither a tuple nor a dict, then the destination function is simply called with the object
objas dest_func(obj). Note how you may take advantage of this behavior. For instance, when your destination function has keyword arguments (e.g.,async def dest_func(a, b=..., c=...): # b and c are kwargs) callingdest_func(a)is valid with a value forayielded from the source function, and bothbandchave their own default values.
- class async_graph_data_flow.AsyncExecutor(graph: AsyncGraph, *, logger: Logger | None = None, max_exceptions: int = 1000)¶
- Attributes:
data_flow_statsData flow statistics.
exceptionsExceptions from the graph execution.
graphThe graph to execute.
start_nodesStart nodes and their arguments.
Methods
execute([start_nodes])Start executing the functions along the graph.
Turn off data flow logging.
turn_on_data_flow_logging([node_format, ...])Turn on and configure data flow logging.
- __init__(graph: AsyncGraph, *, logger: Logger | None = None, max_exceptions: int = 1000)¶
Initialize an executor.
- Parameters:
- graphAsyncGraph
- loggerlogging.Logger, optional
Provide a logger for any customization. If not provided, a generic
logging.getLogger(__name__)is used.- max_exceptionsint, optional
The maximum number of unhandled exceptions to keep track of at each node. If the number of exceptions at a node exceeds this threshold, only the most recent exceptions are kept. See also
exceptions.
- property data_flow_stats: dict[str, dict[str, int]] | None¶
Data flow statistics.
These statistics keep track of (i) the number of times data has passed into each node, (ii) the number of times data has come out of each node, and (iii) the number of errors each node has had. The key is a node by name (str), and the value is a dict with three keys (str) of
"in","out", and"err", each corresponding to its count (int).
- property exceptions: dict[str, list[Exception]] | None¶
Exceptions from the graph execution.
The key is a node by name (str), and the value is the list of exceptions raised from the node.
- execute(start_nodes: dict[str, tuple] | None = None) None¶
Start executing the functions along the graph.
- Parameters:
- start_nodesdict[str, tuple], optional
Specify the start node(s), and optionally their arguments. Each key in this dictionary is the name (str) of the node function, and its corresponding value is the args (tuple) (in which case the node function will be called as
func(*args)– provideNoneif you wantfunc()with no args). Ifstart_nodesisNoneor isn’t provided, nodes that have no incoming edges are treated as start nodes.
- property graph: AsyncGraph¶
The graph to execute.
- property start_nodes: dict[str, tuple]¶
Start nodes and their arguments.
This is a dictionary that maps each start node (str) to its arguments to be passed in when the graph execution begins.
- turn_on_data_flow_logging(node_format: str | None = None, node_filter: Iterable[str] | None = None, time_interval: int | None = None, logger: Logger | None = None) None¶
Turn on and configure data flow logging.
For a long-running graph execution, it is helpful to log such data flow information at a regular time interval.
- Parameters:
- node_formatstr, optional
Logging format for each node’s statistics of (i) the number of times data has passed
inthe node, (ii) the number of times data has comeoutof the node, and (iii) the number of errors the node has had. If not provided, the default is" {node} - in={in}, out={out}, err={err}".- node_filter: Iterable[str], optional
Filter to see logs from only the specified nodes by name. If not provided, all nodes’ statistics will be logged.
- time_interval: int, optional
Time interval in seconds between data flow logs. If not provided, the default is 60 seconds.
- loggerlogging.Logger, optional
Provide a logger for any customization. If not provided, a generic
logging.getLogger(__name__)is used.
- class async_graph_data_flow.graph.InvalidAsyncGraphError¶
- Attributes:
- args
Methods
add_noteException.add_note(note) -- add a note to the exception
with_tracebackException.with_traceback(tb) -- set self.__traceback__ to tb and return self.