API Reference

This documents all public API members available directly from the pubsub package.

Classes

class pubsub.Channel(topic: str)[source]

Bases: object

Represents a pubsub channel using shared memory filesystem and FIFO queues.

Creates a directory in the pubsub base directory with format: {topic}_{random_12_chars}_{process_id} Contains a non-blocking FIFO named ‘queue’ for message passing.

Topic format supports wildcards: - ‘=’ for single word wildcard - ‘+’ for multiple words wildcard

__init__(topic: str)[source]

Initialize a new channel.

Parameters:

topic – The topic string (dot-separated terms with optional wildcards)

Raises:

ValueError – If topic contains invalid characters

property is_open: bool

Check if the channel’s FIFO is currently open for reading.

open() None[source]

Open the FIFO queue for reading in non-blocking mode.

Raises:

OSError – If unable to open the FIFO for reading

close() None[source]

Close and clean up the channel by removing unconsumed messages, the FIFO, and directory.

This ensures that any undelivered messages are properly deleted when the channel is disposed, preventing resource leaks.

static cleanup_inactive() None[source]

Clean up all inactive channels in the pubsub base directory.

Inactive channels are those where the associated process is no longer running. This method removes their directories and any unconsumed messages to free up resources.

static active_paths() list[Path][source]

List all active channel paths in the pubsub base directory.

Only includes channels where the associated process is still running.

Returns:

List of active channel paths

static inactive_paths() list[Path][source]

List all inactive channel paths in the pubsub base directory.

Includes channels where the associated process is no longer running.

Returns:

List of inactive channel paths

static matching_active_paths(topic: str) list[Path][source]

Find all channel directories in the pubsub base directory that match the given topic using regex.

Converts wildcards to regex patterns: - ‘=’ becomes ‘[a-zA-Z0-9-]’ (single word wildcard) - ‘+’ becomes ‘[a-zA-Z0-9.-]*’ (multiple words wildcard)

Parameters:

topic – The topic to match against

Returns:

List of Path objects for matching channel directories

class pubsub.Message(topic: str, content: bytes, headers: dict[str, str | int | float | bool | None] | None = None)[source]

Bases: object

Represents a message in the pubsub system.

__init__(topic: str, content: bytes, headers: dict[str, str | int | float | bool | None] | None = None)[source]

Initialize a new message.

Parameters:
  • topic – The topic this message belongs to

  • content – The message payload as bytes

  • headers – Optional dictionary of string key-value pairs for metadata

write(stream: BinaryIO) None[source]

Write the message to a binary stream.

Binary format: - 4 bytes: magic number (0x504D5347 - “PMSG”) - 1 byte: format version (uint8) - 8 bytes: id (uint64) - 8 bytes: timestamp (uint64, microseconds since epoch) - 4 bytes: topic length (uint32) - N bytes: topic (UTF-8 encoded) - 4 bytes: headers JSON length (uint32) - N bytes: headers as JSON string (UTF-8 encoded) - 4 bytes: content length (uint32) - N bytes: content

Parameters:

stream – Binary stream to write to

classmethod read(stream: BinaryIO) Message[source]

Read and deserialize a message from a binary stream.

Parameters:

stream – Binary stream to read from

Returns:

A new Message instance

to_bytes() bytes[source]

Convenience method to serialize message to bytes.

Returns:

The serialized message as bytes

classmethod from_bytes(data: bytes) Message[source]

Convenience method to deserialize message from bytes.

Parameters:

data – The serialized message bytes

Returns:

A new Message instance

Functions

pubsub.publish(topic: str, data: bytes, headers: dict[str, str | int | float | bool | None] | None = None) int[source]

Publish a message to a topic.

Parameters:
  • topic – The topic to publish to (only alphanumeric, dots, and hyphens allowed)

  • data – The message payload as bytes

  • headers – Optional dictionary with string keys and scalar values (str, int, float, bool, None) for metadata

Returns:

The number of times the messages was published in a channel

Raises:
  • ValueError – If topic contains invalid characters (must be [a-zA-Z0-9.-])

  • RuntimeError – If unable to publish to any matching channels

pubsub.subscribe(channel: Channel, callback: Callable[[Message], None], timeout_seconds: float = 0) int[source]

Subscribe to a channel and call a function for each received message.

Listens for messages for the specified duration and calls the callback function for each message received.

Signal Handling:

This function handles SIGTERM and SIGINT signals gracefully. When either signal is received: - The subscription loop exits cleanly - Any message being processed completes normally - The function returns the count of processed messages - The original signal handler (if any) is called before returning

This allows subscribers services to be terminated gracefully.

Parameters:
  • channel – The channel to subscribe to

  • callback – Function to call with each received message

  • timeout_seconds – How long to listen for messages (0 = listen indefinitely)

Returns:

  • Number of messages processed. Returned after normal timeout expiration.

  • Returns -1 if exited due to a termination signal (SIGTERM/SIGINT).

Raises:
pubsub.fetch(channel: Channel) Message | None[source]

Fetch a single message from a channel (non-blocking).

Reads an 8-byte message ID from the FIFO queue, then loads the actual message from the corresponding file.

Parameters:

channel – The channel to fetch from

Returns:

Message if one is available, None otherwise

Raises: