Writing Agent Workflows#

This guide covers best practices for implementing efficient and robust RolloutWorkflow classes and agent workflows in AReaL.

For the difference between RolloutWorkflow and agent workflows, see the Agentic RL Guide.

Best Practices#

Use Async/Await Throughout#

All workflow methods should be async and use await for I/O-bound operations. This enables concurrent execution across multiple rollouts.

# Or the `run` method in agent workflows
async def arun_episode(self, engine, data):
    # Correct: await the engine call
    resp = await engine.agenerate(req)

    # Correct: await other LLM calls
    async with AsyncOpenAI() as client:
        resp = await client.chat.completions.create(...)

    # Incorrect: blocking calls stall other rollouts
    # resp = engine.generate(req)  # Don't do this
    # resp = OpenAI().chat.completions.create(...)  # Don't do this

    # Await HTTP requests with reused client
    session = await workflow_context.get_aiohttp_session()
    async with session.get(url) as response:
        result = await response.json()

    # Await file operations (use aiofiles)
    async with aiofiles.open(path, "r") as f:
        content = await f.read()

Wrap Expensive Reward Functions#

Use AsyncRewardWrapper for reward functions involving CPU-intensive computation, external API calls, or any blocking operation. AsyncRewardWrapper dispatches reward computation to a dedicated process pool.

from areal.api.reward_api import AsyncRewardWrapper

class MyWorkflow(RolloutWorkflow):
    def __init__(self, reward_fn, ...):
        # Wrap the reward function once during initialization
        self.async_reward_fn = AsyncRewardWrapper(reward_fn)

    async def arun_episode(self, engine, data):
        resp = await engine.agenerate(req)
        # Await the wrapped reward function
        reward = await self.async_reward_fn(
            prompt_str,
            completion_str,
            **data,
        )

Avoid Heavy Initialization#

Place expensive setup logic in __init__, not in arun_episode. The arun_episode method runs for every rollout, so repeated initialization wastes resources.

Reuse HTTP Clients via Workflow Context#

Reuse HTTP clients across requests instead of creating new ones. AReaL provides shared clients through workflow_context with automatic lifecycle management.

When using OpenAI, Anthropic, or other SDK clients, pass the shared HTTP client:

from openai import AsyncOpenAI
from areal.infra import workflow_context

class MyAgentWorkflow:
    async def run(self, data, **extra_kwargs):
        # Get pre-configured client from extra_kwargs
        http_client = extra_kwargs.get("http_client")
        base_url = extra_kwargs.get("base_url")

        # Pass to SDK constructor
        client = AsyncOpenAI(
            base_url=base_url,
            http_client=http_client,
            max_retries=0,
        )

        response = await client.chat.completions.create(...)

See Also#