Package src
Py-Daga: A Python library for Directed Acyclic Graph (DAG) based workflow orchestration.
This package provides a framework for creating and executing workflows defined as directed acyclic graphs, with support for rollback mechanisms and error handling.
Components
- DagaAction: Base class for defining workflow actions
- DagaFlow: Orchestrator for executing DAG-based workflows
Basic Usage Without Rollbacks
import networkx as nx
from py_daga import DagaFlow
async def root(predecessors_results: list[int]):
return sum(predecessors_results) + 1
async def layer1_f1(predecessors_results: list[int]):
return sum(predecessors_results) + 2
async def layer1_f2(predecessors_results: list[int]):
return sum(predecessors_results) + 3
async def layer2_f1(predecessors_results: list[int]):
return sum(predecessors_results) + 4
async def layer2_f2(predecessors_results: list[int]):
return sum(predecessors_results) + 5
dag = nx.DiGraph()
dag.add_edges_from([
(root, layer1_f1),
(root, layer1_f2),
(layer1_f1, layer2_f1),
(layer1_f1, layer2_f2),
(layer1_f2, layer2_f2),
])
flow = DagaFlow(dag)
results = flow.run(input=0) # = [7, 12]
Basic Usage With Rollbacks
To use rollback functionality, we need to wrap our functions as DagaActions:
import networkx as nx
from py_daga import DagaAction, DagaFlow
@DagaAction[set[str], set[str]]
async def root(predecessors_results: list[set[str]]):
# Your action implementation here
pass
@root.register_function_as_rollback
@DagaAction[set[str], set[str]]
async def root_rollback(predecessors_results: list[set[str]]):
# Your rollback implementation here
pass
@DagaAction[set[str], set[str]]
async def layer1_f1(predecessors_results: list[set[str]]):
# Your action implementation here
pass
@layer1_f1.register_function_as_rollback
@DagaAction[set[str], set[str]]
async def layer1_f1_rollback(predecessors_results: list[set[str]]):
# Your rollback implementation here
pass
flow = DagaFlow(
nx.DiGraph().add_edges_from([(root, layer1_f1)])
)
results = flow.run()
Sub-modules
src.action-
DagaAction module providing the core action classes for workflow orchestration …
src.flow-
DagaFlow module for orchestrating DAG-based workflow execution …
src.meta-
DagaMeta metaclass for managing DagaAction registration and validation …
src.utils-
Utility functions and classes for DAG flow orchestration …
Classes
class DagaAction (wrapped: Coroutine[I, Any, O] | None = None)-
Expand source code
class DagaAction[I, O](metaclass=DagaMeta): """ Base class for defining workflow actions in a DAG-based workflow system. DagaAction represents a single unit of work in a workflow. Each action can receive results from predecessor actions and produce output that can be consumed by successor actions. Actions can also have rollback functionality for error recovery. Generic Parameters: I: Input type for the action (when used as a decorator) O: Output type of the action Attributes: _wrapped_action_instance: The rollback action instance if registered log: Logger instance for this action is_decorator_action: Whether this action was created via decorator wrapped_func: The actual function to be executed """ _wrapped_action_instance: "DagaAction[I, O]" = None log: Logger = getLogger(__name__) @final def __init__(self, wrapped: Coroutine[I, Any, O] | None = None) -> None: """ Initialize a DagaAction instance. Args: wrapped: Optional coroutine function to wrap. If provided, this action is created as a decorator action with the wrapped function. """ self.is_decorator_action = False self.wrapped_func = self.__call__ # wrapped is available means that the action is created by decorating a callable if wrapped: self.wrapped_func = wrapped self.__name__ = wrapped.__name__ self.is_decorator_action = True def __call__(self, predecessors_results: list[Any] = None) -> Any: """ Execute the action with results from predecessor actions. Args: predecessors_results: List of results from predecessor actions in the DAG. Can be None if this is a root action. Returns: The result of executing this action. """ return self.wrapped_func(predecessors_results) @final async def rollback(self, predecessors_results: list[Any]) -> O: """ Execute the rollback action for this action. Rollback actions are used to undo the effects of a failed action or to clean up resources when a workflow fails. Args: predecessors_results: List of results from predecessor actions. Returns: The result of the rollback operation, or True if no rollback is defined. """ if self._wrapped_action_instance: self.log.info(f"Rolling back {self} with {self._wrapped_action_instance}") return await self._wrapped_action_instance(predecessors_results) or True self.log.warning(f"No rollback action for {self}") @final @classmethod def register_class_as_rollback(cls, wrapped_rollback_class: type["DagaAction[I, O]"]) -> "DagaAction[I, O]": """ Register a class as the rollback action for this action. Args: wrapped_rollback_class: The class to use as the rollback action. Must be a subclass of DagaAction. Returns: The registered rollback action instance. """ cls._wrapped_action_instance = wrapped_rollback_class() return cls._wrapped_action_instance @final def register_function_as_rollback(self, wrapped_rollback_func: Callable[[Any], Coroutine[O, Any, Any]]) -> "DagaAction[I, O]": """ Register a function as the rollback action for this action. Args: wrapped_rollback_func: The function to use as the rollback action. Must be a coroutine function. Returns: The registered rollback action instance. """ self._wrapped_action_instance = wrapped_rollback_func return self._wrapped_action_instance @final def __repr__(self): """Return a string representation of this action.""" return f"DagaAction({self.__class__.__name__})" if not self.is_decorator_action else f"DagaAction({self.__name__})" @final def __str__(self): """Return a string representation of this action.""" return self.__repr__() @final def __hash__(self) -> int: """Return a hash value for this action.""" return hash(self.__repr__())Base class for defining workflow actions in a DAG-based workflow system.
DagaAction represents a single unit of work in a workflow. Each action can receive results from predecessor actions and produce output that can be consumed by successor actions. Actions can also have rollback functionality for error recovery.
Generic Parameters: I: Input type for the action (when used as a decorator) O: Output type of the action
Attributes
_wrapped_action_instance- The rollback action instance if registered
log- Logger instance for this action
is_decorator_action- Whether this action was created via decorator
wrapped_func- The actual function to be executed
Initialize a DagaAction instance.
Args
wrapped- Optional coroutine function to wrap. If provided, this action is created as a decorator action with the wrapped function.
Ancestors
- typing.Generic
Subclasses
Class variables
var log : logging.Logger-
The type of the None singleton.
Static methods
def register_class_as_rollback(wrapped_rollback_class: type['DagaAction[I, O]'])-
Register a class as the rollback action for this action.
Args
wrapped_rollback_class- The class to use as the rollback action. Must be a subclass of DagaAction.
Returns
The registered rollback action instance.
Methods
def register_function_as_rollback(self, wrapped_rollback_func: Callable[[Any], Coroutine[O, Any, Any]]) ‑> DagaAction[I, O]-
Expand source code
@final def register_function_as_rollback(self, wrapped_rollback_func: Callable[[Any], Coroutine[O, Any, Any]]) -> "DagaAction[I, O]": """ Register a function as the rollback action for this action. Args: wrapped_rollback_func: The function to use as the rollback action. Must be a coroutine function. Returns: The registered rollback action instance. """ self._wrapped_action_instance = wrapped_rollback_func return self._wrapped_action_instanceRegister a function as the rollback action for this action.
Args
wrapped_rollback_func- The function to use as the rollback action. Must be a coroutine function.
Returns
The registered rollback action instance.
async def rollback(self, predecessors_results: list[typing.Any]) ‑> O-
Expand source code
@final async def rollback(self, predecessors_results: list[Any]) -> O: """ Execute the rollback action for this action. Rollback actions are used to undo the effects of a failed action or to clean up resources when a workflow fails. Args: predecessors_results: List of results from predecessor actions. Returns: The result of the rollback operation, or True if no rollback is defined. """ if self._wrapped_action_instance: self.log.info(f"Rolling back {self} with {self._wrapped_action_instance}") return await self._wrapped_action_instance(predecessors_results) or True self.log.warning(f"No rollback action for {self}")Execute the rollback action for this action.
Rollback actions are used to undo the effects of a failed action or to clean up resources when a workflow fails.
Args
predecessors_results- List of results from predecessor actions.
Returns
The result of the rollback operation, or True if no rollback is defined.
class DagaFlow (dag: networkx.classes.digraph.DiGraph)-
Expand source code
class DagaFlow[I, O]: """ Orchestrator for executing DAG-based workflows with rollback support. DagaFlow takes a directed acyclic graph (DAG) of actions and executes them in the correct order, handling failures and performing rollbacks when necessary. Actions within the same batch are executed in parallel for efficiency. Generic Parameters: I: Input type for the workflow O: Output type of the workflow Attributes: dag: The NetworkX DiGraph representing the workflow action_matrix: List of batches containing ActionDescriptor objects """ def __init__(self, dag: nx.DiGraph): """ Initialize a DagaFlow instance with a DAG. Args: dag: A NetworkX DiGraph where nodes are DagaAction instances and edges represent dependencies between actions Raises: AssertionError: If the provided graph is not a directed acyclic graph """ self.dag = dag assert nx.is_directed_acyclic_graph(self.dag), "DAG must be a directed acyclic graph" DagaFlowUtils.initialize_dag_as_flow(self.dag) self.action_matrix = DagaFlowUtils.get_flow_batches(self.dag) def __repr__(self): """Return a string representation of this DagaFlow instance.""" return f"DagaFlow({self.dag})" def __str__(self): """Return a string representation of this DagaFlow instance.""" return self.__repr__() async def run(self, input: I) -> list[O]: """ Execute the workflow with the given input. This method executes all actions in the workflow in the correct order, with parallel execution within batches. If any action fails, the workflow performs rollback operations for all affected actions. Args: input: The initial input value for the workflow Returns: List of results from the final batch of actions Raises: ExceptionGroup: If any action fails, containing all the errors that occurred during execution """ self.action_matrix[0][0].result = input for batch_index, batch in enumerate(self.action_matrix[1:], start=1): tasks = [DagaFlowUtils.wrap_action(action, self.action_matrix) for action in batch] await asyncio.gather(*tasks, return_exceptions=True) failed_actions = list(filter(lambda x: x.error, batch)) if failed_actions: for failed_action in failed_actions: await DagaFlowUtils.rollback_action(failed_action, self.action_matrix) for batch in self.action_matrix[batch_index:]: rollback_tasks = [DagaFlowUtils.rollback_action(action, self.action_matrix) for action in batch] await asyncio.gather(*rollback_tasks) raise ExceptionGroup("DagaFlowError", [failed_action.error for failed_action in failed_actions]) return [action.result for action in self.action_matrix[-1]]Orchestrator for executing DAG-based workflows with rollback support.
DagaFlow takes a directed acyclic graph (DAG) of actions and executes them in the correct order, handling failures and performing rollbacks when necessary. Actions within the same batch are executed in parallel for efficiency.
Generic Parameters: I: Input type for the workflow O: Output type of the workflow
Attributes
dag- The NetworkX DiGraph representing the workflow
action_matrix- List of batches containing ActionDescriptor objects
Initialize a DagaFlow instance with a DAG.
Args
dag- A NetworkX DiGraph where nodes are DagaAction instances and edges represent dependencies between actions
Raises
AssertionError- If the provided graph is not a directed acyclic graph
Ancestors
- typing.Generic
Methods
async def run(self, input: I) ‑> list[O]-
Expand source code
async def run(self, input: I) -> list[O]: """ Execute the workflow with the given input. This method executes all actions in the workflow in the correct order, with parallel execution within batches. If any action fails, the workflow performs rollback operations for all affected actions. Args: input: The initial input value for the workflow Returns: List of results from the final batch of actions Raises: ExceptionGroup: If any action fails, containing all the errors that occurred during execution """ self.action_matrix[0][0].result = input for batch_index, batch in enumerate(self.action_matrix[1:], start=1): tasks = [DagaFlowUtils.wrap_action(action, self.action_matrix) for action in batch] await asyncio.gather(*tasks, return_exceptions=True) failed_actions = list(filter(lambda x: x.error, batch)) if failed_actions: for failed_action in failed_actions: await DagaFlowUtils.rollback_action(failed_action, self.action_matrix) for batch in self.action_matrix[batch_index:]: rollback_tasks = [DagaFlowUtils.rollback_action(action, self.action_matrix) for action in batch] await asyncio.gather(*rollback_tasks) raise ExceptionGroup("DagaFlowError", [failed_action.error for failed_action in failed_actions]) return [action.result for action in self.action_matrix[-1]]Execute the workflow with the given input.
This method executes all actions in the workflow in the correct order, with parallel execution within batches. If any action fails, the workflow performs rollback operations for all affected actions.
Args
input- The initial input value for the workflow
Returns
List of results from the final batch of actions
Raises
ExceptionGroup- If any action fails, containing all the errors that occurred during execution