Agent Workflow#
This document describes AReaL’s agent workflow system, which enables training language models using agent frameworks while capturing token-level data for reinforcement learning.
Notes:
This page targets developers seeking a deep understanding of the codebase. For a practical guide, see the Agentic RL Guide.
Read the
RolloutWorkflowreference first, as agent workflows are built on top ofRolloutWorkflow.Scheduler compatibility: Agent workflows are supported on
localandslurmschedulers only. Therayscheduler is incompatible with the HTTP proxy architecture.
Overview#
Agent workflows allow training models using popular agent frameworks (OpenAI Agents SDK, CAMEL-AI, LangChain, etc.) without modifying their core logic. AReaL automatically captures token-level information needed for RL training while preserving the agent’s original behavior.
Key benefits:
Flexibility: Supports any framework using OpenAI/Anthropic messaging protocols
Unified development: Same code for benchmarking, evaluation, and RL training
Algorithmic correctness: Token-level tracking avoids training-inference mismatch
The challenge is that agent frameworks interact with LLMs through high-level APIs that don’t expose token IDs and log probabilities. AReaL solves this by:
Intercepting LLM calls via a proxy server or direct client
Tracking token-level data in an
InteractionCacheBuilding conversation trees for multi-turn reward propagation
Exporting training-ready tensors with proper reward attribution
Relationship with RolloutWorkflow#
Agent workflows are not a separate abstraction—they are automatically wrapped into
RolloutWorkflow through OpenAIProxyWorkflow:
User's Agent Code (async def run())
↓
OpenAIProxyWorkflow (wrapper)
↓
RolloutWorkflow.arun_episode()
↓
dict[str, InteractionWithTokenLogpReward]
↓
Tensor dictionary for training
Any class with an async def run(data, **extra_kwargs) method is recognized as an agent
workflow and wrapped automatically when passed to the trainer.
Two Integration Paradigms#
AReaL offers two approaches for integrating agent frameworks:
Aspect |
Proxy Approach |
Direct Approach |
|---|---|---|
Code modification |
None (just change |
Must accept |
Communication |
HTTP via proxy server |
Direct engine calls |
Framework support |
Any OpenAI-compatible framework |
Frameworks accepting custom clients |
Performance |
HTTP overhead (minimal) |
No HTTP overhead |
Engine state access |
Limited |
Full access |
Recommended for |
Existing agents, third-party frameworks |
Legacy code. Don’t use it proactively. |
See the Agentic RL Guide for concrete examples.
Proxy Approach#
The proxy approach keeps agent code independent from AReaL. Your agent uses the standard
OpenAI/Anthropic messaging protocol with a customized base_url pointing to AReaL’s
proxy server.
AReaL’s trainer automatically provides base_url and http_client during RL training.
class MyAgent:
async def run(self, data, **extra_kwargs):
# AReaL injects these kwargs
http_client = extra_kwargs.get("http_client")
base_url = extra_kwargs.get("base_url")
# Standard OpenAI SDK usage
client = AsyncOpenAI(
base_url=base_url,
http_client=http_client,
max_retries=0,
)
response = await client.chat.completions.create(
model="default",
messages=data["messages"],
)
# Return reward (float) or reward dict
return compute_reward(response, data["answer"])
Direct Approach#
Legacy Pattern: The direct approach using
ArealOpenAIwithRolloutWorkflowis considered legacy and should not be used for new projects. Prefer the proxy approach above, which keeps agent code independent from AReaL internals.
The direct approach uses ArealOpenAI, which extends AsyncOpenAI and binds directly
to the inference engine. This approach requires the workflow to inherit
RolloutWorkflow and use the engine from arun_episode.
from areal.experimental.openai import ArealOpenAI
class MyWorkflow(RolloutWorkflow):
async def arun_episode(self, engine, data):
# Create client bound to engine
client = ArealOpenAI(engine=engine, tokenizer=self.tokenizer)
# Use like standard OpenAI client
response = await client.chat.completions.create(
model="default",
messages=data["messages"],
)
# Set reward and export
reward = compute_reward(response, data["answer"])
client.set_last_reward(reward)
client.apply_reward_discount(turn_discount=0.9)
return client.export_interactions(style="individual")
Execution Modes#
The proxy approach supports two execution modes, configured via rollout.openai.mode:
Inline Mode (Default)#
The agent runs in the same process as the rollout worker. AReaL calls the agent’s run
method directly as an async coroutine, passing base_url and http_client via
extra_kwargs.
rollout:
openai:
mode: inline
Characteristics:
No serialization overhead
Direct access to shared HTTP client
Lower latency
Requires async code
Subprocess Mode#
The agent runs in a separate process pool (ProcessPoolExecutor). AReaL serializes the
agent and data, executes in a subprocess, and deserializes the result.
rollout:
openai:
mode: subproc
subproc_max_workers: 4 # Process pool size
Characteristics:
Agent must be picklable (serializable)
OPENAI_BASE_URLandOPENAI_API_KEYare set as environment variablesAgent reads
base_urlfromos.environ["OPENAI_BASE_URL"]instead ofextra_kwargsSynchronous code allowed inside
run()(AReaL wraps withasyncio.run())Pickling overhead for agent and data
Useful for non-async libraries or process isolation
Subprocess example:
import os
from openai import OpenAI # Sync client is OK
class MySyncAgent:
async def run(self, data, **extra_kwargs):
# In subproc mode, base_url comes from environment
client = OpenAI(
base_url=os.environ.get("OPENAI_BASE_URL"),
api_key="DUMMY",
)
response = client.chat.completions.create(
model="default",
messages=data["messages"],
)
return compute_reward(response, data["answer"])
Architecture#
Proxy Server#
When an agent workflow is detected, AReaL spawns proxy workers running FastAPI servers that implement OpenAI-compatible endpoints.
┌─────────────────────────────────────────────────────────────────┐
│ PPOTrainer │
│ (Detects agent workflow, initializes proxies) │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ RolloutController │
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Rollout │ │ Proxy │ FastAPI server │
│ │ Worker │◄────│ Worker │ /v1/chat/completions │
│ │ │ │ │ /v1/responses │
│ │ SGLang/vLLM │ │ │ /v1/messages │
│ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Key file: areal/experimental/openai/proxy/proxy_rollout_server.py
Four-Process Architecture (Proxy)#
The proxy mode introduces a proxy server between the agent and the inference engine:
│ Controller Process │ │ Rollout Worker (RPC) │ │ Proxy Worker │ │ GPU Process │
│ │ │ │ │ │ │ │
│ RolloutController │ │ Flask HTTP Server │ │ FastAPI HTTP │ │ SGLang/vLLM │
│ │ │ │ │ │ │ Server │ │ │ │
│ ▼ │ │ /call endpoint │ │ OpenAI API │ │ Inference │
│ BatchTaskDispatcher│ │ │ │ │ compatible │ │ Engine │
│ (bg thread) │ │ ▼ │ │ │ │ │ │ │
│ │ │ │ Engine Thread │ │ │ │ │ │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ │ HTTP │ │ ▼ │ │ │ │ │ │ │
│ submit ├────POST───┼─>│ RemoteInfEngine │ │ │ │ │ │ │
│ task 1 │ │ │ │ │ │ │ │ │ │ │
│ │ │ │ ▼ │ │ │ │ │ │ │
│ submit │ │ │ OpenAIProxyWorkflow │ │ │ │ │ │ │
│ task 2 │ │ │ │ │ │ │ │ │ │ │
│ │ │ │ OpenAIProxyClient ──┼──┼──────┤ │ │ │ │
│ submit │ │ │ │ │ │ │ │ │ │ │
│ task 3 │ │ │ agent.run() │ │ │ │ │ │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ │ │ │ ▼ │ │ │ │ │ │ │
│ │ │ │ OpenAI API call ───┼──┼─> /chat/ ───┼──┼─> generate │
│ │ │ │ │ │ │ completions │ │ tokens │
│ │ │ │ │ │ │ │ │ │ │ │
│ │ │ │ ChatCompletion <────┼──┼──────<───────┼──┼──────┘ │
│ │ │ │ │ │ │ (cached) │ │ │
│ │ │ │ │ │ │ │ │ │ │
│ │ │ │ ▼ │ │ │ │ │ │
│ │ │ │ reward │ │ │ │ │ │
│ │ │ │ │ │ │ │ │ │ │
│ │ │ │ set_reward() ──────┼──┼─> /rl/ │ │ │
│ │ │ │ │ │ │ set_reward │ │ │
│ │ │ │ ▼ │ │ │ │ │ │
│ │ │ │ ... │ │ │ │ │ │
│ │ │ │ │ │ │ │ │ │ │
│ │ │ │ ▼ │ │ │ │ │ │
│ │ │ │ trajectory │ │ │ │ │ │
│ │ │ │ │ │ │ │ │ │ │
│ collect<────────┼──┼────────┘ │ │ │ │ │ │
│ │ │ │ │ │ │ │
└────────────────────┴──┴──────────────────────┴──┴──────────────┴──┴─────────────┘
The OpenAIProxyWorkflow contains an OpenAIProxyClient that manages the session
lifecycle with the proxy server. Key interactions include:
chat/completions: Routes agent’s OpenAI API calls to inference engine, caches token-level data
set_reward: Assigns rewards to completions for RL training
Data Flow Detail#
┌───────────────────────────────────────────────────────────────────────────────────────────┐
│ Rollout Worker + Proxy Worker │
│ │
│ ┌─────────────────────┐ ┌──────────────────────────────────────────────────────────┐ │
│ │ OpenAIProxyWorkflow │ │ ProxyRolloutServer (FastAPI) │ │
│ │ │ │ │ │
│ │ 1. grant_capacity()─┼─────>│ │ │
│ │ │ │ │ │
│ │ 2. start_session() ─┼─────>│ → SessionData created │ │
│ │ → session_id <┼──────┤ │ │
│ │ │ │ │ │
│ │ 3. agent.run() │ │ ┌──────────────────────────────────────────────────┐ │ │
│ │ │ │ │ │ ArealOpenAI │ │ │
│ │ └─> OpenAI call ─┼─────>│ │ │ │ │
│ │ │ │ │ /chat/completions │ │ │
│ │ │ │ │ → tokenize, engine.agenerate() ───────────────┼───┼─┼──┐
│ │ │ │ │ → cache in InteractionCache <──────────────┼───┼─┼──┤
│ │ ChatCompletion <┼──────┤ │ → return ChatCompletion │ │ │ │
│ │ │ │ │ │ │ │ │
│ │ │ │ └──────────────────────────────────────────────────┘ │ │ │
│ │ │ │ │ │ │
│ │ 4. set_reward() ─┼─────>│ → reward stored in InteractionCache │ │ │
│ │ │ │ │ │ │
│ │ 5. end_session() ─┼─────>│ → session marked complete │ │ │
│ │ │ │ │ │ │
│ │ 6. export_ │ │ │ │ │
│ │ trajectories() ─┼─────>│ → apply discount, to_tensor_dict() │ │ │
│ │ → tensors <┼──────┤ │ │ │
│ └─────────────────────┘ └──────────────────────────────────────────────────────────┘ │ │
│ │ │
└────────────────────────────────────────────────────────────────────────────────────────────┘ │
│
┌──────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ GPU Process (SGLang/vLLM) │
│ │
│ Continuous batching, KV cache, tensor parallelism │
└─────────────────────────────────────────────────────────┘
Proxy Endpoints#
Endpoint |
Purpose |
|---|---|
|
Reserve slot (staleness control) |
|
Create unique session ID |
|
OpenAI chat completions API |
|
OpenAI responses API |
|
Anthropic Messages API |
|
Assign reward to interaction |
|
Mark session complete |
|
Export with reward discounting |
Session Lifecycle#
Each agent execution follows this lifecycle:
1. Reserve capacity
POST /grant_capacity → Staleness control
2. Start session
POST /rl/start_session → Returns session_id (e.g., "task-0-0")
3. Agent execution (multiple LLM calls)
POST /{session_id}/v1/chat/completions
→ Proxy tokenizes messages
→ Engine generates tokens with logprobs
→ Response stored in InteractionCache
→ ChatCompletion returned to agent
4. Assign rewards
POST /{session_id}/rl/set_reward
Body: {"reward": 1.0} → Last completion
Body: {"interaction_id": "...", "reward": 0.5} → Specific completion
5. End session
POST /{session_id}/rl/end_session
6. Export trajectories
POST /export_trajectories
→ Apply reward backpropagation
→ Return InteractionWithTokenLogpReward objects
Token-Level Tracking#
InteractionCache#
The InteractionCache (extends OrderedDict) stores InteractionWithTokenLogpReward
objects keyed by completion ID.
Key file: areal/experimental/openai/cache.py
Parent-child resolution: When a new interaction is added, the cache finds its parent by checking if any existing interaction’s messages are a prefix of the new one:
# Parent: [system, user]
# Child: [system, user, assistant, user]
# → Child's parent is set to Parent
InteractionWithTokenLogpReward#
This dataclass stores completion data with token-level information:
@dataclass
class InteractionWithTokenLogpReward:
model_response: ModelResponse | None # Token IDs, logprobs from engine
reward: float | None
parent: InteractionWithTokenLogpReward | None
messages: list[dict] # Input messages
output_message_list: list[dict] | None
completion: ChatCompletion | None # OpenAI response object
Key file: areal/experimental/openai/types.py
The to_tensor_dict() method converts to training format:
{
"input_ids": torch.tensor([...], dtype=torch.int32),
"loss_mask": torch.tensor([0]*input_len + [1]*output_len, dtype=torch.int32),
"logprobs": torch.tensor([0]*input_len + output_logprobs, dtype=torch.float32),
"versions": torch.tensor([...], dtype=torch.int32),
"attention_mask": torch.ones(..., dtype=torch.bool),
"rewards": torch.tensor([reward], dtype=torch.float32),
}
Reward System#
Assignment#
Rewards can be assigned in two ways:
Return from
run()method:float: Applied to last completiondict[str, float]: Maps completion IDs to rewards
Explicit API calls (direct approach):
client.set_last_reward(1.0) client.set_reward(completion_id, 0.5)
Backpropagation#
For multi-turn conversations, rewards propagate backward through the conversation tree with geometric discounting:
# Conversation tree:
A → B → C (leaf, reward=1.0)
# With discount=0.9:
C.reward = 1.0
B.reward = 0 + 1.0 × 0.9 = 0.9
A.reward = 0 + 0.9 × 0.9 = 0.81
Processing occurs in reverse topological order (leaves first), ensuring children’s rewards are finalized before propagating to parents.
Configuration#
# Direct approach
client.apply_reward_discount(turn_discount=0.9)
interactions = client.export_interactions(style="individual")
# Proxy approach (via export endpoint)
POST /export_trajectories
Body: {"session_id": "...", "discount": 0.9, "style": "individual"}
Workflow Resolution#
When a workflow is passed to the trainer, AReaL resolves it as follows:
Key file: areal/infra/remote_inf_engine.py (_resolve_workflow method)
def _resolve_workflow(workflow, workflow_kwargs, group_size, proxy_addr):
# 1. RolloutWorkflow instance → use directly
# 2. RolloutWorkflow class → instantiate with kwargs
# 3. String path → import and resolve recursively
# 4. Has run() method → wrap with OpenAIProxyWorkflow
if not isinstance(resolved, RolloutWorkflow):
resolved = OpenAIProxyWorkflow(
agent=resolved,
proxy_addr=proxy_addr,
...
)
# Apply grouping if needed
if group_size > 1:
resolved = GroupedRolloutWorkflow(resolved, group_size)
return resolved
OpenAIProxyWorkflow#
The OpenAIProxyWorkflow wraps user agents into RolloutWorkflow:
Key file: areal/experimental/openai/proxy/workflow.py
class OpenAIProxyWorkflow(RolloutWorkflow):
async def arun_episode(self, engine, data):
# 1. Grant capacity
await self._grant_capacity(http_session)
# 2. Create proxy client (manages session)
proxy_client = OpenAIProxyClient(...)
async with proxy_client:
# 3. Run agent with session URL
rewards = await self._run_agent(proxy_client.session_url, data)
# 4. Assign rewards
if isinstance(rewards, float):
await proxy_client.set_last_reward(rewards)
elif isinstance(rewards, dict):
for id, reward in rewards.items():
await proxy_client.set_reward(id, reward)
# 5. Export interactions
return await proxy_client.export_interactions(
discount=self.discount,
style=self.export_style,
)
The _run_agent method handles both execution modes:
Inline: Calls
agent.run()directly as a coroutineSubprocess: Submits to
ProcessPoolExecutor, setsOPENAI_BASE_URLenvironment variable, wraps withasyncio.run()
ArealOpenAI Client#
The ArealOpenAI class extends AsyncOpenAI for direct engine integration:
Key file: areal/experimental/openai/client.py
Key Methods#
Method |
Description |
|---|---|
|
OpenAI-compatible chat API |
|
OpenAI responses API |
|
Set reward for specific interaction |
|
Set reward for last interaction |
|
Apply backward reward discounting |
|
Export for training |
Export Styles#
Style |
Description |
|---|---|
|
Returns all interactions as separate entries. Trajectories may share prefixes. |
|
Builds conversation tree, returns only leaf nodes. Only valid for linear conversations with matched token sequences. |
Public API#
from areal.experimental.openai import (
ArealOpenAI, # Direct approach client
InteractionWithTokenLogpReward, # Token-level data structure
OpenAIProxyClient, # HTTP client for proxy sessions
OpenAIProxyWorkflow, # Workflow wrapper
)
Training with Agent Trajectories#
A complete agentic episode may contain multiple LLM interactions (turns). For training, these are treated as independent input-output-reward tuples:
Turn 1: [system, user] → output_1 → reward_1 (discounted)
Turn 2: [system, user, asst, user] → output_2 → reward_2 (discounted)
Turn 3: [system, user, asst, user, asst, user] → output_3 → reward_3 (final)
Each tuple includes full token-level data for policy gradient computation: input token IDs, output token IDs, and log probabilities. The discounted rewards ensure the RL objective correctly credits earlier actions for final outcomes.
Token Consistency Guarantee#
Because AReaL stores the actual tokens used during inference (not re-tokenized text), there is no risk of tokenization mismatch between rollout and training. The tokens sent to the inference engine are exactly the tokens used for gradient computation.
Efficient Training with Tree Attention#
Multi-turn trajectories often share long token prefixes, which can slow down training due to redundant computation. AReaL addresses this with prefix-shared tree attention, which computes attention over shared prefixes only once.
See Also#
RolloutWorkflow Reference - Core workflow abstraction
Agentic RL Guide - Practical training guide
Workflow Best Practices - Implementation tips