NATS Weekly #12

Byron Ruth

Published on Feb 7th, 2022

Week of January 31 - February 6, 2022

🗞 Announcements, writings, and projects

A short list of  announcements, blog posts, projects updates and other news.

⚡Releases

⚙️ Projects️

  • Simple IoT - "Simple IoT cloud/edge application/framework." Just came across this on Twitter. v0.0.42 recently released!

🎬 Media

Alexandre Brandão Lustosa posted a series of videos on their YouTube Channel.

📖 Articles

💡 Recently asked questions

Questions sourced from Slack, Twitter, or individuals. Responses and examples are in my own words, unless otherwise noted.

How can I see the in-flight subjects that are being published to?

The need underlying this question was to determine if there are some clients who are publishing to the wrong subject due to some configuration error (e.g. a typo in the subject name). NATS does have a monitoring endpoint that can be enabled on servers, however it does not support listing out subjects that published to, only the active subscriptions.

To augment this monitoring, one basic solution is write a subscription that subscribes to all subjects that are in scope of your application. The basic form looks like this:

// Define context for NextMsg
ctx := context.Background()

// Subscribe to everything..
sub, _ := nc.SubscribeSync(">")
for {
    msg, err := sub.NextMsgWithContext(ctx)
    if err != nil {
        // handle err or just break
    }
    
    // Function to determine if the subject is of interest.
    if isRelevant(msg.Subject) {
        fmt.Println(msg.Subject)
    }
}

Since this question comes up fairly often, I created a standalone CLI. There is a release with pre-compiled binaries so feel free to try it out. The basic usage looks like this:

$ nats-subject-profiler -server demo.nats.io:4222

This will just listen for all subjects until interrupted and print out unique subjects (it deduplicates them as they come in). Just note that if you have a lot of traffic, the de-dupe structure will grow in memory over time. Use the -help option to see other available options.

What (publisher) ordering guarantees can I achieve with JetStream?

This was a question I thought about this week, so I figured I would write down a short summary. I plan to write a more thorough post with code examples in the future.

JetStream implements a custom Raft consensus protocol that is used any time a message is written to a stream. Streams that must be fault tolerate and/or highly available should be configured with either 3 or 5 replicas. Each stream is replicated independently and thus each have their own round(s) of consensus when a write occurs.

When a write is received and written, the total order of messages that have been written are guaranteed stable. This guarantee ensures consumers will all observe the same order of messages no matter how many times the stream is consumed (not including re-deliveries).

However, what ordering guarantees can we achieve on the publisher side? If we have multiple clients publishing messages to the same stream, can we control ordering? And if so, why would we want to?

The general problem I am currently working on is designing a distributed network of what are, effectively, tiny state machines. NATS is being used for messaging (commands, queries), event sourcing (modeling state transitions), and event streaming (non-state machine consumers).

Focusing on a single state machine for now, let's assume that it does not have a permanent location. Meaning any node in the network could load and transition the state machine. If a command C1 is received by node A which targets state machine P, it must load the current state S1 and then assess whether the command can be accepted as a valid transition. Assuming it is, an event is produced modeling the state transition and written to the stream.

What happens if another command C2 comes in for machine P at the same time and node B receives it? It would do the same thing of loading state S1 and evaluating the transition.

The question is, what happens when both nodes try to write the new event to the stream? Well, by default the stream will happily accept both writes in whichever order they are received. From the standpoint of the state machine, accepting both of the commands may have been invalid and the other may have been rejected if received sequentially.

Fortunately, we can solve this specific problem out of the box with NATS.

One concrete subject per machine

If we model the event stream for the state machine with a concrete subject, say machines.p, then we can efficiently read from it (using a subject filter) as well as write to it with an ordering guarantee. The general algorithm is:

  • Read all events from subject machines.p, derive the state (or load a snapshot)
  • Keep track of the last sequence number consumed from that subject
  • Apply the logic and produce an event
  • Publish the event to the subject and set the Nats-Expected-Last-Subject-Sequence header with the last sequence number that was observed previously

This provides the publisher order guarantee since concurrent writes with the same sequence would fail. To recover from a rejection, we can simply re-run the same algorithm. The read will now include the latest event the other publisher wrote and thus the current node can re-assess the command against the (new) current state. If the transition is no longer valid, it can reject to the client, otherwise it can produce a new event and attempt to write it to the stream again.

The combination of a concrete subject and the header provides a linearizable guarantee across writes on that subject.

Not all commands need to be coordinated

There may be certain commands that can be accepted under any circumstance and it is known (per the state machine) that the order of the resulting event, with respect to other events, doesn't matter (this is actually a fairly deep topic.. but we will defer this to another time).

In this situation, we can omit including the header on publish which means it will not get rejected. This does not, however, prevent other publishers (who do care about order) from still being rejected and retrying since the latest sequence is now different. Usually, if you have a mixture of commands some of which do care about explicit order and some not, it is easier to just always set the header rather than trying to design around it.

Tracking causal ordering

If commands can be accepted without the guarantee of the state being up-to-date, we still may want to track what was the last observed sequence by the machine. This can be done a couple ways, but the simplest one would be to add a header field such as Observed-Last-Subject-Sequence (for example).

An independent consumer can then be implemented to reconstruct the causal timeline and check if its different from the stream order (thus detecting concurrent writes). This header-based tracking is especially convenient with the HeadersOnly consumer option to ignore the data altogether.

Conflict detection and compensating actions

If this causal ordering is the preferred default we can take this tracking a step further to add conflict detection if ordering does matter in some circumstances. Detecting conflicts would either result in new command to perform compensating actions or some kind advisory event to notify a human to get involved.

Other read/write scenarios

Up to now, we have been focused on reading and writing to a single concrete subject within a stream, but the other scenario is reading from multiple subjects (pattern) and writing to one.

For example, if we had some cross-cutting command that needed to take into account state across all machines, the consumer might read from machines.* to derive its state. Given the command, it may then write to one of the concrete subjects, e.g. machine.p.

Again, depending on the guarantee we need, we can utilize the header noted above to guard the write at the subject level. However, if the command requires some kind of uniqueness guarantee across machines, we can use the Nats-Expected-Last-Sequence which is applied at the stream level.

Fortunately, this header does not prevent progress with other publishers writing to other subjects at the subject-level or not using this last-sequence header at all.

If causal ordering is sufficient, this header could be omitted and the last observed sequence header can be tracked with any necessary conflict detection.