Communication Package

Communication module. Provides generic abstraction for communication and commonly used concrete implementations.

class tmtccmd.com.ComInterface

Bases: ABC

Generic form of a communication interface to separate communication logic from the underlying interface.

abstract close(args: Any = 0)

Closes the ComIF and releases any held resources (for example a Communication Port).

Returns:

abstract data_available(timeout: float, parameters: Any = 0) int

Check whether TM packets are available.

Parameters:
  • timeout – Can be used to block on available data if supported by the specific communication interface.

  • parameters – Can be an arbitrary parameter.

Raises:

ReceptionDecodeError – If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors.

Returns:

0 if no data is available, number of packets otherwise.

abstract property id: str
abstract initialize(args: Any = 0) Any

Perform initializations step which can not be done in constructor or which require returnvalues.

abstract is_open() bool

Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking

abstract open(args: Any = 0)

Opens the communication interface to allow communication.

Returns:

abstract receive(parameters: Any = 0) list[bytes]

Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here.

Parameters:

parameters

Raises:

ReceptionDecodeError – If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned.

Returns:

abstract send(data: bytes | bytearray)

Send raw data.

Raises:

SendError – Sending failed for some reason.

exception tmtccmd.com.ReceptionDecodeError(msg: str, custom_exception: Exception | None)

Bases: Exception

Generic decode error which can also wrap the exception thrown by other libraries.

exception tmtccmd.com.SendError(msg: str, custom_exception: Exception | None)

Bases: Exception

Generic send error which can also wrap the exception thrown by other libraries.

TCP Client Module

TCP communication interface

class tmtccmd.com.tcp.TcpCommunicationType(value)

Bases: Enum

Parse for space packets in the TCP stream, using the space packet header.

SPACE_PACKETS = 0
class tmtccmd.com.tcp.TcpSpacepacketsClient(com_if_id: str, space_packet_ids: Sequence[PacketId], inner_thread_delay: float, target_address: EthAddr, max_packets_stored: int | None = None)

Bases: ComInterface

Communication interface for TCP communication. This particular interface expects raw space packets to be sent via TCP and uses a list of passed packet IDs to parse for them.

Initialize a communication interface to send and receive TMTC via TCP.

Parameters:
  • com_if_id

  • space_packet_ids – Valid packet IDs for CCSDS space packets. Those will be used to parse for space packets inside the TCP stream.

  • inner_thread_delay – Polling frequency of TCP thread in seconds.

close(args: Any | None = None) None

Closes the ComIF and releases any held resources (for example a Communication Port).

Returns:

data_available(timeout: float = 0, parameters: Any = 0) int

Check whether TM packets are available.

Parameters:
  • timeout – Can be used to block on available data if supported by the specific communication interface.

  • parameters – Can be an arbitrary parameter.

Raises:

ReceptionDecodeError – If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors.

Returns:

0 if no data is available, number of packets otherwise.

property id: str
initialize(args: Any | None = None)

Perform initializations step which can not be done in constructor or which require returnvalues.

is_open() bool

Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking

open(args: Any | None = None)

Opens the communication interface to allow communication.

Returns:

receive(parameters: float = 0) list[bytes]

Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here.

Parameters:

parameters

Raises:

ReceptionDecodeError – If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned.

Returns:

send(data: bytes | bytearray)

Send raw data.

Raises:

SendError – Sending failed for some reason.

class tmtccmd.com.tcpip_utils.EthAddr(ip_addr: 'str', port: 'int')

Bases: object

classmethod from_tuple(addr: Tuple[str, int]) EthAddr
ip_addr: str
port: int
property to_tuple: Tuple[str, int]
class tmtccmd.com.tcpip_utils.TcpIpConfigIds(value)

Bases: Enum

An enumeration.

RECV_ADDRESS = 2
RECV_MAX_SIZE = 3
SEND_ADDRESS = 1
SPACE_PACKET_ID = 4
class tmtccmd.com.tcpip_utils.TcpIpType(value)

Bases: Enum

An enumeration.

TCP = 1
UDP = 2
UDP_RECV = 3
tmtccmd.com.tcpip_utils.determine_recv_buffer_len(json_cfg_path: str, tcpip_type: TcpIpType)
tmtccmd.com.tcpip_utils.determine_tcp_send_address(json_cfg_path: str) EthAddr
tmtccmd.com.tcpip_utils.determine_tcpip_address(tcpip_type: TcpIpType, json_cfg_path: str) EthAddr
tmtccmd.com.tcpip_utils.determine_udp_recv_address(json_cfg_path: str) EthAddr
tmtccmd.com.tcpip_utils.determine_udp_send_address(json_cfg_path: str) EthAddr
tmtccmd.com.tcpip_utils.prompt_ip_address(type_str: str) EthAddr

Prompt a valid IP address from the user

tmtccmd.com.tcpip_utils.prompt_recv_buffer_len(tcpip_type: TcpIpType) int

UDP Client Module

UDP Communication Interface

class tmtccmd.com.udp.UdpClient(com_if_id: str, send_address: EthAddr, recv_addr: EthAddr | None = None)

Bases: ComInterface

Communication interface for UDP communication

Initialize a communication interface to send and receive UDP datagrams.

Parameters:
  • send_address

  • recv_addr

close(args: any | None = None) None

Closes the ComIF and releases any held resources (for example a Communication Port).

Returns:

data_available(timeout: float = 0, parameters: any = 0) bool

Check whether TM packets are available.

Parameters:
  • timeout – Can be used to block on available data if supported by the specific communication interface.

  • parameters – Can be an arbitrary parameter.

Raises:

ReceptionDecodeError – If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors.

Returns:

0 if no data is available, number of packets otherwise.

property id: str
initialize(args: any | None = None) any

Perform initializations step which can not be done in constructor or which require returnvalues.

is_open() bool

Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking

open(args: any | None = None)

Opens the communication interface to allow communication.

Returns:

receive(poll_timeout: float = 0) List[bytes]

Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here.

Parameters:

parameters

Raises:

ReceptionDecodeError – If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned.

Returns:

send(data: bytes | bytearray)

Send raw data.

Raises:

SendError – Sending failed for some reason.

Serial Communication with COBS Module

class tmtccmd.com.serial_cobs.SerialCobsComIF(ser_cfg: SerialCfg)

Bases: SerialComBase, ComInterface

Serial communication interface which uses the COBS protocol to encode and decode packets.

This class will spin up a receiver thread on the open() call to poll for COBS encoded packets. This means that the close() call might block until the receiver thread has shut down.

close(args: any | None = None) None

Closes the ComIF and releases any held resources (for example a Communication Port).

Returns:

data_available(timeout: float, parameters: any = 0) int

Check whether TM packets are available.

Parameters:
  • timeout – Can be used to block on available data if supported by the specific communication interface.

  • parameters – Can be an arbitrary parameter.

Raises:

ReceptionDecodeError – If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors.

Returns:

0 if no data is available, number of packets otherwise.

property id: str
initialize(args: any | None = None) any

Perform initializations step which can not be done in constructor or which require returnvalues.

is_open() bool

Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking

open(args: any | None = None) None

Spins up a receiver thread to permanently check for new COBS encoded packets.

receive(parameters: any = 0) List[bytes]

Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here.

Parameters:

parameters

Raises:

ReceptionDecodeError – If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned.

Returns:

send(data: bytes | bytearray)

Send raw data.

Raises:

SendError – Sending failed for some reason.

Serial Communication with DLE Module

class tmtccmd.com.serial_dle.DleCfg(dle_queue_len: 'Optional[int]' = None, dle_max_frame: 'Optional[int]' = None, encode_cr: 'bool' = True)

Bases: object

dle_max_frame: int | None = None
dle_queue_len: int | None = None
encode_cr: bool = True
class tmtccmd.com.serial_dle.SerialDleComIF(ser_cfg: SerialCfg, dle_cfg: DleCfg | None)

Bases: SerialComBase, ComInterface

Serial communication interface which uses the DLE protocol to encode and decode packets.

This class will spin up a receiver thread on the open() call to poll for DLE encoded packets. This means that the close() call might block until the receiver thread has shut down.

close(args: any | None = None) None

Closes the ComIF and releases any held resources (for example a Communication Port).

Returns:

data_available(timeout: float, parameters: any = 0) int

Check whether TM packets are available.

Parameters:
  • timeout – Can be used to block on available data if supported by the specific communication interface.

  • parameters – Can be an arbitrary parameter.

Raises:

ReceptionDecodeError – If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors.

Returns:

0 if no data is available, number of packets otherwise.

property id: str
initialize(args: any | None = None) any

Perform initializations step which can not be done in constructor or which require returnvalues.

is_open() bool

Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking

open(args: any | None = None) None

Spins up a receiver thread to permanently check for new DLE encoded packets.

receive(parameters: any = 0) List[bytes]

Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here.

Parameters:

parameters

Raises:

ReceptionDecodeError – If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned.

Returns:

send(data: bytes | bytearray)

Send raw data.

Raises:

SendError – Sending failed for some reason.

Generic Serial Modules

class tmtccmd.com.serial_base.SerialCfg(com_if_id: str, serial_port: str, baud_rate: int, serial_timeout: float)

Bases: object

baud_rate: int
com_if_id: str
serial_port: str
serial_timeout: float
class tmtccmd.com.serial_base.SerialComBase(logger: Logger, ser_cfg: SerialCfg, ser_com_type: SerialCommunicationType)

Bases: object

close_port()
static data_available_from_queue(timeout: float, reception_buffer: deque)
is_port_open() bool
open_port()
class tmtccmd.com.serial_base.SerialCommunicationType(value)

Bases: Enum

Right now, two serial communication methods are supported. One uses frames with a fixed size containing PUS packets and the other uses a simple ASCII based transport layer called DLE. If DLE is used, it is expected that the sender side encoded the packets with the DLE protocol. Any packets sent will also be encoded.

COBS = 0
DLE_ENCODING = 2
class tmtccmd.com.serial_base.SerialConfigIds(value)

Bases: Enum

An enumeration.

SERIAL_BAUD_RATE = 2
SERIAL_COMM_TYPE = 4
SERIAL_DLE_MAX_FRAME_SIZE = 7
SERIAL_DLE_QUEUE_LEN = 6
SERIAL_FRAME_SIZE = 5
SERIAL_PORT = 1
SERIAL_TIMEOUT = 3
tmtccmd.com.ser_utils.check_port_validity(com_port_to_check: str) bool
tmtccmd.com.ser_utils.determine_baud_rate(json_cfg_path: str) int

Determine baud rate. Tries to read from JSON first. If the baud rate is not contained in the config JSON, prompt it from user instead with the option to store value in JSON file.

Returns:

Determined baud rate

tmtccmd.com.ser_utils.determine_com_port(json_cfg_path: str) str

Determine serial port. Tries to read from JSON first. If the serial port is not contained in the config JSON, prompt it from user instead with the option to store value in JSON file.

Returns:

Determined serial port

tmtccmd.com.ser_utils.find_com_port_from_hint(hint: str) Tuple[bool, str]

Find a COM port based on a hint string

tmtccmd.com.ser_utils.prompt_com_port() str

Dummy Module

Dummy Virtual Communication Interface. Currently serves to use the TMTC program without needing external hardware or an extra socket

class tmtccmd.com.dummy.DummyComIF

Bases: ComInterface

close(args: any | None = None) None

Closes the ComIF and releases any held resources (for example a Communication Port).

Returns:

data_available(timeout: float = 0, parameters: any = 0)

Check whether TM packets are available.

Parameters:
  • timeout – Can be used to block on available data if supported by the specific communication interface.

  • parameters – Can be an arbitrary parameter.

Raises:

ReceptionDecodeError – If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors.

Returns:

0 if no data is available, number of packets otherwise.

property id: str
initialize(args: any | None = None) any

Perform initializations step which can not be done in constructor or which require returnvalues.

is_open() bool

Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking

open(args: any | None = None) None

Opens the communication interface to allow communication.

Returns:

receive(parameters: any = 0) list[bytes]

Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here.

Parameters:

parameters

Raises:

ReceptionDecodeError – If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned.

Returns:

send(data: bytes | bytearray)

Send raw data.

Raises:

SendError – Sending failed for some reason.

class tmtccmd.com.dummy.DummyHandler

Bases: object

generate_reply_package()

Generate a reply package. Currently, this only generates a reply for a ping telecommand.

insert_telecommand(data: bytes)
pass_telecommand(data: bytearray)

Deprecated since version 6.0.0: Use insert_telecommand instead

receive_reply_package() List[bytes]

Utilities Module

tmtccmd.com.utils.determine_com_if(com_if_dict: Mapping[str, Tuple[str, Any]], json_cfg_path: str, use_prompts: bool) str
tmtccmd.com.utils.prompt_com_if(com_if_dict: Mapping[str, Tuple[str, Any]]) str
tmtccmd.com.utils.store_com_if_json(com_if_string: str, json_cfg_path: str)

QEMU Module

QEMU_SERIAL Communication Interface to communicate with emulated QEMU_SERIAL hardware via the UART interface.

It utilizes the the asyncio library.

Requirements:

Python >= 3.7 (asyncio support)

Instructions:

Run QEMU_SERIAL (modified for OBSW) via

qemu-system-arm -M isis-obc -monitor stdio -bios path/to/sourceobsw-at91sam9g20_ek-sdram.bin -qmp unix:/tmp/qemu,server -S

Then run the telecommand script with -c 2

class tmtccmd.com.qemu.DataFrame(seq, cat, frame_id, data=None)

Bases: object

Basic protocol unit for communication via the IOX API introduced for external device emulation

bytes()

Convert this protocol unit to raw bytes

class tmtccmd.com.qemu.QEMUComIF(serial_cfg: SerialCfg, ser_com_type: SerialCommunicationType = SerialCommunicationType.DLE_ENCODING)

Bases: ComInterface

Specific Communication Interface implementation of the QEMU_SERIAL USART protocol for the TMTC software

close(_=None) None

Closes the ComIF and releases any held resources (for example a Communication Port).

Returns:

data_available(timeout: float = 0, _=0) int

Check whether TM packets are available.

Parameters:
  • timeout – Can be used to block on available data if supported by the specific communication interface.

  • parameters – Can be an arbitrary parameter.

Raises:

ReceptionDecodeError – If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors.

Returns:

0 if no data is available, number of packets otherwise.

data_available_dle(timeout: float = 0) int
data_available_fixed_frame(timeout: float = 0) int
property id: str
initialize(_=None)

Needs to be called by application code once for DLE mode!

is_open() bool

Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking

open(_=None) None

Opens the communication interface to allow communication.

Returns:

async poll_dle_packets()
receive(_) List[bytes]

Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here.

Parameters:

parameters

Raises:

ReceptionDecodeError – If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned.

Returns:

send(data: bytearray)

Send raw data.

Raises:

SendError – Sending failed for some reason.

send_data(data: bytearray)
async send_data_async(data)
set_dle_settings(dle_queue_len: int, dle_max_frame: int, dle_timeout: float)
set_fixed_frame_settings(serial_frame_size: int)
async start_dle_polling()
class tmtccmd.com.qemu.QmpConnection(addr='/tmp/qemu')

Bases: object

A connection to a QEMU_SERIAL machine via QMP

close()

Close this connection

async cont()

Continue machine execution if it has been paused

async open()

Open this connection. Connect to the machine ensure that the connection is ready to use after this call.

async quit()

Quit the emulation. This causes the emulator to (non-gracefully) shut down and close.

async stop()

Stop/pause machine execution

exception tmtccmd.com.qemu.QmpException(ret, *args, **kwargs)

Bases: Exception

An exception caused by the QML/QEMU_SERIAL as response to a failed command

class tmtccmd.com.qemu.QmpProtocol(conn)

Bases: Protocol

The QMP transport protocoll implementation

connection_lost(exc)

Called when the connection is lost or closed.

The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).

connection_made(transport)

Called when a connection is made.

The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.

data_received(data)

Called when some data is received.

The argument is a bytes object.

class tmtccmd.com.qemu.Usart(addr)

Bases: object

close()

Close this connection

async static create_async(addr)
flush()
get_data_in_waiting() int
inject_frame_error()

Inject a frame error (set CSR_FRAME)

inject_overrun_error()

Inject an overrun error (set CSR_OVRE)

inject_parity_error()

Inject a parity error (set CSR_PARE)

inject_timeout_error()

Inject a timeout (set CSR_TIMEOUT)

new_data_available() bool
async open()

Open this connection

read(n)

Wait for ‘n’ bytes to be received from the USART timeout in seconds

async read_async(n, timeout=None)

Wait for ‘n’ bytes to be received from the USART.

This function will return early if the specified timeout (in seconds) is exceeded. In this case, only the data received up to that point will be returned. If timeout is None, no timeout will be set.

async read_until_async(expected, size=None, timeout=None)

Read data until either the expected byte sequence has been found, the specified number of bytes has been received, or the timeout has occured.

This function will return whatever data has been received up until the first termination condition has been met. In case size is None, there will be no size limit. In case timeout is None, ther will be no timeout.

async write(data)

Write data (bytes) to the USART device

class tmtccmd.com.qemu.UsartProtocol(conn)

Bases: Protocol

The USART transport protocoll implementation

connection_lost(exc)

Called when the connection is lost or closed.

The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).

connection_made(transport)

Called when a connection is made.

The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.

data_received(data)

Called when some data is received.

The argument is a bytes object.

exception tmtccmd.com.qemu.UsartStatusException(errn, *args, **kwargs)

Bases: Exception

An exception returned by the USART send command

tmtccmd.com.qemu.parse_dataframes(buf)

Parse a variable number of DataFrames from the given byte buffer

tmtccmd.com.qemu.start_background_loop(loop: AbstractEventLoop) None