from agenticai_core.designtime.models.agent import AgentMeta
from agenticai_core.runtime.agents.abstract_orchestrator import AbstractOrchestrator
from agenticai_core.runtime.message_item import MessageItem, ToolCall, ErrorMessage
from typing import List, Optional
class CustomOrchestrator(AbstractOrchestrator):
"""Custom orchestrator with keyword-based routing."""
def __init__(
self,
agents: List[AgentMeta],
name: str = "custom",
description: str = "Custom keyword-based orchestrator"
):
super().__init__(name=name, agents=agents, description=description)
async def _handle_message(self, conversation: List[MessageItem]) -> MessageItem:
"""
Override this method to implement your orchestration logic.
Protocol:
- Incoming: MessageItem with role='user' or role='tool'
- Outgoing: ToolCall (next agent) or ErrorMessage (failure)
"""
last_message = conversation[-1]
try:
# Handle user query
if last_message.role == 'user':
selected_agent = self._select_agent(last_message)
if selected_agent:
return ToolCall(
tool_name=selected_agent.name,
message=last_message.content,
thought=f"Routing to {selected_agent.name}",
reason="Query matches agent capabilities"
)
else:
return ToolCall(
tool_name="route_to_user",
message="I'm not sure how to help. Could you rephrase?",
thought="No suitable agent found"
)
# Handle agent response
elif last_message.role == 'tool':
if self._is_complete(last_message):
return ToolCall(
tool_name="route_to_user",
message=f"Here are the results: {last_message.content}",
thought="Task completed successfully"
)
else:
# Route to another agent for follow-up
next_agent = self._get_next_agent(last_message)
return ToolCall(
tool_name=next_agent.name,
message=f"Continue with: {last_message.content}"
)
return ErrorMessage(error=RuntimeError(f"Unsupported role: {last_message.role}"))
except Exception as e:
return ErrorMessage(error=e)
def _select_agent(self, message: MessageItem) -> Optional[AgentMeta]:
"""Select agent based on message content."""
# Your selection logic here
return self._agents[0] if self._agents else None
def _is_complete(self, message: MessageItem) -> bool:
"""Check if task is complete."""
content = message.content.lower()
return any(indicator in content for indicator in [
"completed", "finished", "done"
])