Documentation Index
Fetch the complete documentation index at: https://koreai.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Custom Orchestration
The orchestrator routes each conversation turn to the right agent and controls the flow of information between agents and users. Implement AbstractOrchestrator to define custom routing logic.
Prerequisites
- AgenticAI Core SDK installed and configured.
- At least one agent configured and converted to
AgentMeta. See Creating Agents.
Message handling protocol
Incoming messages
The orchestrator’s _handle_message method receives a List[MessageItem]. The last item is either:
- A user message (
role='user') — when the user sends a query.
- An agent response (
role='tool') — when an agent completes its task.
Outgoing messages
The method must return one of:
| Return type | When to use |
|---|
ToolCall | Route to an agent or back to the user. |
ErrorMessage | When routing fails or an exception occurs. |
ToolCall fields:
tool_name — the agent to invoke, or "route_to_user" to respond to the user.
message — the content sent to the agent or shown to the user.
input, thought, reason — optional context for debugging and tracing.
Special agent: route_to_user
route_to_user is a built-in proxy for the user. Use it to deliver final answers or request clarification. The message field in the ToolCall is exactly what the user sees.
Create a custom orchestrator
Subclass AbstractOrchestrator and override _handle_message:
from agenticai_core.designtime.models.agent import AgentMeta
from agenticai_core.runtime.agents.abstract_orchestrator import AbstractOrchestrator
from agenticai_core.runtime.message_item import MessageItem, ToolCall, ErrorMessage
from typing import List, Optional
class CustomOrchestrator(AbstractOrchestrator):
"""Custom orchestrator with keyword-based routing."""
def __init__(
self,
agents: List[AgentMeta],
name: str = "custom",
description: str = "Custom keyword-based orchestrator"
):
super().__init__(name=name, agents=agents, description=description)
async def _handle_message(self, conversation: List[MessageItem]) -> MessageItem:
"""
Override this method to implement your orchestration logic.
Protocol:
- Incoming: MessageItem with role='user' or role='tool'
- Outgoing: ToolCall (next agent) or ErrorMessage (failure)
"""
last_message = conversation[-1]
try:
# Handle user query
if last_message.role == 'user':
selected_agent = self._select_agent(last_message)
if selected_agent:
return ToolCall(
tool_name=selected_agent.name,
message=last_message.content,
thought=f"Routing to {selected_agent.name}",
reason="Query matches agent capabilities"
)
else:
return ToolCall(
tool_name="route_to_user",
message="I'm not sure how to help. Could you rephrase?",
thought="No suitable agent found"
)
# Handle agent response
elif last_message.role == 'tool':
if self._is_complete(last_message):
return ToolCall(
tool_name="route_to_user",
message=f"Here are the results: {last_message.content}",
thought="Task completed successfully"
)
else:
# Route to another agent for follow-up
next_agent = self._get_next_agent(last_message)
return ToolCall(
tool_name=next_agent.name,
message=f"Continue with: {last_message.content}"
)
return ErrorMessage(error=RuntimeError(f"Unsupported role: {last_message.role}"))
except Exception as e:
return ErrorMessage(error=e)
def _select_agent(self, message: MessageItem) -> Optional[AgentMeta]:
"""Select agent based on message content."""
# Your selection logic here
return self._agents[0] if self._agents else None
def _is_complete(self, message: MessageItem) -> bool:
"""Check if task is complete."""
content = message.content.lower()
return any(indicator in content for indicator in [
"completed", "finished", "done"
])
Routing strategies
Keyword-based routing
class KeywordOrchestrator(AbstractOrchestrator):
def __init__(self, agents, **kwargs):
super().__init__(agents=agents, **kwargs)
self.keywords = {
"calculator": ["calculate", "math", "add", "multiply"],
"weather": ["weather", "temperature", "forecast"],
"translator": ["translate", "language"]
}
def _select_agent(self, message: MessageItem) -> Optional[AgentMeta]:
content = message.content.lower()
for agent_type, keywords in self.keywords.items():
if any(kw in content for kw in keywords):
for agent in self._agents:
if agent_type in agent.name.lower():
return agent
return self._agents[0] if self._agents else None
Round-robin routing
class RoundRobinOrchestrator(AbstractOrchestrator):
def __init__(self, agents, **kwargs):
super().__init__(agents=agents, **kwargs)
self.current_index = 0
def _select_agent(self, message: MessageItem) -> Optional[AgentMeta]:
if not self._agents:
return None
agent = self._agents[self.current_index]
self.current_index = (self.current_index + 1) % len(self._agents)
return agent
Task-based routing
class TaskOrchestrator(AbstractOrchestrator):
def __init__(self, agents, **kwargs):
super().__init__(agents=agents, **kwargs)
self.task_map = {
"data_analysis": "AnalystAgent",
"customer_support": "SupportAgent",
"billing": "BillingAgent"
}
def _select_agent(self, message: MessageItem) -> Optional[AgentMeta]:
# Analyze message to determine task type
task_type = self._identify_task(message)
agent_name = self.task_map.get(task_type)
if agent_name:
for agent in self._agents:
if agent.name == agent_name:
return agent
return None
Use memory in orchestrators
Use RequestContext to persist and retrieve orchestration state across turns:
from agenticai_core.runtime.sessions.request_context import RequestContext
class StatefulOrchestrator(AbstractOrchestrator):
async def _handle_message(self, conversation: List[MessageItem]) -> MessageItem:
context = RequestContext()
memory = context.get_memory()
# Get orchestration state
state = await memory.get_content('orchestrator_state', {
'last_agent': 1,
'task_count': 1
})
# Your routing logic
selected_agent = self._select_agent(conversation[-1])
# Update state
if state.success and state.data:
await memory.set_content('orchestrator_state', {
'last_agent': selected_agent.name,
'task_count': state.data.get('task_count', 0) + 1
})
return ToolCall(
tool_name=selected_agent.name,
message=conversation[-1].content
)
Add distributed tracing
Decorate _handle_message and _select_agent with @tracer.observe to capture routing spans:
from agenticai_core.runtime.trace._langfuse_tracer import Tracer
tracer = Tracer()
class TracedOrchestrator(AbstractOrchestrator):
@tracer.observe(span_name="Orchestrator._handle_message", kind="Orchestrator")
async def _handle_message(self, conversation: List[MessageItem]) -> MessageItem:
"""Handle message with distributed tracing."""
last_message = conversation[-1]
if last_message.role == 'user':
selected_agent = self._select_agent(last_message)
return ToolCall(
tool_name=selected_agent.name,
message=last_message.content,
thought=f"Routing to {selected_agent.name}"
)
# Handle other cases...
@tracer.observe(metadata={"operation": "agent_selection"})
def _select_agent(self, message: MessageItem):
"""Agent selection with tracing."""
# Selection logic
return self._agents[0]
Common orchestration flows
User → agent → user (single-agent turn):
# User query → select agent → agent processes → return to user
role='user' → route to agent → role='tool' → route_to_user
User → agent → agent → user (chained agents):
# Chain multiple agents before returning
role='user' → Agent1 → Agent2 → route_to_user
User → user (direct response, no agent):
# Direct clarification request
role='user' → route_to_user (no suitable agent)
Register the orchestrator
Pass the orchestrator class to app.start:
from src.orchestrator.custom_orchestrator import CustomOrchestrator
app.start(
orchestrator_cls=CustomOrchestrator,
host="0.0.0.0",
port=8080
)
Best practices
message field: This is the most important field in ToolCall. For agents, it contains the task or question to process. For route_to_user, it is the exact response the user sees.
- Error handling: Handle both success and error cases. Return
ErrorMessage for routing failures and always implement a route_to_user fallback.
- Logging: Use
thought and reason fields for debugging routing decisions. Log agent selection outcomes and track agent performance.
- State management: Use memory stores to track conversation context and agent selection history across turns.
- Performance: Keep routing logic lightweight. Avoid blocking calls in
_handle_message. Use tracing to identify latency bottlenecks.