aea.helpers.multiple_executor
This module contains the helpers to run multiple stoppable tasks in different modes: async, threaded, multiprocess .
ExecutorExceptionPolicies Objects
class ExecutorExceptionPolicies(Enum)
Runner exception policy modes.
stop_all
stop all agents on one agent's failure, log exception
propagate
log exception and reraise it to upper level
log_only
log exception and skip it
AbstractExecutorTask Objects
class AbstractExecutorTask(ABC)
Abstract task class to create Task classes.
__init__
def __init__() -> None
Init task.
future
@property
def future() -> Optional[TaskAwaitable]
Return awaitable to get result of task execution.
future
@future.setter
def future(future: TaskAwaitable) -> None
Set awaitable to get result of task execution.
start
@abstractmethod
def start() -> Tuple[Callable, Sequence[Any]]
Implement start task function here.
stop
@abstractmethod
def stop() -> None
Implement stop task function here.
create_async_task
@abstractmethod
def create_async_task(loop: AbstractEventLoop) -> TaskAwaitable
Create asyncio task for task run in asyncio loop.
Arguments:
- loop: the event loop
Returns:
task to run in asyncio loop.
id
@property
def id() -> Any
Return task id.
failed
@property
def failed() -> bool
Return was exception failed or not.
If it's running it's not failed.
Returns:
bool
AbstractMultiprocessExecutorTask Objects
class AbstractMultiprocessExecutorTask(AbstractExecutorTask)
Task for multiprocess executor.
start
@abstractmethod
def start() -> Tuple[Callable, Sequence[Any]]
Return function and arguments to call within subprocess.
create_async_task
def create_async_task(loop: AbstractEventLoop) -> TaskAwaitable
Create asyncio task for task run in asyncio loop.
Raise error, cause async mode is not supported, cause this task for multiprocess executor only.
Arguments:
- loop: the event loop
Raises:
- ValueError: async task construction not possible
AbstractMultipleExecutor Objects
class AbstractMultipleExecutor(ABC)
Abstract class to create multiple executors classes.
__init__
def __init__(
    tasks: Sequence[AbstractExecutorTask],
    task_fail_policy: ExecutorExceptionPolicies = ExecutorExceptionPolicies.
    propagate
) -> None
Init executor.
Arguments:
- tasks: sequence of AbstractExecutorTask instances to run.
- task_fail_policy: the exception policy of all the tasks
is_running
@property
def is_running() -> bool
Return running state of the executor.
start
def start() -> None
Start tasks.
stop
def stop() -> None
Stop tasks.
num_failed
@property
def num_failed() -> int
Return number of failed tasks.
failed_tasks
@property
def failed_tasks() -> Sequence[AbstractExecutorTask]
Return sequence failed tasks.
not_failed_tasks
@property
def not_failed_tasks() -> Sequence[AbstractExecutorTask]
Return sequence successful tasks.
ThreadExecutor Objects
class ThreadExecutor(AbstractMultipleExecutor)
Thread based executor to run multiple agents in threads.
ProcessExecutor Objects
class ProcessExecutor(ThreadExecutor)
Subprocess based executor to run multiple agents in threads.
AsyncExecutor Objects
class AsyncExecutor(AbstractMultipleExecutor)
Thread based executor to run multiple agents in threads.
AbstractMultipleRunner Objects
class AbstractMultipleRunner()
Abstract multiple runner to create classes to launch tasks with selected mode.
__init__
def __init__(
    mode: str,
    fail_policy: ExecutorExceptionPolicies = ExecutorExceptionPolicies.
    propagate
) -> None
Init with selected executor mode.
Arguments:
- mode: one of supported executor modes
- fail_policy: one of ExecutorExceptionPolicies to be used with Executor
is_running
@property
def is_running() -> bool
Return state of the executor.
start
def start(threaded: bool = False) -> None
Run agents.
Arguments:
- threaded: run in dedicated thread without blocking current thread.
stop
def stop(timeout: Optional[float] = None) -> None
Stop agents.
Arguments:
- timeout: timeout in seconds to wait thread stopped, only if started in thread mode.
num_failed
@property
def num_failed() -> int
Return number of failed tasks.
failed
@property
def failed() -> Sequence[Task]
Return sequence failed tasks.
not_failed
@property
def not_failed() -> Sequence[Task]
Return sequence successful tasks.
try_join_thread
def try_join_thread() -> None
Try to join thread if running in thread mode.