Communication Package
Communication module. Provides generic abstraction for communication and commonly used concrete implementations.
- class tmtccmd.com.ComInterface
Bases:
ABCGeneric form of a communication interface to separate communication logic from the underlying interface.
- abstract close(args: Any = 0)
Closes the ComIF and releases any held resources (for example a Communication Port).
- Returns:
- abstract data_available(timeout: float, parameters: Any = 0) int
Check whether TM packets are available.
- Parameters:
timeout – Can be used to block on available data if supported by the specific communication interface.
parameters – Can be an arbitrary parameter.
- Raises:
ReceptionDecodeError – If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors.
- Returns:
0 if no data is available, number of packets otherwise.
- abstract initialize(args: Any = 0) Any
Perform initializations step which can not be done in constructor or which require returnvalues.
- abstract is_open() bool
Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking
- abstract receive(parameters: Any = 0) list[bytes]
Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here.
- Parameters:
parameters
- Raises:
ReceptionDecodeError – If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned.
- Returns:
- exception tmtccmd.com.ReceptionDecodeError(msg: str, custom_exception: Exception | None)
Bases:
ExceptionGeneric decode error which can also wrap the exception thrown by other libraries.
- exception tmtccmd.com.SendError(msg: str, custom_exception: Exception | None)
Bases:
ExceptionGeneric send error which can also wrap the exception thrown by other libraries.
TCP Client Module
TCP communication interface
- class tmtccmd.com.tcp.TcpCommunicationType(value)
Bases:
EnumParse for space packets in the TCP stream, using the space packet header.
- SPACE_PACKETS = 0
- class tmtccmd.com.tcp.TcpSpacepacketsClient(com_if_id: str, space_packet_ids: Sequence[PacketId], inner_thread_delay: float, target_address: EthAddr, max_packets_stored: int | None = None)
Bases:
ComInterfaceCommunication interface for TCP communication. This particular interface expects raw space packets to be sent via TCP and uses a list of passed packet IDs to parse for them.
Initialize a communication interface to send and receive TMTC via TCP.
- Parameters:
com_if_id
space_packet_ids – Valid packet IDs for CCSDS space packets. Those will be used to parse for space packets inside the TCP stream.
inner_thread_delay – Polling frequency of TCP thread in seconds.
- close(args: Any | None = None) None
Closes the ComIF and releases any held resources (for example a Communication Port).
- Returns:
- data_available(timeout: float = 0, parameters: Any = 0) int
Check whether TM packets are available.
- Parameters:
timeout – Can be used to block on available data if supported by the specific communication interface.
parameters – Can be an arbitrary parameter.
- Raises:
ReceptionDecodeError – If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors.
- Returns:
0 if no data is available, number of packets otherwise.
- initialize(args: Any | None = None)
Perform initializations step which can not be done in constructor or which require returnvalues.
- is_open() bool
Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking
- receive(parameters: float = 0) list[bytes]
Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here.
- Parameters:
parameters
- Raises:
ReceptionDecodeError – If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned.
- Returns:
- class tmtccmd.com.tcpip_utils.TcpIpConfigIds(value)
Bases:
EnumAn enumeration.
- RECV_ADDRESS = 2
- RECV_MAX_SIZE = 3
- SEND_ADDRESS = 1
- SPACE_PACKET_ID = 4
- class tmtccmd.com.tcpip_utils.TcpIpType(value)
Bases:
EnumAn enumeration.
- TCP = 1
- UDP = 2
- UDP_RECV = 3
UDP Client Module
UDP Communication Interface
- class tmtccmd.com.udp.UdpClient(com_if_id: str, send_address: EthAddr, recv_addr: EthAddr | None = None)
Bases:
ComInterfaceCommunication interface for UDP communication
Initialize a communication interface to send and receive UDP datagrams.
- Parameters:
send_address
recv_addr
- close(args: any | None = None) None
Closes the ComIF and releases any held resources (for example a Communication Port).
- Returns:
- data_available(timeout: float = 0, parameters: any = 0) bool
Check whether TM packets are available.
- Parameters:
timeout – Can be used to block on available data if supported by the specific communication interface.
parameters – Can be an arbitrary parameter.
- Raises:
ReceptionDecodeError – If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors.
- Returns:
0 if no data is available, number of packets otherwise.
- initialize(args: any | None = None) any
Perform initializations step which can not be done in constructor or which require returnvalues.
- is_open() bool
Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking
- receive(poll_timeout: float = 0) List[bytes]
Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here.
- Parameters:
parameters
- Raises:
ReceptionDecodeError – If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned.
- Returns:
Serial Communication with COBS Module
- class tmtccmd.com.serial_cobs.SerialCobsComIF(ser_cfg: SerialCfg)
Bases:
SerialComBase,ComInterfaceSerial communication interface which uses the COBS protocol to encode and decode packets.
This class will spin up a receiver thread on the
open()call to poll for COBS encoded packets. This means that theclose()call might block until the receiver thread has shut down.- close(args: any | None = None) None
Closes the ComIF and releases any held resources (for example a Communication Port).
- Returns:
- data_available(timeout: float, parameters: any = 0) int
Check whether TM packets are available.
- Parameters:
timeout – Can be used to block on available data if supported by the specific communication interface.
parameters – Can be an arbitrary parameter.
- Raises:
ReceptionDecodeError – If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors.
- Returns:
0 if no data is available, number of packets otherwise.
- initialize(args: any | None = None) any
Perform initializations step which can not be done in constructor or which require returnvalues.
- is_open() bool
Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking
- open(args: any | None = None) None
Spins up a receiver thread to permanently check for new COBS encoded packets.
- receive(parameters: any = 0) List[bytes]
Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here.
- Parameters:
parameters
- Raises:
ReceptionDecodeError – If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned.
- Returns:
Serial Communication with DLE Module
- class tmtccmd.com.serial_dle.DleCfg(dle_queue_len: 'Optional[int]' = None, dle_max_frame: 'Optional[int]' = None, encode_cr: 'bool' = True)
Bases:
object
- class tmtccmd.com.serial_dle.SerialDleComIF(ser_cfg: SerialCfg, dle_cfg: DleCfg | None)
Bases:
SerialComBase,ComInterfaceSerial communication interface which uses the DLE protocol to encode and decode packets.
This class will spin up a receiver thread on the
open()call to poll for DLE encoded packets. This means that theclose()call might block until the receiver thread has shut down.- close(args: any | None = None) None
Closes the ComIF and releases any held resources (for example a Communication Port).
- Returns:
- data_available(timeout: float, parameters: any = 0) int
Check whether TM packets are available.
- Parameters:
timeout – Can be used to block on available data if supported by the specific communication interface.
parameters – Can be an arbitrary parameter.
- Raises:
ReceptionDecodeError – If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors.
- Returns:
0 if no data is available, number of packets otherwise.
- initialize(args: any | None = None) any
Perform initializations step which can not be done in constructor or which require returnvalues.
- is_open() bool
Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking
- open(args: any | None = None) None
Spins up a receiver thread to permanently check for new DLE encoded packets.
- receive(parameters: any = 0) List[bytes]
Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here.
- Parameters:
parameters
- Raises:
ReceptionDecodeError – If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned.
- Returns:
Generic Serial Modules
- class tmtccmd.com.serial_base.SerialCfg(com_if_id: str, serial_port: str, baud_rate: int, serial_timeout: float)
Bases:
object
- class tmtccmd.com.serial_base.SerialComBase(logger: Logger, ser_cfg: SerialCfg, ser_com_type: SerialCommunicationType)
Bases:
object- close_port()
- open_port()
- class tmtccmd.com.serial_base.SerialCommunicationType(value)
Bases:
EnumRight now, two serial communication methods are supported. One uses frames with a fixed size containing PUS packets and the other uses a simple ASCII based transport layer called DLE. If DLE is used, it is expected that the sender side encoded the packets with the DLE protocol. Any packets sent will also be encoded.
- COBS = 0
- DLE_ENCODING = 2
- class tmtccmd.com.serial_base.SerialConfigIds(value)
Bases:
EnumAn enumeration.
- SERIAL_BAUD_RATE = 2
- SERIAL_COMM_TYPE = 4
- SERIAL_DLE_MAX_FRAME_SIZE = 7
- SERIAL_DLE_QUEUE_LEN = 6
- SERIAL_FRAME_SIZE = 5
- SERIAL_PORT = 1
- SERIAL_TIMEOUT = 3
- tmtccmd.com.ser_utils.determine_baud_rate(json_cfg_path: str) int
Determine baud rate. Tries to read from JSON first. If the baud rate is not contained in the config JSON, prompt it from user instead with the option to store value in JSON file.
- Returns:
Determined baud rate
- tmtccmd.com.ser_utils.determine_com_port(json_cfg_path: str) str
Determine serial port. Tries to read from JSON first. If the serial port is not contained in the config JSON, prompt it from user instead with the option to store value in JSON file.
- Returns:
Determined serial port
Dummy Module
Dummy Virtual Communication Interface. Currently serves to use the TMTC program without needing external hardware or an extra socket
- class tmtccmd.com.dummy.DummyComIF
Bases:
ComInterface- close(args: any | None = None) None
Closes the ComIF and releases any held resources (for example a Communication Port).
- Returns:
- data_available(timeout: float = 0, parameters: any = 0)
Check whether TM packets are available.
- Parameters:
timeout – Can be used to block on available data if supported by the specific communication interface.
parameters – Can be an arbitrary parameter.
- Raises:
ReceptionDecodeError – If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors.
- Returns:
0 if no data is available, number of packets otherwise.
- initialize(args: any | None = None) any
Perform initializations step which can not be done in constructor or which require returnvalues.
- is_open() bool
Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking
- open(args: any | None = None) None
Opens the communication interface to allow communication.
- Returns:
- receive(parameters: any = 0) list[bytes]
Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here.
- Parameters:
parameters
- Raises:
ReceptionDecodeError – If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned.
- Returns:
Utilities Module
QEMU Module
QEMU_SERIAL Communication Interface to communicate with emulated QEMU_SERIAL hardware via the UART interface.
It utilizes the the asyncio library.
- Requirements:
Python >= 3.7 (asyncio support)
- Instructions:
Run QEMU_SERIAL (modified for OBSW) via
qemu-system-arm -M isis-obc -monitor stdio -bios path/to/sourceobsw-at91sam9g20_ek-sdram.bin -qmp unix:/tmp/qemu,server -S
Then run the telecommand script with -c 2
- class tmtccmd.com.qemu.DataFrame(seq, cat, frame_id, data=None)
Bases:
objectBasic protocol unit for communication via the IOX API introduced for external device emulation
- bytes()
Convert this protocol unit to raw bytes
- class tmtccmd.com.qemu.QEMUComIF(serial_cfg: SerialCfg, ser_com_type: SerialCommunicationType = SerialCommunicationType.DLE_ENCODING)
Bases:
ComInterfaceSpecific Communication Interface implementation of the QEMU_SERIAL USART protocol for the TMTC software
- close(_=None) None
Closes the ComIF and releases any held resources (for example a Communication Port).
- Returns:
- data_available(timeout: float = 0, _=0) int
Check whether TM packets are available.
- Parameters:
timeout – Can be used to block on available data if supported by the specific communication interface.
parameters – Can be an arbitrary parameter.
- Raises:
ReceptionDecodeError – If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors.
- Returns:
0 if no data is available, number of packets otherwise.
- initialize(_=None)
Needs to be called by application code once for DLE mode!
- is_open() bool
Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking
- async poll_dle_packets()
- receive(_) List[bytes]
Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here.
- Parameters:
parameters
- Raises:
ReceptionDecodeError – If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned.
- Returns:
- async send_data_async(data)
- async start_dle_polling()
- class tmtccmd.com.qemu.QmpConnection(addr='/tmp/qemu')
Bases:
objectA connection to a QEMU_SERIAL machine via QMP
- close()
Close this connection
- async cont()
Continue machine execution if it has been paused
- async open()
Open this connection. Connect to the machine ensure that the connection is ready to use after this call.
- async quit()
Quit the emulation. This causes the emulator to (non-gracefully) shut down and close.
- async stop()
Stop/pause machine execution
- exception tmtccmd.com.qemu.QmpException(ret, *args, **kwargs)
Bases:
ExceptionAn exception caused by the QML/QEMU_SERIAL as response to a failed command
- class tmtccmd.com.qemu.QmpProtocol(conn)
Bases:
ProtocolThe QMP transport protocoll implementation
- connection_lost(exc)
Called when the connection is lost or closed.
The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).
- connection_made(transport)
Called when a connection is made.
The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.
- data_received(data)
Called when some data is received.
The argument is a bytes object.
- class tmtccmd.com.qemu.Usart(addr)
Bases:
object- close()
Close this connection
- async static create_async(addr)
- flush()
- inject_frame_error()
Inject a frame error (set CSR_FRAME)
- inject_overrun_error()
Inject an overrun error (set CSR_OVRE)
- inject_parity_error()
Inject a parity error (set CSR_PARE)
- inject_timeout_error()
Inject a timeout (set CSR_TIMEOUT)
- async open()
Open this connection
- read(n)
Wait for ‘n’ bytes to be received from the USART timeout in seconds
- async read_async(n, timeout=None)
Wait for ‘n’ bytes to be received from the USART.
This function will return early if the specified timeout (in seconds) is exceeded. In this case, only the data received up to that point will be returned. If timeout is None, no timeout will be set.
- async read_until_async(expected, size=None, timeout=None)
Read data until either the expected byte sequence has been found, the specified number of bytes has been received, or the timeout has occured.
This function will return whatever data has been received up until the first termination condition has been met. In case size is None, there will be no size limit. In case timeout is None, ther will be no timeout.
- async write(data)
Write data (bytes) to the USART device
- class tmtccmd.com.qemu.UsartProtocol(conn)
Bases:
ProtocolThe USART transport protocoll implementation
- connection_lost(exc)
Called when the connection is lost or closed.
The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).
- connection_made(transport)
Called when a connection is made.
The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called.
- data_received(data)
Called when some data is received.
The argument is a bytes object.
- exception tmtccmd.com.qemu.UsartStatusException(errn, *args, **kwargs)
Bases:
ExceptionAn exception returned by the USART send command
- tmtccmd.com.qemu.parse_dataframes(buf)
Parse a variable number of DataFrames from the given byte buffer