[PY] Event Broker
by Nate Maxwell
Python Event Broker
“Engineers know the benefits of everything and the tradeoffs of nothing.” - Rich Hickey
I love a good pubsub-like system. I strive for the most decoupling in my larger systems or between systems in my applications. Over the Christmas break I began working on a python code editor Solaire and once again I needed various widget components to react to updates in the state of others, like you do when building a gui application. Every time I use the PySide6/Qt Signals I regret it, trying to manage components passing references around to subscribe to signals. I find that when dealing with large amounts of state reaction that Qt Signals scale very, very poorly.
To get around this I typically set up something like baby’s first pub-sub system.
A table of namespace keys to a list of callbacks as the value, with a
register(namespace, callback) and emit() function. This works fairly well,
but I had grown tired of setting this up in every large application.
Additionally, I have been trying to convince others at work about the benefits of such a system over using native Qt signals but to no avail. In explaining the plan for the system the team asked “Why not just use PyPubSub?”
<soapbox>
As engineers, we often think of ourselves as creating code, but we don’t create code, we create features, and those features are only useful if they’re actually used. As such, we should eschew dependencies for things that can be easily made and maintained. Sometimes a third party dependency is correct, but it should be carefully measured and made sure to justify its weight. Otherwise, we could likely make a lighter weight first party version with features specifically tailored to the project.
</soapbox>
So, I set out to make my own event broker and pub-sub-like system.
The Shape of the Thing
The broker is a singleton with five moving parts:
- The broker itself — the central table of namespaces, subscribers, and transformers.
- Namespaces — dot-notation strings like
system.io.file_openedthat act as event channels. - Subscribers — callbacks registered against a namespace.
- Transformers — middleware that sits between an emission and its subscribers, modifying or rejecting the payload.
- Emitters — anything in your code that calls
emit()oremit_async().
That’s it. There is no class to instantiate, no broker object to thread through
your application, no get_broker() factory to call. You import broker and
everything is already wired up.
import broker
@broker.subscribe("file.saved")
def on_file_saved(filename: str, size: int) -> None:
print(f'Saved: {filename} ({size} bytes)')
broker.emit('file.saved', filename='document.txt', size=1024)
That’s the entire hello-world. The decorator registers the callback against
the file.saved namespace, and emit dispatches keyword arguments to anything
subscribed there. No Signal objects passed around, no connect() calls, no
parent widgets needing to know about child widgets needing to know about sibling
widgets.
Namespaces and Wildcards
Namespaces are just dot-separated strings. There is no enforced hierarchy — the
dots are a convention, not a tree structure — but the wildcard matcher treats
them as one. You can subscribe to a single concrete namespace, or you can use
* to fan out across a whole branch:
@broker.subscribe("file.*")
def on_any_file_event(**kwargs) -> None:
print(f'File event: {kwargs}')
broker.emit('file.saved', filename='data.json', size=2048)
broker.emit('file.deleted', filename='temp.txt', size=512)
Wildcard subscribers have to accept **kwargs because they’re catching events
with potentially different shapes. The signature validator (more on that in a
moment) is smart enough to know this.
Signature Validation
The broker enforces signature consistency per namespace. The first subscriber to register against a namespace sets the expected signature, and every subsequent subscriber and every emission gets checked against it.
@broker.subscribe('user.login')
def first_subscriber(username: str, user_id: int) -> None:
pass
@broker.subscribe('user.login')
def wrong_signature(username: str, email: str) -> None: # raises
pass
broker.emit('user.login', username='alice', email='[email protected]') # also raises
This is the kind of bug that, in a naive pub-sub, you find at 11pm on a Friday
when someone in another module quietly renamed user_id to uid and now a
subscriber three layers away is silently getting None. The broker refuses to
let you start down that path. The signature is the contract, and the contract
is checked at registration and emission time, not when the subscriber blows up.
Priorities
Subscribers can be ordered with a priority value. Higher fires first.
@broker.subscribe('system.alert', priority=10)
def critical_handler(message: str) -> None:
print('CRITICAL:', message)
@broker.subscribe('system.alert', priority=1)
def log_handler(message: str) -> None:
print('Logged:', message)
I went back and forth on whether to include this. Priority-ordered subscribers are a footgun — it’s the kind of feature that, if leaned on too hard, turns your loosely-coupled pub-sub into an implicitly ordered procedure call with extra steps. But there are real cases where it matters (logging that should happen after all the real handlers ran, critical UI updates that need to land before anything else paints) and the alternative — manually sequencing emissions — is worse.
One-Shot Subscribers
The once=True flag registers a subscriber that unregisters itself after firing
once:
@broker.subscribe('app.ready', once=True)
def on_first_ready(status: str) -> None:
print(f'App came up: {status}')
broker.emit('app.ready', status='ok') # fires
broker.emit('app.ready', status='ok') # silent, already gone
This pattern shows up everywhere in GUI code — “wait for the project to load,
then do one thing” — and the manual version is always a cleanup ritual of
storing the handle, defining the callback to call unregister_subscriber on
itself, and hoping you got the closure right.
Transformers
Transformers sit between emit() and the subscribers. When an event is
emitted, every registered transformer for that namespace gets a crack at the
payload before any subscriber sees it. A transformer receives the namespace
and the kwargs dict, and returns either a dict (to forward) or None (to
block delivery entirely).
def add_timestamp(namespace: str, kwargs: dict) -> dict:
kwargs['timestamp'] = datetime.datetime.now().time().isoformat()[:-4]
return kwargs
broker.register_transformer('system.*', add_timestamp, priority=10)
It’s tempting to call this middleware, but that framing is too narrow. Middleware in the Express.js sense is a pipeline of payload mutators. Broker transformers can do that, but they can also:
- Enrich payloads with metadata that subscribers may or may not consume — timestamps, request IDs, source tags, anything you want available but don’t want to require at the emit site.
- Block delivery entirely by returning
None, acting as a filter or validator at the door. - Observe without modifying, by returning the kwargs unchanged after doing
some side effect. A
*transformer that logs every event is a transformer, not a subscriber, precisely because you want it to run before dispatch and see things even when they’re about to be blocked.
The system.* example above is the enrichment case. Every emission under
system. picks up a timestamp field, and subscribers that declare it in
their signature receive it. Subscribers that don’t declare it simply don’t
see it — transformers do not alter the tracked signature for matching. The
signature is set by the first subscriber to register against a namespace and
checked against the emit call; transformers run alongside that contract, not
through it.
The blocking case is where transformers stop looking like middleware entirely:
def validate_user(namespace: str, kwargs: dict) -> dict | None:
if 'user_id' not in kwargs or kwargs['user_id'] < 0:
return None
return kwargs
broker.register_transformer('user.*', validate_user)
broker.emit('user.login', user_id=-1) # silently dropped
A transformer that returns None kills the event before any subscriber sees
it. You can register a * transformer that logs every event in the system,
a validator that drops malformed events at the door, a path normalizer, a
database lookup that enriches user payloads with permissions — all without
any subscriber having to know it’s happening.
Transformers have priority. Higher runs first. The canonical pattern is normalize-then-validate:
broker.register_transformer('input', normalize, priority=10)
broker.register_transformer('input', validate, priority=5)
Normalize cleans the input, validate decides whether to forward it. If you flipped the priorities, validate would see the raw input and reject things it shouldn’t.
Sync, Async, and Why Both
The broker has two emission methods: emit() and emit_async(). The split
exists because async-in-Python is a viral coloring problem and pretending
otherwise causes more pain than it saves.
async def async_handler(data: str) -> None:
await asyncio.sleep(0.1)
print(f'Async: {data}')
def sync_handler(data: str) -> None:
print(f'Sync: {data}')
broker.register_subscriber('process.data', async_handler)
broker.register_subscriber('process.data', sync_handler)
broker.emit('process.data', data='test') # only sync_handler runs
await broker.emit_async('process.data', data='test') # both run
emit() is synchronous and fires synchronous subscribers only. emit_async()
is a coroutine and fires both. This means you can call emit() from anywhere —
inside a Qt slot, inside a thread, inside a function that has no business being
async — and it does the right thing for the synchronous half of your subscribers.
Async subscribers are simply skipped, not errored on.
This was a deliberate trade. The alternative would be to either force every emit site to be awaited (poisoning everything upstream) or to schedule sync emits onto an event loop (which requires there to be a loop, which there isn’t in a Qt main thread, which is exactly where I needed this to work). Splitting the methods means the sync path stays trivial and the async path is there when you want it.
Staging
Staged events sit in a queue until you explicitly dispatch them.
broker.stage('file.saved', filename='render.exr', size=4096)
broker.stage('file.saved', filename='preview.jpg', size=512)
broker.stage('render.done', frame=42)
broker.emit_staged() # dispatch all, then flush
await broker.emit_staged_async() # same but async
The use case: you’re doing a batch operation, you want to emit events about each piece, but you don’t want subscribers reacting one-at-a-time during the batch. Stage everything, do the work, then dispatch in one go.
By default, dispatching flushes the staging table. Pass flush=False to dispatch
without clearing, which is occasionally useful for heartbeats or replays:
broker.stage('session.heartbeat', status='ok')
broker.emit_staged(flush=False) # dispatch
broker.emit_staged(flush=False) # dispatch again
broker.emit_staged() # dispatch and flush
There’s also a subtle bit of behavior worth flagging: events staged from inside
a subscriber during dispatch are not consumed by the current emit_staged call.
They survive to the next one. This means a subscriber can stage a follow-up
event without infinite-looping its own dispatcher.
Pausing
The companion to staging. broker.paused() is a context manager that suppresses
all dispatch — both emit() and emit_async() — for the duration of the block.
with broker.paused():
broker.emit('file.saved', filename='test.exr') # suppressed
await broker.emit_async('render.done', frame=42) # suppressed
broker.emit('file.saved', filename='test.exr') # delivered
Critically, staging is not suppressed by pause. You can stage events inside
a paused block and they will be there waiting when you call emit_staged()
later. Pause kills dispatch; staging is bookkeeping.
with broker.paused():
broker.stage('file.saved', filename='test.exr') # staged fine
broker.emit_staged() # delivered
I use this generally for silencing noisy code paths on application startup. Initialization is when I care the least about signals as I primarily use them to react to updates in state and in initialization I just want the application started and most systems don’t have state yet.
Exception Handling
A subscriber raising an exception is a question, not an answer. Do you want the emission to abort? Do you want to log and continue to the next subscriber? Do you want to silently swallow it? Do you want to collect every exception across a batch for later review? Different applications want different things, and the broker doesn’t pick for you.
Typically, you want to fail fast and early, for optimization purposes, but you may also want to log for later auditing or debugging.
from broker import handlers
# Stop on the first error, log it, don't deliver to remaining subscribers
broker.set_subscriber_exception_handler(handlers.stop_and_log_subscriber_exception)
# Log and keep going
broker.set_subscriber_exception_handler(handlers.log_and_continue_subscriber_exception)
# Silent — swallow everything
broker.set_subscriber_exception_handler(handlers.silent_subscriber_exception)
# Collect into a list for batch review
broker.set_subscriber_exception_handler(handlers.collect_subscriber_exception)
broker.emit('event', data='test')
for error in handlers.exceptions_caught:
print(f"Error in {error['namespace']}: {error['exception']}")
The handler is a single callable with the signature
(callback, namespace, exception) -> bool, where the return value decides
whether to stop or continue delivery. The bundled handlers are just the obvious
implementations of that contract; you can write your own:
def custom_handler(callback: Callable, namespace: str, exception: Exception) -> bool:
if isinstance(exception, ValueError):
return True # stop
return False # continue
broker.set_subscriber_exception_handler(custom_handler)
Setting the handler to None reverts to “raise everything,” which is what you
want during development and almost never what you want in production.
Transformers have their own parallel set of handlers
(set_transformer_exception_handler) with the same shape. I considered unifying
them and decided against it — a transformer raising is a different kind of
problem than a subscriber raising. The transformer sits in the delivery path;
its failure can prevent the event from reaching anyone. Letting it be configured
independently means you can be strict about transformer correctness while being
lenient about subscriber failures, or vice versa.
Weak References
The broker holds weak references to every callback it tracks. If the object owning a subscriber goes out of scope, the subscriber goes with it. No leaks, no zombie handlers firing into deleted widgets.
This is by far my most favorite feature and is the most invisible when it works.
Naive pub-sub systems hold strong references and become the reason your Qt
widgets never actually get collected. The broker also emits
BROKER_ON_SUBSCRIBER_COLLECTED when it notices a subscriber has been culled,
which is useful for debugging “why did my callback stop firing” mysteries.
Introspection
The broker is observable. It can describe itself.
broker.get_namespaces() # all registered namespaces
broker.namespace_exists('file.saved') # check a specific one
broker.get_matching_namespaces('file.*') # wildcard match
broker.get_subscriber_count('file.saved') # count subscribers
broker.get_subscribers('file.saved') # the actual callbacks
broker.get_statistics() # overall stats
broker.to_dict() # full structure
broker.export('/tmp/broker.json') # dump to disk
I built these primarily because I wanted a visualization tool (a PySide6 node
graph showing the live state of the broker — that’s a different post) but they
turn out to be useful for general debugging. “Why isn’t my handler firing?”
Check get_live_subscribers. “Did this transformer get registered?” Check
get_transformers.
The tradeoff of indirection is the more you add, the harder the system is to observer or understand.

Broker Self-Notifications
The broker can emit events about itself. Subscribers added or removed, namespaces created or deleted, emissions happening — all available as their own events.
broker.set_flag_states(on_subscribe=True, on_emit=True)
@broker.subscribe(broker.BROKER_ON_SUBSCRIBER_ADDED)
def on_subscriber_added(using: str) -> None:
print(f'New subscriber to: {using}')
@broker.subscribe(broker.BROKER_ON_EMIT)
def on_emit(namespace: str, kwargs: dict) -> None:
print(f'Event emitted: {namespace}')
These are off by default because they are loud. Turn them on selectively when you’re debugging or building a monitoring dashboard, leave them off in normal operation.
The same pattern — broker subscribing to itself — is what powers the
visualization tool. The node graph reacts to BROKER_ON_SUBSCRIBER_ADDED and
friends, redrawing as the broker’s state changes. Dogfooding the API to build
tools for the API.
Reimport Protection
This is the smallest feature and the one I’m most on the fence with. The broker is a singleton, and reimporting it would clear the global namespace and subscriber table. That is almost never wanted — it’s how you end up with half your application subscribed to the old broker and half to the new one, with no error message and a lot of confusion.
import broker
import importlib
importlib.reload(broker) # ImportError
Reimport raises. If you really want to clear the broker, there’s clear(). The
reload path is closed because I have been burned by it and I do not want to be
burned by it again.
Closing Soapbox
The whole thing is maybe 1800 lines of Python. PyPubSub is fine, and if PyPubSub fits your needs you should use it. But the version I needed has typed signature validation, transformers, staging, pause, weak references, both sync and async emit, configurable exception handling, and introspection deep enough to drive a visualization tool. By the time I’d finished evaluating whether each of those features was available, configurable, and would interact correctly with the others, I’d already written half of it in my head.
Build the lighter, first-party version. Tailor it. Own the failure modes. The dependency you don’t take on is the dependency you don’t have to maintain when its author abandons it in 2031.
tags: python - broker - event