OpenAI-Compatible Workflows#
This guide introduces AReaL’s experimental OpenAI-compatible API for writing rollout workflows. This feature allows you to use familiar OpenAI client patterns while leveraging AReaL’s inference engine and reward system for RL training data collection.
Overview#
The OpenAI-compatible workflow feature provides:
Familiar API: Use OpenAI’s chat completions interface with AReaL’s backend
Reward tracking: Automatic reward assignment and backpropagation through conversation trees
Tool call support: Compatible with OpenAI’s function calling features
Multi-turn conversations: Built-in support for conversation flows with reward discounting
Caching system: Efficient storage and retrieval of completions with token-level data
Key Components#
ArealOpenAI
Client#
The ArealOpenAI
client is a drop-in replacement for OpenAI’s AsyncOpenAI
client that
routes requests to AReaL’s inference engine:
from areal.experimental.openai import ArealOpenAI
# Create client with AReaL engine and tokenizer
client = ArealOpenAI(engine=engine, tokenizer=tokenizer)
# Use like standard OpenAI client
completion = await client.chat.completions.create(
messages=[{"role": "user", "content": "Hello!"}],
temperature=0.7,
max_completion_tokens=100
)
Reward Management#
The client automatically tracks completions and supports reward assignment:
# Set reward for a completion
client.set_reward(completion.id, reward_value)
# Export all completions with propagated rewards
completions_with_rewards = client.export_completions(turn_discount=0.9)
Creating OpenAI-Compatible Workflows#
Basic Workflow Structure#
Here’s the structure of an OpenAI-compatible workflow:
from areal.api.workflow_api import RolloutWorkflow
from areal.experimental.openai import ArealOpenAI
class MyOpenAIWorkflow(RolloutWorkflow):
def __init__(self, reward_fn, gconfig, tokenizer, **kwargs):
self.reward_fn = reward_fn
self.gconfig = gconfig
self.tokenizer = tokenizer
self.async_reward_fn = AsyncRewardWrapper(reward_fn)
async def _run_one_episode(self, engine, data, rid):
# Create OpenAI-compatible client
client = ArealOpenAI(engine=engine, tokenizer=self.tokenizer)
# Run conversation and collect rewards
# ... workflow logic here ...
# Export completions with rewards
return client.export_completions()
Multi-Turn Conversation Example#
Let’s examine the multi-turn workflow implementation:
async def _run_one_episode(self, engine, data, rid):
client = ArealOpenAI(engine=engine, tokenizer=self.tokenizer)
messages = deepcopy(data["messages"])
t = reward = 0
discount = 1
while reward == 0 and t < self.max_turns:
# Generate response
completion = await client.chat.completions.create(
messages=messages,
temperature=self.gconfig.temperature,
max_completion_tokens=self.gconfig.max_new_tokens,
store=True # Important: enables caching
)
# Get completion with token data
comp_data = client.get_completions(completion.id)
# Evaluate reward
reward = await self.async_reward_fn(
prompt, response, input_tokens, output_tokens, **data
)
t += 1
# Continue conversation if needed
if reward == 0 and t < self.max_turns:
messages.append({
"role": "assistant",
"content": completion.choices[0].message.content
})
messages.append(self.reflection_msg)
discount *= self.turn_discount
# Set final reward
client.set_reward(completion.id, reward * discount)
return client.export_completions(), comp_data
Important Parameters and Options#
Chat Completion Parameters#
The client supports standard OpenAI parameters:
messages
: List of conversation messagestemperature
: Sampling temperature (0.0 to 2.0)max_completion_tokens
: Maximum tokens in responsetop_p
: Nucleus sampling parameterfrequency_penalty
: Frequency penalty for repetitionstop
: Stop sequencestools
: Function calling tools (experimental)store
: Whether to cache the completion for training (default: True)
Tool Call Support#
Enable function calling with tools:
tools = [{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get weather information",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string"}
}
}
}
}]
completion = await client.chat.completions.create(
messages=messages,
tools=tools,
tool_choice="auto" # or "none" to disable
)
Reward Backpropagation#
The system automatically builds conversation trees and backpropagates rewards:
Tree Structure Based on Role Sequences#
The system constructs conversation trees based on message role sequences. If one completion’s role structure is a prefix of another completion’s role structure, it becomes the parent node.
How it works:
Each completion’s messages are converted to a role sequence:
("system", "user", "assistant", "user", ...)
Parent-child relationships are determined by prefix matching of these role sequences
The completion with the longest matching prefix becomes the parent
Examples:
# Example 1: Simple linear conversation
# Completion 1: [system, user] -> role sequence: ("system", "user")
# Completion 2: [system, user, assistant, user] -> role sequence: ("system", "user", "assistant", "user")
# Completion 3: [system, user, assistant, user, assistant, user] -> role sequence: ("system", "user", "assistant", "user", "assistant", "user")
# Tree: C1 -> C2 -> C3 (linear chain)
# Example 2: Branching conversation
# Completion A: [system, user] -> ("system", "user")
# Completion B: [system, user, assistant, user] -> ("system", "user", "assistant", "user")
# Completion C: [system, user, assistant, user] -> ("system", "user", "assistant", "user")
# Completion D: [system, user, assistant, user, assistant, user] -> ("system", "user", "assistant", "user", "assistant", "user")
# Tree: A -> B -> D
# \-> C ->/
# Example 3: Tool calling conversation
# Completion X: [system, user] -> ("system", "user")
# Completion Y: [system, user, assistant, tool, user] -> ("system", "user", "assistant", "tool", "user")
# Completion Z: [system, user, assistant, tool, user, assistant] -> ("system", "user", "assistant", "tool", "user", "assistant")
# Tree: X -> Y -> Z (tool messages are part of the role sequence)
Reward Backpropagation Process:
All nodes can receive their explicitly set rewards
All nodes also receive discounted average of their children’s rewards
Processing occurs in topological order (leaves first, then parents)
Export with Discounting#
# Export with turn-level discounting
completions = client.export_completions(turn_discount=0.9)
# Access reward data
for completion_id, comp_data in completions.items():
reward = comp_data.reward
tensor_dict = comp_data.to_tensor_dict() # Convert to training format
Additional Examples#
For more concrete examples and test cases, check the test suite at
areal/experimental/tests/test_openai.py
.