Quick Start Guide

Installation

Install formix-pubsub using pip:

pip install formix-pubsub

Requires Python 3.11 or later.

Basic Usage

Publish & Subscribe Across Processes

The core use case is communication between separate processes. Create two files:

subscriber.py

from pubsub import Channel, subscribe

channel = Channel(topic="greetings")

with channel:
    def on_message(msg):
        print(f"Received: {msg.content.decode()}")

    # Blocks and listens until terminated with Ctrl+C or SIGTERM
    subscribe(channel, on_message)

publisher.py

from pubsub import publish

count = publish("greetings", b"Hello from another process!")
print(f"Published to {count} subscriber(s)")

Run the subscriber first in one terminal, then the publisher in a second:

# Terminal 1
python subscriber.py

# Terminal 2
python publisher.py

The subscriber prints Received: Hello from another process! and keeps listening. Press Ctrl+C to stop it gracefully.

Non-Blocking Fetch

Use fetch() to poll for messages without blocking. It returns None immediately when the queue is empty.

from pubsub import Channel, publish, fetch

channel = Channel(topic="tasks")

with channel:
    # Publish a few messages
    publish("tasks", b"task-1")
    publish("tasks", b"task-2")
    publish("tasks", b"task-3")

    # Poll for messages
    msg = fetch(channel)
    while msg is not None:
        print(f"Processing: {msg.content.decode()}")
        msg = fetch(channel)

    print("Queue empty, moving on.")

Wildcard Topics

Use wildcards to subscribe to multiple topics:

  • = matches a single word

  • + matches one or more words

from pubsub import Channel, publish, subscribe

# Channels can use wildcards to subscribe to multiple topics
channel = Channel(topic="news.=")  # Matches: news.sports, news.tech, news.world

with channel:
    def handle_message(msg):
        print(f"[{msg.topic}] {msg.content.decode()}")

    # Listens indefinitely; terminate with SIGTERM/SIGINT
    subscribe(channel, handle_message)

# Publish to concrete topics
publish("news.sports", b"Game results")
publish("news.tech", b"New release")

Multiple Subscribers

Multiple subscribers can listen independently to the same topic:

from pubsub import Channel, publish, subscribe
import threading

topic = "broadcast"

# Create two independent channels for the same topic
channel1 = Channel(topic=topic)
channel2 = Channel(topic=topic)

def subscriber1(channel):
    with channel:
        def handle(msg):
            print(f"Subscriber 1: {msg.content.decode()}")
        # Use timeout for demo; in production, terminate via signal
        subscribe(channel, handle, timeout_seconds=5.0)

def subscriber2(channel):
    with channel:
        def handle(msg):
            print(f"Subscriber 2: {msg.content.decode()}")
        # Use timeout for demo; in production, terminate via signal
        subscribe(channel, handle, timeout_seconds=5.0)

# Start subscribers in separate threads (use processes for true parallelism)
t1 = threading.Thread(target=subscriber1, args=(channel1,))
t2 = threading.Thread(target=subscriber2, args=(channel2,))

t1.start()
t2.start()

# Publish a message - both subscribers receive it
publish(topic, b"Hello everyone!")

t1.join()
t2.join()

Context Managers

Always use context managers to ensure proper cleanup:

from pubsub import Channel

# Good - automatic cleanup
with Channel(topic="example") as channel:
    # Use the channel
    pass

# Alternative - manual cleanup
channel = Channel(topic="example")
try:
    # Use the channel
    pass
finally:
    channel.close()

Best Practices

  1. Use separate processes for true parallelism - The GIL won’t limit your concurrency

  2. Always use context managers - Ensures proper resource cleanup

  3. Publish to concrete topics - Wildcards are only for subscribing

  4. Use meaningful topic hierarchies - e.g., app.service.event

  5. Terminate subscribers gracefully - Use SIGTERM/SIGINT for clean shutdown; timeouts are optional for demos or time-bound operations