NATS Weekly #22

Byron Ruth

Published on Apr 18th, 2022

Week of April 11 - 17, 2022

🗞 Announcements, writings, and projects

A short list of  announcements, blog posts, projects updates and other news.

âš¡Releases

Official releases from NATS repos and others in the ecosystem.

📖 Articles

Blog posts, tutorials, or any other text-based content about NATS.

💬 Discussions

Github Discussions from various NATS repositories.

💡 Recently asked questions

Questions sourced from Slack, Twitter, or individuals. Responses and examples are in my own words, unless otherwise noted.

What is the expected behavior when network interruptions occur?

Since it is such an informative answer, I wanted to highlight Todd Beets response to a question (and so it does not get lost in the Slack void). Everything below is quoted from a Slack thread Todd responded to.

In general, the behavior [split brain] is benign and quickly recoverable: nodes immediately adjacent a gateway connection outage would see loss of subject interest from the other cluster's node.  During this time, some message publishes would not be sent over a Gateway link that otherwise would.  As the impacted nodes re-initiate a gateway connection, respective interest graphs would be automatically refresh.

In NATS core (i.e. minus JetStream), there is gossiped state (e.g. interest graph and sometimes Account claims), but otherwise no central or special "brains" -- NATS servers are a mesh of peers essentially.

NATS JetStream brings some additional state and "brains" if you will. State is constantly replicated with RAFT consensus (i.e. there is no central database in the architecture), but at any given time there are elected leader nodes at both a JetStream and a JetStream "Meta" level. A given JetStream always lives in a single cluster (i.e. it does not physically span across more than one cluster in a supercluster).   A Gateway outage does not impact a JetStream directly, but may temporarily impact access to the JetStream (from another Cluster).  The JetStream's ability to be operationally changed during such an outage may also be impacted (see next).

JetStream "Meta" leader acts at a JetStream domain level (up to supercluster scope) as a scheduler of new JetStreams and a head for certain JetStream management APIs.  Certain management functions can be impacted by a Gateway outage in the case that the current leader cannot reach enough active nodes -- across the supercluster -- to gain RAFT consensus for a new operational change.  Electing a new Meta leader is one example of a function that requires consensus.  Placing a new JetStream instance is another example.   Expanding or shrinking the number of replicas for an existing JetStream still another.  Already deployed JetStreams continue to run, but in some cases may not be operationally changed in the case of no current "Meta" leader reachable and/or a reachable "Meta" leader no longer has connectivity with enough nodes for RAFT quorum.

Even with JetStream and a multi region supercluster, there are strategies to insure that JetStream management can continue.  For instance, an odd number of clusters in a supercluster (assuming equal numbers of nodes per cluster), allows meta-leader level operations to continue in remaining two clusters, i.e. where one cluster is for a time unreachable.

What are the differences between the stream retention policies?

The original question that was posed in Slack was asking what the difference between a durable consumer with multiple subscribers (e.g. QueueSubscribe or PullSubscribe) on a "standard stream" versus using a "work queue stream". It boiled down to the semantics of the stream rentention policies themselves.

First, let's quickly revisit stream limits and retention policy. As the docs state, all streams have limits, specifically, upper bounds that can be set including MaxMsgs, MaxBytes, and MaxAge. If, your example, you define a MaxAge of 12h then only the messages having a timestamp from the last 12 hours (which is relative) will be visible to be consumed.

Regardless of the retention policy, these limits will be enforced. If you only need/want the limits as the retention policy, then the LimitsPolicy should be used when creating a stream (note, unless explicitly set all limits are set to unlimited by default).

js.AddStream(&nats.StreamConfig{
  Retention: nats.LimitsPolicy,
  Subjects: []string{"foo.>"},
  // etc..
})

Any number of consumers can be created for the stream, with any subset of the subject space bound to the stream, including overlapping subjects. In other words, each consumer is completely independent from others for an limits-based stream.

The second retention policy is called InterestPolicy. The interest applies to the set of consumers bound to the stream that have a SubjectFilter that match a given message's subject. That is, when a message is received, it will be retained an available to be consumed until all consumers that match the subject (via their filter) have consumed (ack'ed) the message. Once this is true, the message will be unavailable for future consumption.

This policy is best when there are a known set of consumers ahead of time that all need to do something with that message AND, unlike the limits-based policy, you only want the message to be kept around until all the consumers have completed the work.

Like the limits-based policy, there can be an arbitrary number of consumers with overlapping subjects (that are subsets of the stream).

Moving onto the WorkQueuePolicy.. this one is a bit more subtle. Since queue semantics expect a unit of work to be queued and then handled by one actor, we cannot have consumers having overlapping subjects since this would result in NATS distributed a message more than once.

For example, while a subject like foo.> and foo.bar.* would be fine for the limits and interest-based policies, this is not allowed for the work queue policy since these subjects overlap. Instead we would need something like foo.bar.* and foo.qux.* (or whatever disjoint subjects). This guarantees a message will only be delivered once to the respective consumer handling the subject space.

To revisit the original question about a queue subscription, the set of subscribers bound to a consumer is independent of the stream rentention policy. Assuming the consumer is configured properly relative to the rentention policy constrains (e.g. non-overlapping subjects), then one or multiple subscribers can be used against that consumer. In other words, the subscriber topology and the stream config are orthogonal concerns.

What is a strategy to indicate the content-type of a message payload?

This is a short tip I have put into practice myself, but Todd Beets also called out in response to a question.

Since NATS messages are payload agnostic (they take a byte array), as a client consuming a message, I would need a priori knowledge of the encoding. If you are in a situation where a message must be self-describing, then it could be useful to take a cue from HTTP here to include Content-* headers like Content-Type, Content-Language and/or Content-Encoding as NATS headers.

This could be extended and tailored for messaging to define a Content-Schema which would be Content-Type dependent, such as the location of a JSON Schema the message expects to adhere to for a JSON encoded message. This could serve a double purpose of making these schema available for meta programming or code generation on the consuming side.

How can I subscribe to multiple streams?

An underrated and under-explored feature (to me) is the ability to define a stream that can source messages from other streams automatically. There are some additional configuration options per source that can be setup, but the base example is as follows:

js.AddStream(&nats.StreamConfig{
  Name: "stream-1-2-3",
  Sources: []*StreamSource{
   {Name: "stream-1"},
   {Name: "stream-2"},
   {Name: "stream-3"},
  },
  // etc..
})

This creates a new stream that sources everything published to the source streams (as of the time of stream creation).

The reason this is powerful is because, today, you cannot create a consumer that spans multiple streams. You can of course create multiple consumers, a subscription per, and then consume them separately in your application, but depending on your comfort and programming language (some are easier than others to handle the concurrency), it can still be tricky to get right.

Instead, you could just create a new stream with it's own limits and retention policy for the use case at hand (i.e. why do you need to source from multiple streams in the first place?). And since its a normal stream, it can be replicated and any number of consumers can be created.

Another interesting capability it supports is also supporting its own set of Subjects that are bound to it. This effectively is a shortcut to needing to define a fourth standard stream and then sourcing from that.