Messages Not Brokers

“Event sourcing”, “CQRS”, or just plain “messaging” - it has been years since I’ve talked to a developer working on a network-exposed (usually via HTTP) service without these coming up within the initial discussion. Over the years it has become an almost doctrinal position that services need to be emitting messages and that those messages must go through a message broker (e.g., RabbitMQ, Kafka, or some cloud-provider messaging system). Yet, such systems can be incredibly complicated to deploy, run, scale, and diagnose. Additionally, developers often believe that certain behaviors (e.g., only-once delivery, absolute ordering) are present and guaranteed in message brokers when, in fact, they are not.

Before I start to dig in, let me start by saying that I am not, in principle, against the idea that it should be possible for Service B to become aware of state changes within Service A, i.e., the end goal of what most developers mean when they say “messaging”. I am however, opposed to the idea that message brokers should be the default tool developers reach for when trying to accomplish this. So, I’d like to start this discussion there and then move on to how I prefer to implement messaging.

The Problem with Message Brokers

I don’t want to spend too much time talking about the difficulties in deploying, running, and scaling brokers because, today, I suspect most developer will be using solutions provided by their cloud providers. However, I do want to spend a few sentences on it because it partly informs what I think is the biggest problem with their usage. But please, if you are going to do brokered messages, do everything you can to avoid having someone in your company running your own brokers. Down every path where you choose to run your own lies far more frazzled and gray-haired staff.

So, what makes running a broker so hard? Well, most of it has to deal with the fact that you probably won’t be running a broker; you’ll be running a cluster of them. Initially you’ll want the resiliency of multiple instances and maybe, just maybe, you’ll eventually generate enough load that you’ll need to worry about scaling them as well. As with any distributed/clustered service that must, at least temporarily, store data there is a complex, intra-cluster consensus protocol that is constantly running to coordinate the state of the cluster and help route requests. My experience has been that nothing is as thoroughly maddening as trying to diagnose issues in that intra-cluster traffic. Blips result in slowed or lost messages, failures in nodes can result in temporarily lost messages that suddenly show back up minutes or hours later or, god forbid, your cluster loses enough nodes that it loses quorum and grinds to a halt and you have to try and reassemble a coherent view of your message universe from nodes that have de-synced with each other.

That nightmare of intra-cluster work leads to the first thing that developers just get wrong about brokers - a cluster of nodes connected via a network and accessed via a network can make NO guarantees around delivery. If a publisher sends a messages to a broker and does not receive a “message received” response, the publisher can never be sure if the message was lost en route to the broker or if the “message received” response lost en route to the publisher. So, right from the start, you run into a sort of fog-of-war that is going to make detecting and diagnosing “lost” messages much harder. Once a message gets into the cluster, network blips within the intra-cluster traffic could prevent it from truly being staged for delivery. Then on the consumer side, the broker can’t know if the consumer received the message and the acknowledgement got lost or if it never received the message. Different brokers do different things, some of them quite clever, to try and reduce the likelihood of these issues but none of them can truly guarantee that these issues will never happen. As such they can never truly guarantee that a message will move from Service A to Service B.

So, developers that develop code assuming that a message sent from Service A to Service B will always be received (let alone only received once) or, maybe a bit more softly, that if a message isn’t received that its absence will be detected, have just introduced a bug into their code. A subtle bug that, when it surfaces, will likely be very difficult to diagnose due to its intermittency and the opacity of what is going on in the broker cluster.

The other big mistake I see developers repeatedly make is to assume that messages will always be received in the order they are sent. If Service B assumes it will always see, for example, a “resource created” message before an “resource updated” message, it is making this mistake. All of the network issues mentioned above can easily cause the “updated” message to reach the cluster before the “created” message if they are issued in fairly quick succession. While cluster node failures can cause the delivery of the “created” message to happens minutes or hours after the “updated” message.

A common thing I see developer do here is to look at the timestamp that is often on messages and attempt to order them that way. Unfortunately, the publisher of messages usually isn’t a publisher but rather a cluster of them. Each node in that cluster has a different system clock and while, hopefully, each publisher node is using something like NTP to keep its clock up to date, time drift on the order of high-milliseconds to low-seconds will occur between nodes. So, node 1 could publish its “create” message with a timestamp of 18:38:53.027 and node 2 publish its “update” message, after node 1, with a timestamp of 18:38:52.852 (175ms “in the past”) simply because clock drift. Things like vector clocks can solve for this but they then require a consensus protocol on the publisher side and thus the publisher becomes susceptible to all the problems and operational overhead mentioned before.

Finally, there is another problem with using brokers; authorization. Service A probably has some set of logic about who can see its data; this could be a coarse-grained you-can-see-all-or-nothing or more sophisticated field-level rules. Whatever rules Service A has are going to need to be replicated in the broker such that Service C doesn’t come in later, subscribe to a topic, and start getting data for which it is not authorized. While most brokers have support for expressing such rules, hopefully it doesn’t take much imagination to start thinking about ways these rules could fall out of sync nor the pain in coordinating new releases of Service A that expose new data and broker configuration changes to protect that data.

There are a lot more issues that show up with brokers (e.g., poisoned messages, dead-letter queue management, how to deal with new Service D coming online without having seen any of the past messages from Service A, evolving the content of messages) but hopefully this begins to paint a picture of a more complex system than most developers seem to think about when they reach for a broker.

Messaging (maybe) without Brokers

So, how do I think about messaging? Well, I start by noting there are really three steps to messaging. Service A generates some data (a message) and denotes this should be “sent out” to other services. Next, some mechanism transports the message from Service A to Service B. Finally, Service B becomes aware that a new message is available for it to act upon.

For the first and third part, I recommend a simple outbox/inbox pattern. Service A publishes a message to an outbox. Service B dequeue’s messages from an inbox. The interfaces for an inbox and outbox are very simple and provide strong decoupling between the rest of Services A & B and whatever “magic” is going on to move messages around. Here are some example Outbox and Inbox APIs.

public interface IOutbox {
  Task Publish<T>(T message) where T : IMessage
}

public interface IInbox {
  T Dequeue<T>() where T : IMessage
  void Subscribe<T>(Action<T> subscriber) where T : IMessage
}

public interface IMessage {
  // some properties
}

The notion of an outbox in Service A and an inbox in Service B is probably pretty uncontroversial for many devs. It’s a pretty standard pattern. It’s when we get into the transports that I tend to start losing people. There are more ways to get messages from Service A to Service B than brokers. Service A could directly send messages to Service B (historically this has been called Webhooks) or Service B could ask (poll) Service A for messages. My preferred starting point is the polling model.

Polling for Messages

In the polling model, Service A will expose one or more network endpoints from which interested services may pull messages. In the case of HTTP services, these are often just GET get calls to some sort of /messages endpoint. Usually that endpoint will support a query parameter that will fetch all messages on or after a given timestamp1. So, in the polling model, Service B has a background job that periodically polls Service A’s message endpoint, gets the messages, and drops them into Service B’s inbox.

There are a few reason I really like this model. First and foremost, it is very easy to detect network issues. Because Service B has a direct connection to Service A it knows whether its request succeeded or failed and can act accordingly. Likewise, a message can’t get lost “in the ether”, it’s either in Service A’s outbox (usually a database table) or it is not and that can be checked very easily. Likewise, after a polling run, the message is either in Service B’s inbox (again, usually a database) or its not and that too can be checked easily.

This method is also “self-healing” and duplicate-detecting. When Service B polls, it’s going to get back a list of messages (or none if there have been none since last query). Assuming those messages carry a unique ID (more to come on the contents of messages), when the poller is dumping messages into the inbox it can check to see whether this message has already been seen and act appropriately. For example, let’s assume an inbox with messages 1, 2, 3, and 5 in it. Service B does a polling run and gets back messages 3, 4, and 5. It goes to put message 3 in the inbox, that message exists already and is skipped. It goes to put message 4 in the inbox, the message doesn’t yet exist and it goes in. It goes to put message 5 in the inbox, the message exists already and is skipped. In this way, missed (I’ll discuss how that might have happened later) get caught and made available and duplicate messages are not resurfaced to the rest of Service B.

Additionally, this model allows Service A to reuse its authorization logic to control which consuming services see which messages. Service C isn’t supposed to get messages (or messages of a specific type)? Don’t return them on the request to /messages.

Now, the two major protestations against polling tend to be time-to-delivery to Service B and additional load on Service A. First, let me address the time-to-delivery concern. The general worry here is that Service B must wait for its next polling cycle to find out about new messages. True. But, here is the thing, it’s generally true for brokers too. Most consumers/subscribers, deep inside their client libraries, poll brokers for new messages. They do this so that they can manage their own local queue of messages trying to trade off the possibility that a consuming process could crash and lose messages against with how quickly messages show up. Consumers will often poll a few times a second. Service B could do that same thing with Service A.

For those that already had a concern that polling was going to put a bunch of new load on Service A, the previous sentence was probably really scary. There are a few reasons I don’t worry about this. First, the cluster of nodes that make up Service A can probably handle a few dozen extra calls a minute already. Second, as we’ll see below, the size of messages should, in my opinion, be quite small so you’re not really sending that much traffic. Lastly, this model limits the blast radius of damage if consumers really do generate too many requests.

Assume a mode where you had a central broker cluster. If some set of consumers, likely because of a bug, generates enough requests to begin overwhelming the cluster that is going to cause problems for all services using that cluster. In the polling model, if this happens, only Service A is overwhelmed. Now, Service A may be really important but chances are pretty good that Service A becoming unavailable due to load is going to be far less damaging to your entire ecosystem than a centralized message broker becoming overwhelmed. Plus, Service A is probably a whole lot easier/quicker to scale out than the broker cluster in the event this actually happens.

So, no Brokers?

No, as I stated at the beginning, I’m not completely anti-broker. They can be a useful tool in a developers toolbox as long as it’s understood how they work and the developer takes that behavior into account. Indeed, you might notice that the polling mechanism seems to “cover” all the downsides of brokers that I mentioned. So you might already see where this is going.

Brokers are normally more efficient in transporting messages from Service A to Service B. They tend to have very sophisticated network management code that is reusing established connection, responding to changes in cluster topology, implementing zero-copy reads and writes when they can, etc. Indeed, my experience is that connection management tends to constitute over half the code in a “simple” broker client. So, now that we have a way to make up for some of the issues that can show up, we are now in a position where we could have a discussion on whether the complexity brought in by brokers is worth the more efficient/faster delivery of messages.

If you find that it is, that’s fine. You’ll write another transport that monitors Service A’s outbox and when a new message shows up, immediately publish it to the broker. The strong encapsulation provided by the outbox interface means the rest of the code doesn’t need to know anything new is happening. Service B then can implement its side of this new transport, a broker subscriber, and upon receiving a message, look to put it in the inbox with the same de-duping logic as mentioned above.

Once the broker path is in place, Service B can usually decrease the frequency at which it polls Service A. Note, it can not fully eliminate it. That polling is still needed to mitigate the different ways in which messages might get lost “in the ether” with the broker path. But taken together these provide both an efficient and resilient messaging system.

Anatomy of a Message

So far, I’ve talked about messages but besides saying above that they should have a unique ID, I haven’t said much about what a message contains. Well, here’s another spot where I’m going to diverge from the norm. Most messages, I think, should contain four primitive data:

  • the aforementioned unique ID
  • a timestamp assigned by the emitting service
  • a message type which helps distinguish what type of resource changed
  • the unique ID of the resource that changed.

That’s it? But what about all the other data for the changed resource?!? Yep, that’s it. And that data for the resource is available… in Service A. This model keeps messages really small which makes them much more efficient/quick to transfer (some of you were really worried about that before, remember?) but more importantly it prevents developers from making another common mistake.

See, if Service A were to put the current state of the resource into the message it is making an assumption that Service B is going to have the same context/state in which to interpret the message as Service A did when sending it. It also assumes that Service B wants the data for every resource managed by Service A. Neither of which is going to be consistently true but developers on both the publishing- and consuming-side behave as if they are. As already noted above, any developer who assumes they will receive one state change message (e.g., “created”) before another (e.g., “updated”) is making this mistake and I literally see this mistake somewhere in every messaging implementation I’ve seen.

By sending only the ID of the resource, Service B now has a bit more work to do. First, it needs to determine if it cares about a change to the given resource at all. If not, it just ignores the message and moves on. But, if it does care, it then needs to figure out what to do about it. This almost always means it immediately turns around and queries Service A for the current state2. Upon receiving the latest state it must determine what all has changed and act upon it. This is the part that developers think could be addressed by messages that made explicit what changed. Note though, if you agree that total message ordering can’t be guaranteed or if you agree that Service B might not have everything from Service A (e.g., because Service B came online after Service A or could have missed messages for some reason) then it must be true that Service B must already have to inspect what it currently knows about a given resource and compare it to what Service A sends it.

Now for Something Totally Different

So, now that I’ve described why I strongly prefer a model that isn’t built solely on a message broker transport and uses only minimal, anemic messages, let me note when I’d do something totally different.

All of the above, at its core, is based on two core assumptions:

  • messages must be delivered to Service B
  • messages are carrying state data about resources managed by the service While I think this is the most common use case of messaging in general network-exposed services, it’s not the only use case.

Another common use case is Service A publishing telemetry (e.g., memory currently in use, clients currently connected, number of errors recently seen) about itself. The intent then is that some observability service will listen to these messages and provide some monitoring/alerting/visualization/response to this data. In this model, the messages usually contain the full point-in-time observation and, if a few messages get lost, it’s not a big deal. The observability service just waits for the next one to show up and keeps evaluating its rules.

As an example, in a previous job, we had a system that consumed all the WiFi client observations made by WiFi access points. We received and processed about 5M messages every 10 seconds3. A message contained things like a client ID and signal strength and noise information. We used this to triangulate or quadrangulate the clients in a 2D or 3D space respectively. We also used it to map the interior space and determine the building material of the walls (physics is fun and privacy is dead4!). If a couple messages were missed, the worst that happened is a client would suddenly “teleport” from one location to the next without an observable path between the two points.

Some Final Implementation Thoughts

So, to summarize, my preferred way of implementing messaging is for the publisher to publish through an Outbox interface that abstracts away any of the details about how messages move around. Start by implementing an HTTP polling mechanism in Service B that queries Service A for messages and places the messages into an Inbox in Service B. Later, and only after evaluating the tradeoffs, potentially implement some sort of message broker transport. The messages flowing between the services should only contain the ID of the resource that changed and Service B should query Service A for the current state of the resource if it cares about the resource at all. Now let me touch on some additional implementation details (in no particular order).

Within, but not exposed outside of, the Inbox and Outbox implementations, it should be possible for transports to associate key-value pairs with a given message. This lets the transport associate transport-specific state with the message. For example, when using a broker, the message publisher transport might want to put a boolean on the message indicating the message was attempted to be published and whether it was successful. The Inbox might track whether the Service has dequeued a message.

When implementing the GET /messages operation on Service A, Service A will usually want to support querying by message type as well as its internal timestamp. This allows Service B to only query for certain message types it cares about should Service A produce multiple message types (which it often will). When Service B queries by timestamp, it should generally query for a time a bit earlier than the newest message it received the last time it queried. This is to allow for a case where multiple nodes were writing messages to the outbox and node 2, which was lagging a bit, wrote a message with an earlier timestamp than the one that just finished before the query. If Service B gets a couple messages it has seen before, it’s no worry, it’ll just dedupe them.

Service A needs to have a background job that will eventually purge old messages. Because messages are quite small in this model it can keep a deep history but space is never infinite and eventually it will need to delete some. Service A should balance how long it wants to support a current subscriber being offline against the slow down that will occur as the outbox fills with more and more messages. My default is to support something like 5 days of history. This way if a non-essential service goes down over a three day weekend, it doesn’t suddenly require a page out to recover the downed service before prior to messages it cares about getting purged.

Service A and B should be sure to monitor and alert on:

  • a substantial increase in the number of messages
  • a substantial increase in the polling for messages
  • a prolonged absence of any messages What exactly constitutes a “substantial increase” and “prolonged absence” will depend on the service.

1 Note, this timestamp does not have the same problems as attempting to use timestamps to order messages. You’re not assuming any sort of clock synchronization between nodes, you’re just assuming that a “created at” time, once assigned, won’t later be changed. That’s usually a pretty safe bet.

2 Note, Service A and Service B both have ways to make this more efficient. First, Service A could support conditional GETs such that if Service B receives a number of messages that a given resource has changed in rapid succession it could pull back the latest record on the first request and receive “nothing has changed” responses for each subsequent time it queried based on the messages it received. In addition to, or as an alternative, Service B could add some complexity to its message deduping logic and if a change message already exists for a given resource it could further “dedupe” those into a single message.

3 So, when I talk about the difficulties of scaling broker clusters, keeping them healthy, and strange issues that can crop up on imperfect networks, I speak from some experience.

4 I was actually pretty proud of our team. We did quite a few things to help protect the privacy of individuals in this setup. It was by no means perfect anonymization (if such a thing exists), but it was a whole lot better than nothing.