Quick Start Guide
Installation
Install formix-pubsub using pip:
pip install formix-pubsub
Requires Python 3.11 or later.
Basic Usage
Publish & Subscribe Across Processes
The core use case is communication between separate processes. Create two files:
subscriber.py
from pubsub import Channel, subscribe
channel = Channel(topic="greetings")
with channel:
def on_message(msg):
print(f"Received: {msg.content.decode()}")
# Blocks and listens until terminated with Ctrl+C or SIGTERM
subscribe(channel, on_message)
publisher.py
from pubsub import publish
count = publish("greetings", b"Hello from another process!")
print(f"Published to {count} subscriber(s)")
Run the subscriber first in one terminal, then the publisher in a second:
# Terminal 1
python subscriber.py
# Terminal 2
python publisher.py
The subscriber prints Received: Hello from another process! and keeps listening.
Press Ctrl+C to stop it gracefully.
Non-Blocking Fetch
Use fetch() to poll for messages without blocking. It returns None immediately
when the queue is empty.
from pubsub import Channel, publish, fetch
channel = Channel(topic="tasks")
with channel:
# Publish a few messages
publish("tasks", b"task-1")
publish("tasks", b"task-2")
publish("tasks", b"task-3")
# Poll for messages
msg = fetch(channel)
while msg is not None:
print(f"Processing: {msg.content.decode()}")
msg = fetch(channel)
print("Queue empty, moving on.")
Wildcard Topics
Use wildcards to subscribe to multiple topics:
=matches a single word+matches one or more words
from pubsub import Channel, publish, subscribe
# Channels can use wildcards to subscribe to multiple topics
channel = Channel(topic="news.=") # Matches: news.sports, news.tech, news.world
with channel:
def handle_message(msg):
print(f"[{msg.topic}] {msg.content.decode()}")
# Listens indefinitely; terminate with SIGTERM/SIGINT
subscribe(channel, handle_message)
# Publish to concrete topics
publish("news.sports", b"Game results")
publish("news.tech", b"New release")
Multiple Subscribers
Multiple subscribers can listen independently to the same topic:
from pubsub import Channel, publish, subscribe
import threading
topic = "broadcast"
# Create two independent channels for the same topic
channel1 = Channel(topic=topic)
channel2 = Channel(topic=topic)
def subscriber1(channel):
with channel:
def handle(msg):
print(f"Subscriber 1: {msg.content.decode()}")
# Use timeout for demo; in production, terminate via signal
subscribe(channel, handle, timeout_seconds=5.0)
def subscriber2(channel):
with channel:
def handle(msg):
print(f"Subscriber 2: {msg.content.decode()}")
# Use timeout for demo; in production, terminate via signal
subscribe(channel, handle, timeout_seconds=5.0)
# Start subscribers in separate threads (use processes for true parallelism)
t1 = threading.Thread(target=subscriber1, args=(channel1,))
t2 = threading.Thread(target=subscriber2, args=(channel2,))
t1.start()
t2.start()
# Publish a message - both subscribers receive it
publish(topic, b"Hello everyone!")
t1.join()
t2.join()
Context Managers
Always use context managers to ensure proper cleanup:
from pubsub import Channel
# Good - automatic cleanup
with Channel(topic="example") as channel:
# Use the channel
pass
# Alternative - manual cleanup
channel = Channel(topic="example")
try:
# Use the channel
pass
finally:
channel.close()
Best Practices
Use separate processes for true parallelism - The GIL won’t limit your concurrency
Always use context managers - Ensures proper resource cleanup
Publish to concrete topics - Wildcards are only for subscribing
Use meaningful topic hierarchies - e.g.,
app.service.eventTerminate subscribers gracefully - Use SIGTERM/SIGINT for clean shutdown; timeouts are optional for demos or time-bound operations