aea.manager.manager
This module contains the implementation of AEA agents manager.
ProjectNotFoundError Objects
class ProjectNotFoundError(ValueError)
Project not found exception.
ProjectCheckError Objects
class ProjectCheckError(ValueError)
Project check error exception.
__init__
def __init__(msg: str, source_exception: Exception)
Init exception.
ProjectPackageConsistencyCheckError Objects
class ProjectPackageConsistencyCheckError(ValueError)
Check consistency of package versions against already added project.
__init__
def __init__(agent_project_id: PublicId,
conflicting_packages: List[Tuple[PackageIdPrefix, str, str,
Set[PublicId]]])
Initialize the exception.
Arguments:
agent_project_id: the agent project id whose addition has failed.conflicting_packages: the conflicting packages.
BaseAgentRunTask Objects
class BaseAgentRunTask(ABC)
Base abstract class for agent run tasks.
start
@abstractmethod
def start() -> None
Start task.
wait
@abstractmethod
def wait() -> asyncio.Future
Return future to wait task completed.
stop
@abstractmethod
def stop() -> None
Stop task.
is_running
@property
@abstractmethod
def is_running() -> bool
Return is task running.
AgentRunAsyncTask Objects
class AgentRunAsyncTask(BaseAgentRunTask)
Async task wrapper for agent.
__init__
def __init__(agent: AEA, loop: asyncio.AbstractEventLoop) -> None
Init task with agent alias and loop.
create_run_loop
def create_run_loop() -> None
Create run loop.
start
def start() -> None
Start task.
wait
def wait() -> asyncio.Future
Return future to wait task completed.
stop
def stop() -> None
Stop task.
run
async def run() -> None
Run task body.
is_running
@property
def is_running() -> bool
Return is task running.
AgentRunThreadTask Objects
class AgentRunThreadTask(AgentRunAsyncTask)
Threaded wrapper to run agent.
__init__
def __init__(agent: AEA, loop: asyncio.AbstractEventLoop) -> None
Init task with agent alias and loop.
create_run_loop
def create_run_loop() -> None
Create run loop.
start
def start() -> None
Run task in a dedicated thread.
stop
def stop() -> None
Stop the task.
AgentRunProcessTask Objects
class AgentRunProcessTask(BaseAgentRunTask)
Subprocess wrapper to run agent.
PROCESS_JOIN_TIMEOUT
in seconds
PROCESS_ALIVE_SLEEP_TIME
in seconds
__init__
def __init__(agent_alias: AgentAlias, loop: asyncio.AbstractEventLoop) -> None
Init task with agent alias and loop.
start
def start() -> None
Run task in a dedicated process.
wait
def wait() -> asyncio.Future
Return future to wait task completed.
stop
def stop() -> None
Stop the task.
is_running
@property
def is_running() -> bool
Is agent running.
MultiAgentManager Objects
class MultiAgentManager()
Multi agents manager.
__init__
def __init__(working_dir: str,
mode: str = "async",
registry_path: str = DEFAULT_REGISTRY_NAME,
auto_add_remove_project: bool = False,
password: Optional[str] = None) -> None
Initialize manager.
Arguments:
working_dir: directory to store base agents.mode: str. async or threadedregistry_path: str. path to the local packages registryauto_add_remove_project: bool. add/remove project on the first agent add/last agent removepassword: the password to encrypt/decrypt the private key.
data_dir
@property
def data_dir() -> str
Get the certs directory.
get_data_dir_of_agent
def get_data_dir_of_agent(agent_name: str) -> str
Get the data directory of a specific agent.
is_running
@property
def is_running() -> bool
Is manager running.
dict_state
@property
def dict_state() -> Dict[str, Any]
Create MultiAgentManager dist state.
projects
@property
def projects() -> Dict[PublicId, Project]
Get all projects.
add_error_callback
def add_error_callback(
error_callback: Callable[[str, BaseException],
None]) -> "MultiAgentManager"
Add error callback to call on error raised.
start_manager
def start_manager(local: bool = False,
remote: bool = False) -> "MultiAgentManager"
Start manager.
If local = False and remote = False, then the packages are fetched in mixed mode (i.e. first try from local registry, and then from remote registry in case of failure).
Arguments:
local: whether or not to fetch from local registry.remote: whether or not to fetch from remote registry.
Returns:
the MultiAgentManager instance.
last_start_status
@property
def last_start_status() -> Tuple[
bool,
Dict[PublicId, List[Dict]],
List[Tuple[PublicId, List[Dict], Exception]],
]
Get status of the last agents start loading state.
stop_manager
def stop_manager(cleanup: bool = True,
save: bool = False) -> "MultiAgentManager"
Stop manager.
Stops all running agents and stop agent.
Arguments:
cleanup: bool is cleanup on stop.save: bool is save state to file on stop.
Returns:
None
add_project
def add_project(public_id: PublicId,
local: bool = False,
remote: bool = False,
restore: bool = False) -> "MultiAgentManager"
Fetch agent project and all dependencies to working_dir.
If local = False and remote = False, then the packages are fetched in mixed mode (i.e. first try from local registry, and then from remote registry in case of failure).
Arguments:
public_id: the public if of the agent project.local: whether or not to fetch from local registry.remote: whether or not to fetch from remote registry.restore: bool flag for restoring already fetched agent.
Returns:
self
remove_project
def remove_project(public_id: PublicId,
keep_files: bool = False) -> "MultiAgentManager"
Remove agent project.
list_projects
def list_projects() -> List[PublicId]
List all agents projects added.
Returns:
list of public ids of projects
add_agent
def add_agent(public_id: PublicId,
agent_name: Optional[str] = None,
agent_overrides: Optional[dict] = None,
component_overrides: Optional[List[dict]] = None,
local: bool = False,
remote: bool = False,
restore: bool = False) -> "MultiAgentManager"
Create new agent configuration based on project with config overrides applied.
Alias is stored in memory only!
Arguments:
public_id: base agent project public idagent_name: unique name for the agentagent_overrides: overrides for agent config.component_overrides: overrides for component section.local: whether or not to fetch from local registry.remote: whether or not to fetch from remote registry.restore: bool flag for restoring already fetched agent.
Returns:
self
add_agent_with_config
def add_agent_with_config(
public_id: PublicId,
config: List[dict],
agent_name: Optional[str] = None) -> "MultiAgentManager"
Create new agent configuration based on project with config provided.
Alias is stored in memory only!
Arguments:
public_id: base agent project public idagent_name: unique name for the agentconfig: agent config (used for agent re-creation).
Returns:
manager
get_agent_overridables
def get_agent_overridables(agent_name: str) -> Tuple[Dict, List[Dict]]
Get agent config overridables.
Arguments:
agent_name: str
Returns:
Tuple of agent overridables dict and and list of component overridables dict.
set_agent_overrides
def set_agent_overrides(
agent_name: str, agent_overides: Optional[Dict],
components_overrides: Optional[List[Dict]]) -> "MultiAgentManager"
Set agent overrides.
Arguments:
agent_name: stragent_overides: optional dict of agent config overridescomponents_overrides: optional list of dict of components overrides
Returns:
self
list_agents_info
def list_agents_info() -> List[Dict[str, Any]]
List agents detailed info.
Returns:
list of dicts that represents agent info: public_id, name, is_running.
list_agents
def list_agents(running_only: bool = False) -> List[str]
List all agents.
Arguments:
running_only: returns only running if set to True
Returns:
list of agents names
remove_agent
def remove_agent(
agent_name: str,
skip_project_auto_remove: bool = False) -> "MultiAgentManager"
Remove agent alias definition from registry.
Arguments:
agent_name: agent name to removeskip_project_auto_remove: disable auto project remove on last agent removed.
Returns:
None
start_agent
def start_agent(agent_name: str) -> "MultiAgentManager"
Start selected agent.
Arguments:
agent_name: agent name to start
Returns:
None
start_all_agents
def start_all_agents() -> "MultiAgentManager"
Start all not started agents.
Returns:
None
stop_agent
def stop_agent(agent_name: str) -> "MultiAgentManager"
Stop running agent.
Arguments:
agent_name: agent name to stop
Returns:
self
stop_all_agents
def stop_all_agents() -> "MultiAgentManager"
Stop all agents running.
Returns:
self
stop_agents
def stop_agents(agent_names: List[str]) -> "MultiAgentManager"
Stop specified agents.
Arguments:
agent_names: names of agents
Returns:
self
start_agents
def start_agents(agent_names: List[str]) -> "MultiAgentManager"
Stop specified agents.
Arguments:
agent_names: names of agents
Returns:
self
get_agent_alias
def get_agent_alias(agent_name: str) -> AgentAlias
Return details about agent alias definition.
Arguments:
agent_name: name of agent
Returns:
AgentAlias