How to Replace a Microservice: Swapping Out the Engine in Mid-Flight

Photo by Michael VH via Flickr (CC BY 2.0)

Photo by Michael VH via Flickr (CC BY 2.0)

Introduction

So you’ve built an important new service. It satisfies current use cases and is also forward looking. Pretty soon it’s mission-critical, so you optimize where you can and keep it humming along.

Then the product changes, and your service needs to do something different. You don’t have time to rewrite from scratch (and you’re never supposed to do that anyway), so you refactor what you can, then start bolting stuff on.

A couple of years later, the product changes again. This time, refactoring isn’t going to cut it: the entire design needs to change. So you take a deep breath and make your case for the big rewrite. The team is justifiably skeptical, but you’re persuasive, and everyone signs off.

Great! The service is going to be so much better this time. With two years of domain knowledge, you know where all the landmines are, you know the stack and the usage patterns like the back of your hand.

Still, it’s slow-going. A bunch of other services now depend on the existing interface in production. Downtime is unacceptable, and a flip-the-switch cutover is just asking for trouble.

Now what?

The Knewton Graph Store

Knewton’s mission is to personalize education for everyone. To do that, Knewton organizes educational content in graph form that shows how knowledge builds on itself. Graphing is a cornerstone of what gives our platform its power.

Knewton’s graph store initially used an event-based model. In order to modify a graph, internal client code would do the following operations:

  • Read graph data using a library built on Titan (a property graph library with transactions and pluggable datastores) and backed by a Cassandra cluster.
  • Make the desired modifications and commit the Titan transaction.
  • Intercept the Titan transaction, package the modifications up into a delta format and write the delta to a queue.
  • Consumers of the queue would pick up the delta messages and write them back into another Cassandra database in different forms.

By the middle of 2015, Knewton was using graphs in ways that required much lower latencies, which meant that frequent round trips to a database wouldn’t work any more. Graphs had grown significantly bigger, which had caused problems with storage and database schemas. Beyond that, many smaller problems also made the service difficult to work with. So we undertook a project to rewrite the graph store.

Design and Development

The design and development of the new service were relatively painless. It’s tempting to claim that this was purely due to how fantastic Knewton’s engineers are (although we are and you should come work with us), but it’s worth noting how important hindsight was to the process. Having seen how the graph store was used in the real world helped us know what to build, what to fix, and what to avoid. One of Knewton’s core values is to “ship and learn”, and after several years of seeing the graph store evolve, we knew what we needed and built the new service accordingly.

Migration

Migration was by far the most complicated phase of the project. It was composed of two parts: migrating the existing data to the new service, then migrating clients to the new service.

Migrating the data

Best practices for doing data migration usually specify a five-step process:

  1. Set up the new datastore (or new table or schema).
  2. Modify existing business logic to write to both the old and new datastores while still reading from the old one.
  3. Backfill historical data into the new datastore.
  4. Start reading from the new datastore.
  5. Stop writing to the old datastore.

Due to the event-sourced nature of writes, we were able to accomplish Step 2 by adding a new consumer to the events topic. Knewton uses Kafka as our main messaging platform, and one of its most useful features is the ability to have multiple consumers subscribe to the same message stream at the same time. This allowed us to mirror incoming data to both the new and old datastores with a minimum of trouble.

Difficulties arose in Step 3 due to issues with old data. Midway through the migration, we discovered that data stored several years earlier didn’t match current schemas. The validation logic had evolved quite a bit, but the historical data had never been cleansed.

This absence of strict validation mechanisms is a downside of many NoSQL solutions: since the database isn’t enforcing any schemas or constraints, the burden falls entirely on application logic, which typically changes much more quickly than schemas do.

Migrating the data ended up requiring several weeks of time-consuming manual investigation and data introspection.

Migrating the clients

Though ultimately worth it, migrating the clients off of the old service took a lot of time and effort because of all the necessary cross team coordination. When trying to switch clients onto a new interface for your services, it’s customary to leave the old interfaces available for a period of time to allow other teams some breathing room to do their migration. While we were able to accommodate our clients using this strategy, unforeseen interdependencies between clients caused us to have to spend several more weeks than planned coordinating the final rollout.

Results

In the end, we were able to serve all of our clients’ needs while simplifying the system and cutting hosting expenses by more than 70%. Knewton completed the migration with no data loss, no downtime, and no outages. Knewton is now able to handle significantly larger graphs and build new features that enable our partners to do things that weren’t possible before.

Lessons Learned

Migrating takes longer than you think

Migration is often the phase of the project with most uncertainty, so it’s difficult to forecast accurately. It’s easy to assume that the difficulty of migrating to a new service will purely proportional to the amount of data being moved, but there are several other factors to keep in mind:

Datastore differences

Moving between different datastores makes the migration much more complicated. You have to account for the particular mechanisms, guarantees, and performance characteristics of your new datastore. Changing datastores also typically means that you have to rewrite tooling and documentation as well.

Data hygiene

Data will get dirty. There are always going to be a few objects or rows that don’t exactly conform to the rules you expect them to, especially in NoSQL databases where hygiene mechanisms such as constraints and foreign keys often don’t exist. Unless you’ve taken great pains in the past to make sure your data is exactly what you think it is, migration time is when you’ll find out which of your assumptions aren’t true.

Clients

You have much less control over the clients of your service than you do over your own code. Clients have their own services and their own schedules, and you’re the one throwing in a monkey wrench by changing one of their dependencies. There are several ways to accommodate clients, but each approach has costs:

  • Be flexible about cutover deadlines
    • Clients have their own schedules to manage,so it’s a good idea to give them some breathing room to do the migration. A hard cutover date is a recipe for problems.
    • Tradeoff: total migration time (and thus the total time you have to maintain the old service for backwards compatibility) can stretch out longer.
  • Get feedback early and continuously
    • A good way to avoid any unforeseen issues in client migration is to continually solicit feedback on both technical considerations (like interface design and documentation) and project management considerations (like timing and scheduling).
    • Tradeoff: communication requires a lot of energy and effort from all involved parties, with a payoff that is not immediately visible.
  • Minimize the need for coordination
    • Coordination across multiple teams is difficult, and even more so with scale. In general, it’s a good practice to avoid it because it reduces the amount of time any one team spends waiting for other teams to complete precursor tasks.
    • Trade-off: Enabling all clients to complete the migration independently requires more effort and planning from the service owner, especially in response to unforeseen events.

Rewrite only when you need to rearchitect

It’s sometimes difficult to judge when to rewrite a service from the ground up, but a good rule of thumb is to rewrite only if the system’s architecture itself is blocking your progress. It can be easy to convince yourself that the code may be confusing, or that the data model doesn’t make sense, or that there’s too much technical debt, but all those issues are solvable “in place.”

If you can refactor the code, refactor the code. If you can change your database schema incrementally, change your schema. When you can’t move forward any more without changing the architecture, it’s time to think about a rewrite.

Don’t look too far down the road

One of the easiest traps to fall into in software engineering is to build a service (or feature or library or tool) because “it might be needed down the road.” After all, software engineers are supposed to be forward looking and anticipate issues before they occur. But the further you look, the less clearly you can see. It’s important to resist temptation and build only those things that you know you need now (or, at most, in the immediate future).

Simplifying Cassandra Heap Size Allocation

As discussed previously, Knewton has a large Cassandra deployment to meet its data store needs. Despite best efforts to standardize configurations across the deployment, the systems are in a near-constant flux. In particular, upgrading the version of Cassandra may be quick for a single cluster but doing so for the entire deployment is usually a lengthy process, due to the overhead involved in bringing legacy stacks up to date.

At the time of this writing, Knewton is in the final stages of a system-wide Cassandra upgrade from Version 1.2 to 2.1. Due to a breaking change in the underlying SSTable format introduced in Version 2.1, this transition must be performed in stages, first from 1.2 to 2.0, then 2.0 to 2.1. While the SSTable format change does not have a direct bearing on the topic of interest here, another change introduced in Version 2.0 is of material importance.

This post describes some nuances of Cassandra’s JVM (maximum) heap memory allocation that we discovered along the way, and how we handled aspects that posed a threat to our systems.

A problem case

Our journey of discovery began with a struggle to address crashes on a particular cluster due to running out of heap memory. For ease of reference, let’s call this cluster Knerd. In our AWS infrastructure, the Knerd cluster was running on m4.large nodes having 8 GB of memory, and by default Cassandra was allocating 4 GB of heap to each node. After exhausting other options, we decided to migrate the cluster to m4.xlarge nodes with 16 GB of memory, believing that these larger nodes would be allocated 8 GB of heap.

Imagine our surprise to see the new nodes come up with 4 GB of heap, same as before.

Default heap allocation in Cassandra 1.2

This phenomenon forced us to take a harder look at the code behind the heap allocation, contained in the cassandra-env.sh config file. The relevant code block, as it ships with Cassandra 1.2, is contained in the function calculate_heap_sizes:

   # set max heap size based on the following
   # max(min(1/2 ram, 4096MB), min(1/4 ram, 8GB))
   # calculate 1/2 ram and cap to 4096MB
   # calculate 1/4 ram and cap to 8192MB
   # pick the max
   half_system_memory_in_mb=`expr $system_memory_in_mb / 2`
   quarter_system_memory_in_mb=`expr $half_system_memory_in_mb / 2`
   if [ "$half_system_memory_in_mb" -gt "4096" ]
   then
       half_system_memory_in_mb="4096"
   fi
   if [ "$quarter_system_memory_in_mb" -gt "8192" ]
   then
       quarter_system_memory_in_mb="8192"
   fi
   if [ "$half_system_memory_in_mb" -gt "$quarter_system_memory_in_mb" ]
   then
       max_heap_size_in_mb="$half_system_memory_in_mb"
   else
       max_heap_size_in_mb="$quarter_system_memory_in_mb"
   fi
   MAX_HEAP_SIZE="${max_heap_size_in_mb}M"

In short, this code block imposes a cap on each of two parameters, then uses the larger parameter as the maximum heap size.

The graph below shows the heap size in GB as a function of the system memory (also in GB), up to the absolute cap of an 8 GB heap. (We’ll focus on integer values for ease of discussion.)

cassandra-2-1-default-heap-allocation

The heap size scales linearly as half the system memory up to 4 GB, then plateaus for systems with total memory between 8 GB and 16 GB. This is precisely the range of values for which we observed a steady maximum heap size of 4 GB on the Knerd cluster.

This plateau is a direct consequence of the half-memory parameter in the shell code being capped at a smaller value than the quarter-memory parameter. While scrutiny of the shell code above will show exactly the behavior displayed in the graph, a new user is unlikely to see this coming, especially in the typical NoSQL climate with short time scales and developer-managed data stores. Indeed, since Cassandra’s heap scales with system memory, it is reasonable to assume that it will scale smoothly for all values of the system memory below the cap. The plateau is at odds with this assumption, and therefore constitutes behavior that might be deemed unpredictable or counterintuitive, and can disrupt provisioning and capacity planning. In other words, this internal Cassandra feature violates the Principle of Least Surprise.

Beyond the plateau, the curve begins to rise linearly again, this time scaling as one-quarter of system memory. Then, at a system memory of 32 GB, the heap size reaches the absolute cap of 8 GB.

Modified heap allocation in Cassandra 1.2

If you’re using Cassandra 1.2, we strongly recommend doing something about this plateau in order for your nodes to scale predictably. Here are some options.

The 8 GB cap is generally desirable, because if the heap is too large then stop-the-world garbage collection cycles will take longer and can cause downstream failures. On the other hand, the system memory value upon reaching the cap is not terribly important. It could be 32 GB, as in the default scheme, or it could be 24 GB with little noticeable difference. Thus the plateau can be eliminated in one of two ways:

  1. Keep the remaining features, shifting the transition points to lower system memory values.cassandra-2-1-modified-heap-no-plateau
  2. Take the average over the values less than system memory of 16 GB, which yields a simple one-quarter slope up to the cap.

cassandra-2-1-modified-heap-constant-slope

Averaging is simpler, but it may not be suitable for small values of system memory.  Either approach can be achieved with a simple set of conditionals specifying the scaling for each segment; for example, the first could be coded like

   if [ "$system_memory_in_mb" -lt "8192" ]
   then
       max_heap_size_in_mb="$half_system_memory_in_mb"
   elif [ "$system_memory_in_mb" -lt "24576" ]
   then
       max_heap_size_in_mb="$quarter_system_memory_in_mb"
   else
       max_heap_size_in_mb="8192"
   fi

Either approach eliminates the undesirable plateau. An upgrade to Cassandra 2.0 or above also effectively removes its impact.

Heap allocation in Cassandra 2.0+

Cassandra 2.0 introduced a change in heap allocation, although this change was easy to miss since it did not get highlighted in the What’s new in Cassandra post for 2.0.

The change is a small one: the cap of the one-half-memory variable was changed from 4 GB to 1 GB, such that the plateau occurs at a heap size of 1 GB and extends from system memory values of 2 GB to 4 GB. While the plateau still exists, it is no longer an issue for systems having memory greater than 4 GB. Production systems will doubtless be running with more memory than this, so upgrading alone will solve the scaling predictability issue in nearly all cases.

However, systems previously operating with system memory at or below 16 GB will have a smaller heap after upgrading from 1.2 to 2.0+. This is because the heap now scales as one-quarter of the system memory for a broader range of memory values than it did on Cassandra 1.2. Although the picture hasn’t changed for high-memory systems, this shift to smaller heaps can cause further problems on systems with relatively small node memory, such as the Knerd cluster.

Moving memtables off heap in Cassandra 2.1

The change in the cap in Cassandra 2.0 may be related to the effort to move more Cassandra components off of the JVM heap. Version 2.0 offered off-heap partition summaries, and version 2.1 introduced the ability to move memtables off heap. These features make a smaller heap size desirable in order to make room for the memtable components in off-heap memory.

Your heap may behave better, at the tradeoff of possible out-of-memory events at the system level. Since each Java process allocates a separate heap, other Java processes running on the same machine (e.g. Datastax OpsCenter) will result in less memory available for Cassandra’s off-heap components. In the extreme case, the operating system may kill the Cassandra process or crash altogether, producing identical behavior to the heap-crash case from the perspective of the application (i.e. Cassandra is not available on that node).

Knewton’s approach

At first, we applied a Band-Aid fix in Cassandra 1.2 to always allocate a heap size equal to half the system memory. This worked reasonably well, until migrating to 2.1. The relevant change to configuration templates had only been made for Cassandra 1.2, and other versions fell back to the default allocation code (which is how we discovered the properties described in this post).

That’s when we decided it was time to do some testing. We had three options:

  1. Move only the memtable buffers off heap (offheap_buffers);
  2. Move both the memtable buffers and the objects off heap (offheap_objects);
  3. Leave the memtables on heap and raise the heap size.

We regarded raising the heap size as our fallback, since we knew how the system would behave if we did that. Due to considerations related to scaling (e.g. data size growth), we weren’t very confident that any testing would yield a clear choice of how large to make the heap. In the end, then, this amounted to simply raising the maximum heap size to half the system memory. While doing so would be nice and simple, we hesitated to make our Cassandra installation deviate this much from the base installation.

So we experimented with off-heap memtables. As noted earlier, moving components off of the heap appears to be related to why the maximum heap size calculation was changed in the first place. Moreover, moving both the memtable buffers and the objects off heap is slated to become the default behavior in a future release of Cassandra, which led us to believe that this option’s performance is good enough that it is recommended in most cases.

Both offheap_buffers and offheap_objects showed good results in tests, reducing the stress on the heap and system performance overall relative to heap_buffers (the default configuration). As expected, the GC time was lowest for offheap_objects, as was the end-to-end processing time. To our surprise, however, offheap_buffers was the star performer. This setting yielded the best numbers while maintaining the most consistent read and write rates. In some of these measures, such as percentage of heap used and GC pause time, offheap_buffers narrowly edged out offheap_objects. In others, notably write latency, bloom filter false-positive ratio, and average response time, offheap_buffers was the clear winner. This performance was enough to convince us to move forward with offheap_buffers.

After deploying this change, we mostly saw positive results. Unfortunately, we did still sometimes see some transient high-heap events, mostly on clusters with the large data sets. On the other hand, these events were not accompanied by crashes. This caused us to reevaluate our longstanding alert threshold for heap usage, whereupon we concluded that our threshold needed some tuning. This resolved the remainder of the issue.

In summary, investigation of some puzzling behavior revealed unexpected heap allocation details in Cassandra’s internal configuration. Studying the configuration code allowed us to evaluate our options to address the behavior we originally observed. Our goal was to yield more predictable heap allocation results when scaling a cluster without impeding Cassandra’s performance. We achieved this by moving the memtable buffers off heap, which is possible in Cassandra 2.1+.

Digging Deep Into Cassandra Thrift Buffer Behavior

Everyone who works in tech has had to debug a problem.  Hopefully it is as simple as looking into a log file, but many times it is not.  Sometimes the problem goes away and sometimes it only looks like it goes away.  Other times it might not look like a problem at all.  A lot of factors will go into deciding if you need to investigate, and how deep you need to go.  These investigations can take a lot of resources for an organization and there is always the chance of coming up empty handed which should never be seen as a failure.

This post will summarize an investigation into some Cassandra memory behavior that the database team at Knewton conducted recently.  It is a good illustration of the kind of multi-pronged approach needed to unravel strange behaviors low in the stack.  While we will be specifically talking about Cassandra running on AWS the takeaways from this article are applicable to systems running on different platforms as well.

Uncovering a Problem

Background

Knewton had a Cassandra cluster that was very overprovisioned. The instances were scaled up (each instance is made more powerful) rather than scaled out (more instances are added to the cluster).  Cassandra is a horizontally scalable datastore and it benefits from having more machines rather than better ones.

So we added more machines and reduced the power of each one. We now had 4GB of available memory for the Java heap instead of 8GB.  This configuration worked well in all of our tests and in other clusters but in this specific overprovisioned cluster we found we had scaled each node down too much, so we moved to machines that could accommodate 8GB of heap, m4.xlarge instances.

Anomalous Behavior on New Instances

After moving all nodes over to m4.xl instances we saw our heap situation stabilize. However we began to notice anomalous CPU and load averages across the cluster.  Some nodes would run higher than other nodes.  The metrics showed that, out of the four cores on a m4.xl instance, one was  completely saturated.

If you saw this load on its own you would not think that it is a problem. Total usage  of CPU on the box is at 25% and there are no catastrophically long GC pauses.  However, the cluster was not uniform, which called for further investigation.

In these CPU graphs, you can see nodes running on low CPU that encountered something that would rapidly promote them into the high CPU band, and they would almost never drop back down.

cassandra-thrift-buffer-high-cpu-band

 

We found that this graph was correlated with the graph of average garbage collection time.

cassandra-thrift-buffer-garbage-collection-high-cpu

When nodes promote themselves into the high CPU band, their GC times spike.

What is Cassandra holding in its heap that is causing GC issues?  With a lot of overhead in memory and CPU, crashes are not a risk, but performance is far from where we want to be.

Before tackling the garbage collection question, we have two other questions that we can answer already:

Why is this behavior showing up now?

We had an 8GB heap before and should have seen the same behavior.  The reason we only saw this CPU and GC behavior once on m4.xlarge instances is twofold:

  1. Something unique in this cluster caused a 4GB heap to be fatal but an 8GB heap to be adequate.  Expanding the heap did not get rid of the problem.
  2. The original cluster that had an 8GB heap was around for years and all nodes were promoted to the high CPU band.  The baseline operation of the cluster looked like the high CPU band.  It is only because of previous issues with provisioning this cluster that we were  watching closely when we moved to these m4.xlarge instances.  This highlights the importance of understanding your baseline resource utilization and not assuming that it means you are in a healthy state.

Why is this behavior a problem?

Even though the high usage band is not a problematic load, the behavior of the cluster was unexpected, and this promotion behavior is problematic.  The biggest reason that is problematic is that it meant we had a nondeterministic factor in our latencies.  We can expect different performance on a node that is in the high CPU band than one in the low CPU usage band.  However we cannot predict when a node will promote itself as we don’t know the cause for this behavior.  So we have an unknown factor in our system, which is dangerous for repeatability and reliability.

Investigation

Investigating further is resource intensive, and often your senior engineering staff has to undertake the investigation, as it requires some degree of independence and experience.  So make sure you decide that the  problem is  actually worth the time of your organization before sinking days or weeks of engineering time into it.

Logs

The first thing to do is to look through the relevant logs.  Looking through the Cassandra logs we found, unsurprisingly, a lot of garbage collections.  We’d had GC logging on and found several “large block” messages in the CMS garbage collection logs at about the time these promotions happened.  To get more visibility into what is actually in the heap, we turned on GC class histogram logs, but these said that almost all the memory was being held in byte arrays.

Not helpful.  Time to go even deeper and see what is in the heap.

Heap Dump

So we took a heap dump from a problematic node and on a node that was exhibiting good GC behavior as a control.  A heap dump is the size of the used Java heap. Dumping the heap is a “stop the world” operation for the process you are dumping, so when doing this in production be sure that the service can be unresponsive for a minute or more.  The file is binary and examining it is labor intensive, so it is best to move the heap dump to a computer that’s not being used in production to investigate.

We used the Eclipse Memory Analyzer Tool (MAT) to investigate the heap dumps and found that almost all of the memory was taken up by the write buffer of the TframedTransport objects.  There were several hundred of these objects and the write buffer size ranged from 1kB to 30MB, with many in the range of 20-30MB.  We saw similar objects in the heap dump of the control node, but not nearly as many.  The write buffer contains what is being written to the Thrift transport socket and does not correspond to Cassandra reads and writes.  In this case Cassandra is writing the output of an incoming read to this buffer to send to the client that has requested the data.

It became pretty clear that this was a Thrift protocol issue so we searched for related issues.

cassandra-thrift-buffer-related-issues

Literature Search

Any problem you find has been reported in some way or another by someone else, especially if you are using anything but the latest versions of open-source software,  It is very useful to search the web for related problems at every step of the investigation, but as you get more specific you might uncover things that previously you would not have encountered.  In this case the investigation led us to the Thrift protocol, which is not something we would have searched for earlier.

The Thrift library that our version of Cassandra used had some memory issues referenced in CASSANDRA-4896. These are referenced again in more detail and resolved in  CASSANDRA-8685.  So you don’t have to read through the tickets, basically the write buffer — the thing taking up all of our space and the likely cause of our issues — will grow to accommodate a larger payload but never shrinks back down. So once an operation is requested of Cassandra that has a large payload, such as a query returning a 20MB record, these buffers increase in size and never drop back down.

This behavior lines up with what we are seeing in the cluster, where a node could be operating smoothly until a large set of queries come in and expand the write buffers on the Thrift frame objects.  Once the frame size increases, it stays big for the duration of the connection with the client. (We’ll come back to this point shortly).

This Thrift behavior is noted in THRIFT-1457 and a fix is out for it. The Thrift library fix is also rolled into Cassandra 2.1.3 and later.  This is also only in Thrift so any clients using CQL won’t see this.

Verify the Hypothesis

The link between this memory behavior and client connections was a working hypothesis. We got a very strong signal supporting the theory one morning when we deployed a new version of services that communicated with our cluster.  This deployment meant closing all of the Cassandra connections on the client side during a low traffic period and then reopening them.  Cold caches cause a lot of queries to occur during a deploy, but these queries do not return large blocks of data, so the TframedTransport write buffers do not expand. Memory stays low until we start getting production traffic, which then expands the buffers as described above.

cassandra-thrift-buffer-cpu-usage

The CPU and GC graphs from the deploy at 10:45 show that we have almost no traffic beforehand.  We’re still holding onto these large memory objects and they aren’t cleaned up because the TframedTransport objects live as long as the corresponding client connection.

cassandra-thrift-buffer-garbage-collection-time

Once the client connections close, the objects are garbage collected and the new connections start with small TframedTransport write buffers. So we see low heap and GC times at about 10:45.

cassandra-thrift-buffer-heap-size-by-node

There is a dramatic heap usage drop when the deploy happens at 10:45.

Conclusions

Technical Wrapup

The behavior of these TframedTransport buffers makes the Thrift protocol in pre-2.1 Cassandra dangerous.  Each connection holds a TframedTransport object in the Cassandra heap that can grow depending on the query executed but cannot shrink.  Memory then begins to fill with these TframedTransport objects as they cannot be cleaned out as long as the connection to the client is open.  If the client connection is severed by a client restart or a deploy, these objects get garbage collected.  New connections begin with a small TframedTransport object size which behaves well with GC / Heap / CPU on the node until a large query comes in which then expands the write buffer on the TframedTransport object again,  promoting the node to a high-CPU-usage node and damaging its performance and response time.  Eventually all nodes in the cluster reach this high-CPU-usage band and the extra resources taken up by these large TframedTransport objects become a burden on the performance of our database.

Next Steps

Because these buffers never decrease in size, the memory usage of the Cassandra instance is bounded only by the number of connections to the cluster, which is determined by the number of active clients.  Our previous 4GB heap was unable to accommodate the number of clients in Knewton’s infrastructure so it would perpetually crash, however 8GB is sufficient.

Once we have identified the source of the problem, we have to decide what, if anything, to do about it. Some options would be:

  • Upgrading to Cassandra 2.1.3 or later.
  • Migrating to CQL from Thrift

We know that the behavior is not fatal to our processes, so depending on the opportunity cost of each of these processes, we can decide if we will live with this behavior or if we are going to fix it.

Cost Analysis

It is difficult to quantify how much an investigation like this costs in terms of time, opportunity cost, and money to an organization.  Before starting an investigation, it is useful to timebox the project and restrict yourself to a certain number of engineering hours to investigate.  In hindsight one might quantify the cost of this investigation as:

Monetary Cost = Engineering Time – Hardware Optimization

In this case we will need less hardware once this memory issue is resolved, perhaps three fewer m4.xlarge instances.  The yearlong cost of a reserved m4.xl instance is currently $1,200. We spent about three engineer days investigating.  Estimating engineering costs at $200 an hour (including overhead and opportunity cost to the organization) each engineering day is $1,600.  So over the year, we spent about $1,200 more on the investigation than we recovered from optimizations.

Final Remarks

Beyond monetary considerations, we have a better understanding of our system, which benefits us in several ways.  We are able to adjust our alerts to better monitor the system.  We can justify our instance sizes.  And finally we are able to support larger engineering objectives, advising the organization on the best course of action for future projects.  For example, we decided to move aggressively to upgrade this cluster to 2.1.

Deep investigations like this do not come with guarantees of strong takeaways or definitive answers.  They are expensive in terms of time and often require senior engineers to dig into deep system internals.  Yet at Knewton we have found that these investigations are valuable for the database team and the wider tech organization.

Introducing the Knewton Client Library

Why Build a Client Library?

As part of Knewton’s mission to personalize learning for everyone, Knewton strives to provide an easy-to-use API that our partners can leverage in order to experience the power of adaptive learning. Knewton’s API follows industry norms and standards for authentication and authorization protocols, error handling, rate limiting, and API structure.

To authenticate incoming API requests from client applications, Knewton uses two-legged OAuth 2.0. Currently, each partner has to implement OAuth 2.0 on the client side from scratch, which may increase the potential of authentication failures when users try to log in.

Partners also need to be well-versed in Knewton’s API standards. Using the right API endpoint along with valid credentials is critical to ensuring that the client application scales effectively upon launching. Beyond the initial integration, partners are generally cautious about upgrading their applications when Knewton releases updates or new endpoints.

To help simplify and standardize development of client applications, Knewton is starting to pilot the Knewton Client Library (KCL). The KCL boilerplate code will allow partners to build client applications faster and more efficiently. Implementing KCL in client applications will allow partners to devote more resources to the actual client application and enriching the overall experience of the students and teachers who use it. Using KCL will also offer partners a smoother upgrade path, since Knewton will absorb any backwards compatibility complexities into the library.

Components of the Knewton Client Library

The Knewton Client Library uses the Swagger framework. Utilizing the contract first method of SOA development, Swagger provides a set of tools for REST APIs. It has a standardized definition format which can be used to generate code and documentation.

A centralized API definition ties together 1) server side implementation, 2) client code generation, and 3) API documentation, ensuring that they stay in sync and reducing time spent writing boilerplate code.

A centralized API definition ties together 1) server side implementation, 2) client code generation, and 3) API documentation, ensuring that they stay in sync and reducing time spent writing boilerplate code.

As part of the integration process, our partners must create and register student accounts through the Knewton API. The following Swagger definition is a simplified example of a dummy endpoint to create a student account, POST /student.

# Path
/student:
  post:
    summary: Create a student
    operationId: createStudent  
    parameters:
      - name: StudentCreatePayload
        in: body
        required: true
        schema:
          properties:
            name:
              description: Student's first and last name
              type: string
            courses:
              description: Array of Course IDs
              type: array
              items:
                type: integer  
            date_created:
              description: Date student was added
              type: string
              format: date
    responses:
      200:
        description: On success, returns no content

This example describes a dummy POST REST endpoint which takes in a JSON object with a student’s name, courses, and creation date. A request to such an endpoint might look like:

curl -H "Content-Type: application/json" -X POST -d '{"name":"Knewton Student","courses":[19, 123],"date_created":"September 1, 2016",}' http://knewton-dummy-api.com/student

The Knewton Java Client Library

The swagger-codegen library can generate API clients in many popular programming languages from a definition file. Let’s take a look at how we use this to generate the Knewton Java Client Library.

We use the swagger-codegen library to generate a Java Retrofit API interface from our API definition. Retrofit uses annotations on interface methods and parameters to describe the API.  These are then parsed by an HTTP client to execute requests.

Here is the Retrofit interface, which was automatically generated from the student creation endpoint definition. The interface matches the Swagger definition above: it defines a POST request to the “/student” endpoint, which takes a StudentCreatePayload (containing name, courses, and date created) as a parameter that is automatically serialized to JSON and added to the request body.

public interface Api {

    @POST("/student")
    Void createStudent(
        @Body StudentCreatePayload studentCreatePayload
    );
}

The auto-generated code provides a baseline client implementation. To make KCL as simple to use as possible, KCL wraps the auto-generated client with higher-level functionality such as authentication and rate limiting. The wrapper also refactors methods, parameters, and return objects to provide a more natural interface for Java developers.

For example, while Java has the java.util.Date object, the autogenerated code uses a String. The wrapper allows users to pass java.util.Date objects and will restructure them to the strings that the Swagger-generated API class expects.

Continuing with the student creation example above, we can further simplify the partner-facing interface. The API endpoint requires that the caller provides the date that the student record is created. There is no need for KCL to require the current date, as this can be determined automatically. Therefore the only parameters in our wrapped createStudent method are the student name and course list.

public void createStudent(String name, List<Integer> courses) {
    Date dateCreated = new Date();
    StudentCreatePayload studentCreatePayload = new StudentCreatePayload();
    studentCreatePayload.setName(name);
    studentCreatePayload.setCourses(courses);
    studentCreatePayload.setDateCreated(dateCreated.toString());
    generatedApi.createStudent(studentCreatePayload);
}

Instead of writing all of the code in the create student method, a partner application can now simply call wrapper.createStudent(name, courses);

OAuth, Retrying, and Logging

KCL uses OkHttp as its underlying client library.  OkHttp integrates well with Retrofit and provides an easy way to extend its functionality via Retrofit’s interceptor interface. Each client can have multiple interceptors. Each interceptor can modify a request or response at any point. For example, an interceptor could monitor, amend, and retry requests based on information read from responses.

As the diagram below illustrates, OkHttp nests interceptors added to a client, so the first interceptor to process a request will be the last interceptor to process a response.

KCL uses interceptors for logging, OAuth and retrying on rate limiting.

KCL uses interceptors for logging, OAuth and retrying on rate limiting.

Logging Interceptor

The logging interceptor is the simplest interceptor because it does not modify or react to a request or response. This interceptor logs information from the request made to Knewton and the response received from Knewton.

The following is a simplified example of the KCL logging interceptor. This sample interceptor logs the request headers and request body, as well as the response code.

@Override
public Response intercept(Chain chain) {
    Request request = chain.request();

    // log request information
    logger.log("Request Headers: " + request.headers());
    logger.log("Request Body: " + request.body());

    // Continue with the interceptor chain
    Response response = chain.proceed(request);

    // log response information
    logger.log("Response Code: " + response.code());

    return response;

}

OAuth is an industry standard authentication protocol used for API authentication.

KCL includes an OAuth interceptor to handle authentication. If a user is already authenticated, there will be an authorization header on the request and the interceptor will no-op. If there is no authorization header, two possible cases would require re-authentication. Either the user does not have an access token in which case KCL requests one, or the user’s access token is not working. Either the access token has expired, in which case KCL uses the refresh token to request a new access token, or the refresh token has expired, in which case KCL must request a new authorization altogether.

Here is a sample OAuth interceptor:

knewton-client-library-oauth-flowchart

@Override
public Response intercept(Chain chain) {
    Request request = chain.request();

    // Already authorized (no-op)
    if (request.header("Authorization") != null) {
    return chain.proceed(request);
    }

    // Get a new auth and add it to the request
    Auth auth = authCache.getAuth();
    request.addHeader("Authorization", auth);

    Response response = chain.proceed(request);

    // Access token expired
    if (response.code() == 401) {
        auth = authCache.getNewAccessToken();
        request.addHeader("Authorization", auth);
        response = chain.proceed(request);
    }

    // Refresh token expired
    if (response.code() == 401) {
        auth = authCache.getNewRefreshToken();
        request.addHeader("Authorization", auth);
        response = chain.proceed(request);
    }

    return response;

}

Rate Limiting and Entity Limiting

Knewton enforces a set of rate limits to ensure that no single user can overwhelm the API with traffic and shut out other partners. After a specified number of requests per minute for a given endpoint, the Knewton API will respond with HTTP 429 errors, indicating that the request has been rate-limited and rejected. It can be retried in the next minute.

Some requests to Knewton’s API are not so time-sensitive that a delay of up to a minute would affect users. Therefore, KCL has incorporated automatic configurable retrying after a time increment specified by the partner.

This is an example of one retry.

private static final int HTTP_TOO_MANY_REQUESTS = 429;

@Override
public Response intercept(Chain chain) {
    Request request = chain.request();
    Response response = chain.proceed(request);

    if (response.code() == HTTP_TOO_MANY_REQUESTS) {
        Thread.sleep(RETRY_WAIT_TIME);
        response = chain.proceed(request);
    }

    return response;

}

If KCL continues to receive HTTP 429s, it will repeat this process until it times out. Since partners can configure the timeline for this process, they do not have to worry about handling these errors in their client applications.

Conclusion

With the Knewton Client Library, partners can more easily integrate with the Knewton adaptive learning platform, and consequently focus their efforts on delivering a great user experience to students and teachers accessing their applications. KCL also allows partners to take advantage of new features as Knewton releases them.

Thank you to Aditya Subramaniam and Paul Sastrasinh for their contributions to this blog post.

System Design Documents at Knewton: RFCs

The team is in agreement: the Flimflamulator service is a mess of tech debt and something needs to be done. Other software teams at the company write code that depends on this service, however. They’ve built workarounds, and any changes will propagate into their systems. Flimflamulator provides some customer-facing functionality; the Product team will want to weigh in too.

How do you make sure make sure you’re not creating new problems? What if someone has already thought through solutions to this? There are so many stakeholders that a design review meeting would be chaos. How do you collect feedback from everyone?

At Knewton we use the RFC process. You might call it design documents or architecture review, but the goal is the same.

What is an RFC?

RFC stands for Request for Comment; at its most basic level an RFC is a way to collect feedback about an idea. A document explaining the idea is shared with peers and stakeholders who provide feedback and ultimately sign off (or don’t) on the idea. RFCs can be product proposals, design documents, process changes, or any other thing you’d like someone’s opinion on.

RFC does not stand for Request for Consensus. People are rarely equally informed or equally affected by issues, and some team members naturally hold more sway than others. RFCs are not decided by majority vote, and that’s OK.

Knewton’s process for this is loosely based on the RFC process used by the Internet Engineering Task Force to discuss and set standards for internet protocols.

When to Write an RFC?

Write an RFC when you’re considering something that will be hard to change later: typically public interfaces, data schemas, frameworks, platforms, or human processes.

How to Write an RFC?

Knewton’s RFC template explains what an RFC should cover. We use Google Docs to share and discuss documents because it has great commenting tools.

RFC Ownership

Each RFC must have at least one owner, who should be listed at the top of the RFC along with the RFC’s status and any relevant dates (refer to the template for examples). The owner is responsible for:

  • doing research on prior art
  • identifying the right audience of stakeholders and peers
  • publishing the RFC to that audience
  • addressing all questions, concerns, and suggestions transparently and expediently
  • documenting any dissenting opinions

The “Right” RFC Audience

“An RFC is like a wedding proposal; if you’re not sure your stakeholders are going to say yes, it’s too early to write it,” as my colleague Rémi likes to say.

Not all RFCs require involvement of the entire tech team. In fact, most don’t.

The RFC owner should identify key stakeholders and solicit their opinions first. These are the people affected by what you’re proposing, subject matter experts, or people who can make the proposal happen/not happen. An informal chat before you start writing can save lots of time.

Once you have a doc, share it with a small audience who can give you rapid feedback. Your manager is often a good place to start. Focus on quick iteration and tell people when you expect to receive their feedback.  Early in the process, 24-hour turnaround is a reasonable request. Be proactive in soliciting feedback. You’ll probably get a lot of comments on your first draft, and an in-person review can be useful to speed things along.

As major issues get worked out and details solidified, expand the audience in a few rounds: more stakeholders, your whole team, tech-wide. It should be like rolling a snowball down a hill. Allow up to 5 business days for the final audience to sign off. This will be a judgment call based on the size of the audience and urgency of the proposal. At Knewton it’s customary to share your RFC with the entire tech team as an FYI even if it isn’t relevant to everyone.

How to Address Comments

It’s unlikely everyone will be in perfect agreement about your proposal, and you’ll probably receive some feedback that you disagree with. Disagreement is OK.

A few things to keep in mind when addressing dissenting comments:

  • What would it take to change your mind? If you think incorporating the dissenting feedback would cause problems, ask the commenter to provide a solution to those problems.
  • Does the commenter have skin in the game? Are they a subject matter expert? You don’t have to address every comment on your proposal if they’re not relevant.
  • Close out comments you won’t incorporate by briefly but respectfully saying you disagree, and mark the comment resolved. If the commenter feels strongly, they’ll let you know.
  • Long comment threads can be a time sink. Resolve them with a real-time conversation.

RFC Statuses

RFC documents should list their current status in the document header (see RFC template).

Proposed means open for comment, actively reviewed and publicly debated.

Accepted means closed for comment, all comments “resolved,” dissenting opinions documented, and work to implement the proposal has been scheduled. Doc status should be updated to “Accepted on [date].”

Cancelled means the RFC owner decided to not proceed and cancel the RFC. Doc status should be updated to “Cancelled on [date].”

Summing It All Up

  1. Decide on problem to solve
  2. Identify and talk to key stakeholders
  3. Write RFC
  4. Solicit feedback
    1. Primary stakeholder or mentor (no more than 1 day per iteration)
    2. Team or wider group of stakeholders (2-3 days)
    3. Full audience (5 business days)
  5. Resolve comments
  6. Close RFC
    1. Update doc to indicate it has been closed

API Infrastructure at Knewton: What’s in an Edge Service?

The Knewton API gives students and teachers access to personalized learning recommendations and analytics in real time. In this post, we will pull back the covers of our API to explain how we handle user requests. You will first learn how to build an edge service with Netflix Zuul, the framework we chose for its simplicity and flexibility. Then, we’ll dive into the Knewton edge service to show you how it improves API simplicity, flexibility, and performance.

What’s in an Edge Service

An edge service is a component which is exposed to the public internet. It acts as a gateway to all other services, which we will refer to as platform services. For example, consider an Nginx reverse proxy in front of some web resource servers. Here, Nginx acts as an edge service by routing public HTTP requests to the appropriate platform service.

An example edge service: Nginx as a reverse proxy for two resource servers. Requests for images are routed to one server, while requests for text are routed to another.

An example edge service: Nginx as a reverse proxy for two resource servers. Requests for images are routed to one server, while requests for text are routed to another.

Beyond Routing: Edge Service Architecture

The reverse proxy pattern makes sense if every one of your resource servers is hardened to receive raw internet traffic. But in the typical service-oriented architecture, implementing a simple reverse proxy would duplicate a substantial amount of security and reliability code across platform services. For example, we do not want to reimplement user authentication in each platform service. Instead, we factor these responsibilities directly into the edge service. It validates and authenticates incoming requests; it enforces rate limits to protect the platform from undue load, and it routes requests to the appropriate upstream services. Factoring these high-level features into the edge service is a huge win for platform simplicity.

Building this service was a substantial engineering task. We wanted to leverage our experience developing and deploying JVM-based services by creating our edge service following the same pattern. This made Nginx and Apache’s Lua and PHP-based scripting capabilities unattractive. We would have had to rewrite our standard libraries and debug code in a new paradigm. Instead, we built our edge service on top of Netflix Zuul.

The edge service is Knewton’s interface to the public internet. It is registered directly with an AWS Elastic Load Balancer (ELB), and is responsible for sanitizing and routing requests for the Knewton Platform. To maintain high availability, our edge service runs as a cluster. On startup, edge service nodes register themselves with the load balancer, which then distributes requests across the cluster.

The edge service is Knewton’s interface to the public internet. It is registered directly with an AWS Elastic Load Balancer (ELB), and is responsible for sanitizing and routing requests for the Knewton Platform. To maintain high availability, our edge service runs as a cluster. On startup, edge service nodes register themselves with the load balancer, which then distributes requests across the cluster.

Netflix Zuul

Zuul is a framework created by Netflix to handle their own API and website traffic. The framework is structured around filters, a concept borrowed from the Java Servlet Filter pattern. A Zuul edge service is made up of a series of such filters, each performing some action on an HTTP request and / or response before passing control along to the next filter. For example, a filter might add authentication information to request headers, write an error response to an invalid request, or forward a request to an upstream service for further processing. In this section, we will walk through an example Zuul filter to show you the simplicity of the pattern.

Zuul Filters

We will consider three categories of Zuul filters: pre-filters, route-filters, and post-filters. (Zuul also supports error-filters and static-filters.) Pre-filters run before the edge service routes a request; route-filters forward requests to upstream services; and post-filters run after the proxied service returns its response.

The edge service consists of a series of Zuul filters which work together to write a response for a given request. The route-filters make requests to platform services to retrieve data and update state.

The edge service consists of a series of Zuul filters which work together to write a response for a given request. The route-filters make requests to platform services to retrieve data and update state.

Filters are defined by three pieces of logic:

  • Filter execution order
  • Conditional filter execution
  • Execution logic

Let’s dive into our example. We’ve written this filter in Java, but Zuul is compatible with all JVM languages.

Filter Execution Order

Zuul filters run in the same order for every request. This enables successive filters to make assumptions about the validations run and to access accumulated state. For example, we might store the request’s user ID in one filter and use it to apply rate limits in the next. Zuul filter ordering is defined with two methods: filterType, which specifies whether a filter is a pre-filter, route-filter or post-filter, and filterOrder, which orders filters of the same type.

// Defining the execution order for a Zuul filter in Java

@Override
public String filterType() {
   // run this filter before routing
   return "pre";
}

@Override
public int filterOrder() {
   // run this filter first among pre-filters
   return 0;
}

Conditional Filter Execution

Filters may be run conditionally for each request. This gives the designer significant flexibility. For example, a rate limit filter may not run for an authenticated user. This conditional logic is factored into the shouldfilter method.

// Configuring a Zuul filter to run on every request

@Override
public boolean shouldfilter() {
   // Always run this filter
   return true;
}
// Configuring a Zuul filter to run on requests from unauthenticated users only
@Override
public boolean shouldfilter() {
   RequestContext context = RequestContext.getCurrentContext();
   return !(boolean)context.getOrDefault("IS_AUTHENTICATED", false);
}

Execution Logic

The code to execute in a filter is defined in the runfilter method. Here, we make use of the static RequestContext object to store state for later filters, or to write the response.

// Defining Zuul filter logic to record and check rate limits.

@Override
public ZuulfilterResult runfilter() {
   RequestContext context = RequestContext.getCurrentContext();
   boolean isOverRate = rateLimiter.recordRate(context.getRequest());
   if (isOverRate) {
       context.set("IS_RATE_LIMITED", true);
   }
   return null;
}

All together, this gives us an example filter:

// A simple Zuul filter that runs for unauthenticated users and records
// whether a rate limit has been exceeded.
public class RateLimitfilter extends Zuulfilter {

   @Override
   public String filterType() {
       // run this filter before routing
       return "pre";
   }

   @Override
   public int filterOrder() {
       return 0;
   }

   @Override
   public ZuulfilterResult runfilter() {
       // records the request with the rate limiter and checks if the
       // current rate is above the configured rate limit
       RequestContext context = RequestContext.getCurrentContext();
       boolean isOverRate = rateLimiter.recordRate(context.getRequest());
       if (isOverRate) {
           context.set("IS_RATE_LIMITED", true);
       }
       return null;
   }

   @Override
   public boolean shouldfilter() {
       // should only run if the user is not authenticated
       RequestContext context = RequestContext.getCurrentContext();
       return !(boolean)context.getOrDefault("IS_AUTHENTICATED", false);
   }
}

In this way, we build up a modular set of request-processing functionality. This pattern makes it easy for multiple engineers to contribute new features. At Knewton, engineers outside of the API team have committed code for edge service features.

For more information about the design and lifecycle of a Zuul service, see this Netflix blog post.

Zuul and the Knewton Edge Service

Zuul is an opinionated but barebones framework. While adding functionality is simple, you will have to implement the filters yourself. Now we will explain the most important filters we wrote for the Knewton edge service. You will learn how to reject bad requests, reverse-proxy good ones, and reduce interservice traffic.

Pre-filters

Pre-filters in our edge service validate, rate-limit, and authenticate incoming requests. This enables us to reject bad requests as early in the pipeline as possible.

Rate Limiting

Our edge service is responsible for protecting the Platform from bursts of requests. When the rate of requests from a given IP or user exceeds a specified threshold, the edge service responds with 429: Too Many Requests. This threshold helps to prevent harm to the platform from Denial of Service attacks and other excessive request load.

Rate limits are enforced in a pre-filter so that these excess requests will not place any load on platform services. This pre-filter tracks the number of requests made during a one-minute window. If this number exceeds the rate limit, the filter skips further request processing and immediately writes a “429” response.

Logic in the rate limiting Zuul prefilter. For a simple rate limiting implementation with Redis, see the Redis documentation.

Logic in the rate limiting Zuul prefilter. For a simple rate limiting implementation with Redis, see the Redis documentation.

Rates are stored in memory on each edge service node to provide the lowest latency possible to each request. The rates recorded by each node are reconciled asynchronously using a shared Redis cache. This means that, no matter which node handles a given request, all nodes will eventually acknowledge it. In practice, this reconciliation happens quickly; convergence occurs within a constant factor of AWS’s region-internal network latency.

A request to an endpoint is rejected when more than the configured limit of requests have been made within a minute. edge service nodes coordinate request counts through a shared Redis cache.

A request to an endpoint is rejected when more than the configured limit of requests have been made within a minute. edge service nodes coordinate request counts through a shared Redis cache.

Surge Queuing

Load on the Knewton API is not constant. Even within the broad patterns of school days and lunch breaks, request rates vary. Rather than passing these bursts on to platform services, our edge service smooths traffic, so that the platform sees smaller peaks. To accomplish this, our rate limiter follows the same Leaky Bucket pattern used in Nginx. Requests exceeding the rate limit get added to a fixed-length queue. The queued requests are processed at the rate limit, ensuring that the platform does not see a surge. If the queue is full when the rate limit is exceeded, these excess requests are rejected with a 429 error. This approach has the added benefit of simplifying Knewton API clients, because requests can be made in reasonable bursts without being rejected. Read more about how we implement request queueing in our next blog post.

A request surge queue sits in front of the rate limiter to smooth out bursts of requests.

A request surge queue sits in front of the rate limiter to smooth out bursts of requests.

Authentication

At Knewton, we use OAuth to authenticate users. Each API request must contain a valid OAuth token. The edge service rejects requests which do not contain these tokens. This obviates the need for end-user authentication in upstream platform services and reduces interservice traffic by rejecting unauthenticated requests immediately at the edge.

Route-filters

The edge service is responsible for forwarding requests to the appropriate upstream microservice. Once a request has passed the gauntlet of Pre-filters, it is passed on to the Route-filters. Since Knewton services use Eureka for service discovery, we chose to use Netflix’s Ribbon HTTP client to make these upstream requests. Ribbon allows our edge service to automatically discover upstream services, load-balance traffic between them, and retry failed requests across instances of a service, all with minimal configuration.

// Creates a Ribbon client with Eureka discovery and round robin
// load balancing
AbstractLoadBalancerAwareClient setupRibbonClient(String upstreamName) {
    ServerList<DiscoveryEnabledServer> serverList =
            new DiscoveryEnabledNIWSServerList(upstreamName);
    IRule rule = new AvailabilityFilteringRule();
    ServerListFilter<DiscoveryEnabledServer> filter = new ZoneAffinityServerListFilter();
    ZoneAwareLoadBalancer<DiscoveryEnabledServer> loadBalancer =
            LoadBalancerBuilder.<DiscoveryEnabledServer>newBuilder()
                               .withDynamicServerList(serverList)
                               .withRule(rule)
                               .withServerListFilter(filter)
                               .buildDynamicServerListLoadBalancer();
    AbstractLoadBalancerAwareClient client =
            (AbstractLoadBalancerAwareClient) ClientFactory.getNamedClient(upstreamName);
    client.setLoadBalancer(loadBalancer);
    return client;
}

The Knewton edge service is not limited to endpoint-based routing. Our next blog post will cover specialized routing we have implemented to support shadow services and server-side API mocking.

Post-filters

After receiving the upstream responses, the post-filters record metrics and logs, and write a response back to the user.

Response Writing

The Post-filter stage is responsible for writing a response to the user. It is worth emphasizing that the edge service must always write back some response. If an unhandled exception or upstream timeout were to prevent this, the user would be left idling. This means catching exceptions and handling errors in all situations, so that the edge service always returns a timely answer, even if that answer is an error code (ie. 5xx response).

Metrics and Logging

Recording the right metrics and logs helps to ensure quality of service of the Knewton platform.  The edge service encapsulates the entire lifecycle of every API request. It records the entire duration of each request and handles every failure. This uniquely positions the edge service to report on end-user experience. Our post-filters publish metrics and logs to Graphite relays and Splunk Indexers. The resulting dashboards, alerts, and ad-hoc queries give us all the information we need to investigate problems, optimize performance, and guide future development work. Watch for our upcoming blogpost on API monitoring for more details.

Conclusion

The Knewton edge service validates and rate limits API requests, authenticates users, routes requests to upstream microservices, and records end-to-end metrics. Building this logic into the edge service simplifies our platform and provides performance and reliability guarantees to our users. We built our edge service on top of Zuul because its filter-based design is simple and extensible, its Java construction was compatible with our architecture, and because it has been proven in production at Netflix.

In this blog post, you learned how to build a Zuul edge service and how this design can improve your API. A follow-up post will describe custom logic we built to support surge queuing, shadow services, and API mocking in the Zuul framework.

Thanks to all the contributors on the Knewton API Platform team: Paul Sastrasinh, Rob Murcek, Daniel Singer, Bennett Wineholt, Robert Schultheis, Aditya Subramaniam, Stephanie Killian

Distributed Tracing: Observations in Production

Previous blog posts have explained Knewton’s motivation for implementing distributed tracing, and the architecture we put together for it. At Knewton, the major consumers of tracing are ~80 engineers working on ~50 services. A team consisting of three engineers handled designing and deploying the tracing infrastructure detailed in the previous sections.  This section highlights our experience and challenges running Zipkin at Knewton over the past few months.

Technical Hurdles

We elected to use Redis on Amazon Elasticache as our storage for spans, which are detailed in the previous section. This caused us a few problems:

Redis was inexplicably slow

Our initial rollout had an early version of the Zipkin Collector that wrote to a cache.r3.2xlarge (58.2 GB Ram, 8 vCPU) Redis Elasticache instance, which could not keep up with the traffic we were sending it. It could only handle ~250 requests per second. This seemed very slow to us. To set expectations, we dug into how to benchmark Redis, and stumbled upon the venerable redis-benchmark, which informed us that it could handle at least 120,000 requests per second with the 120 byte payloads that we were transmitting.

Looking at Amazon’s Cloudwatch metrics during the initial rollout, we quickly realized that, while the test was using multiple connections to Redis, the Zipkin code was using only a single connection. We modified the code to use multiple connections so we could make ~3000 requests per second and still keep up with incoming traffic.

Redis lost all data stored on it every few days

After we got Redis running, we found that Redis would restart every few days, when the amount of data that we stored in Redis approached the memory allocation of the Elasticache instance.

It turned out that the memory listed on Amazon’s Elasticache documentation refers to the total memory on the instance that runs Redis, and not to the memory allocated to Redis. As we continued to load data into Elasticache Redis, the underlying instance would start swapping heavily, and the Redis process would be restarted. We speculated that the process was being OOM killed, but because we don’t have access to Linux logs on Elasticache instances, we couldn’t be certain. After setting the `reserved-memory` parameter to approximately 15% of the total memory on the cache instance, we no longer had this problem.

Amazon Kinesis Queue Library

We packaged TDist into a library responsible for collecting span data at every service and publishing it to our tracing message bus, Amazon Kinesis. During early testing, we had configured the AWS Kinesis stream to have only a single shard, limiting it to 1000 writes per second.

When we deployed a few services using TDist, we began seeing unusual growth in JVM heap sizes under bursty traffic. When traffic reached baseline levels, the heap shrunk to reasonable sizes.

The default configuration for the Kinesis Queue Producer, which made the assumption that the rate at which it receives data is less than the rate at which it can submit data to the Kinesis queue. As such, it makes use of an unbounded in-memory queue to submit data to Kinesis. We decided to drop data when we are unable to keep up, and changed to use a bounded queue. This resulted in predictable performance under load or network blips at the expense of losing data under load spikes.

Now we have predictable performance, but during periods of high traffic, the Kinesis publishers fall behind and fill up the local queues, having the unfortunate effect of dropping spans. This can be solved by increasing the rate at which we publish to Kinesis. The two options are to increase the number of publishers and thus the number of calls we make to the Kinesis API, or to increase the number of spans published per API call. Because the span payload is small, most of the time spent on the Kinesis API call is network time. We intend to alleviate this problem by using batch Kinesis operations to submit multiple spans at once..

Metrics and Monitoring

When tracing was first rolled out to Knewton, it was hard to get an idea of how our client applications were doing. While the server and client side metrics were reported to our Graphite based metrics system, we had a difficult time correlating those with metrics for Kinesis, which are reported with different granularity, units, and retention periods using Amazon CloudWatch. Knewton contributed to the cloudwatch-to-graphite project to get consolidated metrics in Graphite.

Once this was done, we were able to focus on the metrics that gave us most insight into how tracing was performing: the rate at which we are emitting spans from applications to Kinesis, and how quickly the Collector was able to ship these off to Redis. These rates determine the time it takes for a trace to appear in the Zipkin web interface

Organizational hurdles

User education

While we had stealthily instrumented the Thrift protocol to pass tracing data around, we found that developers were still looking at service logs instead of at tracing data while debugging. We evangelize tracing when developers run into problems across services, and gave a presentation on how developers can use tracing to pinpoint performance problems.

Successes

TDist allows Knewton to see internal dependencies between services clearly. Now, a service owner can see precisely which services depends on their service, and which functions they  depend upon. This allows us to optimize service APIs to match usage patterns.

The week after we launched distributed tracing, we noticed that a particular external API call that resulted in hundreds of calls to one internal service. We were able to add a new endpoint to the internal service to eliminate these large number of calls and speed up this endpoint.

Conclusion

TDist proved that we could pass data across services in an noninvasive way by modifying the Thrift protocol. Since then, we’ve built the TDist ecosystem and are using it in production. While we faced problems getting TDist into the hands of developers, it is incredibly useful when debugging problems across services.

Distributed Tracing: Design and Architecture

The previous blog post talked about why Knewton needed a distributed tracing system and the value it can add to a company. This section will go into more technical detail as to how we implemented our distributed tracing solution.

General Architecture and Tracing Data Management

Our solution has two main parts: the tracing library that all services integrate with, and a place to  store and visualize the tracing data. We chose Zipkin, a scalable open-source tracing framework developed at Twitter, for storing and visualizing the tracing data. Zipkin is usually paired with Finagle, but as mentioned in Part I, we ruled it out due to complications with our existing infrastructure. Knewton built the tracing library, called TDist, from the ground up, starting as a company hack day experiment.

TDist-architecture-tracing-data-management

Tracing data model

For our solution, we chose to match the data model used in Zipkin, which in turn borrows heavily from Dapper. A trace tree is made up of a set of spans.  Spans represent a particular call from client start through server receive, server send, and, ultimately, client receive.  For example, the call-and-response between ServiceA and ServiceB would count as a single span: .

TDist-RPC-span-knewton

Each span tracks three identifiers:

  • Trace ID: Every span in a trace will share this ID.
  • Span ID: The ID for a particular span. The Span ID may or may not be the same as the Trace ID.
  • Parent Span ID: An optional ID present only on child spans. The root span does not have a Parent Span ID.

The diagram below shows how these IDs are applied to calls through the tree.  Notice that the Trace ID is consistent throughout the tree.

TDist-trace-tree-with-IDs-knewton

For more details, read the Dapper paper.

TDist

TDist is a Java library that Knewton developed. All of our services use it to enable tracing. TDist currently supports Thrift, HTTP, and Kafka, and it can also trace direct method invocations with the help of Guice annotations.

Each thread servicing or making a request to another service gets assigned a Span that is propagated and updated by the library in the background. Upon receipt of a request (or right before an outgoing request is made), the tracing data are added to an internal queue, and the name of the thread handling the request is changed to include the Trace ID by a DataManager. Worker threads then consume from that queue, and the tracing data get published to our tracing message bus. Java ThreadLocals makes it easy to globally store and access information assigned to a particular thread, and that is  the approach we used in the DataManager.

Often, threads offload the actual work to other threads, which then either do other remote calls or report back to the parent thread. Because of this, we also implemented thread factories as well as executors, which know how to retrieve the tracing data from the parent thread and assign it to the child thread so that the child thread can also be tracked.

Zipkin

Once the tracing data arrive at the tracing message bus through TDist, the Zipkin infrastructure handles the rest of the process. Multiple instances of collectors,consuming from the message bus, store each record in the tracing data store. A separate set of query and web services, part of the Zipkin source code, in turn query the database for traces. We decided to join the query and the web service to keep things simpler, and also because this combined service is internal and has predictable traffic patterns. However, the collector is decoupled from the query and web service because the more Knewton services integrated with the collector, the more tracing data itwould have to process.

Zipkin UI

Out of the box, Zipkin provides a simple UI to view traces across all services. While it’s easy to view logs for a particular Trace ID across all services, the Zipkin UI provides a summarized view of the duration of each call without having to look through hundreds of log statements. It’s also a useful way of identifying  the biggest or slowest traces over a given period of time. Finding these outliers allowed us to flag cases where we were making redundant calls to other services that were slowing down our overall SLA for certain call chains. Here’s a screenshot of a trace appearing in the Zipkin UI:

TDist-zipkin-UI-knewton

Of course, the UI doesn’t come without drawbacks. While it’s easy to look at distinct individual traces, we found the Zipkin UI lacking for examining aggregate data. For example, there’s currently no way to get aggregate timing information or aggregate data on most called endpoints, services etc.

Throughout the development process and rolling out of the Zipkin infrastructure, we made several open-source contributions to Zipkin, thanks to its active and growing community.

Splunk

As mentioned above, the thread name of the current thread servicing a request is also changed, and the trace ID is appended to it. Because of this we can query for logs across all of the trace-enabled services for a particular call. This makes debugging a lot easier, and it’s proven to be very useful in post mortem analysis, log aggregations, debugging for isolated problems, and explaining uncommon platform behavior.

Thrift

Thrift is a cross-language RPC framework for building up scalable services. The user can define a service and data model spec in Thrift, and Thrift will compile the spec into many different languages. Users can then implement the generated service interfaces in the desired language. Thrift also automatically generates the client code and data structures for the services defined by the user.

Thrift is the most widely used RPC method between services at Knewton. Most of our services talk to each other through this framework, so supporting it while still maintaining backwards compatibility was critical for the success of this project. More precisely, we wanted services that were not tracing-enabled to be able to talk to tracing-enabled services.

When we started looking into adding tracing support to Thrift, we experimented with two different approaches. The first approach involved a modified Thrift compiler, and the second involved modified serialization protocols and server processors. Both methods had their advantages and disadvantages.

Custom Compiler

In this approach, we experimented with modifying the C++ Thrift compiler to generate additional service interfaces that could pass along the tracing data to the user. Modified thrift compilers are not uncommon; perhaps the most famous example is Scrooge. One advantage of a modified compiler was that clients would have to swap out fewer class implementations in their code, since tracing was supported right in the generated code. Clients could also get a reference of the tracing data from the service interface.

Although we didn’t benchmark, we also think that this approach would have been marginally faster, since there are fewer classes delegating to tracing implementations. However, we would have had to recompile all of our Thrift code and deviate from the open-source version, making it harder to upgrade in the future. We also soon realized that allowing the user access to the tracing data might not be desirable or safe, and data management might be better left to TDist for consistency.

Custom Protocols and Server Processors

We ended up using this approach in production. Not having to maintain a custom compiler lowered our development cost significantly. The biggest disadvantage to customizing protocols and server processors was that we had to upgrade to Thrift 0.9.0 (from 0.7.0) to take advantage of some features that would make it easier to plug in our tracing components to the custom Thrift processors and protocols. The upgrade required a lot of coordination across the organization. Thankfully, the newer version of Thrift was backwards-compatible with the older version, and we could work on TDist while Knewton services were being updated to the newer version. However, we still had to release all Knewton services before we could start integrating them with our distributed tracing solution.

Upgrading Thrift

Upgrading libraries when using a dependency framework is relatively easy, but for an RPC framework like Thrift and a service-oriented architecture with a deep call dependency chain, it gets a little more complicated. A typical server will have server and client code, with the server code often depending on other client libraries. Because of this, upgrades needed to start from the leaf services and move up the tree to avoid introducing wire incompatibilities since the outgoing services might not know if the destination service will be able to detect the tracing data coming through the wire.

TDist-upgrading-thrift-knewton

Another hurdle was that certain services, such as the Cassandra client library Astyanax depended on third-party libraries that in turn depended on the Thrift 0.7.0. For Astyanax, we had to shade the JARs using Maven and change package names so that they didn’t collide with the newer Thrift library. All of this had to happen quickly and without downtime. And because we didn’t want other teams at Knewton incurring the cost of this upgrade, the distributed tracing team had to implement and roll out all the changes.

Tracing Thrift Components: How It Works

Our Thrift solution consisted of custom, backwards-compatible protocols and custom server processors that extract tracing data and set them before routing them to the appropriate RPC call. Our protocols essentially write the tracing data at the beginning of each message. When the RPC call reaches the server, the processor will identify and note whether the incoming call has tracing data so it can respond appropriately. Calls with tracing data get responses with tracing data, and requests from non-integrated services that don’t carry tracing data get responses without tracing data. This makes the tracing protocols backwards-compatible since the server outgoing protocols don’t write tracing data if instructed not to by the processor servicing the request. A tracing protocol can detect whether the payload contains tracing data based from the first few bytes. Thrift appends a protocol ID to the beginning, and if the reading protocol sees that the first few bytes do not indicate the presence of tracing data the bytes are put back on the buffer and the payload is reread as a non-tracing payload. When reading a message, the protocols will extract the tracing data and set them to a ThreadLocal for the thread servicing the incoming RPC call using the DataManager. If that thread ever makes additional calls to other services downstream, the tracing data will be picked up from the DataManager automatically by TDist and will get appended to the outgoing message.

TDist-tracing-thrift-components-knewton

Here’s a diagram showing how the payload is modified to add tracing data:

TDist-tracing-payload-knewton

Kafka Tracing Support

When we were adding tracing support to Kafka, we wanted to keep the Kafka servers, also referred to as brokers, as a black box. In other words, we wanted to pass the data through the brokers without them necessarily knowing and therefore not having to modify the Kafka broker code at all. Similar to our approach with RPC services, we upgraded the consumers before upgrading the producers. The consumers are backwards-compatible and can detect when a payload contains tracing data, deserializing the content in the manner of the Thrift protocols described above. To achieve this, we require clients to wrap their serializers/deserializers in tracing equivalents that delegate reading and writing of the non-tracing payload to the wrapped ones.

TDist-kafka-tracing-support-knewton

TDist-kafka-tracing-payload-knewton

HTTP Tracing Support

As some of Knewton’s internal infrastructure and all public facing endpoints are HTTP-based, we needed a simple way of instrumenting HTTP calls to carry tracing information.

This was quite simple, because HTTP supports putting arbitrary data in headers. According to section 5 of rfc2047, the only guideline for adding custom headers is to prefix them with a `X-`.

We elected to continue the Zipkin tradition and use the following headers to propagate tracing information:

  • X-B3-TraceId
  • X-B3-SpanId
  • X-B3-ParentSpanId

Services at Knewton primarily use the Jetty HTTP Server and the Apache HTTP Client. Both of these projects allow for easy header manipulation.

Jetty services requests by routing them to a Servlet. As part of this routing, Jetty allows the request and response to pass through a series of Filters. We felt this was the ideal place to deal with tracing data. When any incoming request comes with tracing data headers, we construct span data from it and submit it to the DataManager.

With the Apache HTTP Client, we use an HttpRequestInterceptor and HttpResponseInterceptor, which were designed to interact with header contents and modify them. These interceptors readtracing data from headers and set them using the DataManager and vice versa.

Guice

For those unfamiliar with Guice, it’s a dependency management framework developed at Google. To make the TDist integrations with our existing services easier and less error-prone, we relied on Guice and implemented several modules that our clients simply had to install. Guice handles dependency injection during object instantiation and makes it easy when swapping implementations of interfaces. If throughout this article you have been  thinking that integrating with TDist sounds complicated, a lot of the time all our clients needed to do was install additional Guice modules that would bind our tracing implementations to existing Thrift interfaces. This also meant that our clients never had to instantiate any of our tracing-enabled constructs. Whenever a TDist client forgets to bind something, Guice would notify our clients at compile time. We put a lot of thought into how we laid out our Guice module hierarchies so that TDist didn’t collide with our clients, and we were very careful whenever we had to expose elements to the outside world.

The Tracing Message Bus

The tracing message bus is where all our client services place tracing data prior to its being consumed by the Zipkin collector and persisted. Our two options were Kafka and Kinesis, and we ended up choosing Kinesis. We were considering Kafka because Knewton, has had a stable Kafka deployment for several years. At the time, our Kafka cluster, which we’ve been using as our student event bus, was ingesting over 300 messages per second in production. Our initial estimates for putting us in the range of over 400,000 tracing messages per second with only a partial integration. The idea of straining production systems with instrumentation data made us nervous. Kinesis seemed like an attractive alternative that would be isolated from our Kafka servers, which were only handling production, non-instrumentation data. At the time of implementation, Kinesis was a new AWS service and none of us were familiar with it. Its price, throughput capabilities, and the lack of maintenance on our end sealed the deal for us. Overall, we’ve been satisfied with its performance and stability. It’s not as fast as Kafka, but the nature of our data made it acceptable to have an SLA of even a few minutes from publication to ingestion. Since we deployed the tracing message bus to production, we were also able to easily scale up the number of Kinesis shards without incurring any downtime.

The Tracing Data Store

The tracing data store is where all our tracing data ends up. The data stay there for a configurable time and are queried by the Zipkin query service to display on the UI. Zipkin supports a lot of data stores out of the box, including Cassandra, Redis, MongoDB, Postgres and MySQL. We experimented with Cassandra and DynamoDB, mainly because of the institutional knowledge we have at Knewton, but ended up choosing Amazon’s Elasticache Redis. The most important reasons behind our decision were

  • Time to production, given that we didn’t have to roll out and maintain a new cluster
  • cost
  • easier integration with Zipkin with less code
  • support for TTLs on the data.

Conclusion

Our tracing solution at Knewton has been in all environments for a few months now. So far it has proven to be invaluable. We have only started to scratch the surface with what we can do with the tracing and timing data we’re collecting. We had a lot of fun implementing and rolling out tracing at Knewton, and we have come to understand the value of this data.

Distributed Tracing: Motivation

Knewton’s technology solutions comprise more than 50 different services. Such a complex system can be difficult to debug, which is why we set out to integrate distributed tracing into our architecture. A distributed tracing system tracks requests by assigning a unique identifier for the request that is maintained through all service layers, from initial reception through response to clients. Without a distributed tracing system, understanding issues occurring deep in the stack becomes extremely difficult.

Let’s walk through a typical debugging scenario without distributed tracing. Here is a simplified view of the architecture involved in receiving a student event and generating a subsequent recommendation:

distributed-tracing-simplified-flow-knewton

  1. Student completes some work. The results are posted to the HTTP server.
  2. The HTTP server reposts the event to the distributed queue for durability.
  3. The student event processor converts the information from client language to internal Knewton language.
  4. The student event processor posts the translated event to a distributed queue.
  5. The recommendation service reads the event, generates a recommendation, and returns it to the client.

Let’s say the client reports that an event was sent for User A, but no recommendation was received for what User A should work on next. The problem could exist at any of the five layers described above. The identifiers used within each service may be different, so to find where the problem occurred, we’d have to query the logs of each service in serial. Debugging becomes even more complicated in a non-linear dependency chain.

In the world of distributed tracing, debugging would be reduced to two steps: find the incoming event, noting the trace ID assigned to it, and searching across all services for logs associated with that trace ID.

The distributed tracing system provides latency data for each step in the transaction. Before distributed tracing we were unable to calculate end-to-end time for a single transaction, but we could visualize the connections between our services. The Grafana graph below shows 95% latency for various steps in recommendation processing.

grafana-graph-recommended-processing-latency-knewton

To get the most out of a distributed tracing system, we identified key requirements:

Integration:

  • Adding tracing support to a service should require minimal or no code changes.
  • Adding tracing support to a service should not increase latency, nor should it affect service uptime or reliability.
  • The solution must be able to support our interservice communication protocols: Thrift, HTTP and Kafka.
  • The solution must provide a way for an external system to input the trace ID.  Current full-platform tests tell us only that an issue occurred, but have no indication as to where.  Smoke test alerts could include the trace ID, which would make debugging much quicker.
  • The trace ID and information should be accessible to a service for any purpose that the service finds useful, such as logging intermediary actions or including in error logs.

Data:

  • A solution that traces all events.  Some tracing solutions trace only a portion of traffic, and you never know when a particular event will require investigation.
  • A solution that will display an event from end to end, through each service that interacts with it.
  • Tracing information must include unique identification, system, action, timing and sequence.
  • System time across services cannot be guaranteed, so the solution must implement tracking of a logical order so that events can be displayed in the order in which they occurred.
  • Tracing data will not contain any Personally Identifiable Information or information proprietary to customers or Knewton.

Availability:

  • The solution must function in all environments: development, quality assurance, production.  Retention policy may be different for different environments.
  • Tracing reports must be available for immediate debugging and investigation.
  • Tracing data will be available for real-time debugging for one week.  All tracing data will be retained offline for historical analysis.

We analyzed the following distributed tracing solutions for their ability to satisfy our requirements:

Most of these products did not support all of the protocols we require. Kafka was most often missing, with Thrift a close second. And it would not be possible to enhance for the proprietary products to fit our protocol needs. Brave was a particularly compelling solution, but ultimately we decided it was too invasive.

In the end we decided to use Zipkin without Finagle. Finagle is a great product, but did not support Thrift 7 and reverting to an older version of Thrift would have been a large effort in the wrong direction. In the end, we upgraded to Thrift 9, but this was wire compatible between server and client so it was much easier to roll out than a switch to Scrooge.

Our next blog post will explain how we were able to integrate distributed tracing compatible with Zipkin and Finagle into our code while meeting all of the above requirements.

Rolling Out the Mesos Slave Roller

A few months ago, Knewton started running most services via Docker containers, deployed to an Apache Mesos cluster with a Marathon scheduler. This new infrastructure makes it easy to deploy and manage services but adds complexity to maintaining them.

Applications running on a Mesos cluster run on a group of slave nodes. These slave nodes often need to be upgraded or reconfigured. Although it is usually possible to make modifications to a Mesos slave in place, the Infrastructure Team at Knewton prefers to replace slaves entirely, following the principles of immutable infrastructure. Mesos makes it simple to add and remove slaves and we run them on AWS EC2, so replacing a slave is really easy.

Moreover, immutable Mesos slaves have some nice properties:

  • Configuration management code can be much simpler as we don’t need to uninstall old versions or configurations
  • We can be sure that the state of configuration management at the time of the slave’s launch corresponds to the configuration of the Mesos slave.
  • Rollbacks are simpler in case of failures.
  • Replacing a Mesos slave is a good stress test. If we could not stand to lose one slave, we would be in trouble.

Still, relaunching all of the nodes in a Mesos slave cluster has to be done carefully to prevent outages. Marathon will relaunch tasks that were running on a slave that is being replaced, but if all of the remaining tasks of a service are on that one slave, there will be an outage.

When Marathon launches a task, it will launch it on any slave that fits the constraints of that application. For Knewton, the most useful set of constraints includes CPU, memory, and balanced distribution across availability zones. Since it is possible that any service task might be launched on any slave, changes need to be applied to the entire cluster at the same time.

The First Attempt

Shortly after switching to Mesos, we discovered that we needed to upgrade Docker. For the upgrade, we chose to bring up a new set of Mesos slaves, then start terminating the old slave instances one by one. We determined that would be safe because Marathon would relaunch tasks on the new slaves as the old ones were terminating.

The replacement process was successful, but not without its challenges:

It takes a while: Terminating an EC2 instance, deregistering a slave from its master cluster, and relaunching all of the service tasks took about four minutes. At the time, we had up to 55 slaves per environment. With the time it takes to launch new nodes, the process took around four hours.

Don’t trust your eyes: One health check failing can be an indication of a bigger problem. One bad service task may not cause an outage or integration test failures, but it can be an early indication that an upgrade needs to be rolled back. Having a human watch web interfaces for green dots or failure messages is ineffective and error-prone.

Validation: Marathon allows you to set a constraint that will prevent two tasks of the same application from launching on the same Mesos slave. That property seems appealing, but when you use a small set of large EC2 instances, you may have more tasks of an application than instances to run them on — in which case, the tasks won’t launch.

For that reason, we don’t enforce that constraint. So sometimes all tasks of some service will be running on a single slave. When that happens, taking that slave down may cause an overall service outage. Once again, having a human assert that is not the case takes time and is error prone.

Maintaining balance: Our Mesos slave clusters consist of EC2 instances that are part of an Auto Scaling Group. The ASG AZRebalance process keeps the instances in that ASG balanced across Availability Zones. The ASG maintains balance when launching new instances, capacity permitting. Terminating an instance can also trigger AZRebalance, launching a new instance and terminating an existing one. If you have services that have two tasks, terminating an instance and triggering AZRebalance may cause an outage. The AZRebalance process can be turned off during an upgrade. An ASG will still maintain balance when launching instances (unless there is only one task, in which case an outage must be acceptable), but once the original set of nodes has been removed, instances may no longer be evenly distributed across availability zones. When AZRebalance is turned back on, some instances may need to be terminated. For example if instances can be launched across three availability zones, one instance may be terminated. (See appendix for details.)

Given our first experience, we decided to automate the process of replacing slaves and built the Mesos Slave Roller.

Mesos Slave Roller

The Mesos Slave Roller (MSR) is an automated tool that replaces all the slaves in a cluster. It ensures that all components stay healthy and permits its user to stop and start at will.

mesos-slave-roller

The Mesos Slave Roller takes as input the environment to configure and the number of slaves that there should be when it’s done.

It compares this desired end-configuration to the existing state of the Mesos slave ASG. It then generates a list of actions that will produce the desired configuration:

  1. Scale up the slave ASG by the number of instances in the desired end-configuration.
  2. For each of the original Mesos slaves:
    1. Trigger a termination event
    2. Log the termination in a checkpoint file
    3. Wait for Mesos to deregister that slave
    4. Wait for Marathon to report that all services are healthy

Updating minimum and maximum instance counts for the ASG is a separate process. If a checkpoint file were to accidentally be deleted, rerunning the Mesos Slave Roller could result in a costly and potentially dangerous state with many more instances running than expected.

Scaling and terminating is done by directly invoking the autoscaling API. To scale, we just update the desired count for the ASG. To terminate, we use the terminate-instance-in-autoscaling-group function and tell it to decrement the capacity (desired count).

Health Checks: While running, the Mesos Slave Roller queries the autoscaling, Mesos, and Marathon APIs to make sure all parts of the system are functioning as expected. The following checks are performed:

  1. Checks the Mesos /slaves endpoint to confirm that the appropriate amount of Mesos slaves have been registered after increasing the size of the ASG.
  2. Checks the Mesos /slaves endpoint to confirm that the correct slave disconnected.
  3. Checks the Marathon /v2/apps endpoint to see that all apps are healthy. Our services are interconnected, so it is necessary to check that everything still works, not just the apps that had tasks on the terminated slave.

We consider an app healthy if there are no deployments in progress and the number of tasks reporting healthy is equal to the number of desired instances. If a given app has no health check, then the number of tasks running must be equal to the number of desired instances.

Checkpoints: All of the Mesos Slave Roller‘s operations have some timeout associated with them. If they take longer than expected, the Mesos Slave Roller will pause and wait for a human to kick it off again. If some error is thrown or if an upgrade goes poorly and a human stops it, the Mesos Slave Roller can be resumed from the checkpoint file that it has created. When it resumes, it will use the checkpoint file to determine the remaining work it has to do.

Auto-balancing Availability Zones: To keep things simple and to minimize surprises, we turn off the AZBalance process before starting the Mesos Slave Roller. When replacement is complete, we turn AZBalance back on. We limit our ASG to three AZs so the potential extra balancing operation is safe as this entire process rests on being able to take down one node at a time.

Planned Enhancements

The Mesos Slave Roller has proven to be useful for maintaining our infrastructure. There are several enhancements we’d like to make:

Offer Draining: Mesos 0.25 introduced a useful suite of maintenance primitives, including the ability to drain Mesos slaves of offers. That would allow the Mesos Slave Roller to tell a slave to evict all of its applications instead of relying on termination. If we can shut down service tasks gracefully, we can minimize the impact of losing a task.

Tagging Mesos Slaves and moving tasks: When a Mesos slave is terminated and Marathon receives a notification that a task died, it redeploys that task on a healthy Mesos slave. Some tasks may relaunch on “old” Mesos slaves a few times before finally winding up on a new one. This process works, but leads to more churn than necessary.

An alternate approach would involve some sort of indicator on each slave that shows when it was launched. Ideally the indicator could just alternate between green and blue, but it may need to be more complicated. With an indicator, we can ask Marathon to relaunch apps on a new slave. This approach has the added benefit that, once all tasks are moved to new slaves, we can terminate all of the old slaves without paying attention to when the termination process finishes. Given that waiting for deregistration takes most of the four minutes per node, this approach should cut the time required to replace a cluster by more than half.

PagerDuty Integration: When the Mesos Slave Roller fails, it will pause and wait for a human to restart it. For now, a human has to periodically check to see that whether it is running. Improving error handling to trigger a PagerDuty alert will make the Mesos Slave Roller into a truly hands-off tool.

Appendix

Calculating how many moves need to be made:

Balanced state:  The difference in the number of instances between any two availability zones is at most 1.

Claim: Because the ASG is balanced before scaling and after scaling, the difference in the number of instances in any AZ after removing the original set can be at most 2.

Let:

  • A_x \leftarrow the number of instances in the AZ called (x) at the start of this process
  • D_x \leftarrow  number of instances added to the AZ called (x)

We can say that for any pair of AZs (i, j) that are in some ASG:

Before scale: \lvert A_i - A_j \rvert \le 1 — Property of ASG balancing

After scale: \lvert \left ( A_i + D_i \right ) - \left ( A_j + D_j \right ) \rvert  \le 1 — Property of ASG balancing

Proof:

\lvert D_i - D_j \rvert = \lvert D_i - D_j + A_i - A_i + A_j - A_j \rvert

= \lvert \left ( A_i + D_i \right ) - \left ( A_j + D_j \right )  + \left ( A_j - A_i \right ) \rvert

which, by the triangle inequality and the statements above means that

\lvert D_i - D_j \rvert \le \lvert \left ( A_i + D_i \right ) - \left ( A_j + D_j \right ) \rvert  + \lvert \left ( A_j  -A_i \right ) \rvert \rightarrow

\lvert D_i - D_j \rvert \le 2

Because of those properties, the number of final instances in each AZ can be one of three numbers: x - 1 (low), x (mid), and x + 1 (high) for some value x. In the case that there are occurrences of both the high and low values, some instances will have to move. The number of moves necessary is equal to the lesser of the number of highs and the number of lows. A move terminates an instance in one of the AZs with a high count and launches one in an AZ with low count. Once the counts remaining are only mids and lows or only mids and highs, the ASG is balanced.

In the worst case, half of the counts will be highs, and half of the counts will be lows. In that case, there are \tfrac {n}{2} moves where n is the number of AZs. If the number of AZs is odd, the amount of moves is \tfrac {n}{2} - 1 because the last node can not have a complementary low or high.