Custom runner
The AWorld framework provides a flexible runtime infrastructure that enables developers to implement arbitrarily complex execution logic without modifying the core framework code. As the heart of the runtime, the Runner is also fully extensible—developers can create entirely custom task executors and engines to meet specific business requirements.
When to Implement a Custom Runner
- ✅ You need a specialized execution flow (e.g., parallel execution, conditional branching, loops)
- ✅ You require custom error handling and recovery mechanisms
- ✅ You must integrate with a specific external system
- ✅ You need performance optimization or fine-grained resource control
- ✅ You want to implement a custom scheduling policy
- ✅ You require fully customized output formatting or monitoring logic
When Not to Implement a Custom Runner
- ❌ To modify post-processing logic → Use a Handler
- ❌ To adjust parameters or inputs → Use Hooks or Callbacks
- ❌ To change output format → Use an OutputProcessHook
- ❌ To customize trajectory recording → Extend the TrajectoryStrategy
Creating a Custom Runner
For Agent Tasks
It is recommended to base your implementation on **TaskRunner**.
Inherit from TaskRunner
Override the necessary methods to define your custom execution behavior.
from aworld.runners.task_runner import TaskRunner
from aworld.core.task import Task, TaskResponse
from aworld.core.context.base import Context
class MyCustomRunner(TaskRunner):
"""Custom Runner"""
def __init__(self, task: Task, *args, **kwargs):
super().__init__(task, *args, **kwargs)
# add custom init
self.custom_state = {}
async def pre_run(self):
"""prepare"""
await super().pre_run()
# custom logic
logger.info("Custom runner preparing...")
async def do_run(self, context: Context = None) -> TaskResponse:
"""core logic"""
pass
async def post_run(self):
"""clean"""
await super().post_run()
# custom logic
logger.info("Custom runner cleaning up...")
Customize the core execution process
async def do_run(self, context: Context = None) -> TaskResponse:
"""Custom response,return TaskResponse."""
observation = self.observation
try:
# 1. execution core logic
result = await self._execute_custom_logic(observation)
# 2. gen response
response = TaskResponse(
task_id=self.task.id,
content=result,
status="success"
)
return response
except Exception as e:
logger.error(f"Execution failed: {e}")
return TaskResponse(
task_id=self.task.id,
content=str(e),
status="failed"
)
async def _execute_custom_logic(self, observation):
"""Custom logic"""
pass
Other types of tasks
Inheriting Runner
class TreeSearchRunner(Runner):
"""Customization based on tree search runner."""
def __init__(self, task: Task, *args, **kwargs):
super().__init__(task, *args, **kwargs)
self.max_depth = self.conf.get('max_depth', 3)
self.max_breadth = self.conf.get('max_breadth', 3)
self.search_tree = {}
Execution Core
async def do_run(self, context: Context = None) -> TaskResponse:
"""Perform tree search."""
observation = self.observation
# depth search first
best_result = await self._dfs(observation, depth=0)
return TaskResponse(
task_id=self.task.id,
content=best_result,
status="success"
)
async def _dfs(self, state, depth):
"""depth search first"""
if depth >= self.max_depth:
return state
agent = self.swarm.ordered_agents[0]
# get action
actions = await self._get_possible_actions(agent, state)
# Exploring the max-breadth actions before exploration
best_result = state
best_score = self._evaluate(state)
for action in actions[:self.max_breadth]:
# execute action
next_state = await self._execute_action(action, state)
# Recursively searching
result = await self._dfs(next_state, depth + 1)
# Evaluate result
score = self._evaluate(result)
if score > best_score:
best_score = score
best_result = result
return best_result
async def _get_possible_actions(self, agent, state):
"""Obtain possible actions."""
obs = Observation(content=str(state))
output = await agent.run(obs)
return output.get('actions', [])
async def _execute_action(self, action, state):
"""Execute action."""
tool = self.tools.get(action['tool'])
result = await tool.step(action['args'])
return result.get('next_state', state)
def _evaluate(self, state):
"""Evaluate the status."""
# custom evaluate process
return len(str(state))
Best Practices
✅ Suggestion: Clear division of responsibilities
modular design
class WellDesignedRunner(TaskRunner):
"""Clear processes and modules"""
async def do_run(self, context):
# Only handle the main process
return await self._main_execution(context)
async def _main_execution(self, context):
"""main logic"""
observation = ...
# 1. pre process
processed_obs = await self._preprocess(observation)
# 2. execute main logic
for step in range(self.max_steps):
output = await self._execute_step(processed_obs, step)
results.append(output)
if self._should_stop(output):
break
# 3. update observation
processed_obs = self._update_observation(processed_obs, output)
# 4. post process
final_result = await self._postprocess(results)
...
async def _execute_step(self, obs, step):
pass
def _should_stop(self, output):
pass
def _update_observation(self, ob, output):
pass
async def _preprocess(self):
pass
async def _postprocess(self):
pass
❌ Not recommended: a single method is too large
Severe coupling, difficult to measure individually
class PoorlyDesignedRunner(TaskRunner):
async def do_run(self, context):
# All logic is executed inside
# ... Hundreds of lines of code ...
pass