aea.skills.tasks
This module contains the classes for tasks.
Task Objects
class Task(WithLogger)
This class implements an abstract task.
__init__
def __init__(**kwargs: Any) -> None
Initialize a task.
__call__
def __call__(*args: Any, **kwargs: Any) -> Any
Execute the task.
Arguments:
- args: positional arguments forwarded to the 'execute' method.
- kwargs: keyword arguments forwarded to the 'execute' method.
Raises:
- ValueError: if the task has already been executed.
Returns:
the task instance
is_executed
@property
def is_executed() -> bool
Check if the task has already been executed.
result
@property
def result() -> Any
Get the result.
Raises:
- ValueError: if the task has not been executed yet.
Returns:
the result from the execute method.
setup
def setup() -> None
Implement the task setup.
execute
@abstractmethod
def execute(*args: Any, **kwargs: Any) -> Any
Run the task logic.
Arguments:
- args: the positional arguments
- kwargs: the keyword arguments
Returns:
any
teardown
def teardown() -> None
Implement the task teardown.
init_worker
def init_worker() -> None
Initialize a worker.
Disable the SIGINT handler of process pool is using. Related to a well-known bug: https://bugs.python.org/issue8296
TaskManager Objects
class TaskManager(WithLogger)
A Task manager.
__init__
def __init__(nb_workers: int = DEFAULT_WORKERS_AMOUNT,
             is_lazy_pool_start: bool = True,
             logger: Optional[logging.Logger] = None,
             pool_mode: str = THREAD_POOL_MODE) -> None
Initialize the task manager.
Arguments:
- nb_workers: the number of worker processes.
- is_lazy_pool_start: option to postpone pool creation till the first enqueue_task called.
- logger: the logger.
- pool_mode: str. multithread or multiprocess
is_started
@property
def is_started() -> bool
Get started status of TaskManager.
Returns:
bool
nb_workers
@property
def nb_workers() -> int
Get the number of workers.
Returns:
int
enqueue_task
def enqueue_task(func: Callable,
                 args: Sequence = (),
                 kwargs: Optional[Dict[str, Any]] = None) -> int
Enqueue a task with the executor.
Arguments:
- func: the callable instance to be enqueued
- args: the positional arguments to be passed to the function.
- kwargs: the keyword arguments to be passed to the function.
Raises:
- ValueError: if the task manager is not running.
Returns:
the task id to get the the result.
get_task_result
def get_task_result(task_id: int) -> AsyncResult
Get the result from a task.
Arguments:
- task_id: the task id
Returns:
async result for task_id
start
def start() -> None
Start the task manager.
stop
def stop() -> None
Stop the task manager.
ThreadedTaskManager Objects
class ThreadedTaskManager(TaskManager)
A threaded task manager.
__init__
def __init__(nb_workers: int = DEFAULT_WORKERS_AMOUNT,
             is_lazy_pool_start: bool = True,
             logger: Optional[logging.Logger] = None) -> None
Initialize the task manager.
Arguments:
- nb_workers: the number of worker processes.
- is_lazy_pool_start: option to postpone pool creation till the first enqueue_task called.
- logger: the logger.
ProcessTaskManager Objects
class ProcessTaskManager(TaskManager)
A multiprocess task manager.
__init__
def __init__(nb_workers: int = DEFAULT_WORKERS_AMOUNT,
             is_lazy_pool_start: bool = True,
             logger: Optional[logging.Logger] = None) -> None
Initialize the task manager.
Arguments:
- nb_workers: the number of worker processes.
- is_lazy_pool_start: option to postpone pool creation till the first enqueue_task called.
- logger: the logger.