Package src

Py-Daga: A Python library for Directed Acyclic Graph (DAG) based workflow orchestration.

This package provides a framework for creating and executing workflows defined as directed acyclic graphs, with support for rollback mechanisms and error handling.

Components

  • DagaAction: Base class for defining workflow actions
  • DagaFlow: Orchestrator for executing DAG-based workflows

Basic Usage Without Rollbacks

import networkx as nx
from py_daga import DagaFlow

async def root(predecessors_results: list[int]):
    return sum(predecessors_results) + 1
async def layer1_f1(predecessors_results: list[int]): 
    return sum(predecessors_results) + 2
async def layer1_f2(predecessors_results: list[int]): 
    return sum(predecessors_results) + 3
async def layer2_f1(predecessors_results: list[int]): 
    return sum(predecessors_results) + 4
async def layer2_f2(predecessors_results: list[int]): 
    return sum(predecessors_results) + 5

dag = nx.DiGraph()
dag.add_edges_from([
    (root, layer1_f1),
    (root, layer1_f2),
    (layer1_f1, layer2_f1),
    (layer1_f1, layer2_f2),
    (layer1_f2, layer2_f2),
])
flow = DagaFlow(dag)
results = flow.run(input=0)  # = [7, 12]

Basic Usage With Rollbacks

To use rollback functionality, we need to wrap our functions as DagaActions:

import networkx as nx
from py_daga import DagaAction, DagaFlow

@DagaAction[set[str], set[str]]
async def root(predecessors_results: list[set[str]]):
    # Your action implementation here
    pass

@root.register_function_as_rollback
@DagaAction[set[str], set[str]]
async def root_rollback(predecessors_results: list[set[str]]):
    # Your rollback implementation here
    pass

@DagaAction[set[str], set[str]]
async def layer1_f1(predecessors_results: list[set[str]]): 
    # Your action implementation here
    pass

@layer1_f1.register_function_as_rollback
@DagaAction[set[str], set[str]]
async def layer1_f1_rollback(predecessors_results: list[set[str]]): 
    # Your rollback implementation here
    pass

flow = DagaFlow(
    nx.DiGraph().add_edges_from([(root, layer1_f1)])
)
results = flow.run()

Sub-modules

src.action

DagaAction module providing the core action classes for workflow orchestration …

src.flow

DagaFlow module for orchestrating DAG-based workflow execution …

src.meta

DagaMeta metaclass for managing DagaAction registration and validation …

src.utils

Utility functions and classes for DAG flow orchestration …

Classes

class DagaAction (wrapped: Coroutine[I, Any, O] | None = None)
Expand source code
class DagaAction[I, O](metaclass=DagaMeta):
    """
    Base class for defining workflow actions in a DAG-based workflow system.
    
    DagaAction represents a single unit of work in a workflow. Each action can
    receive results from predecessor actions and produce output that can be
    consumed by successor actions. Actions can also have rollback functionality
    for error recovery.
    
    Generic Parameters:
        I: Input type for the action (when used as a decorator)
        O: Output type of the action
    
    Attributes:
        _wrapped_action_instance: The rollback action instance if registered
        log: Logger instance for this action
        is_decorator_action: Whether this action was created via decorator
        wrapped_func: The actual function to be executed
    """
    _wrapped_action_instance: "DagaAction[I, O]" = None
    log: Logger = getLogger(__name__)
    
    @final
    def __init__(self, wrapped: Coroutine[I, Any, O] | None = None) -> None:
        """
        Initialize a DagaAction instance.
        
        Args:
            wrapped: Optional coroutine function to wrap. If provided, this action
                    is created as a decorator action with the wrapped function.
        """
        self.is_decorator_action = False
        self.wrapped_func = self.__call__
        # wrapped is available means that the action is created by decorating a callable
        if wrapped:
            self.wrapped_func = wrapped
            self.__name__ = wrapped.__name__
            self.is_decorator_action = True
    
    def __call__(self, predecessors_results: list[Any] = None) -> Any:
        """
        Execute the action with results from predecessor actions.
        
        Args:
            predecessors_results: List of results from predecessor actions in the DAG.
                                Can be None if this is a root action.
        
        Returns:
            The result of executing this action.
        """
        return self.wrapped_func(predecessors_results)

    @final
    async def rollback(self, predecessors_results: list[Any]) -> O:
        """
        Execute the rollback action for this action.
        
        Rollback actions are used to undo the effects of a failed action or
        to clean up resources when a workflow fails.
        
        Args:
            predecessors_results: List of results from predecessor actions.
        
        Returns:
            The result of the rollback operation, or True if no rollback is defined.
        """
        if self._wrapped_action_instance:
            self.log.info(f"Rolling back {self} with {self._wrapped_action_instance}")
            return await self._wrapped_action_instance(predecessors_results) or True
        self.log.warning(f"No rollback action for {self}")
    
    @final
    @classmethod
    def register_class_as_rollback(cls, wrapped_rollback_class: type["DagaAction[I, O]"]) -> "DagaAction[I, O]":
        """
        Register a class as the rollback action for this action.
        
        Args:
            wrapped_rollback_class: The class to use as the rollback action.
                                   Must be a subclass of DagaAction.
        
        Returns:
            The registered rollback action instance.
        """
        cls._wrapped_action_instance = wrapped_rollback_class()
        return cls._wrapped_action_instance

    @final
    def register_function_as_rollback(self, wrapped_rollback_func: Callable[[Any], Coroutine[O, Any, Any]]) -> "DagaAction[I, O]":
        """
        Register a function as the rollback action for this action.
        
        Args:
            wrapped_rollback_func: The function to use as the rollback action.
                                  Must be a coroutine function.
        
        Returns:
            The registered rollback action instance.
        """
        self._wrapped_action_instance = wrapped_rollback_func
        return self._wrapped_action_instance

    @final
    def __repr__(self):
        """Return a string representation of this action."""
        return f"DagaAction({self.__class__.__name__})" if not self.is_decorator_action else f"DagaAction({self.__name__})"
    
    @final
    def __str__(self):
        """Return a string representation of this action."""
        return self.__repr__()
    
    @final
    def __hash__(self) -> int:
        """Return a hash value for this action."""
        return hash(self.__repr__())

Base class for defining workflow actions in a DAG-based workflow system.

DagaAction represents a single unit of work in a workflow. Each action can receive results from predecessor actions and produce output that can be consumed by successor actions. Actions can also have rollback functionality for error recovery.

Generic Parameters: I: Input type for the action (when used as a decorator) O: Output type of the action

Attributes

_wrapped_action_instance
The rollback action instance if registered
log
Logger instance for this action
is_decorator_action
Whether this action was created via decorator
wrapped_func
The actual function to be executed

Initialize a DagaAction instance.

Args

wrapped
Optional coroutine function to wrap. If provided, this action is created as a decorator action with the wrapped function.

Ancestors

  • typing.Generic

Subclasses

Class variables

var log : logging.Logger

The type of the None singleton.

Static methods

def register_class_as_rollback(wrapped_rollback_class: type['DagaAction[I, O]'])

Register a class as the rollback action for this action.

Args

wrapped_rollback_class
The class to use as the rollback action. Must be a subclass of DagaAction.

Returns

The registered rollback action instance.

Methods

def register_function_as_rollback(self, wrapped_rollback_func: Callable[[Any], Coroutine[O, Any, Any]]) ‑> DagaAction[I, O]
Expand source code
@final
def register_function_as_rollback(self, wrapped_rollback_func: Callable[[Any], Coroutine[O, Any, Any]]) -> "DagaAction[I, O]":
    """
    Register a function as the rollback action for this action.
    
    Args:
        wrapped_rollback_func: The function to use as the rollback action.
                              Must be a coroutine function.
    
    Returns:
        The registered rollback action instance.
    """
    self._wrapped_action_instance = wrapped_rollback_func
    return self._wrapped_action_instance

Register a function as the rollback action for this action.

Args

wrapped_rollback_func
The function to use as the rollback action. Must be a coroutine function.

Returns

The registered rollback action instance.

async def rollback(self, predecessors_results: list[typing.Any]) ‑> O
Expand source code
@final
async def rollback(self, predecessors_results: list[Any]) -> O:
    """
    Execute the rollback action for this action.
    
    Rollback actions are used to undo the effects of a failed action or
    to clean up resources when a workflow fails.
    
    Args:
        predecessors_results: List of results from predecessor actions.
    
    Returns:
        The result of the rollback operation, or True if no rollback is defined.
    """
    if self._wrapped_action_instance:
        self.log.info(f"Rolling back {self} with {self._wrapped_action_instance}")
        return await self._wrapped_action_instance(predecessors_results) or True
    self.log.warning(f"No rollback action for {self}")

Execute the rollback action for this action.

Rollback actions are used to undo the effects of a failed action or to clean up resources when a workflow fails.

Args

predecessors_results
List of results from predecessor actions.

Returns

The result of the rollback operation, or True if no rollback is defined.

class DagaFlow (dag: networkx.classes.digraph.DiGraph)
Expand source code
class DagaFlow[I, O]:
    """
    Orchestrator for executing DAG-based workflows with rollback support.
    
    DagaFlow takes a directed acyclic graph (DAG) of actions and executes them
    in the correct order, handling failures and performing rollbacks when necessary.
    Actions within the same batch are executed in parallel for efficiency.
    
    Generic Parameters:
        I: Input type for the workflow
        O: Output type of the workflow
    
    Attributes:
        dag: The NetworkX DiGraph representing the workflow
        action_matrix: List of batches containing ActionDescriptor objects
    """
    
    def __init__(self, dag: nx.DiGraph):
        """
        Initialize a DagaFlow instance with a DAG.
        
        Args:
            dag: A NetworkX DiGraph where nodes are DagaAction instances and
                 edges represent dependencies between actions
        
        Raises:
            AssertionError: If the provided graph is not a directed acyclic graph
        """
        self.dag = dag
        assert nx.is_directed_acyclic_graph(self.dag), "DAG must be a directed acyclic graph"
        DagaFlowUtils.initialize_dag_as_flow(self.dag)
        self.action_matrix = DagaFlowUtils.get_flow_batches(self.dag)

    def __repr__(self):
        """Return a string representation of this DagaFlow instance."""
        return f"DagaFlow({self.dag})"

    def __str__(self):
        """Return a string representation of this DagaFlow instance."""
        return self.__repr__()
    
    async def run(self, input: I) -> list[O]:
        """
        Execute the workflow with the given input.
        
        This method executes all actions in the workflow in the correct order,
        with parallel execution within batches. If any action fails, the workflow
        performs rollback operations for all affected actions.
        
        Args:
            input: The initial input value for the workflow
        
        Returns:
            List of results from the final batch of actions
        
        Raises:
            ExceptionGroup: If any action fails, containing all the errors
                           that occurred during execution
        """
        self.action_matrix[0][0].result = input
        for batch_index, batch in enumerate(self.action_matrix[1:], start=1):
            tasks = [DagaFlowUtils.wrap_action(action, self.action_matrix) for action in batch]
            await asyncio.gather(*tasks, return_exceptions=True)
            failed_actions = list(filter(lambda x: x.error, batch))
            if failed_actions:
                for failed_action in failed_actions:
                    await DagaFlowUtils.rollback_action(failed_action, self.action_matrix)
                for batch in self.action_matrix[batch_index:]:
                    rollback_tasks = [DagaFlowUtils.rollback_action(action, self.action_matrix) for action in batch]
                    await asyncio.gather(*rollback_tasks)
                raise ExceptionGroup("DagaFlowError", [failed_action.error for failed_action in failed_actions])
        return [action.result for action in self.action_matrix[-1]]

Orchestrator for executing DAG-based workflows with rollback support.

DagaFlow takes a directed acyclic graph (DAG) of actions and executes them in the correct order, handling failures and performing rollbacks when necessary. Actions within the same batch are executed in parallel for efficiency.

Generic Parameters: I: Input type for the workflow O: Output type of the workflow

Attributes

dag
The NetworkX DiGraph representing the workflow
action_matrix
List of batches containing ActionDescriptor objects

Initialize a DagaFlow instance with a DAG.

Args

dag
A NetworkX DiGraph where nodes are DagaAction instances and edges represent dependencies between actions

Raises

AssertionError
If the provided graph is not a directed acyclic graph

Ancestors

  • typing.Generic

Methods

async def run(self, input: I) ‑> list[O]
Expand source code
async def run(self, input: I) -> list[O]:
    """
    Execute the workflow with the given input.
    
    This method executes all actions in the workflow in the correct order,
    with parallel execution within batches. If any action fails, the workflow
    performs rollback operations for all affected actions.
    
    Args:
        input: The initial input value for the workflow
    
    Returns:
        List of results from the final batch of actions
    
    Raises:
        ExceptionGroup: If any action fails, containing all the errors
                       that occurred during execution
    """
    self.action_matrix[0][0].result = input
    for batch_index, batch in enumerate(self.action_matrix[1:], start=1):
        tasks = [DagaFlowUtils.wrap_action(action, self.action_matrix) for action in batch]
        await asyncio.gather(*tasks, return_exceptions=True)
        failed_actions = list(filter(lambda x: x.error, batch))
        if failed_actions:
            for failed_action in failed_actions:
                await DagaFlowUtils.rollback_action(failed_action, self.action_matrix)
            for batch in self.action_matrix[batch_index:]:
                rollback_tasks = [DagaFlowUtils.rollback_action(action, self.action_matrix) for action in batch]
                await asyncio.gather(*rollback_tasks)
            raise ExceptionGroup("DagaFlowError", [failed_action.error for failed_action in failed_actions])
    return [action.result for action in self.action_matrix[-1]]

Execute the workflow with the given input.

This method executes all actions in the workflow in the correct order, with parallel execution within batches. If any action fails, the workflow performs rollback operations for all affected actions.

Args

input
The initial input value for the workflow

Returns

List of results from the final batch of actions

Raises

ExceptionGroup
If any action fails, containing all the errors that occurred during execution