NATS Weekly #15

Byron Ruth

Published on Feb 28th, 2022

Week of February 21 - 27, 2022

🗞 Announcements, writings, and projects

A short list of  announcements, blog posts, projects updates and other news.

âš¡Releases

Official releases from NATS repos and others in the ecosystem.

Two notable features in this release includes support for changing the number of replicas a stream has as well as tag-defined placement of streams across a cluster.

Changing stream replicas can be done through a simple stream config update. For the tag support, servers can now have a set of tags defined in server configuration and stream configuration can define a set of tags that can be used to place the stream on matching nodes. Previously, a set of JetStream-enabled nodes would be randomly selected for stream placement, but this now provides more control for specific clusters and/or geos.

📖 Articles

💬 Discussions

Github Discussions from various NATS repositories.

💡 Recently asked questions

Questions sourced from Slack, Twitter, or individuals. Responses and examples are in my own words, unless otherwise noted.

How big of a NATS cluster do I need?

The recommended standard setup consists of a three-node cluster within the same region, e.g. us-west1. This applies to JetStream-enabled nodes as well. This ensures that a stream with three replicas can be placed on each node and can tolerate one-node failure. If you have a desire to have up to five replicas, then the cluster can be proportionally sized. This is currently the hard limit on the number of replicas for a stream, both for practical reasons as well as performance considerations.

Beyond the standard cluster setup, there are a several use cases and options that could necessitate expanding the cluster. The two fairly well-known and documented options are gateways and leaf nodes.

Read the docs for a deeper dive, but a gateway essentially enables transparent connectivity across clusters in different regions. Gateways are what make it possible to have a client in the US that can transparently route it to a handler deployed in Japan, all using subject-based routing.

Leaf nodes act as local NATS server for an application that can connect into a larger cluster. This can be for consumption of public services and stream in the cluster and/or exposing of services and streams from the leaf node itself, when connected.

Another design consideration is the combination of standard NATS nodes and JS-enabled nodes. Specifically, a cluster can support both types in the same topology. If there is a lot of non-JS messaging, then standard nodes can be added easily. It is worth noting that many JS-enabled nodes can be added to a cluster and this is independent of the max replicas of a stream. For example, 10 JS-enabled nodes could be deployed that support 100 streams of which the replicas would be spread across the 10 nodes. Whether you need that many nodes is a different question.. but I am just saying it theoretically possible 😉.

However, what makes this many JS-enabled node deployment more interesting now is the newly added support for tag-based placement (see release notes above). If different JS-enabled nodes are deployed with different tags (even across clusters), then stream placement can be much more deliberate by choosing the tags corresponding to those nodes. Previously, stream placement was random.

Even with all these potential ways of modeling a perfect topology.. just start with a simple three-node cluster. Or if you are just getting started, a single node locally works perfectly well.

How can I bridge HTTP requests to NATS?

HTTP is the protocol of the Web. It has been the basis for serving basic HTML documents, SOAP, JSON APIs, etc. Other than a couple extensions, including WebSockets, and Server-Sent Events (SSE), HTTP is the foundation. Even protocols like gRPC and GraphQL rely on HTTP for transport, adding onto the protocol semantics.  

All that said, the structure of an HTTP message is simple. Since HTTP is fundamentally follows the request-reply pattern (clients always expect a response), it has two variations of the first line of the message. For requests, the "start line" indicates the method, the location, and the requested protocol version GET /api/info HTTP/2. It is worth calling out that the location is a relative path since it is assumed the TCP connection to the server has already been established, e.g. https://example.org. For a response, the "status line" indicates the protocol version used, the status code and corresponding text representation, e.g. HTTP/1.1 200 OK.

The two other sections are the headers and the body. The types of headers and their semantics are not relevant for a simple adaptor implementation.

A NATS message has a subject, supports headers, and a body. The basic structure of an implementation looks like this.

nc, _ := nats.Connect("localhost:4222")

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
  // Subject to bridge the request to. A generic approach
  // could be to encode the path to a dot-delimited structure
  // with the HTTP verb in the front, e.g. `get.api.info`. This
  // does not account for URL parameters
  //TODO..
  subject := "..."
  
  // Create a message with the header map allocated.
  req := nats.NewMsg(subject)
  
  // Set the headers from the request, if desired.
  for k, v := range r.Header {
    req.Header[k] = v
  }
  
  // Read the body and set on the message.
  // Note that the default allowed message size for NATS is 1MB
  // so the content-length should be checked or using a limit
  // reader to guard reading too large of a body. Another strategy
  // is to do chunking of the message if body are expected to be
  // large.
  data, _ := ioutil.ReadAll(r.Body)
  r.Body.Close()
  req.Data = data
  
  // Send a NATS request-reply using the existing request
  // context for timeout/cancellation.
  rep, _ := nc.RequestMsgWithContext(r.Context(), req)
  
  // Set HTTP response headers. These could be passed from
  // the reply header or set explicitly.
  w.Header().Set("content-type", "application/json")
  
  // An explicit X-Http-Status header could be used to relay
  // from the NATS handler if applicable. Otherwise an explicit
  // mapping would be required here.
  statusCode, _ := strconv.Atoi(rep.Header.Get("X-Http-Status"))
  rep.WriteStatus(statusCode)
  
  // Write the reply data to the HTTP response writer.
  w.Write(rep.Data)
})

The main decision is how to derive the subject from the request start line and relay status info from a reply message. Of course there is some error handling needed as well, but this is the basic structure.

The design choices will largely depend on how this will be used. Simply forwarding HTTP headers to a NATS handler or writing HTTP headers back from a reply would likely only be relevant if NATS is acting as a proxy for HTTP messages.

More often than not, an adaptor is needed to support HTTP clients (including Web browsers) that cannot use the NATS clients directly (browsers do have the additional option to use the native NATS WebSockets adaptor). If this is the case, then an explicit translation is likely more desired rather than blindly translating the HTTP message into a NATS message.

What is the purpose of the pull consumer timeout?

A pull consumer requires a subscriber to explicitly request a batch of messages (one or more). Accompanied with an optional max wait override.

msgs, err := sub.Fetch(10, nats.MaxWait(2 * time.Second))

What can be surprising is that this fetch will block until the full batch has been received or the max wait time has been reached. The behavior is that if any messages are received, these are returned without an error after max wait has been reached. If no messages have been received, then an error is returned.

With this behavior in mind, the question then becomes, how long do I want to wait for some batch size? The appropriate size largely depends on the expected message rate and the variability. In general, if the message rate is fairly constant, then N subscribers (for scale out) with a fixed batch size is likely fine. For example, 100 messages/sec with 10 subscribers means a batch size of 10 is likely suitable. Of course if processing the messages take some time, then that needs to be factored in.

If the message rate is irregular, then this is where the batch size and/or timeout can be a bit more dynamic to optimize latency or throughput. For bursts, we would want a larger batch size to reduce the chattiness of fetching, say, one message at a time. However, for sparse message times, we either want one message per fetch (reduce waiting, make some progress) or a large batch size with a lower timeout.

I have not done this personally, but in theory a sophisticated setup could rely on metrics to dynamically change the batch size or timeout based on the predicted message rate at some time of day. However it may also be simpler to just scale up or down the number of subscribers on the consumer, keeping the batch and timeout stable.