Agent Workflow#

This document describes AReaL’s agent workflow system, which enables training language models using agent frameworks while capturing token-level data for reinforcement learning.

Notes:

  1. This page targets developers seeking a deep understanding of the codebase. For a practical guide, see the Agentic RL Guide.

  2. Read the RolloutWorkflow reference first, as agent workflows are built on top of RolloutWorkflow.

  3. Scheduler compatibility: Agent workflows are supported on local and slurm schedulers only. The ray scheduler is incompatible with the HTTP proxy architecture.

Overview#

Agent workflows allow training models using popular agent frameworks (OpenAI Agents SDK, CAMEL-AI, LangChain, etc.) without modifying their core logic. AReaL automatically captures token-level information needed for RL training while preserving the agent’s original behavior.

Key benefits:

  • Flexibility: Supports any framework using OpenAI/Anthropic messaging protocols

  • Unified development: Same code for benchmarking, evaluation, and RL training

  • Algorithmic correctness: Token-level tracking avoids training-inference mismatch

The challenge is that agent frameworks interact with LLMs through high-level APIs that don’t expose token IDs and log probabilities. AReaL solves this by:

  1. Intercepting LLM calls via a proxy server or direct client

  2. Tracking token-level data in an InteractionCache

  3. Building conversation trees for multi-turn reward propagation

  4. Exporting training-ready tensors with proper reward attribution

Relationship with RolloutWorkflow#

Agent workflows are not a separate abstraction—they are automatically wrapped into RolloutWorkflow through OpenAIProxyWorkflow:

User's Agent Code (async def run())
           ↓
   OpenAIProxyWorkflow (wrapper)
           ↓
   RolloutWorkflow.arun_episode()
           ↓
   dict[str, InteractionWithTokenLogpReward]
           ↓
   Tensor dictionary for training

Any class with an async def run(data, **extra_kwargs) method is recognized as an agent workflow and wrapped automatically when passed to the trainer.

Two Integration Paradigms#

AReaL offers two approaches for integrating agent frameworks:

Aspect

Proxy Approach

Direct Approach

Code modification

None (just change base_url)

Must accept ArealOpenAI client

Communication

HTTP via proxy server

Direct engine calls

Framework support

Any OpenAI-compatible framework

Frameworks accepting custom clients

Performance

HTTP overhead (minimal)

No HTTP overhead

Engine state access

Limited

Full access

Recommended for

Existing agents, third-party frameworks

Legacy code. Don’t use it proactively.

See the Agentic RL Guide for concrete examples.

Proxy Approach#

The proxy approach keeps agent code independent from AReaL. Your agent uses the standard OpenAI/Anthropic messaging protocol with a customized base_url pointing to AReaL’s proxy server.

AReaL’s trainer automatically provides base_url and http_client during RL training.

class MyAgent:
    async def run(self, data, **extra_kwargs):
        # AReaL injects these kwargs
        http_client = extra_kwargs.get("http_client")
        base_url = extra_kwargs.get("base_url")

        # Standard OpenAI SDK usage
        client = AsyncOpenAI(
            base_url=base_url,
            http_client=http_client,
            max_retries=0,
        )

        response = await client.chat.completions.create(
            model="default",
            messages=data["messages"],
        )

        # Return reward (float) or reward dict
        return compute_reward(response, data["answer"])

Direct Approach#

Legacy Pattern: The direct approach using ArealOpenAI with RolloutWorkflow is considered legacy and should not be used for new projects. Prefer the proxy approach above, which keeps agent code independent from AReaL internals.

The direct approach uses ArealOpenAI, which extends AsyncOpenAI and binds directly to the inference engine. This approach requires the workflow to inherit RolloutWorkflow and use the engine from arun_episode.

from areal.experimental.openai import ArealOpenAI

class MyWorkflow(RolloutWorkflow):
    async def arun_episode(self, engine, data):
        # Create client bound to engine
        client = ArealOpenAI(engine=engine, tokenizer=self.tokenizer)

        # Use like standard OpenAI client
        response = await client.chat.completions.create(
            model="default",
            messages=data["messages"],
        )

        # Set reward and export
        reward = compute_reward(response, data["answer"])
        client.set_last_reward(reward)
        client.apply_reward_discount(turn_discount=0.9)

        return client.export_interactions(style="individual")

Execution Modes#

The proxy approach supports two execution modes, configured via rollout.openai.mode:

Inline Mode (Default)#

The agent runs in the same process as the rollout worker. AReaL calls the agent’s run method directly as an async coroutine, passing base_url and http_client via extra_kwargs.

rollout:
  openai:
    mode: inline

Characteristics:

  • No serialization overhead

  • Direct access to shared HTTP client

  • Lower latency

  • Requires async code

Subprocess Mode#

The agent runs in a separate process pool (ProcessPoolExecutor). AReaL serializes the agent and data, executes in a subprocess, and deserializes the result.

rollout:
  openai:
    mode: subproc
    subproc_max_workers: 4  # Process pool size

Characteristics:

  • Agent must be picklable (serializable)

  • OPENAI_BASE_URL and OPENAI_API_KEY are set as environment variables

  • Agent reads base_url from os.environ["OPENAI_BASE_URL"] instead of extra_kwargs

  • Synchronous code allowed inside run() (AReaL wraps with asyncio.run())

  • Pickling overhead for agent and data

  • Useful for non-async libraries or process isolation

Subprocess example:

import os
from openai import OpenAI  # Sync client is OK

class MySyncAgent:
    async def run(self, data, **extra_kwargs):
        # In subproc mode, base_url comes from environment
        client = OpenAI(
            base_url=os.environ.get("OPENAI_BASE_URL"),
            api_key="DUMMY",
        )

        response = client.chat.completions.create(
            model="default",
            messages=data["messages"],
        )

        return compute_reward(response, data["answer"])

Architecture#

Proxy Server#

When an agent workflow is detected, AReaL spawns proxy workers running FastAPI servers that implement OpenAI-compatible endpoints.

┌─────────────────────────────────────────────────────────────────┐
│                         PPOTrainer                              │
│         (Detects agent workflow, initializes proxies)           │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                    RolloutController                            │
│                                                                 │
│  ┌──────────────┐     ┌──────────────┐                          │
│  │   Rollout    │     │    Proxy     │  FastAPI server          │
│  │   Worker     │◄────│    Worker    │  /v1/chat/completions    │
│  │              │     │              │  /v1/responses           │
│  │ SGLang/vLLM  │     │              │  /v1/messages            │
│  └──────────────┘     └──────────────┘                          │
└─────────────────────────────────────────────────────────────────┘

Key file: areal/experimental/openai/proxy/proxy_rollout_server.py

Four-Process Architecture (Proxy)#

The proxy mode introduces a proxy server between the agent and the inference engine:

│ Controller Process │  │ Rollout Worker (RPC) │  │ Proxy Worker │  │ GPU Process │
│                    │  │                      │  │              │  │             │
│ RolloutController  │  │  Flask HTTP Server   │  │ FastAPI HTTP │  │ SGLang/vLLM │
│        │           │  │        │             │  │    Server    │  │      │      │
│        ▼           │  │   /call endpoint     │  │ OpenAI API   │  │ Inference   │
│ BatchTaskDispatcher│  │        │             │  │ compatible   │  │   Engine    │
│   (bg thread)      │  │        ▼             │  │      │       │  │      │      │
│        │           │  │   Engine Thread      │  │      │       │  │      │      │
│        │           │  │        │             │  │      │       │  │      │      │
│        │    HTTP   │  │        ▼             │  │      │       │  │      │      │
│ submit ├────POST───┼─>│   RemoteInfEngine    │  │      │       │  │      │      │
│ task 1 │           │  │        │             │  │      │       │  │      │      │
│        │           │  │        ▼             │  │      │       │  │      │      │
│ submit │           │  │ OpenAIProxyWorkflow  │  │      │       │  │      │      │
│ task 2 │           │  │        │             │  │      │       │  │      │      │
│        │           │  │  OpenAIProxyClient ──┼──┼──────┤       │  │      │      │
│ submit │           │  │        │             │  │      │       │  │      │      │
│ task 3 │           │  │   agent.run()        │  │      │       │  │      │      │
│        │           │  │        │             │  │      │       │  │      │      │
│        │           │  │        ▼             │  │      │       │  │      │      │
│        │           │  │   OpenAI API call ───┼──┼─>  /chat/ ───┼──┼─> generate  │
│        │           │  │        │             │  │ completions  │  │    tokens   │
│        │           │  │        │             │  │      │       │  │      │      │
│        │           │  │  ChatCompletion <────┼──┼──────<───────┼──┼──────┘      │
│        │           │  │        │             │  │   (cached)   │  │             │
│        │           │  │        │             │  │      │       │  │             │
│        │           │  │        ▼             │  │      │       │  │             │
│        │           │  │     reward           │  │      │       │  │             │
│        │           │  │        │             │  │      │       │  │             │
│        │           │  │   set_reward() ──────┼──┼─>  /rl/      │  │             │
│        │           │  │        │             │  │ set_reward   │  │             │
│        │           │  │        ▼             │  │      │       │  │             │
│        │           │  │     ...              │  │      │       │  │             │
│        │           │  │        │             │  │      │       │  │             │
│        │           │  │        ▼             │  │      │       │  │             │
│        │           │  │    trajectory        │  │      │       │  │             │
│        │           │  │        │             │  │      │       │  │             │
│    collect<────────┼──┼────────┘             │  │      │       │  │             │
│                    │  │                      │  │              │  │             │
└────────────────────┴──┴──────────────────────┴──┴──────────────┴──┴─────────────┘

The OpenAIProxyWorkflow contains an OpenAIProxyClient that manages the session lifecycle with the proxy server. Key interactions include:

  • chat/completions: Routes agent’s OpenAI API calls to inference engine, caches token-level data

  • set_reward: Assigns rewards to completions for RL training

Data Flow Detail#

┌───────────────────────────────────────────────────────────────────────────────────────────┐
│                               Rollout Worker + Proxy Worker                                │
│                                                                                            │
│  ┌─────────────────────┐      ┌──────────────────────────────────────────────────────────┐ │
│  │ OpenAIProxyWorkflow │      │               ProxyRolloutServer (FastAPI)               │ │
│  │                     │      │                                                          │ │
│  │ 1. grant_capacity()─┼─────>│                                                          │ │
│  │                     │      │                                                          │ │
│  │ 2. start_session() ─┼─────>│ → SessionData created                                    │ │
│  │    → session_id    <┼──────┤                                                          │ │
│  │                     │      │                                                          │ │
│  │ 3. agent.run()      │      │   ┌──────────────────────────────────────────────────┐   │ │
│  │    │                │      │   │                   ArealOpenAI                    │   │ │
│  │    └─> OpenAI call ─┼─────>│   │                                                  │   │ │
│  │                     │      │   │  /chat/completions                               │   │ │
│  │                     │      │   │    → tokenize, engine.agenerate() ───────────────┼───┼─┼──┐
│  │                     │      │   │    → cache in InteractionCache    <──────────────┼───┼─┼──┤
│  │    ChatCompletion  <┼──────┤   │    → return ChatCompletion                       │   │ │  │
│  │                     │      │   │                                                  │   │ │  │
│  │                     │      │   └──────────────────────────────────────────────────┘   │ │  │
│  │                     │      │                                                          │ │  │
│  │ 4. set_reward()    ─┼─────>│ → reward stored in InteractionCache                      │ │  │
│  │                     │      │                                                          │ │  │
│  │ 5. end_session()   ─┼─────>│ → session marked complete                                │ │  │
│  │                     │      │                                                          │ │  │
│  │ 6. export_          │      │                                                          │ │  │
│  │    trajectories()  ─┼─────>│ → apply discount, to_tensor_dict()                       │ │  │
│  │    → tensors       <┼──────┤                                                          │ │  │
│  └─────────────────────┘      └──────────────────────────────────────────────────────────┘ │  │
│                                                                                            │  │
└────────────────────────────────────────────────────────────────────────────────────────────┘  │
                                                                                                │
                                             ┌──────────────────────────────────────────────────┘
                                             │
                                             ▼
                           ┌─────────────────────────────────────────────────────────┐
                           │                  GPU Process (SGLang/vLLM)              │
                           │                                                         │
                           │   Continuous batching, KV cache, tensor parallelism     │
                           └─────────────────────────────────────────────────────────┘

Proxy Endpoints#

Endpoint

Purpose

POST /grant_capacity

Reserve slot (staleness control)

POST /rl/start_session

Create unique session ID

POST /{session_id}/v1/chat/completions

OpenAI chat completions API

POST /{session_id}/v1/responses

OpenAI responses API

POST /{session_id}/v1/messages

Anthropic Messages API

POST /{session_id}/rl/set_reward

Assign reward to interaction

POST /{session_id}/rl/end_session

Mark session complete

POST /export_trajectories

Export with reward discounting

Session Lifecycle#

Each agent execution follows this lifecycle:

1. Reserve capacity
   POST /grant_capacity → Staleness control

2. Start session
   POST /rl/start_session → Returns session_id (e.g., "task-0-0")

3. Agent execution (multiple LLM calls)
   POST /{session_id}/v1/chat/completions
     → Proxy tokenizes messages
     → Engine generates tokens with logprobs
     → Response stored in InteractionCache
     → ChatCompletion returned to agent

4. Assign rewards
   POST /{session_id}/rl/set_reward
     Body: {"reward": 1.0}                           → Last completion
     Body: {"interaction_id": "...", "reward": 0.5}  → Specific completion

5. End session
   POST /{session_id}/rl/end_session

6. Export trajectories
   POST /export_trajectories
     → Apply reward backpropagation
     → Return InteractionWithTokenLogpReward objects

Token-Level Tracking#

InteractionCache#

The InteractionCache (extends OrderedDict) stores InteractionWithTokenLogpReward objects keyed by completion ID.

Key file: areal/experimental/openai/cache.py

Parent-child resolution: When a new interaction is added, the cache finds its parent by checking if any existing interaction’s messages are a prefix of the new one:

# Parent: [system, user]
# Child:  [system, user, assistant, user]
# → Child's parent is set to Parent

InteractionWithTokenLogpReward#

This dataclass stores completion data with token-level information:

@dataclass
class InteractionWithTokenLogpReward:
    model_response: ModelResponse | None  # Token IDs, logprobs from engine
    reward: float | None
    parent: InteractionWithTokenLogpReward | None
    messages: list[dict]                  # Input messages
    output_message_list: list[dict] | None
    completion: ChatCompletion | None     # OpenAI response object

Key file: areal/experimental/openai/types.py

The to_tensor_dict() method converts to training format:

{
    "input_ids": torch.tensor([...], dtype=torch.int32),
    "loss_mask": torch.tensor([0]*input_len + [1]*output_len, dtype=torch.int32),
    "logprobs": torch.tensor([0]*input_len + output_logprobs, dtype=torch.float32),
    "versions": torch.tensor([...], dtype=torch.int32),
    "attention_mask": torch.ones(..., dtype=torch.bool),
    "rewards": torch.tensor([reward], dtype=torch.float32),
}

Reward System#

Assignment#

Rewards can be assigned in two ways:

  1. Return from run() method:

    • float: Applied to last completion

    • dict[str, float]: Maps completion IDs to rewards

  2. Explicit API calls (direct approach):

    client.set_last_reward(1.0)
    client.set_reward(completion_id, 0.5)
    

Backpropagation#

For multi-turn conversations, rewards propagate backward through the conversation tree with geometric discounting:

# Conversation tree:
A → B → C (leaf, reward=1.0)

# With discount=0.9:
C.reward = 1.0
B.reward = 0 + 1.0 × 0.9 = 0.9
A.reward = 0 + 0.9 × 0.9 = 0.81

Processing occurs in reverse topological order (leaves first), ensuring children’s rewards are finalized before propagating to parents.

Configuration#

# Direct approach
client.apply_reward_discount(turn_discount=0.9)
interactions = client.export_interactions(style="individual")

# Proxy approach (via export endpoint)
POST /export_trajectories
Body: {"session_id": "...", "discount": 0.9, "style": "individual"}

Workflow Resolution#

When a workflow is passed to the trainer, AReaL resolves it as follows:

Key file: areal/infra/remote_inf_engine.py (_resolve_workflow method)

def _resolve_workflow(workflow, workflow_kwargs, group_size, proxy_addr):
    # 1. RolloutWorkflow instance → use directly
    # 2. RolloutWorkflow class → instantiate with kwargs
    # 3. String path → import and resolve recursively
    # 4. Has run() method → wrap with OpenAIProxyWorkflow

    if not isinstance(resolved, RolloutWorkflow):
        resolved = OpenAIProxyWorkflow(
            agent=resolved,
            proxy_addr=proxy_addr,
            ...
        )

    # Apply grouping if needed
    if group_size > 1:
        resolved = GroupedRolloutWorkflow(resolved, group_size)

    return resolved

OpenAIProxyWorkflow#

The OpenAIProxyWorkflow wraps user agents into RolloutWorkflow:

Key file: areal/experimental/openai/proxy/workflow.py

class OpenAIProxyWorkflow(RolloutWorkflow):
    async def arun_episode(self, engine, data):
        # 1. Grant capacity
        await self._grant_capacity(http_session)

        # 2. Create proxy client (manages session)
        proxy_client = OpenAIProxyClient(...)

        async with proxy_client:
            # 3. Run agent with session URL
            rewards = await self._run_agent(proxy_client.session_url, data)

            # 4. Assign rewards
            if isinstance(rewards, float):
                await proxy_client.set_last_reward(rewards)
            elif isinstance(rewards, dict):
                for id, reward in rewards.items():
                    await proxy_client.set_reward(id, reward)

        # 5. Export interactions
        return await proxy_client.export_interactions(
            discount=self.discount,
            style=self.export_style,
        )

The _run_agent method handles both execution modes:

  • Inline: Calls agent.run() directly as a coroutine

  • Subprocess: Submits to ProcessPoolExecutor, sets OPENAI_BASE_URL environment variable, wraps with asyncio.run()

ArealOpenAI Client#

The ArealOpenAI class extends AsyncOpenAI for direct engine integration:

Key file: areal/experimental/openai/client.py

Key Methods#

Method

Description

chat.completions.create(...)

OpenAI-compatible chat API

responses.create(...)

OpenAI responses API

set_reward(id, reward)

Set reward for specific interaction

set_last_reward(reward)

Set reward for last interaction

apply_reward_discount(turn_discount)

Apply backward reward discounting

export_interactions(style)

Export for training

Export Styles#

Style

Description

individual

Returns all interactions as separate entries. Trajectories may share prefixes.

concat

Builds conversation tree, returns only leaf nodes. Only valid for linear conversations with matched token sequences.

Public API#

from areal.experimental.openai import (
    ArealOpenAI,                     # Direct approach client
    InteractionWithTokenLogpReward,  # Token-level data structure
    OpenAIProxyClient,               # HTTP client for proxy sessions
    OpenAIProxyWorkflow,             # Workflow wrapper
)

Training with Agent Trajectories#

A complete agentic episode may contain multiple LLM interactions (turns). For training, these are treated as independent input-output-reward tuples:

Turn 1: [system, user]                         → output_1 → reward_1 (discounted)
Turn 2: [system, user, asst, user]             → output_2 → reward_2 (discounted)
Turn 3: [system, user, asst, user, asst, user] → output_3 → reward_3 (final)

Each tuple includes full token-level data for policy gradient computation: input token IDs, output token IDs, and log probabilities. The discounted rewards ensure the RL objective correctly credits earlier actions for final outcomes.

Token Consistency Guarantee#

Because AReaL stores the actual tokens used during inference (not re-tokenized text), there is no risk of tokenization mismatch between rollout and training. The tokens sent to the inference engine are exactly the tokens used for gradient computation.

Efficient Training with Tree Attention#

Multi-turn trajectories often share long token prefixes, which can slow down training due to redundant computation. AReaL addresses this with prefix-shared tree attention, which computes attention over shared prefixes only once.

See Also#