Skip to content

Workflow

We use the classic graph syntax to describe workflows in AWorld. The following are the basic scenarios for constructing agent workflows.

Agent Native Workflow

Sequential

"""
Sequential Agent Pipeline: agent1 → agent2 → agent3

Executes agents in sequence where each agent's output becomes 
the next agent's input, enabling multi-step collaborative processing.
"""

swarm = Swarm([(agent1, agent2), (agent2, agent3)], root_agent=[agent1])
result: TaskResponse = Runners.run(input=question, swarm=swarm)

Parallel

"""
Parallel Agent Execution with Barrier Synchronization

    Input ──┬─→ agent1 ──┐
            │            ├──→ agent3 (barrier wait)
            └─→ agent2 ──┘

- agent1 and agent2 execute in parallel
- agent3 acts as a barrier, waiting for both agents
- agent3 processes combined outputs from agent1 and agent2
"""

swarm = Swarm([(agent1, agent3), (agent2, agent3)], root_agent=[agent1, agent2])
result: TaskResponse = Runners.run(input=question, swarm=swarm)

Parallel Multi-Path

"""
Parallel Multi-Path Agent Execution

    Input ──→ agent1 ──┬──→ agent2 ──┐
                       │             │
                       └──→ agent3 ←─┘ (barrier wait for agent1 & agent2)

- Single input enters only through agent1
- agent1 distributes to both agent2 and agent3
- agent2 processes and feeds agent3
- agent3 waits for both agent1 and agent2 completion
- agent3 synthesizes outputs from both agent1 and agent2
"""

swarm = Swarm([(agent1, agent2), (agent1, agent3), (agent2, agent3)], root_agent=[agent1])
result: TaskResponse = Runners.run(input=question, swarm=swarm)

Task Native Workflow

Task native workflow is further implemented for Isolating the agent runtimes and environments, in the distributed or other easy-to-overlap scenarios.

Task native workflow is further implemented for isolating agent runtimes and environments, particularly useful in distributed or other scenarios where tool-isolation is required.

task1 = Task(input="my question", agent=agent1)
task2 = Task(agent=agent2)
task3 = Task(agent=agent3)
tasks = [task1, task2, task3]

result: Dict[str, TaskResponse] = Runners.run_task(tasks, RunConfig(sequence_dependent=True))