Source code for pubsub.pubsub

"""PubSub module for publishing, fetching, and subscribing to messages."""

import logging
import os
import re
import signal
import struct
import time
from collections.abc import Callable

from .abstractions import get_base_dir
from .channel import Channel
from .message import Header, Message


[docs] def publish(topic: str, data: bytes, headers: Header | None = None) -> int: """ Publish a message to a topic. Args: topic: The topic to publish to (only alphanumeric, dots, and hyphens allowed) data: The message payload as bytes headers: Optional dictionary with string keys and scalar values (str, int, float, bool, None) for metadata Returns: The number of times the messages was published in a channel Raises: ValueError: If topic contains invalid characters (must be [a-zA-Z0-9.-]) RuntimeError: If unable to publish to any matching channels """ # Validate that topic contains only allowed characters (no wildcards) if not re.match(r"^[a-zA-Z0-9.-]+$", topic): raise ValueError( f"Topic '{topic}' can only contain alphanumeric characters, dots, and hyphens " "[a-zA-Z0-9.-] when publishing" ) message = Message(topic=topic, content=data, headers=headers) tmp_dir = get_base_dir() / "tmp" tmp_dir.mkdir(exist_ok=True) # Ensure temporary directory exists message_temp_file = tmp_dir / f"{message.id}" with open(message_temp_file, "wb") as msg_file: message.write(msg_file) publication_count = 0 matching_channels = Channel.matching_active_paths(topic) for channel_dir in matching_channels: queue_path = channel_dir / "queue" if not queue_path.exists(): logging.warning( f"Channel directory {channel_dir} does not contain a queue file. Skipping." ) continue try: message_file_path = channel_dir / str(message.id) os.link(str(message_temp_file), str(message_file_path)) with os.fdopen( os.open(str(queue_path), os.O_WRONLY | os.O_NONBLOCK), "wb" ) as queue_file: queue_file.write(struct.pack("!Q", message.id)) publication_count += 1 except (OSError, BrokenPipeError) as e: logging.warning(f"Failed to publish message {message.id} to {queue_path}: {e}") try: message_temp_file.unlink() except FileNotFoundError: logging.debug(f"Temporary message file {message_temp_file} already removed.") return publication_count
[docs] def fetch(channel: Channel) -> Message | None: """ Fetch a single message from a channel (non-blocking). Reads an 8-byte message ID from the FIFO queue, then loads the actual message from the corresponding file. Args: channel: The channel to fetch from Returns: Message if one is available, None otherwise Raises: ValueError: If message format is invalid RuntimeError: If channel is not open for reading """ if not channel.is_open: raise RuntimeError("Channel must be open to fetch messages") # Read message ID from queue (non-blocking) try: id_bytes = os.read(channel._fp, 8) except BlockingIOError: # No data available (non-blocking read) return None if not id_bytes or len(id_bytes) != 8: # No data available or incomplete ID return None id = struct.unpack("!Q", id_bytes)[0] message_file_path = channel.directory_path / str(id) if not message_file_path.exists(): return None with open(message_file_path, "rb") as msg_file: message = Message.read(msg_file) message_file_path.unlink() return message
[docs] def subscribe( channel: Channel, callback: Callable[[Message], None], timeout_seconds: float = 0 ) -> int: """ Subscribe to a channel and call a function for each received message. Listens for messages for the specified duration and calls the callback function for each message received. Signal Handling: This function handles SIGTERM and SIGINT signals gracefully. When either signal is received: - The subscription loop exits cleanly - Any message being processed completes normally - The function returns the count of processed messages - The original signal handler (if any) is called before returning This allows subscribers services to be terminated gracefully. Args: channel: The channel to subscribe to callback: Function to call with each received message timeout_seconds: How long to listen for messages (0 = listen indefinitely) Returns: - Number of messages processed. Returned after normal timeout expiration. - Returns -1 if exited due to a termination signal (SIGTERM/SIGINT). Raises: RuntimeError: If channel is not open for reading ValueError: If timeout_seconds is negative """ if timeout_seconds < 0: raise ValueError("timeout_seconds must be non-negative") if not channel.is_open: raise RuntimeError("Channel must be open to subscribe") # Set up signal handler for graceful shutdown shutdown_requested = False signal_args: tuple | None = None def signal_handler(signum, frame): nonlocal shutdown_requested, signal_args shutdown_requested = True signal_args = (signum, frame) logging.info("Received termination signal, exiting subscribe loop...") # Register SIGTERM and SIGINT handlers original_sigterm = signal.signal(signal.SIGTERM, signal_handler) original_sigint = signal.signal(signal.SIGINT, signal_handler) try: message_count = 0 start_time = time.time() listen_forever = timeout_seconds == 0 while not shutdown_requested and ( listen_forever or (time.time() - start_time < timeout_seconds) ): message = fetch(channel) if message: try: callback(message) except Exception as e: logging.warning(f"Error processing message {message}: {e}") message_count += 1 try: time.sleep(0.01) # Sleep briefly to avoid busy waiting except KeyboardInterrupt: # time.sleep() can raise KeyboardInterrupt when SIGINT is # delivered during the sleep syscall, before the custom signal # handler has a chance to suppress it. Treat it identically. shutdown_requested = True if shutdown_requested: logging.info("Exiting subscribe loop on '%s'. Handled %i messages.", channel.topic, message_count) return -1 # Indicate shutdown by signal return message_count finally: # Call original handler if a signal was received if signal_args is not None: if signal_args[0] == signal.SIGTERM and callable(original_sigterm): original_sigterm(*signal_args) elif signal_args[0] == signal.SIGINT and callable(original_sigint): original_sigint(*signal_args)