Nate Maxwell

A blog of various experiments and projects of mine.

View on GitHub
16 May 2026

[PY] Event Broker

by Nate Maxwell

Python Event Broker

“Engineers know the benefits of everything and the tradeoffs of nothing.” - Rich Hickey

I love a good pubsub-like system. I strive for the most decoupling in my larger systems or between systems in my applications. Over the Christmas break I began working on a python code editor Solaire and once again I needed various widget components to react to updates in the state of others, like you do when building a gui application. Every time I use the PySide6/Qt Signals I regret it, trying to manage components passing references around to subscribe to signals. I find that when dealing with large amounts of state reaction that Qt Signals scale very, very poorly.

To get around this I typically set up something like baby’s first pub-sub system. A table of namespace keys to a list of callbacks as the value, with a register(namespace, callback) and emit() function. This works fairly well, but I had grown tired of setting this up in every large application.

Additionally, I have been trying to convince others at work about the benefits of such a system over using native Qt signals but to no avail. In explaining the plan for the system the team asked “Why not just use PyPubSub?”

<soapbox>

As engineers, we often think of ourselves as creating code, but we don’t create code, we create features, and those features are only useful if they’re actually used. As such, we should eschew dependencies for things that can be easily made and maintained. Sometimes a third party dependency is correct, but it should be carefully measured and made sure to justify its weight. Otherwise, we could likely make a lighter weight first party version with features specifically tailored to the project.

</soapbox>

So, I set out to make my own event broker and pub-sub-like system.

The Shape of the Thing

The broker is a singleton with five moving parts:

That’s it. There is no class to instantiate, no broker object to thread through your application, no get_broker() factory to call. You import broker and everything is already wired up.

import broker

@broker.subscribe("file.saved")
def on_file_saved(filename: str, size: int) -> None:
    print(f'Saved: {filename} ({size} bytes)')

broker.emit('file.saved', filename='document.txt', size=1024)

That’s the entire hello-world. The decorator registers the callback against the file.saved namespace, and emit dispatches keyword arguments to anything subscribed there. No Signal objects passed around, no connect() calls, no parent widgets needing to know about child widgets needing to know about sibling widgets.

Namespaces and Wildcards

Namespaces are just dot-separated strings. There is no enforced hierarchy — the dots are a convention, not a tree structure — but the wildcard matcher treats them as one. You can subscribe to a single concrete namespace, or you can use * to fan out across a whole branch:

@broker.subscribe("file.*")
def on_any_file_event(**kwargs) -> None:
    print(f'File event: {kwargs}')

broker.emit('file.saved', filename='data.json', size=2048)
broker.emit('file.deleted', filename='temp.txt', size=512)

Wildcard subscribers have to accept **kwargs because they’re catching events with potentially different shapes. The signature validator (more on that in a moment) is smart enough to know this.

Signature Validation

The broker enforces signature consistency per namespace. The first subscriber to register against a namespace sets the expected signature, and every subsequent subscriber and every emission gets checked against it.

@broker.subscribe('user.login')
def first_subscriber(username: str, user_id: int) -> None:
    pass

@broker.subscribe('user.login')
def wrong_signature(username: str, email: str) -> None:  # raises
    pass

broker.emit('user.login', username='alice', email='[email protected]')  # also raises

This is the kind of bug that, in a naive pub-sub, you find at 11pm on a Friday when someone in another module quietly renamed user_id to uid and now a subscriber three layers away is silently getting None. The broker refuses to let you start down that path. The signature is the contract, and the contract is checked at registration and emission time, not when the subscriber blows up.

Priorities

Subscribers can be ordered with a priority value. Higher fires first.

@broker.subscribe('system.alert', priority=10)
def critical_handler(message: str) -> None:
    print('CRITICAL:', message)

@broker.subscribe('system.alert', priority=1)
def log_handler(message: str) -> None:
    print('Logged:', message)

I went back and forth on whether to include this. Priority-ordered subscribers are a footgun — it’s the kind of feature that, if leaned on too hard, turns your loosely-coupled pub-sub into an implicitly ordered procedure call with extra steps. But there are real cases where it matters (logging that should happen after all the real handlers ran, critical UI updates that need to land before anything else paints) and the alternative — manually sequencing emissions — is worse.

One-Shot Subscribers

The once=True flag registers a subscriber that unregisters itself after firing once:

@broker.subscribe('app.ready', once=True)
def on_first_ready(status: str) -> None:
    print(f'App came up: {status}')

broker.emit('app.ready', status='ok')   # fires
broker.emit('app.ready', status='ok')   # silent, already gone

This pattern shows up everywhere in GUI code — “wait for the project to load, then do one thing” — and the manual version is always a cleanup ritual of storing the handle, defining the callback to call unregister_subscriber on itself, and hoping you got the closure right.

Transformers

Transformers sit between emit() and the subscribers. When an event is emitted, every registered transformer for that namespace gets a crack at the payload before any subscriber sees it. A transformer receives the namespace and the kwargs dict, and returns either a dict (to forward) or None (to block delivery entirely).

def add_timestamp(namespace: str, kwargs: dict) -> dict:
    kwargs['timestamp'] = datetime.datetime.now().time().isoformat()[:-4]
    return kwargs

broker.register_transformer('system.*', add_timestamp, priority=10)

It’s tempting to call this middleware, but that framing is too narrow. Middleware in the Express.js sense is a pipeline of payload mutators. Broker transformers can do that, but they can also:

The system.* example above is the enrichment case. Every emission under system. picks up a timestamp field, and subscribers that declare it in their signature receive it. Subscribers that don’t declare it simply don’t see it — transformers do not alter the tracked signature for matching. The signature is set by the first subscriber to register against a namespace and checked against the emit call; transformers run alongside that contract, not through it.

The blocking case is where transformers stop looking like middleware entirely:

def validate_user(namespace: str, kwargs: dict) -> dict | None:
    if 'user_id' not in kwargs or kwargs['user_id'] < 0:
        return None
    return kwargs

broker.register_transformer('user.*', validate_user)
broker.emit('user.login', user_id=-1)  # silently dropped

A transformer that returns None kills the event before any subscriber sees it. You can register a * transformer that logs every event in the system, a validator that drops malformed events at the door, a path normalizer, a database lookup that enriches user payloads with permissions — all without any subscriber having to know it’s happening.

Transformers have priority. Higher runs first. The canonical pattern is normalize-then-validate:

broker.register_transformer('input', normalize, priority=10)
broker.register_transformer('input', validate, priority=5)

Normalize cleans the input, validate decides whether to forward it. If you flipped the priorities, validate would see the raw input and reject things it shouldn’t.

Sync, Async, and Why Both

The broker has two emission methods: emit() and emit_async(). The split exists because async-in-Python is a viral coloring problem and pretending otherwise causes more pain than it saves.

async def async_handler(data: str) -> None:
    await asyncio.sleep(0.1)
    print(f'Async: {data}')

def sync_handler(data: str) -> None:
    print(f'Sync: {data}')

broker.register_subscriber('process.data', async_handler)
broker.register_subscriber('process.data', sync_handler)

broker.emit('process.data', data='test')        # only sync_handler runs
await broker.emit_async('process.data', data='test')  # both run

emit() is synchronous and fires synchronous subscribers only. emit_async() is a coroutine and fires both. This means you can call emit() from anywhere — inside a Qt slot, inside a thread, inside a function that has no business being async — and it does the right thing for the synchronous half of your subscribers. Async subscribers are simply skipped, not errored on.

This was a deliberate trade. The alternative would be to either force every emit site to be awaited (poisoning everything upstream) or to schedule sync emits onto an event loop (which requires there to be a loop, which there isn’t in a Qt main thread, which is exactly where I needed this to work). Splitting the methods means the sync path stays trivial and the async path is there when you want it.

Staging

Staged events sit in a queue until you explicitly dispatch them.

broker.stage('file.saved', filename='render.exr', size=4096)
broker.stage('file.saved', filename='preview.jpg', size=512)
broker.stage('render.done', frame=42)

broker.emit_staged()              # dispatch all, then flush
await broker.emit_staged_async()  # same but async

The use case: you’re doing a batch operation, you want to emit events about each piece, but you don’t want subscribers reacting one-at-a-time during the batch. Stage everything, do the work, then dispatch in one go.

By default, dispatching flushes the staging table. Pass flush=False to dispatch without clearing, which is occasionally useful for heartbeats or replays:

broker.stage('session.heartbeat', status='ok')
broker.emit_staged(flush=False)  # dispatch
broker.emit_staged(flush=False)  # dispatch again
broker.emit_staged()             # dispatch and flush

There’s also a subtle bit of behavior worth flagging: events staged from inside a subscriber during dispatch are not consumed by the current emit_staged call. They survive to the next one. This means a subscriber can stage a follow-up event without infinite-looping its own dispatcher.

Pausing

The companion to staging. broker.paused() is a context manager that suppresses all dispatch — both emit() and emit_async() — for the duration of the block.

with broker.paused():
    broker.emit('file.saved', filename='test.exr')   # suppressed
    await broker.emit_async('render.done', frame=42) # suppressed

broker.emit('file.saved', filename='test.exr')  # delivered

Critically, staging is not suppressed by pause. You can stage events inside a paused block and they will be there waiting when you call emit_staged() later. Pause kills dispatch; staging is bookkeeping.

with broker.paused():
    broker.stage('file.saved', filename='test.exr')  # staged fine

broker.emit_staged()  # delivered

I use this generally for silencing noisy code paths on application startup. Initialization is when I care the least about signals as I primarily use them to react to updates in state and in initialization I just want the application started and most systems don’t have state yet.

Exception Handling

A subscriber raising an exception is a question, not an answer. Do you want the emission to abort? Do you want to log and continue to the next subscriber? Do you want to silently swallow it? Do you want to collect every exception across a batch for later review? Different applications want different things, and the broker doesn’t pick for you.

Typically, you want to fail fast and early, for optimization purposes, but you may also want to log for later auditing or debugging.

from broker import handlers

# Stop on the first error, log it, don't deliver to remaining subscribers
broker.set_subscriber_exception_handler(handlers.stop_and_log_subscriber_exception)

# Log and keep going
broker.set_subscriber_exception_handler(handlers.log_and_continue_subscriber_exception)

# Silent — swallow everything
broker.set_subscriber_exception_handler(handlers.silent_subscriber_exception)

# Collect into a list for batch review
broker.set_subscriber_exception_handler(handlers.collect_subscriber_exception)
broker.emit('event', data='test')
for error in handlers.exceptions_caught:
    print(f"Error in {error['namespace']}: {error['exception']}")

The handler is a single callable with the signature (callback, namespace, exception) -> bool, where the return value decides whether to stop or continue delivery. The bundled handlers are just the obvious implementations of that contract; you can write your own:

def custom_handler(callback: Callable, namespace: str, exception: Exception) -> bool:
    if isinstance(exception, ValueError):
        return True   # stop
    return False      # continue

broker.set_subscriber_exception_handler(custom_handler)

Setting the handler to None reverts to “raise everything,” which is what you want during development and almost never what you want in production.

Transformers have their own parallel set of handlers (set_transformer_exception_handler) with the same shape. I considered unifying them and decided against it — a transformer raising is a different kind of problem than a subscriber raising. The transformer sits in the delivery path; its failure can prevent the event from reaching anyone. Letting it be configured independently means you can be strict about transformer correctness while being lenient about subscriber failures, or vice versa.

Weak References

The broker holds weak references to every callback it tracks. If the object owning a subscriber goes out of scope, the subscriber goes with it. No leaks, no zombie handlers firing into deleted widgets.

This is by far my most favorite feature and is the most invisible when it works. Naive pub-sub systems hold strong references and become the reason your Qt widgets never actually get collected. The broker also emits BROKER_ON_SUBSCRIBER_COLLECTED when it notices a subscriber has been culled, which is useful for debugging “why did my callback stop firing” mysteries.

Introspection

The broker is observable. It can describe itself.

broker.get_namespaces()                    # all registered namespaces
broker.namespace_exists('file.saved')      # check a specific one
broker.get_matching_namespaces('file.*')   # wildcard match
broker.get_subscriber_count('file.saved')  # count subscribers
broker.get_subscribers('file.saved')       # the actual callbacks
broker.get_statistics()                    # overall stats
broker.to_dict()                           # full structure
broker.export('/tmp/broker.json')          # dump to disk

I built these primarily because I wanted a visualization tool (a PySide6 node graph showing the live state of the broker — that’s a different post) but they turn out to be useful for general debugging. “Why isn’t my handler firing?” Check get_live_subscribers. “Did this transformer get registered?” Check get_transformers.

The tradeoff of indirection is the more you add, the harder the system is to observer or understand.

Broker Self-Notifications

The broker can emit events about itself. Subscribers added or removed, namespaces created or deleted, emissions happening — all available as their own events.

broker.set_flag_states(on_subscribe=True, on_emit=True)

@broker.subscribe(broker.BROKER_ON_SUBSCRIBER_ADDED)
def on_subscriber_added(using: str) -> None:
    print(f'New subscriber to: {using}')

@broker.subscribe(broker.BROKER_ON_EMIT)
def on_emit(namespace: str, kwargs: dict) -> None:
    print(f'Event emitted: {namespace}')

These are off by default because they are loud. Turn them on selectively when you’re debugging or building a monitoring dashboard, leave them off in normal operation.

The same pattern — broker subscribing to itself — is what powers the visualization tool. The node graph reacts to BROKER_ON_SUBSCRIBER_ADDED and friends, redrawing as the broker’s state changes. Dogfooding the API to build tools for the API.

Reimport Protection

This is the smallest feature and the one I’m most on the fence with. The broker is a singleton, and reimporting it would clear the global namespace and subscriber table. That is almost never wanted — it’s how you end up with half your application subscribed to the old broker and half to the new one, with no error message and a lot of confusion.

import broker
import importlib
importlib.reload(broker)  # ImportError

Reimport raises. If you really want to clear the broker, there’s clear(). The reload path is closed because I have been burned by it and I do not want to be burned by it again.

Closing Soapbox

The whole thing is maybe 1800 lines of Python. PyPubSub is fine, and if PyPubSub fits your needs you should use it. But the version I needed has typed signature validation, transformers, staging, pause, weak references, both sync and async emit, configurable exception handling, and introspection deep enough to drive a visualization tool. By the time I’d finished evaluating whether each of those features was available, configurable, and would interact correctly with the others, I’d already written half of it in my head.

Build the lighter, first-party version. Tailor it. Own the failure modes. The dependency you don’t take on is the dependency you don’t have to maintain when its author abandons it in 2031.

tags: python - broker - event