aea.helpers.pipe
Portable pipe implementation for Linux, MacOS, and Windows.
IPCChannelClient Objects
class IPCChannelClient(ABC)
Multi-platform interprocess communication channel for the client side.
connect
@abstractmethod
async def connect(timeout: float = PIPE_CONN_TIMEOUT) -> bool
Connect to communication channel
Arguments:
- timeout: timeout for other end to connect
Returns:
connection status
write
@abstractmethod
async def write(data: bytes) -> None
Write data bytes to the other end of the channel
Will first write the size than the actual data
Arguments:
- data: bytes to write
read
@abstractmethod
async def read() -> Optional[bytes]
Read bytes from the other end of the channel
Will first read the size than the actual data
Returns:
read bytes
close
@abstractmethod
async def close() -> None
Close the communication channel.
IPCChannel Objects
class IPCChannel(IPCChannelClient)
Multi-platform interprocess communication channel.
in_path
@property
@abstractmethod
def in_path() -> str
Rendezvous point for incoming communication.
Returns:
path
out_path
@property
@abstractmethod
def out_path() -> str
Rendezvous point for outgoing communication.
Returns:
path
PosixNamedPipeProtocol Objects
class PosixNamedPipeProtocol()
Posix named pipes async wrapper communication protocol.
__init__
def __init__(in_path: str,
             out_path: str,
             logger: logging.Logger = _default_logger,
             loop: Optional[AbstractEventLoop] = None) -> None
Initialize a new posix named pipe.
Arguments:
- in_path: rendezvous point for incoming data
- out_path: rendezvous point for outgoing data
- logger: the logger
- loop: the event loop
connect
async def connect(timeout: float = PIPE_CONN_TIMEOUT) -> bool
Connect to the other end of the pipe
Arguments:
- timeout: timeout before failing
Returns:
connection success
write
async def write(data: bytes) -> None
Write to pipe.
Arguments:
- data: bytes to write to pipe
read
async def read() -> Optional[bytes]
Read from pipe.
Returns:
read bytes
close
async def close() -> None
Disconnect pipe.
TCPSocketProtocol Objects
class TCPSocketProtocol()
TCP socket communication protocol.
__init__
def __init__(reader: asyncio.StreamReader,
             writer: asyncio.StreamWriter,
             logger: logging.Logger = _default_logger,
             loop: Optional[AbstractEventLoop] = None) -> None
Initialize the tcp socket protocol.
Arguments:
- reader: established asyncio reader
- writer: established asyncio writer
- logger: the logger
- loop: the event loop
writer
@property
def writer() -> StreamWriter
Get a writer associated with protocol.
write
async def write(data: bytes) -> None
Write to socket.
Arguments:
- data: bytes to write
read
async def read() -> Optional[bytes]
Read from socket.
Returns:
read bytes
close
async def close() -> None
Disconnect socket.
TCPSocketChannel Objects
class TCPSocketChannel(IPCChannel)
Interprocess communication channel implementation using tcp sockets.
__init__
def __init__(logger: logging.Logger = _default_logger,
             loop: Optional[AbstractEventLoop] = None) -> None
Initialize tcp socket interprocess communication channel.
connect
async def connect(timeout: float = PIPE_CONN_TIMEOUT) -> bool
Setup communication channel and wait for other end to connect.
Arguments:
- timeout: timeout for the connection to be established
Returns:
connection status
write
async def write(data: bytes) -> None
Write to channel.
Arguments:
- data: bytes to write
read
async def read() -> Optional[bytes]
Read from channel.
Returns:
read bytes
close
async def close() -> None
Disconnect from channel and clean it up.
in_path
@property
def in_path() -> str
Rendezvous point for incoming communication.
out_path
@property
def out_path() -> str
Rendezvous point for outgoing communication.
PosixNamedPipeChannel Objects
class PosixNamedPipeChannel(IPCChannel)
Interprocess communication channel implementation using Posix named pipes.
__init__
def __init__(logger: logging.Logger = _default_logger,
             loop: Optional[AbstractEventLoop] = None) -> None
Initialize posix named pipe interprocess communication channel.
connect
async def connect(timeout: float = PIPE_CONN_TIMEOUT) -> bool
Setup communication channel and wait for other end to connect.
Arguments:
- timeout: timeout for connection to be established
Returns:
bool, indicating success
write
async def write(data: bytes) -> None
Write to the channel.
Arguments:
- data: data to write to channel
read
async def read() -> Optional[bytes]
Read from the channel.
Returns:
read bytes
close
async def close() -> None
Close the channel and clean it up.
in_path
@property
def in_path() -> str
Rendezvous point for incoming communication.
out_path
@property
def out_path() -> str
Rendezvous point for outgoing communication.
TCPSocketChannelClient Objects
class TCPSocketChannelClient(IPCChannelClient)
Interprocess communication channel client using tcp sockets.
__init__
def __init__(in_path: str,
             out_path: str,
             logger: logging.Logger = _default_logger,
             loop: Optional[AbstractEventLoop] = None) -> None
Initialize a tcp socket communication channel client.
Arguments:
- in_path: rendezvous point for incoming data
- out_path: rendezvous point for outgoing data
- logger: the logger
- loop: the event loop
connect
async def connect(timeout: float = PIPE_CONN_TIMEOUT) -> bool
Connect to the other end of the communication channel.
Arguments:
- timeout: timeout for connection to be established
Returns:
connection status
write
async def write(data: bytes) -> None
Write data to channel.
Arguments:
- data: bytes to write
read
async def read() -> Optional[bytes]
Read data from channel.
Returns:
read bytes
close
async def close() -> None
Disconnect from communication channel.
PosixNamedPipeChannelClient Objects
class PosixNamedPipeChannelClient(IPCChannelClient)
Interprocess communication channel client using Posix named pipes.
__init__
def __init__(in_path: str,
             out_path: str,
             logger: logging.Logger = _default_logger,
             loop: Optional[AbstractEventLoop] = None) -> None
Initialize a posix named pipe communication channel client.
Arguments:
- in_path: rendezvous point for incoming data
- out_path: rendezvous point for outgoing data
- logger: the logger
- loop: the event loop
connect
async def connect(timeout: float = PIPE_CONN_TIMEOUT) -> bool
Connect to the other end of the communication channel.
Arguments:
- timeout: timeout for connection to be established
Returns:
connection status
write
async def write(data: bytes) -> None
Write data to channel.
Arguments:
- data: bytes to write
read
async def read() -> Optional[bytes]
Read data from channel.
Returns:
read bytes
close
async def close() -> None
Disconnect from communication channel.
make_ipc_channel
def make_ipc_channel(logger: logging.Logger = _default_logger,
                     loop: Optional[AbstractEventLoop] = None) -> IPCChannel
Build a portable bidirectional InterProcess Communication channel
Arguments:
- logger: the logger
- loop: the loop
Returns:
IPCChannel
make_ipc_channel_client
def make_ipc_channel_client(
        in_path: str,
        out_path: str,
        logger: logging.Logger = _default_logger,
        loop: Optional[AbstractEventLoop] = None) -> IPCChannelClient
Build a portable bidirectional InterProcess Communication client channel
Arguments:
- in_path: rendezvous point for incoming communication
- out_path: rendezvous point for outgoing outgoing
- logger: the logger
- loop: the loop
Returns:
IPCChannel