API Reference
This documents all public API members available directly from the pubsub package.
Classes
- class pubsub.Channel(topic: str)[source]
Bases:
objectRepresents a pubsub channel using shared memory filesystem and FIFO queues.
Creates a directory in the pubsub base directory with format: {topic}_{random_12_chars}_{process_id} Contains a non-blocking FIFO named ‘queue’ for message passing.
Topic format supports wildcards: - ‘=’ for single word wildcard - ‘+’ for multiple words wildcard
- __init__(topic: str)[source]
Initialize a new channel.
- Parameters:
topic – The topic string (dot-separated terms with optional wildcards)
- Raises:
ValueError – If topic contains invalid characters
- open() None[source]
Open the FIFO queue for reading in non-blocking mode.
- Raises:
OSError – If unable to open the FIFO for reading
- close() None[source]
Close and clean up the channel by removing unconsumed messages, the FIFO, and directory.
This ensures that any undelivered messages are properly deleted when the channel is disposed, preventing resource leaks.
- static cleanup_inactive() None[source]
Clean up all inactive channels in the pubsub base directory.
Inactive channels are those where the associated process is no longer running. This method removes their directories and any unconsumed messages to free up resources.
- static active_paths() list[Path][source]
List all active channel paths in the pubsub base directory.
Only includes channels where the associated process is still running.
- Returns:
List of active channel paths
- static inactive_paths() list[Path][source]
List all inactive channel paths in the pubsub base directory.
Includes channels where the associated process is no longer running.
- Returns:
List of inactive channel paths
- static matching_active_paths(topic: str) list[Path][source]
Find all channel directories in the pubsub base directory that match the given topic using regex.
Converts wildcards to regex patterns: - ‘=’ becomes ‘[a-zA-Z0-9-]’ (single word wildcard) - ‘+’ becomes ‘[a-zA-Z0-9.-]*’ (multiple words wildcard)
- Parameters:
topic – The topic to match against
- Returns:
List of Path objects for matching channel directories
- class pubsub.Message(topic: str, content: bytes, headers: dict[str, str | int | float | bool | None] | None = None)[source]
Bases:
objectRepresents a message in the pubsub system.
- __init__(topic: str, content: bytes, headers: dict[str, str | int | float | bool | None] | None = None)[source]
Initialize a new message.
- Parameters:
topic – The topic this message belongs to
content – The message payload as bytes
headers – Optional dictionary of string key-value pairs for metadata
- write(stream: BinaryIO) None[source]
Write the message to a binary stream.
Binary format: - 4 bytes: magic number (0x504D5347 - “PMSG”) - 1 byte: format version (uint8) - 8 bytes: id (uint64) - 8 bytes: timestamp (uint64, microseconds since epoch) - 4 bytes: topic length (uint32) - N bytes: topic (UTF-8 encoded) - 4 bytes: headers JSON length (uint32) - N bytes: headers as JSON string (UTF-8 encoded) - 4 bytes: content length (uint32) - N bytes: content
- Parameters:
stream – Binary stream to write to
- classmethod read(stream: BinaryIO) Message[source]
Read and deserialize a message from a binary stream.
- Parameters:
stream – Binary stream to read from
- Returns:
A new Message instance
Functions
- pubsub.publish(topic: str, data: bytes, headers: dict[str, str | int | float | bool | None] | None = None) int[source]
Publish a message to a topic.
- Parameters:
topic – The topic to publish to (only alphanumeric, dots, and hyphens allowed)
data – The message payload as bytes
headers – Optional dictionary with string keys and scalar values (str, int, float, bool, None) for metadata
- Returns:
The number of times the messages was published in a channel
- Raises:
ValueError – If topic contains invalid characters (must be [a-zA-Z0-9.-])
RuntimeError – If unable to publish to any matching channels
- pubsub.subscribe(channel: Channel, callback: Callable[[Message], None], timeout_seconds: float = 0) int[source]
Subscribe to a channel and call a function for each received message.
Listens for messages for the specified duration and calls the callback function for each message received.
- Signal Handling:
This function handles SIGTERM and SIGINT signals gracefully. When either signal is received: - The subscription loop exits cleanly - Any message being processed completes normally - The function returns the count of processed messages - The original signal handler (if any) is called before returning
This allows subscribers services to be terminated gracefully.
- Parameters:
channel – The channel to subscribe to
callback – Function to call with each received message
timeout_seconds – How long to listen for messages (0 = listen indefinitely)
- Returns:
Number of messages processed. Returned after normal timeout expiration.
Returns -1 if exited due to a termination signal (SIGTERM/SIGINT).
- Raises:
RuntimeError – If channel is not open for reading
ValueError – If timeout_seconds is negative
- pubsub.fetch(channel: Channel) Message | None[source]
Fetch a single message from a channel (non-blocking).
Reads an 8-byte message ID from the FIFO queue, then loads the actual message from the corresponding file.
- Parameters:
channel – The channel to fetch from
- Returns:
Message if one is available, None otherwise
- Raises:
ValueError – If message format is invalid
RuntimeError – If channel is not open for reading