packages.valory.skills.abstract_round_abci.base
This module contains the base classes for the models classes of the skill.
get_name
def get_name(prop: Any) -> str
Get the name of a property.
ABCIAppException Objects
class ABCIAppException(Exception)
A parent class for all exceptions related to the ABCIApp.
SignatureNotValidError Objects
class SignatureNotValidError(ABCIAppException)
Error raised when a signature is invalid.
AddBlockError Objects
class AddBlockError(ABCIAppException)
Exception raised when a block addition is not valid.
ABCIAppInternalError Objects
class ABCIAppInternalError(ABCIAppException)
Internal error due to a bad implementation of the ABCIApp.
__init__
def __init__(message: str, *args: Any) -> None
Initialize the error object.
TransactionTypeNotRecognizedError Objects
class TransactionTypeNotRecognizedError(ABCIAppException)
Error raised when a transaction type is not recognized.
TransactionNotValidError Objects
class TransactionNotValidError(ABCIAppException)
Error raised when a transaction is not valid.
LateArrivingTransaction Objects
class LateArrivingTransaction(ABCIAppException)
Error raised when the transaction belongs to previous round.
AbstractRoundInternalError Objects
class AbstractRoundInternalError(ABCIAppException)
Internal error due to a bad implementation of the AbstractRound.
__init__
def __init__(message: str, *args: Any) -> None
Initialize the error object.
_MetaPayload Objects
class _MetaPayload(ABCMeta)
Payload metaclass.
The purpose of this metaclass is to remember the association between the type of payload and the payload class to build it. This is necessary to recover the right payload class to instantiate at decoding time.
__new__
def __new__(mcs, name: str, bases: Tuple, namespace: Dict,
**kwargs: Any) -> Type
Create a new class object.
BaseTxPayload Objects
@dataclass(frozen=True)
class BaseTxPayload(metaclass=_MetaPayload)
This class represents a base class for transaction payload classes.
data
@property
def data() -> Dict[str, Any]
Data
values
@property
def values() -> Tuple[Any, ...]
Data
json
@property
def json() -> Dict[str, Any]
Json
from_json
@classmethod
def from_json(cls, obj: Dict) -> "BaseTxPayload"
Decode the payload.
with_new_id
def with_new_id() -> "BaseTxPayload"
Create a new payload with the same content but new id.
encode
def encode() -> bytes
Encode
decode
@classmethod
def decode(cls, obj: bytes) -> "BaseTxPayload"
Decode
Transaction Objects
@dataclass(frozen=True)
class Transaction(ABC)
Class to represent a transaction for the ephemeral chain of a period.
encode
def encode() -> bytes
Encode the transaction.
decode
@classmethod
def decode(cls, obj: bytes) -> "Transaction"
Decode the transaction.
verify
def verify(ledger_id: str) -> None
Verify the signature is correct.
Arguments:
ledger_id: the ledger id of the address
Raises:
None: SignatureNotValidError: if the signature is not valid.
Block Objects
class Block()
Class to represent (a subset of) data of a Tendermint block.
__init__
def __init__(header: Header, transactions: Sequence[Transaction]) -> None
Initialize the block.
transactions
@property
def transactions() -> Tuple[Transaction, ...]
Get the transactions.
timestamp
@property
def timestamp() -> datetime.datetime
Get the block timestamp.
Blockchain Objects
class Blockchain()
Class to represent a (naive) Tendermint blockchain.
The consistency of the data in the blocks is guaranteed by Tendermint.
__init__
def __init__(height_offset: int = 0, is_init: bool = True) -> None
Initialize the blockchain.
is_init
@property
def is_init() -> bool
Returns true if the blockchain is initialized.
add_block
def add_block(block: Block) -> None
Add a block to the list.
height
@property
def height() -> int
Get the height.
Tendermint's height starts from 1. A return value equal to 0 means empty blockchain.
Returns:
the height.
length
@property
def length() -> int
Get the blockchain length.
blocks
@property
def blocks() -> Tuple[Block, ...]
Get the blocks.
last_block
@property
def last_block() -> Block
Returns the last stored block.
BlockBuilder Objects
class BlockBuilder()
Helper class to build a block.
__init__
def __init__() -> None
Initialize the block builder.
reset
def reset() -> None
Reset the temporary data structures.
header
@property
def header() -> Header
Get the block header.
Returns:
the block header
header
@header.setter
def header(header: Header) -> None
Set the header.
transactions
@property
def transactions() -> Tuple[Transaction, ...]
Get the sequence of transactions.
add_transaction
def add_transaction(transaction: Transaction) -> None
Add a transaction.
get_block
def get_block() -> Block
Get the block.
AbciAppDB Objects
class AbciAppDB()
Class to represent all data replicated across agents.
This class stores all the data in self._data. Every entry on this dict represents an optional "period" within your app execution. The concept of period is user-defined, so it might be something like a sequence of rounds that together conform a logical cycle of its execution, or it might have no sense at all (thus its optionality) and therefore only period 0 will be used.
Every "period" entry stores a dict where every key is a saved parameter and its corresponding value a list containing the history of the parameter values. For instance, for period 0:
0: {"parameter_name": [parameter_history]}
A complete database could look like this:
data = { 0: { "participants": [ {"participant_a", "participant_b", "participant_c", "participant_d"}, {"participant_a", "participant_b", "participant_c"}, {"participant_a", "participant_b", "participant_c", "participant_d"}, ] }, "other_parameter": [0, 2, 8] }, 1: { "participants": [ {"participant_a", "participant_c", "participant_d"}, {"participant_a", "participant_b", "participant_c", "participant_d"}, {"participant_a", "participant_b", "participant_c"}, {"participant_a", "participant_b", "participant_d"}, {"participant_a", "participant_b", "participant_c", "participant_d"}, ], "other_parameter": [3, 19, 10, 32, 6] }, 2: ... }
Adding and removing data from the current period
To update the current period entry, just call update() on the class. The new values will be appended to the current list for each updated parameter.
To clean up old data from the current period entry, call cleanup_current_histories(cleanup_history_depth_current), where cleanup_history_depth_current is the amount of data that you want to keep after the cleanup. The newest cleanup_history_depth_current values will be kept for each parameter in the DB.
Creating and removing old periods
To create a new period entry, call create() on the class. The new values will be stored in a new list for each updated parameter.
To remove old periods, call cleanup(cleanup_history_depth, [cleanup_history_depth_current]), where cleanup_history_depth is the amount of periods that you want to keep after the cleanup. The newest cleanup_history_depth periods will be kept. If you also specify cleanup_history_depth_current, cleanup_current_histories will be also called (see previous point).
The parameters cleanup_history_depth and cleanup_history_depth_current can also be configured in skill.yaml so they are used automatically when the cleanup method is called from AbciApp.cleanup().
Memory warning
The database is implemented in such a way to avoid indirect modification of its contents. It copies all the mutable data structures*, which means that it consumes more memory than expected. This is necessary because otherwise it would risk chance of modification from the behaviour side, which is a safety concern.
The effect of this on the memory usage should not be a big concern, because:
1. The synchronized data of the agents are not intended to store large amount of data.
IPFS should be used in such cases, and only the hash should be synchronized in the db.
2. The data are automatically wiped after a predefined `cleanup_history` depth as described above.
3. The retrieved data are only meant to be used for a short amount of time,
e.g., to perform a decision on a behaviour, which means that the gc will collect them before they are noticed.
- the in-built
copymodule is used, which automatically detects if an item is immutable and skips copying it. For more information take a look at the_deepcopy_atomicmethod and its usage: https://github.com/python/cpython/blob/3.10/Lib/copy.py#L182-L183
__init__
def __init__(setup_data: Dict[str, List[Any]],
cross_period_persisted_keys: Optional[FrozenSet[str]] = None,
logger: Optional[logging.Logger] = None) -> None
Initialize the AbciApp database.
setup_data must be passed as a Dict[str, List[Any]] (the database internal format). The staticmethod 'data_to_lists' can be used to convert from Dict[str, Any] to Dict[str, List[Any]] before instantiating this class.
Arguments:
setup_data: the setup datacross_period_persisted_keys: data keys that will be kept after a new period startslogger: the logger of the abci app
normalize
@staticmethod
def normalize(value: Any) -> str
Attempt to normalize a non-primitive type to insert it into the db.
setup_data
@property
def setup_data() -> Dict[str, Any]
Get the setup_data without entries which have empty values.
Returns:
the setup_data
reset_index
@property
def reset_index() -> int
Get the current reset index.
round_count
@property
def round_count() -> int
Get the round count.
round_count
@round_count.setter
def round_count(round_count: int) -> None
Set the round count.
cross_period_persisted_keys
@property
def cross_period_persisted_keys() -> FrozenSet[str]
Keys in the database which are persistent across periods.
get
def get(key: str, default: Any = VALUE_NOT_PROVIDED) -> Optional[Any]
Given a key, get its last for the current reset index.
get_strict
def get_strict(key: str) -> Any
Get a value from the data dictionary and raise if it is None.
validate
@staticmethod
def validate(data: Any) -> None
Validate if the given data are json serializable and therefore can be accepted into the database.
Arguments:
data: the data to check.
Raises:
ABCIAppInternalError: If the data are not serializable.
update
def update(**kwargs: Any) -> None
Update the current data.
create
def create(**kwargs: Any) -> None
Add a new entry to the data.
Passes automatically the values of the cross_period_persisted_keys to the next period.
Arguments:
kwargs: keyword arguments
get_latest_from_reset_index
def get_latest_from_reset_index(reset_index: int) -> Dict[str, Any]
Get the latest key-value pairs from the data dictionary for the specified period.
get_latest
def get_latest() -> Dict[str, Any]
Get the latest key-value pairs from the data dictionary for the current period.
increment_round_count
def increment_round_count() -> None
Increment the round count.
__repr__
def __repr__() -> str
Return a string representation of the data.
cleanup
def cleanup(cleanup_history_depth: int,
cleanup_history_depth_current: Optional[int] = None) -> None
Reset the db, keeping only the latest entries (periods).
If cleanup_history_depth_current has been also set, also clear oldest historic values in the current entry.
Arguments:
cleanup_history_depth: depth to clean up historycleanup_history_depth_current: whether or not to clean up current entry too.
cleanup_current_histories
def cleanup_current_histories(cleanup_history_depth_current: int) -> None
Reset the parameter histories for the current entry (period), keeping only the latest values for each parameter.
serialize
def serialize() -> str
Serialize the data of the database to a string.
sync
def sync(serialized_data: str) -> None
Synchronize the data using a serialized object.
Arguments:
serialized_data: the serialized data to use in order to sync the db.
Raises:
ABCIAppInternalError: if the given data cannot be deserialized.
hash
def hash() -> bytes
Create a hash of the data.
data_to_lists
@staticmethod
def data_to_lists(data: Dict[str, Any]) -> Dict[str, List[Any]]
Convert Dict[str, Any] to Dict[str, List[Any]].
BaseSynchronizedData Objects
class BaseSynchronizedData()
Class to represent the synchronized data.
This is the relevant data constructed and replicated by the agents.
__init__
def __init__(db: AbciAppDB) -> None
Initialize the synchronized data.
db
@property
def db() -> AbciAppDB
Get DB.
round_count
@property
def round_count() -> int
Get the round count.
period_count
@property
def period_count() -> int
Get the period count.
Periods are executions between calls to AbciAppDB.create(), so as soon as it is called, a new period begins. It is useful to have a logical subdivision of the FSM execution. For example, if AbciAppDB.create() is called during reset, then a period will be the execution between resets.
Returns:
the period count
participants
@property
def participants() -> FrozenSet[str]
Get the currently active participants.
all_participants
@property
def all_participants() -> FrozenSet[str]
Get all registered participants.
max_participants
@property
def max_participants() -> int
Get the number of all the participants.
consensus_threshold
@property
def consensus_threshold() -> int
Get the consensus threshold.
sorted_participants
@property
def sorted_participants() -> Sequence[str]
Get the sorted participants' addresses.
The addresses are sorted according to their hexadecimal value; this is the reason we use key=str.lower as comparator.
This property is useful when interacting with the Safe contract.
Returns:
the sorted participants' addresses
nb_participants
@property
def nb_participants() -> int
Get the number of participants.
slashing_config
@property
def slashing_config() -> str
Get the slashing configuration.
slashing_config
@slashing_config.setter
def slashing_config(config: str) -> None
Set the slashing configuration.
update
def update(synchronized_data_class: Optional[Type] = None,
**kwargs: Any) -> "BaseSynchronizedData"
Copy and update the current data.
create
def create(
synchronized_data_class: Optional[Type] = None
) -> "BaseSynchronizedData"
Copy and update with new data. Set values are stored as sorted tuples to the db for determinism.
__repr__
def __repr__() -> str
Return a string representation of the data.
keeper_randomness
@property
def keeper_randomness() -> float
Get the keeper's random number [0-1].
most_voted_randomness
@property
def most_voted_randomness() -> str
Get the most_voted_randomness.
most_voted_keeper_address
@property
def most_voted_keeper_address() -> str
Get the most_voted_keeper_address.
is_keeper_set
@property
def is_keeper_set() -> bool
Check whether keeper is set.
blacklisted_keepers
@property
def blacklisted_keepers() -> Set[str]
Get the current cycle's blacklisted keepers who cannot submit a transaction.
participant_to_selection
@property
def participant_to_selection() -> DeserializedCollection
Check whether keeper is set.
participant_to_randomness
@property
def participant_to_randomness() -> DeserializedCollection
Check whether keeper is set.
participant_to_votes
@property
def participant_to_votes() -> DeserializedCollection
Check whether keeper is set.
safe_contract_address
@property
def safe_contract_address() -> str
Get the safe contract address.
_MetaAbstractRound Objects
class _MetaAbstractRound(ABCMeta)
A metaclass that validates AbstractRound's attributes.
__new__
def __new__(mcs, name: str, bases: Tuple, namespace: Dict,
**kwargs: Any) -> Type
Initialize the class.
AbstractRound Objects
class AbstractRound(Generic[EventType], ABC, metaclass=_MetaAbstractRound)
This class represents an abstract round.
A round is a state of the FSM App execution. It usually involves interactions between participants in the FSM App, although this is not enforced at this level of abstraction.
Concrete classes must set: - synchronized_data_class: the data class associated with this round; - payload_class: the payload type that is allowed for this round;
Optionally, round_id can be defined, although it is recommended to use the autogenerated id.
__init__
def __init__(
synchronized_data: BaseSynchronizedData,
context: SkillContext,
previous_round_payload_class: Optional[Type[BaseTxPayload]] = None
) -> None
Initialize the round.
auto_round_id
@classmethod
def auto_round_id(cls) -> str
Get round id automatically.
This method returns the auto generated id from the class name if the class variable behaviour_id is not set on the child class. Otherwise, it returns the class variable behaviour_id.
round_id
@property
def round_id() -> str
Get round id.
synchronized_data
@property
def synchronized_data() -> BaseSynchronizedData
Get the synchronized data.
check_transaction
def check_transaction(transaction: Transaction) -> None
Check transaction against the current state.
Arguments:
transaction: the transaction
process_transaction
def process_transaction(transaction: Transaction) -> None
Process a transaction.
By convention, the payload handler should be a method of the class that is named '{payload_name}'.
Arguments:
transaction: the transaction.
end_block
@abstractmethod
def end_block() -> Optional[Tuple[BaseSynchronizedData, Enum]]
Process the end of the block.
The role of this method is check whether the round is considered ended.
If the round is ended, the return value is - the final result of the round. - the event that triggers a transition. If None, the period in which the round was executed is considered ended.
This is done after each block because we consider the consensus engine's block, and not the transaction, as the smallest unit on which the consensus is reached; in other words, each read operation on the state should be done only after each block, and not after each transaction.
check_payload_type
def check_payload_type(transaction: Transaction) -> None
Check the transaction is of the allowed transaction type.
Arguments:
transaction: the transaction
Raises:
None: TransactionTypeNotRecognizedError if the transaction can be applied to the current state.
check_majority_possible_with_new_voter
def check_majority_possible_with_new_voter(
votes_by_participant: Dict[str, BaseTxPayload],
new_voter: str,
new_vote: BaseTxPayload,
nb_participants: int,
exception_cls: Type[ABCIAppException] = ABCIAppException) -> None
Check that a Byzantine majority is achievable, once a new vote is added.
Arguments:
votes_by_participant: a mapping from a participant to its vote, before the new vote is addednew_voter: the new voternew_vote: the new votenb_participants: the total number of participantsexception_cls: the class of the exception to raise in case the check fails.
Raises:
None: exception_cls: in case the check does not pass.
check_majority_possible
def check_majority_possible(
votes_by_participant: Dict[str, BaseTxPayload],
nb_participants: int,
exception_cls: Type[ABCIAppException] = ABCIAppException) -> None
Check that a Byzantine majority is still achievable.
The idea is that, even if all the votes have not been delivered yet, it can be deduced whether a quorum cannot be reached due to divergent preferences among the voters and due to a too small number of other participants whose vote has not been delivered yet.
The check fails iff:
nb_remaining_votes + largest_nb_votes < quorum
That is, if the number of remaining votes is not enough to make the most voted item so far to exceed the quorum.
Preconditions on the input: - the size of votes_by_participant should not be greater than "nb_participants - 1" voters - new voter must not be in the current votes_by_participant
Arguments:
votes_by_participant: a mapping from a participant to its votenb_participants: the total number of participantsexception_cls: the class of the exception to raise in case the check fails.
Raises:
exception_cls: in case the check does not pass.
is_majority_possible
def is_majority_possible(votes_by_participant: Dict[str, BaseTxPayload],
nb_participants: int) -> bool
Return true if a Byzantine majority is achievable, false otherwise.
Arguments:
votes_by_participant: a mapping from a participant to its votenb_participants: the total number of participants
Returns:
True if the majority is still possible, false otherwise.
check_payload
@abstractmethod
def check_payload(payload: BaseTxPayload) -> None
Check payload.
process_payload
@abstractmethod
def process_payload(payload: BaseTxPayload) -> None
Process payload.
DegenerateRound Objects
class DegenerateRound(AbstractRound, ABC)
This class represents the finished round during operation.
It is a sink round.
check_payload
def check_payload(payload: BaseTxPayload) -> None
Check payload.
process_payload
def process_payload(payload: BaseTxPayload) -> None
Process payload.
end_block
def end_block() -> Optional[Tuple[BaseSynchronizedData, Enum]]
End block.
CollectionRound Objects
class CollectionRound(AbstractRound, ABC)
CollectionRound.
This class represents abstract logic for collection based rounds where the round object needs to collect data from different agents. The data might for example be from a voting round or estimation round.
_allow_rejoin_payloads is used to allow agents not currently active to
deliver a payload.
__init__
def __init__(*args: Any, **kwargs: Any)
Initialize the collection round.
serialize_collection
@staticmethod
def serialize_collection(
collection: DeserializedCollection) -> SerializedCollection
Deserialize a serialized collection.
deserialize_collection
@staticmethod
def deserialize_collection(
serialized: SerializedCollection) -> DeserializedCollection
Deserialize a serialized collection.
serialized_collection
@property
def serialized_collection() -> SerializedCollection
A collection with the addresses mapped to serialized payloads.
accepting_payloads_from
@property
def accepting_payloads_from() -> FrozenSet[str]
Accepting from the active set, or also from (re)joiners
payloads
@property
def payloads() -> List[BaseTxPayload]
Get all agent payloads
payload_values_count
@property
def payload_values_count() -> Counter
Get count of payload values.
process_payload
def process_payload(payload: BaseTxPayload) -> None
Process payload.
check_payload
def check_payload(payload: BaseTxPayload) -> None
Check Payload
_CollectUntilAllRound Objects
class _CollectUntilAllRound(CollectionRound, ABC)
_CollectUntilAllRound
This class represents abstract logic for when rounds need to collect payloads from all agents.
This round should only be used when non-BFT behaviour is acceptable.
check_payload
def check_payload(payload: BaseTxPayload) -> None
Check Payload
process_payload
def process_payload(payload: BaseTxPayload) -> None
Process payload.
collection_threshold_reached
@property
def collection_threshold_reached() -> bool
Check that the collection threshold has been reached.
CollectDifferentUntilAllRound Objects
class CollectDifferentUntilAllRound(_CollectUntilAllRound, ABC)
CollectDifferentUntilAllRound
This class represents logic for rounds where a round needs to collect different payloads from each agent.
This round should only be used for registration of new agents when there is synchronization of the db.
check_payload
def check_payload(payload: BaseTxPayload) -> None
Check Payload
CollectSameUntilAllRound Objects
class CollectSameUntilAllRound(_CollectUntilAllRound, ABC)
This class represents logic for when a round needs to collect the same payload from all the agents.
This round should only be used for registration of new agents when there is no synchronization of the db.
check_payload
def check_payload(payload: BaseTxPayload) -> None
Check Payload
common_payload
@property
def common_payload() -> Any
Get the common payload among the agents.
common_payload_values
@property
def common_payload_values() -> Tuple[Any, ...]
Get the common payload among the agents.
CollectSameUntilThresholdRound Objects
class CollectSameUntilThresholdRound(CollectionRound, ABC)
CollectSameUntilThresholdRound
This class represents logic for rounds where a round needs to collect same payload from k of n agents.
done_event is emitted when a) the collection threshold (k of n) is reached,
and b) the most voted payload has non-empty attributes. In this case all
payloads are saved under collection_key and the most voted payload attributes
are saved under selection_key.
none_event is emitted when a) the collection threshold (k of n) is reached,
and b) the most voted payload has only empty attributes.
no_majority_event is emitted when it is impossible to reach a k of n majority.
threshold_reached
@property
def threshold_reached() -> bool
Check if the threshold has been reached.
most_voted_payload
@property
def most_voted_payload() -> Any
Get the most voted payload value.
Kept for backward compatibility.
most_voted_payload_values
@property
def most_voted_payload_values() -> Tuple[Any, ...]
Get the most voted payload values.
end_block
def end_block() -> Optional[Tuple[BaseSynchronizedData, Enum]]
Process the end of the block.
OnlyKeeperSendsRound Objects
class OnlyKeeperSendsRound(AbstractRound, ABC)
OnlyKeeperSendsRound
This class represents logic for rounds where only one agent sends a payload.
done_event is emitted when a) the keeper payload has been received and b)
the keeper payload has non-empty attributes. In this case all attributes are saved
under payload_key.
fail_event is emitted when a) the keeper payload has been received and b)
the keeper payload has only empty attributes
process_payload
def process_payload(payload: BaseTxPayload) -> None
Handle a deploy safe payload.
check_payload
def check_payload(payload: BaseTxPayload) -> None
Check a deploy safe payload can be applied to the current state.
end_block
def end_block() -> Optional[Tuple[BaseSynchronizedData, Enum]]
Process the end of the block.
VotingRound Objects
class VotingRound(CollectionRound, ABC)
VotingRound
This class represents logic for rounds where a round needs votes from
agents. Votes are in the form of True (positive), False (negative)
and None (abstain). The round ends when k of n agents make the same vote.
done_event is emitted when a) the collection threshold (k of n) is reached
with k positive votes. In this case all payloads are saved under collection_key.
negative_event is emitted when a) the collection threshold (k of n) is reached
with k negative votes.
none_event is emitted when a) the collection threshold (k of n) is reached
with k abstain votes.
no_majority_event is emitted when it is impossible to reach a k of n majority for
either of the options.
vote_count
@property
def vote_count() -> Counter
Get agent payload vote count
positive_vote_threshold_reached
@property
def positive_vote_threshold_reached() -> bool
Check that the vote threshold has been reached.
negative_vote_threshold_reached
@property
def negative_vote_threshold_reached() -> bool
Check that the vote threshold has been reached.
none_vote_threshold_reached
@property
def none_vote_threshold_reached() -> bool
Check that the vote threshold has been reached.
end_block
def end_block() -> Optional[Tuple[BaseSynchronizedData, Enum]]
Process the end of the block.
CollectDifferentUntilThresholdRound Objects
class CollectDifferentUntilThresholdRound(CollectionRound, ABC)
CollectDifferentUntilThresholdRound
This class represents logic for rounds where a round needs to collect different payloads from k of n agents.
done_event is emitted when a) the required block confirmations
have been met, and b) the collection threshold (k of n) is reached. In
this case all payloads are saved under collection_key.
Extended required_block_confirmations to allow for arrival of more
payloads.
collection_threshold_reached
@property
def collection_threshold_reached() -> bool
Check if the threshold has been reached.
end_block
def end_block() -> Optional[Tuple[BaseSynchronizedData, Enum]]
Process the end of the block.
CollectNonEmptyUntilThresholdRound Objects
class CollectNonEmptyUntilThresholdRound(CollectDifferentUntilThresholdRound,
ABC)
CollectNonEmptyUntilThresholdRound
This class represents logic for rounds where a round needs to collect optionally different payloads from k of n agents, where we only keep the non-empty attributes.
done_event is emitted when a) the required block confirmations
have been met, b) the collection threshold (k of n) is reached, and
c) some non-empty attribute values have been collected. In this case
all payloads are saved under collection_key. Under selection_key
the non-empty attribute values are stored.
none_event is emitted when a) the required block confirmations
have been met, b) the collection threshold (k of n) is reached, and
c) no non-empty attribute values have been collected.
Attention: A none_event might be triggered even though some of the
remaining n-k agents might send non-empty attributes! Extended
required_block_confirmations can alleviate this somewhat.
end_block
def end_block() -> Optional[Tuple[BaseSynchronizedData, Enum]]
Process the end of the block.
TimeoutEvent Objects
@dataclass(order=True)
class TimeoutEvent(Generic[EventType])
Timeout event.
Timeouts Objects
class Timeouts(Generic[EventType])
Class to keep track of pending timeouts.
__init__
def __init__() -> None
Initialize.
size
@property
def size() -> int
Get the size of the timeout queue.
add_timeout
def add_timeout(deadline: datetime.datetime, event: EventType) -> int
Add a timeout.
cancel_timeout
def cancel_timeout(entry_count: int) -> None
Remove a timeout.
Arguments:
entry_count: the entry id to remove.
Raises:
None: KeyError: if the entry count is not found.
pop_earliest_cancelled_timeouts
def pop_earliest_cancelled_timeouts() -> None
Pop earliest cancelled timeouts.
get_earliest_timeout
def get_earliest_timeout() -> Tuple[datetime.datetime, Any]
Get the earliest timeout-event pair.
pop_timeout
def pop_timeout() -> Tuple[datetime.datetime, Any]
Remove and return the earliest timeout-event pair.
_MetaAbciApp Objects
class _MetaAbciApp(ABCMeta)
A metaclass that validates AbciApp's attributes.
__new__
def __new__(mcs, name: str, bases: Tuple, namespace: Dict,
**kwargs: Any) -> Type
Initialize the class.
BackgroundAppType Objects
class BackgroundAppType(Enum)
The type of a background app.
Please note that the values correspond to the priority in which the background apps should be processed when updating rounds.
correct_types
@staticmethod
def correct_types() -> Set[str]
Return the correct types only.
BackgroundAppConfig Objects
@dataclass(frozen=True)
class BackgroundAppConfig(Generic[EventType])
Necessary configuration for a background app.
For a deeper understanding of the various types of background apps and how the config influences
the generated background app's type, please refer to the BackgroundApp class.
The specify_type method provides further insight on the subject matter.
BackgroundApp Objects
class BackgroundApp(Generic[EventType])
A background app.
__init__
def __init__(config: BackgroundAppConfig) -> None
Initialize the BackgroundApp.
__eq__
def __eq__(other: Any) -> bool
Custom equality comparing operator.
__hash__
def __hash__() -> int
Custom hashing operator
specify_type
def specify_type() -> BackgroundAppType
Specify the type of the background app.
setup
def setup(initial_synchronized_data: BaseSynchronizedData,
context: SkillContext) -> None
Set up the background round.
background_round
@property
def background_round() -> AbstractRound
Get the background round.
process_transaction
def process_transaction(transaction: Transaction, dry: bool = False) -> bool
Process a transaction.
TransitionBackup Objects
@dataclass
class TransitionBackup()
Holds transition related information as a backup in case we want to transition back from a background app.
AbciApp Objects
class AbciApp(Generic[EventType], ABC, metaclass=_MetaAbciApp)
Base class for ABCI apps.
Concrete classes of this class implement the ABCI App.
__init__
def __init__(synchronized_data: BaseSynchronizedData, logger: logging.Logger,
context: SkillContext)
Initialize the AbciApp.
is_abstract
@classmethod
def is_abstract(cls) -> bool
Return if the abci app is abstract.
add_background_app
@classmethod
def add_background_app(cls, config: BackgroundAppConfig) -> Type["AbciApp"]
Sets the background related class variables.
For a deeper understanding of the various types of background apps and how the inputs of this method influence
the generated background app's type, please refer to the BackgroundApp class.
The specify_type method provides further insight on the subject matter.
Arguments:
config: the background app's configuration.
Returns:
the AbciApp with the new background app contained in the background_apps set.
synchronized_data
@property
def synchronized_data() -> BaseSynchronizedData
Return the current synchronized data.
get_all_rounds
@classmethod
def get_all_rounds(cls) -> Set[AppState]
Get all the round states.
get_all_events
@classmethod
def get_all_events(cls) -> Set[EventType]
Get all the events.
get_all_round_classes
@classmethod
def get_all_round_classes(
cls,
bg_round_cls: Set[Type[AbstractRound]],
include_background_rounds: bool = False) -> Set[AppState]
Get all round classes.
bg_apps_prioritized
@property
def bg_apps_prioritized() -> Tuple[List[BackgroundApp], ...]
Get the background apps grouped and prioritized by their types.
last_timestamp
@property
def last_timestamp() -> datetime.datetime
Get last timestamp.
setup
def setup() -> None
Set up the behaviour.
schedule_round
def schedule_round(round_cls: AppState) -> None
Schedule a round class.
this means: - cancel timeout events belonging to the current round; - instantiate the new round class and set it as current round; - create new timeout events and schedule them according to the latest timestamp.
Arguments:
round_cls: the class of the new round.
current_round
@property
def current_round() -> AbstractRound
Get the current round.
current_round_id
@property
def current_round_id() -> Optional[str]
Get the current round id.
current_round_height
@property
def current_round_height() -> int
Get the current round height.
last_round_id
@property
def last_round_id() -> Optional[str]
Get the last round id.
is_finished
@property
def is_finished() -> bool
Check whether the AbciApp execution has finished.
latest_result
@property
def latest_result() -> Optional[BaseSynchronizedData]
Get the latest result of the round.
cleanup_timeouts
def cleanup_timeouts() -> None
Remove all timeouts.
Note that this is method is meant to be used only when performing recovery. Calling it in normal execution will result in unexpected behaviour.
check_transaction
def check_transaction(transaction: Transaction) -> None
Check a transaction.
process_transaction
def process_transaction(transaction: Transaction, dry: bool = False) -> None
Process a transaction.
The background rounds run concurrently with other (normal) rounds. First we check if the transaction is meant for a background round, if not we forward it to the current round object.
Arguments:
transaction: the transaction.dry: whether the transaction should only be checked and not processed.
process_event
def process_event(event: EventType,
result: Optional[BaseSynchronizedData] = None) -> None
Process a round event.
update_time
def update_time(timestamp: datetime.datetime) -> None
Observe timestamp from last block.
Arguments:
timestamp: the latest block's timestamp.
cleanup
def cleanup(cleanup_history_depth: int,
cleanup_history_depth_current: Optional[int] = None) -> None
Clear data.
cleanup_current_histories
def cleanup_current_histories(cleanup_history_depth_current: int) -> None
Reset the parameter histories for the current entry (period), keeping only the latest values for each parameter.
OffenseType Objects
class OffenseType(Enum)
The types of offenses.
The values of the enum represent the seriousness of the offence.
Offense types with values >1000 are considered serious.
See also is_light_offence and is_serious_offence functions.
is_light_offence
def is_light_offence(offence_type: OffenseType) -> bool
Check if an offence type is light.
is_serious_offence
def is_serious_offence(offence_type: OffenseType) -> bool
Check if an offence type is serious.
light_offences
def light_offences() -> Iterator[OffenseType]
Get the light offences.
serious_offences
def serious_offences() -> Iterator[OffenseType]
Get the serious offences.
AvailabilityWindow Objects
class AvailabilityWindow()
A cyclic array with a maximum length that holds boolean values.
When an element is added to the array and the maximum length has been reached,
the oldest element is removed. Two attributes num_positive and num_negative
reflect the number of positive and negative elements in the AvailabilityWindow,
they are updated every time a new element is added.
__init__
def __init__(max_length: int) -> None
Initializes the AvailabilityWindow instance.
Arguments:
max_length: the maximum length of the cyclic array.
__eq__
def __eq__(other: Any) -> bool
Compare AvailabilityWindow objects.
has_bad_availability_rate
def has_bad_availability_rate(threshold: float = 0.95) -> bool
Whether the agent on which the window belongs to has a bad availability rate or not.
add
def add(value: bool) -> None
Adds a new boolean value to the cyclic array.
If the maximum length has been reached, the oldest element is removed.
Arguments:
value: The boolean value to add to the cyclic array.
to_dict
def to_dict() -> Dict[str, int]
Returns a dictionary representation of the AvailabilityWindow instance.
from_dict
@classmethod
def from_dict(cls, data: Dict[str, int]) -> "AvailabilityWindow"
Initializes an AvailabilityWindow instance from a dictionary.
OffenceStatus Objects
@dataclass
class OffenceStatus()
A class that holds information about offence status for an agent.
slash_amount
def slash_amount(light_unit_amount: int, serious_unit_amount: int) -> int
Get the slash amount of the current status.
OffenseStatusEncoder Objects
class OffenseStatusEncoder(json.JSONEncoder)
A custom JSON encoder for the offence status dictionary.
default
def default(o: Any) -> Any
The default JSON encoder.
OffenseStatusDecoder Objects
class OffenseStatusDecoder(json.JSONDecoder)
A custom JSON decoder for the offence status dictionary.
__init__
def __init__(*args: Any, **kwargs: Any) -> None
Initialize the custom JSON decoder.
hook
@staticmethod
def hook(
data: Dict[str, Any]
) -> Union[AvailabilityWindow, OffenceStatus, Dict[str, OffenceStatus]]
Perform the custom decoding.
PendingOffense Objects
@dataclass(frozen=True, eq=True)
class PendingOffense()
A dataclass to represent offences that need to be addressed.
__post_init__
def __post_init__() -> None
Post initialization for offence type conversion in case it is given as an int.
SlashingNotConfiguredError Objects
class SlashingNotConfiguredError(Exception)
Custom exception raised when slashing configuration is requested but is not available.
DEFAULT_PENDING_OFFENCE_TTL
1 hour
RoundSequence Objects
class RoundSequence()
This class represents a sequence of rounds
It is a generic class that keeps track of the current round of the consensus period. It receives 'deliver_tx' requests from the ABCI handlers and forwards them to the current active round instance, which implements the ABCI app logic. It also schedules the next round (if any) whenever a round terminates.
__init__
def __init__(context: SkillContext, abci_app_cls: Type[AbciApp])
Initialize the round.
enable_slashing
def enable_slashing() -> None
Enable slashing.
validator_to_agent
@property
def validator_to_agent() -> Dict[str, str]
Get the mapping of the validators' addresses to their agent addresses.
validator_to_agent
@validator_to_agent.setter
def validator_to_agent(validator_to_agent: Dict[str, str]) -> None
Set the mapping of the validators' addresses to their agent addresses.
offence_status
@property
def offence_status() -> Dict[str, OffenceStatus]
Get the mapping of the agents' addresses to their offence status.
offence_status
@offence_status.setter
def offence_status(offence_status: Dict[str, OffenceStatus]) -> None
Set the mapping of the agents' addresses to their offence status.
add_pending_offence
def add_pending_offence(pending_offence: PendingOffense) -> None
Add a pending offence to the set of pending offences.
Pending offences are offences that have been detected, but not yet agreed upon by the consensus. A pending offence is removed from the set of pending offences and added to the OffenceStatus of a validator when the majority of the agents agree on it.
Arguments:
pending_offence: the pending offence to add
Returns:
None
sync_db_and_slashing
def sync_db_and_slashing(serialized_db_state: str) -> None
Sync the database and the slashing configuration.
serialized_offence_status
def serialized_offence_status() -> str
Serialize the offence status.
store_offence_status
def store_offence_status() -> None
Store the serialized offence status.
get_agent_address
def get_agent_address(validator: Validator) -> str
Get corresponding agent address from a Validator instance.
setup
def setup(*args: Any, **kwargs: Any) -> None
Set up the round sequence.
Arguments:
args: the arguments to pass to the round constructor.kwargs: the keyword-arguments to pass to the round constructor.
start_sync
def start_sync() -> None
Set _syncing_up flag to true.
if the _syncing_up flag is set to true, the async_act method won't be executed. For more details refer to
https://github.com/valory-xyz/open-autonomy/issues/247#issuecomment-1012268656
end_sync
def end_sync() -> None
Set _syncing_up flag to false.
syncing_up
@property
def syncing_up() -> bool
Return if the app is in sync mode.
abci_app
@property
def abci_app() -> AbciApp
Get the AbciApp.
blockchain
@property
def blockchain() -> Blockchain
Get the Blockchain instance.
blockchain
@blockchain.setter
def blockchain(_blockchain: Blockchain) -> None
Get the Blockchain instance.
height
@property
def height() -> int
Get the height.
is_finished
@property
def is_finished() -> bool
Check if a round sequence has finished.
check_is_finished
def check_is_finished() -> None
Check if a round sequence has finished.
current_round
@property
def current_round() -> AbstractRound
Get current round.
current_round_id
@property
def current_round_id() -> Optional[str]
Get the current round id.
current_round_height
@property
def current_round_height() -> int
Get the current round height.
last_round_id
@property
def last_round_id() -> Optional[str]
Get the last round id.
last_timestamp
@property
def last_timestamp() -> datetime.datetime
Get the last timestamp.
last_round_transition_timestamp
@property
def last_round_transition_timestamp() -> datetime.datetime
Returns the timestamp for last round transition.
last_round_transition_height
@property
def last_round_transition_height() -> int
Returns the height for last round transition.
last_round_transition_root_hash
@property
def last_round_transition_root_hash() -> bytes
Returns the root hash for last round transition.
last_round_transition_tm_height
@property
def last_round_transition_tm_height() -> int
Returns the Tendermint height for last round transition.
latest_synchronized_data
@property
def latest_synchronized_data() -> BaseSynchronizedData
Get the latest synchronized_data.
root_hash
@property
def root_hash() -> bytes
Get the Merkle root hash of the application state.
This is going to be the database's hash. In this way, the app hash will be reflecting our application's state, and will guarantee that all the agents on the chain apply the changes of the arriving blocks in the same way.
Returns:
the root hash to be included as the Header.AppHash in the next block.
tm_height
@property
def tm_height() -> int
Get Tendermint's current height.
tm_height
@tm_height.setter
def tm_height(_tm_height: int) -> None
Set Tendermint's current height.
block_stall_deadline_expired
@property
def block_stall_deadline_expired() -> bool
Get if the deadline for not having received any begin block requests from the Tendermint node has expired.
set_block_stall_deadline
def set_block_stall_deadline() -> None
Use the local time of the agent and a predefined tolerance, to specify the expiration of the deadline.
init_chain
def init_chain(initial_height: int) -> None
Init chain.
begin_block
def begin_block(header: Header, evidences: Evidences,
last_commit_info: LastCommitInfo) -> None
Begin block.
deliver_tx
def deliver_tx(transaction: Transaction) -> None
Deliver a transaction.
Appends the transaction to build the block on 'end_block' later.
Arguments:
transaction: the transaction.
Raises:
None: an Error otherwise.
end_block
def end_block() -> None
Process the 'end_block' request.
commit
def commit() -> None
Process the 'commit' request.
reset_blockchain
def reset_blockchain(is_replay: bool = False, is_init: bool = False) -> None
Reset blockchain after tendermint reset.
Arguments:
is_replay: whether we are resetting the blockchain while replaying blocks.is_init: whether to process blocks before receiving an init_chain req from tendermint.
reset_state
def reset_state(restart_from_round: str,
round_count: int,
serialized_db_state: Optional[str] = None) -> None
This method resets the state of RoundSequence to the beginning of the period.
Note: This is intended to be used for agent <-> tendermint communication recovery only!
Arguments:
restart_from_round: from which round to restart the abci. This round should be the first round in the last period.round_count: the round count at the beginning of the period -1.serialized_db_state: the state of the database at the beginning of the period. If provided, the database will be reset to this state.
PendingOffencesPayload Objects
@dataclass(frozen=True)
class PendingOffencesPayload(BaseTxPayload)
Represent a transaction payload for pending offences.
PendingOffencesRound Objects
class PendingOffencesRound(CollectSameUntilThresholdRound)
Defines the pending offences background round, which runs concurrently with other rounds to sync the offences.
__init__
def __init__(*args: Any, **kwargs: Any) -> None
Initialize the PendingOffencesRound.
offence_status
@property
def offence_status() -> Dict[str, OffenceStatus]
Get the offence status from the round sequence.
end_block
def end_block() -> None
Process the end of the block for the pending offences background round.
It is important to note that this is a non-standard type of round, meaning it does not emit any events. Instead, it continuously runs in the background. The objective of this round is to consistently monitor the received pending offences and achieve a consensus among the agents.