Examples
This page contains practical examples of using formix-pubsub in real-world scenarios.
Event Broadcasting System
Create an event broadcasting system where multiple services listen for events:
# services/email_service.py
from pubsub import Channel, subscribe
import multiprocessing
def email_service():
channel = Channel(topic="events.user.+")
with channel:
def handle_user_event(msg):
event_type = msg.topic.split('.')[-1]
if event_type == "registered":
print(f"Sending welcome email for: {msg.content.decode()}")
elif event_type == "deleted":
print(f"Sending goodbye email for: {msg.content.decode()}")
print("Email service started")
# Listens indefinitely; terminate process with SIGTERM/SIGINT for graceful shutdown
subscribe(channel, handle_user_event)
if __name__ == "__main__":
email_service()
# services/analytics_service.py
from pubsub import Channel, subscribe
def analytics_service():
channel = Channel(topic="events.user.+")
with channel:
def handle_user_event(msg):
print(f"Logging analytics: {msg.topic} - {msg.content.decode()}")
print("Analytics service started")
# Listens indefinitely; terminate process with SIGTERM/SIGINT for graceful shutdown
subscribe(channel, handle_user_event)
if __name__ == "__main__":
analytics_service()
# main.py
from pubsub import publish
import multiprocessing
from services.email_service import email_service
from services.analytics_service import analytics_service
# Start services in separate processes
email_proc = multiprocessing.Process(target=email_service)
analytics_proc = multiprocessing.Process(target=analytics_service)
email_proc.start()
analytics_proc.start()
# Wait a moment for services to start
import time
time.sleep(0.5)
# Publish events
publish("events.user.registered", b"user123")
publish("events.user.deleted", b"user456")
# Wait for services to process
time.sleep(2)
# Gracefully terminate services (sends SIGTERM/SIGINT)
email_proc.terminate()
analytics_proc.terminate()
# Wait for processes to finish cleaning up
email_proc.join(timeout=5)
analytics_proc.join(timeout=5)
Logging Aggregator
Collect logs from multiple services:
# logger.py
from pubsub import Channel, subscribe
import datetime
def log_aggregator():
channel = Channel(topic="logs.+")
with open("application.log", "a") as log_file:
with channel:
def handle_log(msg):
timestamp = datetime.datetime.now().isoformat()
level = msg.topic.split('.')[-1]
log_line = f"[{timestamp}] [{level.upper()}] {msg.content.decode()}\n"
log_file.write(log_line)
log_file.flush()
print(log_line.strip())
print("Log aggregator started")
# Listens indefinitely; terminate process with SIGTERM/SIGINT for graceful shutdown
subscribe(channel, handle_log)
if __name__ == "__main__":
log_aggregator()
# In your application
from pubsub import publish
# Services can publish logs
publish("logs.info", b"Application started")
publish("logs.warning", b"Cache miss, rebuilding")
publish("logs.error", b"Failed to connect to database")
Request-Response (RPC) Pattern
For request-response workflows, use message headers to route replies back to the caller. This example implements a multiply service with correlation IDs and timeout handling.
rpc_server.py
# rpc_server.py
from pubsub import Channel, subscribe, publish
channel = Channel(topic="rpc.math.multiply")
with channel:
def handle_request(request):
response_topic = request.headers.get("response-topic")
correlation_id = request.headers.get("correlation-id")
if not response_topic or not correlation_id:
print("Malformed request (missing headers), skipping.")
return
# Parse operands from the payload
a, b = request.content.decode().split(",")
result = float(a) * float(b)
# Publish the response back to the caller's private channel
publish(
response_topic,
str(result).encode(),
headers={"correlation-id": correlation_id},
)
print(f"[{correlation_id}] {a} * {b} = {result}")
print("RPC server listening on 'rpc.math.multiply'...")
subscribe(channel, handle_request)
rpc_client.py
# rpc_client.py
import os
import uuid
from pubsub import Channel, publish, subscribe
def rpc_multiply(a: float, b: float, timeout: float = 2.0) -> float:
"""Send a multiply request and wait for the response."""
correlation_id = str(uuid.uuid4())
result = {correlation_id: None}
# Create a private response channel unique to this process
response_topic = f"rpc.response.{os.getpid()}"
response_channel = Channel(topic=response_topic)
with response_channel:
# Send the request with routing headers
publish(
"rpc.math.multiply",
f"{a},{b}".encode(),
headers={
"response-topic": response_topic,
"correlation-id": correlation_id,
},
)
# Wait for the response using subscribe with a timeout
def on_response(msg):
if msg.headers.get("correlation-id") == correlation_id:
result[correlation_id] = float(msg.content.decode())
subscribe(response_channel, on_response, timeout_seconds=timeout)
if result[correlation_id] is None:
raise TimeoutError(
f"No response received for correlation-id {correlation_id} "
f"within {timeout}s"
)
return result[correlation_id]
if __name__ == "__main__":
result = rpc_multiply(6, 7)
print(f"6 * 7 = {result}")
Run the server first, then the client:
# Terminal 1
python rpc_server.py
# Terminal 2
python rpc_client.py
# Output:
# 6 * 7 = 42.0
How it works:
The client generates a unique
correlation-idand creates a private response channel using its PID.It publishes a request to
rpc.math.multiplywithresponse-topicandcorrelation-idheaders.The server processes the request, computes the result, and publishes it back to the client’s
response-topic.The client calls
subscribe()with a timeout. The callback captures the response when thecorrelation-idmatches.If no response arrives before the timeout, a
TimeoutErroris raised.