Module src.flow

DagaFlow module for orchestrating DAG-based workflow execution.

This module provides the main orchestrator class for executing workflows defined as directed acyclic graphs (DAGs) with support for rollback mechanisms.

Classes

class DagaFlow (dag: networkx.classes.digraph.DiGraph)
Expand source code
class DagaFlow[I, O]:
    """
    Orchestrator for executing DAG-based workflows with rollback support.
    
    DagaFlow takes a directed acyclic graph (DAG) of actions and executes them
    in the correct order, handling failures and performing rollbacks when necessary.
    Actions within the same batch are executed in parallel for efficiency.
    
    Generic Parameters:
        I: Input type for the workflow
        O: Output type of the workflow
    
    Attributes:
        dag: The NetworkX DiGraph representing the workflow
        action_matrix: List of batches containing ActionDescriptor objects
    """
    
    def __init__(self, dag: nx.DiGraph):
        """
        Initialize a DagaFlow instance with a DAG.
        
        Args:
            dag: A NetworkX DiGraph where nodes are DagaAction instances and
                 edges represent dependencies between actions
        
        Raises:
            AssertionError: If the provided graph is not a directed acyclic graph
        """
        self.dag = dag
        assert nx.is_directed_acyclic_graph(self.dag), "DAG must be a directed acyclic graph"
        DagaFlowUtils.initialize_dag_as_flow(self.dag)
        self.action_matrix = DagaFlowUtils.get_flow_batches(self.dag)

    def __repr__(self):
        """Return a string representation of this DagaFlow instance."""
        return f"DagaFlow({self.dag})"

    def __str__(self):
        """Return a string representation of this DagaFlow instance."""
        return self.__repr__()
    
    async def run(self, input: I) -> list[O]:
        """
        Execute the workflow with the given input.
        
        This method executes all actions in the workflow in the correct order,
        with parallel execution within batches. If any action fails, the workflow
        performs rollback operations for all affected actions.
        
        Args:
            input: The initial input value for the workflow
        
        Returns:
            List of results from the final batch of actions
        
        Raises:
            ExceptionGroup: If any action fails, containing all the errors
                           that occurred during execution
        """
        self.action_matrix[0][0].result = input
        for batch_index, batch in enumerate(self.action_matrix[1:], start=1):
            tasks = [DagaFlowUtils.wrap_action(action, self.action_matrix) for action in batch]
            await asyncio.gather(*tasks, return_exceptions=True)
            failed_actions = list(filter(lambda x: x.error, batch))
            if failed_actions:
                for failed_action in failed_actions:
                    await DagaFlowUtils.rollback_action(failed_action, self.action_matrix)
                for batch in self.action_matrix[batch_index:]:
                    rollback_tasks = [DagaFlowUtils.rollback_action(action, self.action_matrix) for action in batch]
                    await asyncio.gather(*rollback_tasks)
                raise ExceptionGroup("DagaFlowError", [failed_action.error for failed_action in failed_actions])
        return [action.result for action in self.action_matrix[-1]]

Orchestrator for executing DAG-based workflows with rollback support.

DagaFlow takes a directed acyclic graph (DAG) of actions and executes them in the correct order, handling failures and performing rollbacks when necessary. Actions within the same batch are executed in parallel for efficiency.

Generic Parameters: I: Input type for the workflow O: Output type of the workflow

Attributes

dag
The NetworkX DiGraph representing the workflow
action_matrix
List of batches containing ActionDescriptor objects

Initialize a DagaFlow instance with a DAG.

Args

dag
A NetworkX DiGraph where nodes are DagaAction instances and edges represent dependencies between actions

Raises

AssertionError
If the provided graph is not a directed acyclic graph

Ancestors

  • typing.Generic

Methods

async def run(self, input: I) ‑> list[O]
Expand source code
async def run(self, input: I) -> list[O]:
    """
    Execute the workflow with the given input.
    
    This method executes all actions in the workflow in the correct order,
    with parallel execution within batches. If any action fails, the workflow
    performs rollback operations for all affected actions.
    
    Args:
        input: The initial input value for the workflow
    
    Returns:
        List of results from the final batch of actions
    
    Raises:
        ExceptionGroup: If any action fails, containing all the errors
                       that occurred during execution
    """
    self.action_matrix[0][0].result = input
    for batch_index, batch in enumerate(self.action_matrix[1:], start=1):
        tasks = [DagaFlowUtils.wrap_action(action, self.action_matrix) for action in batch]
        await asyncio.gather(*tasks, return_exceptions=True)
        failed_actions = list(filter(lambda x: x.error, batch))
        if failed_actions:
            for failed_action in failed_actions:
                await DagaFlowUtils.rollback_action(failed_action, self.action_matrix)
            for batch in self.action_matrix[batch_index:]:
                rollback_tasks = [DagaFlowUtils.rollback_action(action, self.action_matrix) for action in batch]
                await asyncio.gather(*rollback_tasks)
            raise ExceptionGroup("DagaFlowError", [failed_action.error for failed_action in failed_actions])
    return [action.result for action in self.action_matrix[-1]]

Execute the workflow with the given input.

This method executes all actions in the workflow in the correct order, with parallel execution within batches. If any action fails, the workflow performs rollback operations for all affected actions.

Args

input
The initial input value for the workflow

Returns

List of results from the final batch of actions

Raises

ExceptionGroup
If any action fails, containing all the errors that occurred during execution