Redux at Knewton

About two years ago it became apparent that our frontend architecture was showing its limits. We dreaded adding new features because of side-effect bugs. Changing a dropdown could break unrelated parts of the UI. These issues occurred mostly because of the way we managed our application state, or actually didn’t: the DOM was the state.

It’s always the state

It’s easy to fall into this trap: you start with a simple frontend application with little to no javascript, because you set out to make an application that could work without javascript enabled. Then one day you need some form of widget that native html doesn’t support, so you add a jquery plugin, and then another, and then a wizard form (we called it warlock because we’re funny). Before you know it, your application is a patchwork minefield of jquery plugins, orchestrated by a master script that looks for data attributes or even worse: class names.


Knewton, we have a problem


So we started looking into React. This is a commit from March 2015:

commit 89bca222aa25aca6f405be3b608acc189da9c739
Author: Desperate Engineer
Date:   Fri Mar 27 15:06:27 2015 -0400

    [FIX] Making the class warlock behave
    React save our souls

The next week we started rewriting our code base to use React. In our new state management container components were in charge of maintaining an internal state and defining callbacks, and presentational components were turning our state into pretty boxes and widgets. Container components retrieved the data from the backend, stored it along with some metadata like the request state in their internal state, and then rendered a loader icon or the content. Our React templates were declarative and composable, and lived in the same place as the branching logic. We were even unit testing frontend code with Jest!

Then the state got out of hand again.

The container component approach works well when the presentational components that require data are close to the container components that maintain it (in the react component tree). But what happens if two distant components need the same data? You have to either duplicate the data, or make a container component high enough in the tree to be a common ancestor. The first option is bad because you can have inconsistencies and you must maintain the state in two places, and the second option is tedious because you have to pass down props through a lot of components that don’t need them.

An alternative is to extract the state out of the component tree, and make containers subscribe to it. This way you can have container components close to the presentational components, but the container components are only replicas of the actual state. This is the approach in the Flux architecture. This state outside of the component tree is called a store in Flux.

Store in a component tree

Adding a store

Sort of Flux

In addition to having a global store, Flux uses a one-way data flow, which is a fancy way of saying that components are not allowed to update the store directly: components must send actions that a dispatcher will process to update the store. This approach was new to a lot of web engineers, but it was not exactly a new idea.

Our first attempt at (partially) implementing Flux relied on a store mixin, a deprecated React pattern that shares behaviors across components by overloading component lifecycle methods.

It solved our problems for simple use cases, but our Teacher dashboard data could be updated by multiple components. Our mixin solution did not implement unidirectional data flow, so this ownership was shared between the container components and even some presentational components. In spite of code reviews, we also had slightly different styles in the way we wrote components, and the sources lacked structure. We did not always know where a callback that changed the state was implemented.

Flux pattern

Flux pattern

Because the state didn’t have a cohesive way of being updated, we had to rely on more mixins and shared functions to make components interact with the store. We needed the actions and the dispatcher from Flux.


Redux is a javascript library implementing Flux. It uses the context to pass down the store to a container component wrapped in a high order component. The high order component pattern is a component that wraps another on. It relies on its own lifecycle methods. Facebook explained why they decided to deprecate mixins in this post: Mixins Considered Harmful.

Programming modern web applications is hard. Just like React, Redux is not a silver bullet, and just like React, Redux has helped us solve problems that can be solved without it. It bring its own set of challenges, and after about 8 months of Redux, there are things that we’re happy about, and things that still need improvement.

Open Source versus Homemade, and the occasional contributor

Unless you have a lot of resources, open source solutions will often be superior to homemade solutions like our store mixin. It’s simple really: more people are using it, more people are encountering the same bugs, and more people struggled with it so there is better documentation. In the case of Redux, the documentation is exemplary, and is probably one the biggest reasons behind Redux’s success over other Flux libraries. Dan Abramov (co-author of Redux) also released a series of video tutorials. This critical in our team, where both backend and frontend engineers contribute to the frontend code. Having documented standards and tutorials helps to speed up on-boarding, and requires less support from frontend engineers. It becomes easier for occasional contributors to jump in the code, look at the way things were done, and fix a bug or implement a new feature.

Redux encourages splitting out features into multiple files: one for the reducer (Flux’s dispatcher), one for the actions and the action creators, and one for the connected containers. In addition to these files, we also have route files for react-router, and saga files (more on this later). This is tedious at first, because you need to maintain more files, but it creates an explicit pattern that makes the sources easier to navigate. This structure makes it harder to write code in the wrong place.

Another benefit of using an open source library is the tooling and the middleware extensions developed by the community. The Redux chrome extension displays the actions and the state in real time, and can undo actions or display the diffs between two states. We use middleware to handle things that Redux does not support, like asynchronous actions.

How we finally tamed state management

Redux embraces the immutable app architecture (described in this excellent talk by Lee Byron). To keep state management simple, redux recommends to return a new state after every action rather than mutating the current one. We do not use immutable data structures, but instead we rely on a combination of code reviews and a redux middleware in our dev environment to catch mistakes: redux-freeze. It is not as robust as using immutable data structures like the ones offered by ImmutableJs, but it allows us to use native js data structures, and we’re ok with the trade-offs.

Every action processed by a reducer produces a new state instead of mutating the old one. This can be costly in terms of performances if you use the wrong approach like deep copies. ImmutableJs can help enforce immutability with little to no performance penalty, but at the cost of having to translate your data structures back and forth. Since we don’t use ImmutableJs, we have to make sure every reducer function is a pure function. By using structural sharing like ImmutableJs does under the hood, we can achieve decent performance, as most of the tree is reused while only nodes with changes are replaced.

const defaultState = {
    step: 0,
    finalStep: 4
function carouselReducer(state = defaultState, action) {
    switch(action.type) {
        case CAROUSEL_NEXT:
            return {
              step: Math.min(state.step + 1, state.finalStep)
            return {
              step: Math.max(state.step - 1, 0)
            return state

The object property spread feature ({...state}) is helpful to keep reducers concise and readable. It’s not a standard yet, but it is cleaner than the vanilla javascript equivalent:

    return Object.assign({}, state,
        step: Math.min(state.step + 1, state.finalStep)

Normalization and routing

Redux also helped us normalize our state. Instead of making nested payloads, we now return flatter payloads that will be normalized in the state. This normalization translates into reducers storing dictionaries of entities indexed by id. They can be used as a cache: if the entity does not need to be fresh and is already in the store, we can reuse it. Our payloads are smaller since entities can be referenced by id.

It seems like a minor victory for the memory footprint, but the biggest difference comes from the number of requests we need to make. Requests to the backend slow down the experience considerably, especially expensive analytics calls or history requests in our dashboards. Since our application is mostly single page with react-router, we were frequently loading data that had already been loaded. By normalizing it in the state, we are able to reuse it across the screens, and only load the missing analytics. This also means that you can start showing some of the data before everything is loaded, giving the user a snappier experience.

indexed reducer

A reducer with courses indexed by id

Although we do not use it at Knewton, normalizr can help automate the normalization by describing the API’s schemas.

Asynchronous actions

With React, the standard approach to loading data is to use componentDidMount() in a container component. When the component mounts, componentDidMount() loads the data asynchronously with jQuery or the fetch API. You can use the state of the component to keep track of the status of the request to display a loading icon or store an error message.

With Redux however, the idea is to store as little data as possible in the state of Components. What if two components need access to the same data? Which one should trigger a request? How do you even trigger a request and update the store?

We have two problems on our hands here:
1. How to make asynchronous requests to a backend using redux (what actions are you firing?)
2. What is triggering these requests to load data (componentDidMount in a container? which one is responsible then?)

It’s easier to explain Redux with a todo list app, but as soon as you start working on a real world application you will invariably run into these problems.

To solve them, most people add one of these middleware libraries to their Redux setup:
– the redux doc suggests using redux-thunk. With redux thunk, components can dispatches functions (thunks) instead of simple actions, and this function will be in charge of dispatching the actions.
redux-saga‘s approach is to watch the actions being fired and trigger side effects (sagas). This is a simple yet powerful approach, and it means components still dispatch regular actions instead of dispatching complex flows. Interestingly, redux-saga uses generators to handle asynchronicity, which makes the sagas look more like synchronous code.
– Netflix uses redux-observable, like redux-saga it focuses on triggering side-effects when certain actions happen. The middleware monitors action types and triggers “epics”, similar to the sagas in redux-saga.

The side-effect approach is powerful because it decouples the action (usually dispatched in your view) from the effects of the action in your application. It allows us to support behaviors like: “When a user navigates to screen A, update the page title in the browser and load the required data if not already available”. We were able to dramatically reduce the number of requests made to the backend to load data by combining sagas and our normalized redux state.

Although we will not dive into testing details, sagas are straightforward to test because you can easily control what the generator returns at every step. One of the downsides of sagas is that the sources transpiled for generators are hard to read: it turns all the possible states in the generator into a state machine (a big switch statement in a loop).

Backend requests and the spinner problem

Sagas still don’t solve one of our problems: how to make asynchronous requests to the backend? We also need to be able to support loading indicators like our React app did. This means that the application needs to be aware of the current state of a request (not started, started, completed, error).

To make the actual request, we replaced our jquery calls with the fetch api. It uses native promises, so we had to do a little plumbing to make the saga dispatch actions to describe the state of the request. The end result is a function similar to the one described in this GitHub issue. This abstraction is important to reduce the boilerplate code.

It takes a promise-based api function, the name of the action to dispatch, and arguments to make the request:

yield* apiCall(getAssignmentAnalytics, API_GET_ASSIGNMENT_ANALYTICS, assignmentId)

The sequence of actions that happens when an instructor opens an assignment analytics screen:


These @ prefixes are used as namespaces for the actions. It is not part of any standard, but this practice is now common in redux libraries.

If the analytics for this assignment were already loaded by a previous screen, the saga will skip the call, and the screen will load faster.

The loading indicator problem is now solved. The reducers can look for the api init action (@kn.api.init/API_GET_ASSIGNMENT_ANALYTICS) to flip an isLoading property in the state. When the success or error action is dispatched, the property is reset to false and the spinner is hidden.

You can keep waiting, it won't go away

A lot of work for a loading indicator

Redux now

After a year using Redux we can draw a couple conclusions:

Things we’re happy about:

– We can share the state across components thanks to the one global state, instead of having multiple local component states.
– It provides a strong structure for our file hierarchy, and has opinions on how to do things the Right Way.
– Components are decoupled from the state logic, which makes them easier to write and reuse
– Testing stateless components and pure reducer functions is a breeze. Since we started using redux, our test coverage has expanded dramatically

Things we still need to work on:

– Sanitizing api payloads is still a hard task, and finding the right compromise between multiple entity specific endpoints and bloated api payloads is still a problem. GraphQL offers a good alternative to Redux, but the cost of migrating existing REST services is high.
– We don’t use selectors and memoized selectors enough (see reselect), it should be the standard way of accessing the state

Some downsides:

– A lot more files need to be maintained, and there is an overhead for some features that would have been trivial to implement with vanilla react
– There is a learning curve for React, Redux and Redux-saga. The good news is that they each have solid documentation, a lot of blog articles, videos, and tutorials to help get started.
– Library churn is perceived negatively. React and Redux libraries have very short development cycles and their APIs are deprecated quickly. Updating our react-router files for the third time in less than 2 years can be discouraging, but the upside is that bugs get fixed quickly and the APIs are always improving.

The javascript landscape changes quickly, and we engineers tend to sometimes blindly adopt the new hot frameworks. If you are starting a new react project, think about whether you really need a state management library, or if Redux is the best for you. Is GraphQL more suited? Are React states enough? Do you favor fast but potentially dirty iterations or a more stable and structured architecture at a higher cost?

In our case, redux was a good solution, it proved easy enough to update an existing code base without changing the backend endpoints, and we were able to make it coexist with the old system while we updated.

Analyzing Java “Garbage First Garbage Collection” (G1GC) Logs

Garbage Collection can take a big toll on any Java application, so it’s important to understand its behavior and impact. After a JVM upgrade of Knewton’s Cassandra database, we needed a tool to compare the performance and impact of different garbage collection strategies, but  we couldn’t find an existing tool that would parse gc logs and analyze them.

This post explains the process followed and discusses some results that we believe may be useful while evaluating Java garbage collection strategies.

Evolution of Garbage Collection on Java

One of the biggest challenges for any programming language is the way that it will allocate memory while executing a task. Garbage collection is the action to regain sectors of memory that are no longer in use. These mechanisms are usually evaluated in three dimensions that are known as the three legs of the performance stool: latency, throughput, and footprint. In real life, there is always a tradeoff, and if all the stars align, you can optimize for two of the three dimensions.


Source: Explorations of the three legged performance stool

Over the course of its history, Java has handled garbage collection in different ways. The early JDK (before 1.4) had a single thread to sweep memory to find out which areas were in use and which ones could be reclaimed.

Java 4 introduced more complex algorithms based on tracing collectors The idea was to find root objects and identify all instances that were still in use. If an object didn’t have a trace, it was considered not in use, and that area of memory would be reclaimed. The trouble with this approach is that applications had cease all activities to allow the analysis. Examples of these “stop the world” algorithms include Mark and Sweep, Heap Compaction, and Copying Collector.

Garbage collection strategies before Java 7 relied on the definition of fixed regions of memory named generations:

  • Objects created recently were gathered in the Eden Generation,
  • When the garbage collector acted, objects that had traces to be in use were “promoted” to the Survivor Generation when an object were not referenced by other objects, they are removed.
  • Objects that survived multiple passes eventually reach the Permanent Generation.

One disadvantage of these garbage collection algorithms is that they require copying and deleting objects during promotions, temporarily using more memory and in some cases causing an overflow that could crash an application. As the size of those memory buckets is inflexible, this approach could limit applications when they needed to scale. Examples of these strategies are Concurrent Mark-Sweep (CMS) and Parallel Collector.

Java 7 update 4 came with a new algorithm: Garbage First (G1), which divides memory into heap regions of the same size. Unlike CMS, while sweeping memory, G1 uses a pause predictive model to identify areas likely to contain objects not in use and targets them as first to be evacuated, hence the name. Only the evacuation process, when objects from the heap that are still in use are compacted and copied to a different region to avoid fragmentation has a stop-the-world behavior. In our tests, the stop-the-world phase has been imperceptible, however.

G1 doesn’t sort objects by generation, thus there are no limits on generation size. The result is a strategy that frees up large portions of memory in less time and that keeps adjusting based on of previous garbage collections.

Java 8 supports G1, CMS, and Parallel Collector, and G1 has been proposed as the default strategy for Java 9.

Why Knewton Created a Parser/Analyzer

While upgrading the Cassandra database, the database  team searched for a tool that could evaluate objectively the performance of G1 vs. CMS; Although G1 was released in April 2012, utilities available for analyzing the garbage collection logs, like Netflix’s gcviz, didn’t support G1 logs; We also tried an approach proposed by Jeff Taylor of Oracle, but their code was not able to interpret all the information from our servers. The scripts are an update of Taylor’s approach: With AWK they collect the logs and format them as a table so they can be interpreted and analyzed with R.

The JVM options that we have been using are:

-Xms1995M -Xmx1995M
-Xss256k -XX:StringTableSize=1000003
-Xloggc:/<valid path>/gc.log

The resulting logs contain entries formatted as JSON:

2016-05-12T20:42:14.917+0000: 2.438: Total time for which application threads were stopped: 0.0204068 seconds, Stopping threads took: 0.0000649 seconds
{Heap before GC invocations=2 (full 0):
garbage-first heap  total 2043904K, used 120064K [0x0000000083400000, 0x0000000083503e60, 0x0000000100000000) region size 1024K, 111 young (113664K), 7 survivors (7168K)
Metaspace     used 18551K, capacity 18694K, committed 18944K, reserved 1067008K 
class space    used 2265K, capacity 2299K, committed 2304K, reserved 1048576K
2016-05-12T20:42:15.303+0000: 2.824: [GC pause (G1 Evacuation Pause) (young) 
Desired survivor size 7340032 bytes, new threshold 1 (max 1) - age   1:    6402368 bytes,    6402368 total, 0.0183564 secs]
  [Parallel Time: 17.2 ms, GC Workers: 2]
     [GC Worker Start (ms): Min: 2824.3, Avg: 2824.3, Max: 2824.3, Diff: 0.0]
     [Ext Root Scanning (ms): Min: 2.2, Avg: 2.2, Max: 2.2, Diff: 0.0, Sum: 4.5]
     [Update RS (ms): Min: 0.4, Avg: 0.4, Max: 0.4, Diff: 0.1, Sum: 0.8]
        [Processed Buffers: Min: 2, Avg: 3.5, Max: 5, Diff: 3, Sum: 7]
     [Scan RS (ms): Min: 0.0, Avg: 0.1, Max: 0.1, Diff: 0.1, Sum: 0.2]
     [Code Root Scanning (ms): Min: 0.0, Avg: 0.1, Max: 0.1, Diff: 0.1, Sum: 0.2]
     [Object Copy (ms): Min: 9.2, Avg: 11.7, Max: 14.2, Diff: 5.0, Sum: 23.4]
     [Termination (ms): Min: 0.0, Avg: 2.6, Max: 5.2, Diff: 5.2, Sum: 5.2]
        [Termination Attempts: Min: 1, Avg: 1.0, Max: 1, Diff: 0, Sum: 2]
     [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, Sum: 0.0]
     [GC Worker Total (ms): Min: 17.1, Avg: 17.1, Max: 17.1, Diff: 0.0, Sum: 34.3]
     [GC Worker End (ms): Min: 2841.5, Avg: 2841.5, Max: 2841.5, Diff: 0.0]
  [Code Root Fixup: 0.0 ms]
  [Code Root Purge: 0.0 ms]
  [Clear CT: 0.1 ms]
  [Other: 1.1 ms]
     [Choose CSet: 0.0 ms]
     [Ref Proc: 0.6 ms]
     [Ref Enq: 0.0 ms]
     [Redirty Cards: 0.0 ms]
     [Humongous Register: 0.1 ms]
     [Humongous Reclaim: 0.0 ms]
     [Free CSet: 0.1 ms]
  [Eden: 104.0M(104.0M)->0.0B(1189.0M) Survivors: 7168.0K->8192.0K Heap: 117.3M(1996.0M)->15.0M(1996.0M)]
Heap after GC invocations=3 (full 0):
garbage-first heap   total 2043904K, used 15360K [0x0000000083400000, 0x0000000083503e60, 0x0000000100000000) region size 1024K, 8 young (8192K), 8 survivors (8192K)
Metaspace       used 18551K, capacity 18694K, committed 18944K, reserved 1067008K
class space     used 2265K, capacity 2299K, committed 2304K, reserved 1048576K

The Cassandra upgrade was a consequence of a fix for a Thrift Buffer anomaly; this is another example of the continuous efforts of Knewton’s database team to tune and optimize our database servers.


Knewton has conducted extensive analysis of the performance and tuning of our Cassandra nodes, the following are some of the information collected from a cluster with 24 nodes worth of information that are presented as the hypothetical Knerd Cassandra cluster, also described in Dr. Josh Wickman’s post. Each node has 2 CPU’s and 8 GB of RAM.

The following examples show ways to analyze the the gc.logs. They consider only the logs obtained from Production Cassandra nodes during a 24 hours window; during that period it was also done a full repair on the database, and an automated backup. Note that the results shown were from the test once after the change was done; for exploratory phases we used cassandra-stress tool for functional testing, and an end-to-end load test using Locust.

The following histogram compares the duration of garbage collection execution and groups them in three categories:

  1. Real Time: the total time from start to finish for the garbage collection
  2. User Time: the time used for processes that were not related to the kernel (operating system) and pauses.
  3. Sys Time: The time spent by kernel operations,  most of it corresponding to time that the CPU is locked.lso known as stop-the-world scenario.


Time spent on garbage collection in a “stop-the-world” phase (Sys Time) is a fraction of the time when user processes are blocking operations. The garbage collection adapts to minimize impact on the the application.

We can also compare the sizes for each generation before and after the garbage collection and confirm the impact that a Cassandra nodetool repair action can have on the garbage collection.

First, a brief explanation of how repairs work in Cassandra: Cassandra is a masterless database, which means that each server that is part of the same cluster will share data (sometimes several copies of the same record). Each node is responsible for knowing where to find a record for a fraction of the data; each record will have a unique token identification; and each node will be responsible for keeping track of all the copies of specific tokens.

When a repair is requested, each node will be responsible for validating the consistency of all copies of the data. (If there are differences the latest version will prevail.) Depending on the amount of data and the replication factor, this can be a very expensive operation on CPU and memory.

The graph below shows the total memory used before and after a garbage collection


At (1), the command nodetool repair executed a sequential repair in the entire cluster, which used more memory after the garbage collection since the anti-entropy mechanism of the repair put more objects in use.


Garbage collection has a non deterministic behaviour: it does not start at the exact time that the memory threshold is surpassed (2), as sometimes it can take fractions of the second afterward, also, there can be a delay while G1 determines the areas that will release the most memory; that delay will depend on the heap size allocated and how much is in use, this can be an area of concern for environments with limited resources (i.e.: development virtual environments, docker containers).


After the repair, the pattern changed, with garbage collections that effectively decreased the memory used below 0.25 GB (3); one explanation is that the garbage collector learns from its previous runs when the repair was being executed.


Finally, there is an additional spike (4) caused by our automated backup mechanism.


From the total heap size, we can also analyze and compare components; in the example we’ll show only the parts of the New Generation: Eden and Survivor spaces, as they usually are the earliest indicators of issues with memory consumption.

Garbage collection is triggered by the total heap size heap, so garbage collection will happen even when the Eden and Survivors Generations are relatively small.

After garbage collection, the Eden space should be at or close to zero, while the Survivor space gets bigger.. The Eden space is also larger than the Survivor space; in this example, by 20:1. This ratio will be different depending on the application.

If needed, we can get more granular information about the different operations (like scanning memory sections or copying objects) that take place during a garbage collection; in the same way as before, we can also identify the change of the behavior that occurs during a repair.

As explained before, a Cassandra repair can consume a lot of resources; but this is necessary to ensure that the data is consistent (especially when there are constant updates). One finding of the composition is that the change on the behavior pattern: the Objects Copy operation (in green) stayed longer hours after the repair was completed. which could be also an effect of the adaptive nature of the G1 algorithm.


The algorithms and strategies for garbage collection will keep evolving and changing, as we have not yet found one that maximizes throughput with minimum latency and footprint. In the meantime, Java applications will continue to rely upon G1GC for versions 7, 8, and, most likely, 9. By sharing some experiences with logs from Generation First Garbage Collection (G1GC) and open source data modeling tools, we hope to help other database teams understand their systems better.


For Garbage Collection

IBM – Java theory and practice: A brief history of garbage collection

infoq – G1: One Garbage Collector To Rule Them All

Oracle – Getting Started with the G1 Garbage Collector

Oracle – Java SE 6 Garbage Collection Tuning

jaxenter – Java 9’s new garbage collector: What’s changing? What’s staying?

Oracle – Understanding G1 GC Logs

For visualization tools of Garbage Collection logs


Netflix’s gcviz

For data modeling with R


Code is available at this gist

How to Replace a Microservice: Swapping Out the Engine in Mid-Flight

Photo by Michael VH via Flickr (CC BY 2.0)

Photo by Michael VH via Flickr (CC BY 2.0)


So you’ve built an important new service. It satisfies current use cases and is also forward looking. Pretty soon it’s mission-critical, so you optimize where you can and keep it humming along.

Then the product changes, and your service needs to do something different. You don’t have time to rewrite from scratch (and you’re never supposed to do that anyway), so you refactor what you can, then start bolting stuff on.

A couple of years later, the product changes again. This time, refactoring isn’t going to cut it: the entire design needs to change. So you take a deep breath and make your case for the big rewrite. The team is justifiably skeptical, but you’re persuasive, and everyone signs off.

Great! The service is going to be so much better this time. With two years of domain knowledge, you know where all the landmines are, you know the stack and the usage patterns like the back of your hand.

Still, it’s slow-going. A bunch of other services now depend on the existing interface in production. Downtime is unacceptable, and a flip-the-switch cutover is just asking for trouble.

Now what?

The Knewton Graph Store

Knewton’s mission is to personalize education for everyone. To do that, Knewton organizes educational content in graph form that shows how knowledge builds on itself. Graphing is a cornerstone of what gives our platform its power.

Knewton’s graph store initially used an event-based model. In order to modify a graph, internal client code would do the following operations:

  • Read graph data using a library built on Titan (a property graph library with transactions and pluggable datastores) and backed by a Cassandra cluster.
  • Make the desired modifications and commit the Titan transaction.
  • Intercept the Titan transaction, package the modifications up into a delta format and write the delta to a queue.
  • Consumers of the queue would pick up the delta messages and write them back into another Cassandra database in different forms.

By the middle of 2015, Knewton was using graphs in ways that required much lower latencies, which meant that frequent round trips to a database wouldn’t work any more. Graphs had grown significantly bigger, which had caused problems with storage and database schemas. Beyond that, many smaller problems also made the service difficult to work with. So we undertook a project to rewrite the graph store.

Design and Development

The design and development of the new service were relatively painless. It’s tempting to claim that this was purely due to how fantastic Knewton’s engineers are (although we are and you should come work with us), but it’s worth noting how important hindsight was to the process. Having seen how the graph store was used in the real world helped us know what to build, what to fix, and what to avoid. One of Knewton’s core values is to “ship and learn”, and after several years of seeing the graph store evolve, we knew what we needed and built the new service accordingly.


Migration was by far the most complicated phase of the project. It was composed of two parts: migrating the existing data to the new service, then migrating clients to the new service.

Migrating the data

Best practices for doing data migration usually specify a five-step process:

  1. Set up the new datastore (or new table or schema).
  2. Modify existing business logic to write to both the old and new datastores while still reading from the old one.
  3. Backfill historical data into the new datastore.
  4. Start reading from the new datastore.
  5. Stop writing to the old datastore.

Due to the event-sourced nature of writes, we were able to accomplish Step 2 by adding a new consumer to the events topic. Knewton uses Kafka as our main messaging platform, and one of its most useful features is the ability to have multiple consumers subscribe to the same message stream at the same time. This allowed us to mirror incoming data to both the new and old datastores with a minimum of trouble.

Difficulties arose in Step 3 due to issues with old data. Midway through the migration, we discovered that data stored several years earlier didn’t match current schemas. The validation logic had evolved quite a bit, but the historical data had never been cleansed.

This absence of strict validation mechanisms is a downside of many NoSQL solutions: since the database isn’t enforcing any schemas or constraints, the burden falls entirely on application logic, which typically changes much more quickly than schemas do.

Migrating the data ended up requiring several weeks of time-consuming manual investigation and data introspection.

Migrating the clients

Though ultimately worth it, migrating the clients off of the old service took a lot of time and effort because of all the necessary cross team coordination. When trying to switch clients onto a new interface for your services, it’s customary to leave the old interfaces available for a period of time to allow other teams some breathing room to do their migration. While we were able to accommodate our clients using this strategy, unforeseen interdependencies between clients caused us to have to spend several more weeks than planned coordinating the final rollout.


In the end, we were able to serve all of our clients’ needs while simplifying the system and cutting hosting expenses by more than 70%. Knewton completed the migration with no data loss, no downtime, and no outages. Knewton is now able to handle significantly larger graphs and build new features that enable our partners to do things that weren’t possible before.

Lessons Learned

Migrating takes longer than you think

Migration is often the phase of the project with most uncertainty, so it’s difficult to forecast accurately. It’s easy to assume that the difficulty of migrating to a new service will purely proportional to the amount of data being moved, but there are several other factors to keep in mind:

Datastore differences

Moving between different datastores makes the migration much more complicated. You have to account for the particular mechanisms, guarantees, and performance characteristics of your new datastore. Changing datastores also typically means that you have to rewrite tooling and documentation as well.

Data hygiene

Data will get dirty. There are always going to be a few objects or rows that don’t exactly conform to the rules you expect them to, especially in NoSQL databases where hygiene mechanisms such as constraints and foreign keys often don’t exist. Unless you’ve taken great pains in the past to make sure your data is exactly what you think it is, migration time is when you’ll find out which of your assumptions aren’t true.


You have much less control over the clients of your service than you do over your own code. Clients have their own services and their own schedules, and you’re the one throwing in a monkey wrench by changing one of their dependencies. There are several ways to accommodate clients, but each approach has costs:

  • Be flexible about cutover deadlines
    • Clients have their own schedules to manage,so it’s a good idea to give them some breathing room to do the migration. A hard cutover date is a recipe for problems.
    • Tradeoff: total migration time (and thus the total time you have to maintain the old service for backwards compatibility) can stretch out longer.
  • Get feedback early and continuously
    • A good way to avoid any unforeseen issues in client migration is to continually solicit feedback on both technical considerations (like interface design and documentation) and project management considerations (like timing and scheduling).
    • Tradeoff: communication requires a lot of energy and effort from all involved parties, with a payoff that is not immediately visible.
  • Minimize the need for coordination
    • Coordination across multiple teams is difficult, and even more so with scale. In general, it’s a good practice to avoid it because it reduces the amount of time any one team spends waiting for other teams to complete precursor tasks.
    • Trade-off: Enabling all clients to complete the migration independently requires more effort and planning from the service owner, especially in response to unforeseen events.

Rewrite only when you need to rearchitect

It’s sometimes difficult to judge when to rewrite a service from the ground up, but a good rule of thumb is to rewrite only if the system’s architecture itself is blocking your progress. It can be easy to convince yourself that the code may be confusing, or that the data model doesn’t make sense, or that there’s too much technical debt, but all those issues are solvable “in place.”

If you can refactor the code, refactor the code. If you can change your database schema incrementally, change your schema. When you can’t move forward any more without changing the architecture, it’s time to think about a rewrite.

Don’t look too far down the road

One of the easiest traps to fall into in software engineering is to build a service (or feature or library or tool) because “it might be needed down the road.” After all, software engineers are supposed to be forward looking and anticipate issues before they occur. But the further you look, the less clearly you can see. It’s important to resist temptation and build only those things that you know you need now (or, at most, in the immediate future).

Simplifying Cassandra Heap Size Allocation

As discussed previously, Knewton has a large Cassandra deployment to meet its data store needs. Despite best efforts to standardize configurations across the deployment, the systems are in a near-constant flux. In particular, upgrading the version of Cassandra may be quick for a single cluster but doing so for the entire deployment is usually a lengthy process, due to the overhead involved in bringing legacy stacks up to date.

At the time of this writing, Knewton is in the final stages of a system-wide Cassandra upgrade from Version 1.2 to 2.1. Due to a breaking change in the underlying SSTable format introduced in Version 2.1, this transition must be performed in stages, first from 1.2 to 2.0, then 2.0 to 2.1. While the SSTable format change does not have a direct bearing on the topic of interest here, another change introduced in Version 2.0 is of material importance.

This post describes some nuances of Cassandra’s JVM (maximum) heap memory allocation that we discovered along the way, and how we handled aspects that posed a threat to our systems.

A problem case

Our journey of discovery began with a struggle to address crashes on a particular cluster due to running out of heap memory. For ease of reference, let’s call this cluster Knerd. In our AWS infrastructure, the Knerd cluster was running on m4.large nodes having 8 GB of memory, and by default Cassandra was allocating 4 GB of heap to each node. After exhausting other options, we decided to migrate the cluster to m4.xlarge nodes with 16 GB of memory, believing that these larger nodes would be allocated 8 GB of heap.

Imagine our surprise to see the new nodes come up with 4 GB of heap, same as before.

Default heap allocation in Cassandra 1.2

This phenomenon forced us to take a harder look at the code behind the heap allocation, contained in the config file. The relevant code block, as it ships with Cassandra 1.2, is contained in the function calculate_heap_sizes:

   # set max heap size based on the following
   # max(min(1/2 ram, 4096MB), min(1/4 ram, 8GB))
   # calculate 1/2 ram and cap to 4096MB
   # calculate 1/4 ram and cap to 8192MB
   # pick the max
   half_system_memory_in_mb=`expr $system_memory_in_mb / 2`
   quarter_system_memory_in_mb=`expr $half_system_memory_in_mb / 2`
   if [ "$half_system_memory_in_mb" -gt "4096" ]
   if [ "$quarter_system_memory_in_mb" -gt "8192" ]
   if [ "$half_system_memory_in_mb" -gt "$quarter_system_memory_in_mb" ]

In short, this code block imposes a cap on each of two parameters, then uses the larger parameter as the maximum heap size.

The graph below shows the heap size in GB as a function of the system memory (also in GB), up to the absolute cap of an 8 GB heap. (We’ll focus on integer values for ease of discussion.)


The heap size scales linearly as half the system memory up to 4 GB, then plateaus for systems with total memory between 8 GB and 16 GB. This is precisely the range of values for which we observed a steady maximum heap size of 4 GB on the Knerd cluster.

This plateau is a direct consequence of the half-memory parameter in the shell code being capped at a smaller value than the quarter-memory parameter. While scrutiny of the shell code above will show exactly the behavior displayed in the graph, a new user is unlikely to see this coming, especially in the typical NoSQL climate with short time scales and developer-managed data stores. Indeed, since Cassandra’s heap scales with system memory, it is reasonable to assume that it will scale smoothly for all values of the system memory below the cap. The plateau is at odds with this assumption, and therefore constitutes behavior that might be deemed unpredictable or counterintuitive, and can disrupt provisioning and capacity planning. In other words, this internal Cassandra feature violates the Principle of Least Surprise.

Beyond the plateau, the curve begins to rise linearly again, this time scaling as one-quarter of system memory. Then, at a system memory of 32 GB, the heap size reaches the absolute cap of 8 GB.

Modified heap allocation in Cassandra 1.2

If you’re using Cassandra 1.2, we strongly recommend doing something about this plateau in order for your nodes to scale predictably. Here are some options.

The 8 GB cap is generally desirable, because if the heap is too large then stop-the-world garbage collection cycles will take longer and can cause downstream failures. On the other hand, the system memory value upon reaching the cap is not terribly important. It could be 32 GB, as in the default scheme, or it could be 24 GB with little noticeable difference. Thus the plateau can be eliminated in one of two ways:

  1. Keep the remaining features, shifting the transition points to lower system memory values.cassandra-2-1-modified-heap-no-plateau
  2. Take the average over the values less than system memory of 16 GB, which yields a simple one-quarter slope up to the cap.


Averaging is simpler, but it may not be suitable for small values of system memory.  Either approach can be achieved with a simple set of conditionals specifying the scaling for each segment; for example, the first could be coded like

   if [ "$system_memory_in_mb" -lt "8192" ]
   elif [ "$system_memory_in_mb" -lt "24576" ]

Either approach eliminates the undesirable plateau. An upgrade to Cassandra 2.0 or above also effectively removes its impact.

Heap allocation in Cassandra 2.0+

Cassandra 2.0 introduced a change in heap allocation, although this change was easy to miss since it did not get highlighted in the What’s new in Cassandra post for 2.0.

The change is a small one: the cap of the one-half-memory variable was changed from 4 GB to 1 GB, such that the plateau occurs at a heap size of 1 GB and extends from system memory values of 2 GB to 4 GB. While the plateau still exists, it is no longer an issue for systems having memory greater than 4 GB. Production systems will doubtless be running with more memory than this, so upgrading alone will solve the scaling predictability issue in nearly all cases.

However, systems previously operating with system memory at or below 16 GB will have a smaller heap after upgrading from 1.2 to 2.0+. This is because the heap now scales as one-quarter of the system memory for a broader range of memory values than it did on Cassandra 1.2. Although the picture hasn’t changed for high-memory systems, this shift to smaller heaps can cause further problems on systems with relatively small node memory, such as the Knerd cluster.

Moving memtables off heap in Cassandra 2.1

The change in the cap in Cassandra 2.0 may be related to the effort to move more Cassandra components off of the JVM heap. Version 2.0 offered off-heap partition summaries, and version 2.1 introduced the ability to move memtables off heap. These features make a smaller heap size desirable in order to make room for the memtable components in off-heap memory.

Your heap may behave better, at the tradeoff of possible out-of-memory events at the system level. Since each Java process allocates a separate heap, other Java processes running on the same machine (e.g. Datastax OpsCenter) will result in less memory available for Cassandra’s off-heap components. In the extreme case, the operating system may kill the Cassandra process or crash altogether, producing identical behavior to the heap-crash case from the perspective of the application (i.e. Cassandra is not available on that node).

Knewton’s approach

At first, we applied a Band-Aid fix in Cassandra 1.2 to always allocate a heap size equal to half the system memory. This worked reasonably well, until migrating to 2.1. The relevant change to configuration templates had only been made for Cassandra 1.2, and other versions fell back to the default allocation code (which is how we discovered the properties described in this post).

That’s when we decided it was time to do some testing. We had three options:

  1. Move only the memtable buffers off heap (offheap_buffers);
  2. Move both the memtable buffers and the objects off heap (offheap_objects);
  3. Leave the memtables on heap and raise the heap size.

We regarded raising the heap size as our fallback, since we knew how the system would behave if we did that. Due to considerations related to scaling (e.g. data size growth), we weren’t very confident that any testing would yield a clear choice of how large to make the heap. In the end, then, this amounted to simply raising the maximum heap size to half the system memory. While doing so would be nice and simple, we hesitated to make our Cassandra installation deviate this much from the base installation.

So we experimented with off-heap memtables. As noted earlier, moving components off of the heap appears to be related to why the maximum heap size calculation was changed in the first place. Moreover, moving both the memtable buffers and the objects off heap is slated to become the default behavior in a future release of Cassandra, which led us to believe that this option’s performance is good enough that it is recommended in most cases.

Both offheap_buffers and offheap_objects showed good results in tests, reducing the stress on the heap and system performance overall relative to heap_buffers (the default configuration). As expected, the GC time was lowest for offheap_objects, as was the end-to-end processing time. To our surprise, however, offheap_buffers was the star performer. This setting yielded the best numbers while maintaining the most consistent read and write rates. In some of these measures, such as percentage of heap used and GC pause time, offheap_buffers narrowly edged out offheap_objects. In others, notably write latency, bloom filter false-positive ratio, and average response time, offheap_buffers was the clear winner. This performance was enough to convince us to move forward with offheap_buffers.

After deploying this change, we mostly saw positive results. Unfortunately, we did still sometimes see some transient high-heap events, mostly on clusters with the large data sets. On the other hand, these events were not accompanied by crashes. This caused us to reevaluate our longstanding alert threshold for heap usage, whereupon we concluded that our threshold needed some tuning. This resolved the remainder of the issue.

In summary, investigation of some puzzling behavior revealed unexpected heap allocation details in Cassandra’s internal configuration. Studying the configuration code allowed us to evaluate our options to address the behavior we originally observed. Our goal was to yield more predictable heap allocation results when scaling a cluster without impeding Cassandra’s performance. We achieved this by moving the memtable buffers off heap, which is possible in Cassandra 2.1+.

Digging Deep Into Cassandra Thrift Buffer Behavior

Everyone who works in tech has had to debug a problem.  Hopefully it is as simple as looking into a log file, but many times it is not.  Sometimes the problem goes away and sometimes it only looks like it goes away.  Other times it might not look like a problem at all.  A lot of factors will go into deciding if you need to investigate, and how deep you need to go.  These investigations can take a lot of resources for an organization and there is always the chance of coming up empty handed which should never be seen as a failure.

This post will summarize an investigation into some Cassandra memory behavior that the database team at Knewton conducted recently.  It is a good illustration of the kind of multi-pronged approach needed to unravel strange behaviors low in the stack.  While we will be specifically talking about Cassandra running on AWS the takeaways from this article are applicable to systems running on different platforms as well.

Uncovering a Problem


Knewton had a Cassandra cluster that was very overprovisioned. The instances were scaled up (each instance is made more powerful) rather than scaled out (more instances are added to the cluster).  Cassandra is a horizontally scalable datastore and it benefits from having more machines rather than better ones.

So we added more machines and reduced the power of each one. We now had 4GB of available memory for the Java heap instead of 8GB.  This configuration worked well in all of our tests and in other clusters but in this specific overprovisioned cluster we found we had scaled each node down too much, so we moved to machines that could accommodate 8GB of heap, m4.xlarge instances.

Anomalous Behavior on New Instances

After moving all nodes over to m4.xl instances we saw our heap situation stabilize. However we began to notice anomalous CPU and load averages across the cluster.  Some nodes would run higher than other nodes.  The metrics showed that, out of the four cores on a m4.xl instance, one was  completely saturated.

If you saw this load on its own you would not think that it is a problem. Total usage  of CPU on the box is at 25% and there are no catastrophically long GC pauses.  However, the cluster was not uniform, which called for further investigation.

In these CPU graphs, you can see nodes running on low CPU that encountered something that would rapidly promote them into the high CPU band, and they would almost never drop back down.



We found that this graph was correlated with the graph of average garbage collection time.


When nodes promote themselves into the high CPU band, their GC times spike.

What is Cassandra holding in its heap that is causing GC issues?  With a lot of overhead in memory and CPU, crashes are not a risk, but performance is far from where we want to be.

Before tackling the garbage collection question, we have two other questions that we can answer already:

Why is this behavior showing up now?

We had an 8GB heap before and should have seen the same behavior.  The reason we only saw this CPU and GC behavior once on m4.xlarge instances is twofold:

  1. Something unique in this cluster caused a 4GB heap to be fatal but an 8GB heap to be adequate.  Expanding the heap did not get rid of the problem.
  2. The original cluster that had an 8GB heap was around for years and all nodes were promoted to the high CPU band.  The baseline operation of the cluster looked like the high CPU band.  It is only because of previous issues with provisioning this cluster that we were  watching closely when we moved to these m4.xlarge instances.  This highlights the importance of understanding your baseline resource utilization and not assuming that it means you are in a healthy state.

Why is this behavior a problem?

Even though the high usage band is not a problematic load, the behavior of the cluster was unexpected, and this promotion behavior is problematic.  The biggest reason that is problematic is that it meant we had a nondeterministic factor in our latencies.  We can expect different performance on a node that is in the high CPU band than one in the low CPU usage band.  However we cannot predict when a node will promote itself as we don’t know the cause for this behavior.  So we have an unknown factor in our system, which is dangerous for repeatability and reliability.


Investigating further is resource intensive, and often your senior engineering staff has to undertake the investigation, as it requires some degree of independence and experience.  So make sure you decide that the  problem is  actually worth the time of your organization before sinking days or weeks of engineering time into it.


The first thing to do is to look through the relevant logs.  Looking through the Cassandra logs we found, unsurprisingly, a lot of garbage collections.  We’d had GC logging on and found several “large block” messages in the CMS garbage collection logs at about the time these promotions happened.  To get more visibility into what is actually in the heap, we turned on GC class histogram logs, but these said that almost all the memory was being held in byte arrays.

Not helpful.  Time to go even deeper and see what is in the heap.

Heap Dump

So we took a heap dump from a problematic node and on a node that was exhibiting good GC behavior as a control.  A heap dump is the size of the used Java heap. Dumping the heap is a “stop the world” operation for the process you are dumping, so when doing this in production be sure that the service can be unresponsive for a minute or more.  The file is binary and examining it is labor intensive, so it is best to move the heap dump to a computer that’s not being used in production to investigate.

We used the Eclipse Memory Analyzer Tool (MAT) to investigate the heap dumps and found that almost all of the memory was taken up by the write buffer of the TframedTransport objects.  There were several hundred of these objects and the write buffer size ranged from 1kB to 30MB, with many in the range of 20-30MB.  We saw similar objects in the heap dump of the control node, but not nearly as many.  The write buffer contains what is being written to the Thrift transport socket and does not correspond to Cassandra reads and writes.  In this case Cassandra is writing the output of an incoming read to this buffer to send to the client that has requested the data.

It became pretty clear that this was a Thrift protocol issue so we searched for related issues.


Literature Search

Any problem you find has been reported in some way or another by someone else, especially if you are using anything but the latest versions of open-source software,  It is very useful to search the web for related problems at every step of the investigation, but as you get more specific you might uncover things that previously you would not have encountered.  In this case the investigation led us to the Thrift protocol, which is not something we would have searched for earlier.

The Thrift library that our version of Cassandra used had some memory issues referenced in CASSANDRA-4896. These are referenced again in more detail and resolved in  CASSANDRA-8685.  So you don’t have to read through the tickets, basically the write buffer — the thing taking up all of our space and the likely cause of our issues — will grow to accommodate a larger payload but never shrinks back down. So once an operation is requested of Cassandra that has a large payload, such as a query returning a 20MB record, these buffers increase in size and never drop back down.

This behavior lines up with what we are seeing in the cluster, where a node could be operating smoothly until a large set of queries come in and expand the write buffers on the Thrift frame objects.  Once the frame size increases, it stays big for the duration of the connection with the client. (We’ll come back to this point shortly).

This Thrift behavior is noted in THRIFT-1457 and a fix is out for it. The Thrift library fix is also rolled into Cassandra 2.1.3 and later.  This is also only in Thrift so any clients using CQL won’t see this.

Verify the Hypothesis

The link between this memory behavior and client connections was a working hypothesis. We got a very strong signal supporting the theory one morning when we deployed a new version of services that communicated with our cluster.  This deployment meant closing all of the Cassandra connections on the client side during a low traffic period and then reopening them.  Cold caches cause a lot of queries to occur during a deploy, but these queries do not return large blocks of data, so the TframedTransport write buffers do not expand. Memory stays low until we start getting production traffic, which then expands the buffers as described above.


The CPU and GC graphs from the deploy at 10:45 show that we have almost no traffic beforehand.  We’re still holding onto these large memory objects and they aren’t cleaned up because the TframedTransport objects live as long as the corresponding client connection.


Once the client connections close, the objects are garbage collected and the new connections start with small TframedTransport write buffers. So we see low heap and GC times at about 10:45.


There is a dramatic heap usage drop when the deploy happens at 10:45.


Technical Wrapup

The behavior of these TframedTransport buffers makes the Thrift protocol in pre-2.1 Cassandra dangerous.  Each connection holds a TframedTransport object in the Cassandra heap that can grow depending on the query executed but cannot shrink.  Memory then begins to fill with these TframedTransport objects as they cannot be cleaned out as long as the connection to the client is open.  If the client connection is severed by a client restart or a deploy, these objects get garbage collected.  New connections begin with a small TframedTransport object size which behaves well with GC / Heap / CPU on the node until a large query comes in which then expands the write buffer on the TframedTransport object again,  promoting the node to a high-CPU-usage node and damaging its performance and response time.  Eventually all nodes in the cluster reach this high-CPU-usage band and the extra resources taken up by these large TframedTransport objects become a burden on the performance of our database.

Next Steps

Because these buffers never decrease in size, the memory usage of the Cassandra instance is bounded only by the number of connections to the cluster, which is determined by the number of active clients.  Our previous 4GB heap was unable to accommodate the number of clients in Knewton’s infrastructure so it would perpetually crash, however 8GB is sufficient.

Once we have identified the source of the problem, we have to decide what, if anything, to do about it. Some options would be:

  • Upgrading to Cassandra 2.1.3 or later.
  • Migrating to CQL from Thrift

We know that the behavior is not fatal to our processes, so depending on the opportunity cost of each of these processes, we can decide if we will live with this behavior or if we are going to fix it.

Cost Analysis

It is difficult to quantify how much an investigation like this costs in terms of time, opportunity cost, and money to an organization.  Before starting an investigation, it is useful to timebox the project and restrict yourself to a certain number of engineering hours to investigate.  In hindsight one might quantify the cost of this investigation as:

Monetary Cost = Engineering Time – Hardware Optimization

In this case we will need less hardware once this memory issue is resolved, perhaps three fewer m4.xlarge instances.  The yearlong cost of a reserved m4.xl instance is currently $1,200. We spent about three engineer days investigating.  Estimating engineering costs at $200 an hour (including overhead and opportunity cost to the organization) each engineering day is $1,600.  So over the year, we spent about $1,200 more on the investigation than we recovered from optimizations.

Final Remarks

Beyond monetary considerations, we have a better understanding of our system, which benefits us in several ways.  We are able to adjust our alerts to better monitor the system.  We can justify our instance sizes.  And finally we are able to support larger engineering objectives, advising the organization on the best course of action for future projects.  For example, we decided to move aggressively to upgrade this cluster to 2.1.

Deep investigations like this do not come with guarantees of strong takeaways or definitive answers.  They are expensive in terms of time and often require senior engineers to dig into deep system internals.  Yet at Knewton we have found that these investigations are valuable for the database team and the wider tech organization.

Introducing the Knewton Client Library

Why Build a Client Library?

As part of Knewton’s mission to personalize learning for everyone, Knewton strives to provide an easy-to-use API that our partners can leverage in order to experience the power of adaptive learning. Knewton’s API follows industry norms and standards for authentication and authorization protocols, error handling, rate limiting, and API structure.

To authenticate incoming API requests from client applications, Knewton uses two-legged OAuth 2.0. Currently, each partner has to implement OAuth 2.0 on the client side from scratch, which may increase the potential of authentication failures when users try to log in.

Partners also need to be well-versed in Knewton’s API standards. Using the right API endpoint along with valid credentials is critical to ensuring that the client application scales effectively upon launching. Beyond the initial integration, partners are generally cautious about upgrading their applications when Knewton releases updates or new endpoints.

To help simplify and standardize development of client applications, Knewton is starting to pilot the Knewton Client Library (KCL). The KCL boilerplate code will allow partners to build client applications faster and more efficiently. Implementing KCL in client applications will allow partners to devote more resources to the actual client application and enriching the overall experience of the students and teachers who use it. Using KCL will also offer partners a smoother upgrade path, since Knewton will absorb any backwards compatibility complexities into the library.

Components of the Knewton Client Library

The Knewton Client Library uses the Swagger framework. Utilizing the contract first method of SOA development, Swagger provides a set of tools for REST APIs. It has a standardized definition format which can be used to generate code and documentation.

A centralized API definition ties together 1) server side implementation, 2) client code generation, and 3) API documentation, ensuring that they stay in sync and reducing time spent writing boilerplate code.

A centralized API definition ties together 1) server side implementation, 2) client code generation, and 3) API documentation, ensuring that they stay in sync and reducing time spent writing boilerplate code.

As part of the integration process, our partners must create and register student accounts through the Knewton API. The following Swagger definition is a simplified example of a dummy endpoint to create a student account, POST /student.

# Path
    summary: Create a student
    operationId: createStudent  
      - name: StudentCreatePayload
        in: body
        required: true
              description: Student's first and last name
              type: string
              description: Array of Course IDs
              type: array
                type: integer  
              description: Date student was added
              type: string
              format: date
        description: On success, returns no content

This example describes a dummy POST REST endpoint which takes in a JSON object with a student’s name, courses, and creation date. A request to such an endpoint might look like:

curl -H "Content-Type: application/json" -X POST -d '{"name":"Knewton Student","courses":[19, 123],"date_created":"September 1, 2016",}'

The Knewton Java Client Library

The swagger-codegen library can generate API clients in many popular programming languages from a definition file. Let’s take a look at how we use this to generate the Knewton Java Client Library.

We use the swagger-codegen library to generate a Java Retrofit API interface from our API definition. Retrofit uses annotations on interface methods and parameters to describe the API.  These are then parsed by an HTTP client to execute requests.

Here is the Retrofit interface, which was automatically generated from the student creation endpoint definition. The interface matches the Swagger definition above: it defines a POST request to the “/student” endpoint, which takes a StudentCreatePayload (containing name, courses, and date created) as a parameter that is automatically serialized to JSON and added to the request body.

public interface Api {

    Void createStudent(
        @Body StudentCreatePayload studentCreatePayload

The auto-generated code provides a baseline client implementation. To make KCL as simple to use as possible, KCL wraps the auto-generated client with higher-level functionality such as authentication and rate limiting. The wrapper also refactors methods, parameters, and return objects to provide a more natural interface for Java developers.

For example, while Java has the java.util.Date object, the autogenerated code uses a String. The wrapper allows users to pass java.util.Date objects and will restructure them to the strings that the Swagger-generated API class expects.

Continuing with the student creation example above, we can further simplify the partner-facing interface. The API endpoint requires that the caller provides the date that the student record is created. There is no need for KCL to require the current date, as this can be determined automatically. Therefore the only parameters in our wrapped createStudent method are the student name and course list.

public void createStudent(String name, List<Integer> courses) {
    Date dateCreated = new Date();
    StudentCreatePayload studentCreatePayload = new StudentCreatePayload();

Instead of writing all of the code in the create student method, a partner application can now simply call wrapper.createStudent(name, courses);

OAuth, Retrying, and Logging

KCL uses OkHttp as its underlying client library.  OkHttp integrates well with Retrofit and provides an easy way to extend its functionality via Retrofit’s interceptor interface. Each client can have multiple interceptors. Each interceptor can modify a request or response at any point. For example, an interceptor could monitor, amend, and retry requests based on information read from responses.

As the diagram below illustrates, OkHttp nests interceptors added to a client, so the first interceptor to process a request will be the last interceptor to process a response.

KCL uses interceptors for logging, OAuth and retrying on rate limiting.

KCL uses interceptors for logging, OAuth and retrying on rate limiting.

Logging Interceptor

The logging interceptor is the simplest interceptor because it does not modify or react to a request or response. This interceptor logs information from the request made to Knewton and the response received from Knewton.

The following is a simplified example of the KCL logging interceptor. This sample interceptor logs the request headers and request body, as well as the response code.

public Response intercept(Chain chain) {
    Request request = chain.request();

    // log request information
    logger.log("Request Headers: " + request.headers());
    logger.log("Request Body: " + request.body());

    // Continue with the interceptor chain
    Response response = chain.proceed(request);

    // log response information
    logger.log("Response Code: " + response.code());

    return response;


OAuth is an industry standard authentication protocol used for API authentication.

KCL includes an OAuth interceptor to handle authentication. If a user is already authenticated, there will be an authorization header on the request and the interceptor will no-op. If there is no authorization header, two possible cases would require re-authentication. Either the user does not have an access token in which case KCL requests one, or the user’s access token is not working. Either the access token has expired, in which case KCL uses the refresh token to request a new access token, or the refresh token has expired, in which case KCL must request a new authorization altogether.

Here is a sample OAuth interceptor:


public Response intercept(Chain chain) {
    Request request = chain.request();

    // Already authorized (no-op)
    if (request.header("Authorization") != null) {
    return chain.proceed(request);

    // Get a new auth and add it to the request
    Auth auth = authCache.getAuth();
    request.addHeader("Authorization", auth);

    Response response = chain.proceed(request);

    // Access token expired
    if (response.code() == 401) {
        auth = authCache.getNewAccessToken();
        request.addHeader("Authorization", auth);
        response = chain.proceed(request);

    // Refresh token expired
    if (response.code() == 401) {
        auth = authCache.getNewRefreshToken();
        request.addHeader("Authorization", auth);
        response = chain.proceed(request);

    return response;


Rate Limiting and Entity Limiting

Knewton enforces a set of rate limits to ensure that no single user can overwhelm the API with traffic and shut out other partners. After a specified number of requests per minute for a given endpoint, the Knewton API will respond with HTTP 429 errors, indicating that the request has been rate-limited and rejected. It can be retried in the next minute.

Some requests to Knewton’s API are not so time-sensitive that a delay of up to a minute would affect users. Therefore, KCL has incorporated automatic configurable retrying after a time increment specified by the partner.

This is an example of one retry.

private static final int HTTP_TOO_MANY_REQUESTS = 429;

public Response intercept(Chain chain) {
    Request request = chain.request();
    Response response = chain.proceed(request);

    if (response.code() == HTTP_TOO_MANY_REQUESTS) {
        response = chain.proceed(request);

    return response;


If KCL continues to receive HTTP 429s, it will repeat this process until it times out. Since partners can configure the timeline for this process, they do not have to worry about handling these errors in their client applications.


With the Knewton Client Library, partners can more easily integrate with the Knewton adaptive learning platform, and consequently focus their efforts on delivering a great user experience to students and teachers accessing their applications. KCL also allows partners to take advantage of new features as Knewton releases them.

Thank you to Aditya Subramaniam and Paul Sastrasinh for their contributions to this blog post.

System Design Documents at Knewton: RFCs

The team is in agreement: the Flimflamulator service is a mess of tech debt and something needs to be done. Other software teams at the company write code that depends on this service, however. They’ve built workarounds, and any changes will propagate into their systems. Flimflamulator provides some customer-facing functionality; the Product team will want to weigh in too.

How do you make sure make sure you’re not creating new problems? What if someone has already thought through solutions to this? There are so many stakeholders that a design review meeting would be chaos. How do you collect feedback from everyone?

At Knewton we use the RFC process. You might call it design documents or architecture review, but the goal is the same.

What is an RFC?

RFC stands for Request for Comment; at its most basic level an RFC is a way to collect feedback about an idea. A document explaining the idea is shared with peers and stakeholders who provide feedback and ultimately sign off (or don’t) on the idea. RFCs can be product proposals, design documents, process changes, or any other thing you’d like someone’s opinion on.

RFC does not stand for Request for Consensus. People are rarely equally informed or equally affected by issues, and some team members naturally hold more sway than others. RFCs are not decided by majority vote, and that’s OK.

Knewton’s process for this is loosely based on the RFC process used by the Internet Engineering Task Force to discuss and set standards for internet protocols.

When to Write an RFC?

Write an RFC when you’re considering something that will be hard to change later: typically public interfaces, data schemas, frameworks, platforms, or human processes.

How to Write an RFC?

Knewton’s RFC template explains what an RFC should cover. We use Google Docs to share and discuss documents because it has great commenting tools.

RFC Ownership

Each RFC must have at least one owner, who should be listed at the top of the RFC along with the RFC’s status and any relevant dates (refer to the template for examples). The owner is responsible for:

  • doing research on prior art
  • identifying the right audience of stakeholders and peers
  • publishing the RFC to that audience
  • addressing all questions, concerns, and suggestions transparently and expediently
  • documenting any dissenting opinions

The “Right” RFC Audience

“An RFC is like a wedding proposal; if you’re not sure your stakeholders are going to say yes, it’s too early to write it,” as my colleague Rémi likes to say.

Not all RFCs require involvement of the entire tech team. In fact, most don’t.

The RFC owner should identify key stakeholders and solicit their opinions first. These are the people affected by what you’re proposing, subject matter experts, or people who can make the proposal happen/not happen. An informal chat before you start writing can save lots of time.

Once you have a doc, share it with a small audience who can give you rapid feedback. Your manager is often a good place to start. Focus on quick iteration and tell people when you expect to receive their feedback.  Early in the process, 24-hour turnaround is a reasonable request. Be proactive in soliciting feedback. You’ll probably get a lot of comments on your first draft, and an in-person review can be useful to speed things along.

As major issues get worked out and details solidified, expand the audience in a few rounds: more stakeholders, your whole team, tech-wide. It should be like rolling a snowball down a hill. Allow up to 5 business days for the final audience to sign off. This will be a judgment call based on the size of the audience and urgency of the proposal. At Knewton it’s customary to share your RFC with the entire tech team as an FYI even if it isn’t relevant to everyone.

How to Address Comments

It’s unlikely everyone will be in perfect agreement about your proposal, and you’ll probably receive some feedback that you disagree with. Disagreement is OK.

A few things to keep in mind when addressing dissenting comments:

  • What would it take to change your mind? If you think incorporating the dissenting feedback would cause problems, ask the commenter to provide a solution to those problems.
  • Does the commenter have skin in the game? Are they a subject matter expert? You don’t have to address every comment on your proposal if they’re not relevant.
  • Close out comments you won’t incorporate by briefly but respectfully saying you disagree, and mark the comment resolved. If the commenter feels strongly, they’ll let you know.
  • Long comment threads can be a time sink. Resolve them with a real-time conversation.

RFC Statuses

RFC documents should list their current status in the document header (see RFC template).

Proposed means open for comment, actively reviewed and publicly debated.

Accepted means closed for comment, all comments “resolved,” dissenting opinions documented, and work to implement the proposal has been scheduled. Doc status should be updated to “Accepted on [date].”

Cancelled means the RFC owner decided to not proceed and cancel the RFC. Doc status should be updated to “Cancelled on [date].”

Summing It All Up

  1. Decide on problem to solve
  2. Identify and talk to key stakeholders
  3. Write RFC
  4. Solicit feedback
    1. Primary stakeholder or mentor (no more than 1 day per iteration)
    2. Team or wider group of stakeholders (2-3 days)
    3. Full audience (5 business days)
  5. Resolve comments
  6. Close RFC
    1. Update doc to indicate it has been closed

API Infrastructure at Knewton: What’s in an Edge Service?

The Knewton API gives students and teachers access to personalized learning recommendations and analytics in real time. In this post, we will pull back the covers of our API to explain how we handle user requests. You will first learn how to build an edge service with Netflix Zuul, the framework we chose for its simplicity and flexibility. Then, we’ll dive into the Knewton edge service to show you how it improves API simplicity, flexibility, and performance.

What’s in an Edge Service

An edge service is a component which is exposed to the public internet. It acts as a gateway to all other services, which we will refer to as platform services. For example, consider an Nginx reverse proxy in front of some web resource servers. Here, Nginx acts as an edge service by routing public HTTP requests to the appropriate platform service.

An example edge service: Nginx as a reverse proxy for two resource servers. Requests for images are routed to one server, while requests for text are routed to another.

An example edge service: Nginx as a reverse proxy for two resource servers. Requests for images are routed to one server, while requests for text are routed to another.

Beyond Routing: Edge Service Architecture

The reverse proxy pattern makes sense if every one of your resource servers is hardened to receive raw internet traffic. But in the typical service-oriented architecture, implementing a simple reverse proxy would duplicate a substantial amount of security and reliability code across platform services. For example, we do not want to reimplement user authentication in each platform service. Instead, we factor these responsibilities directly into the edge service. It validates and authenticates incoming requests; it enforces rate limits to protect the platform from undue load, and it routes requests to the appropriate upstream services. Factoring these high-level features into the edge service is a huge win for platform simplicity.

Building this service was a substantial engineering task. We wanted to leverage our experience developing and deploying JVM-based services by creating our edge service following the same pattern. This made Nginx and Apache’s Lua and PHP-based scripting capabilities unattractive. We would have had to rewrite our standard libraries and debug code in a new paradigm. Instead, we built our edge service on top of Netflix Zuul.

The edge service is Knewton’s interface to the public internet. It is registered directly with an AWS Elastic Load Balancer (ELB), and is responsible for sanitizing and routing requests for the Knewton Platform. To maintain high availability, our edge service runs as a cluster. On startup, edge service nodes register themselves with the load balancer, which then distributes requests across the cluster.

The edge service is Knewton’s interface to the public internet. It is registered directly with an AWS Elastic Load Balancer (ELB), and is responsible for sanitizing and routing requests for the Knewton Platform. To maintain high availability, our edge service runs as a cluster. On startup, edge service nodes register themselves with the load balancer, which then distributes requests across the cluster.

Netflix Zuul

Zuul is a framework created by Netflix to handle their own API and website traffic. The framework is structured around filters, a concept borrowed from the Java Servlet Filter pattern. A Zuul edge service is made up of a series of such filters, each performing some action on an HTTP request and / or response before passing control along to the next filter. For example, a filter might add authentication information to request headers, write an error response to an invalid request, or forward a request to an upstream service for further processing. In this section, we will walk through an example Zuul filter to show you the simplicity of the pattern.

Zuul Filters

We will consider three categories of Zuul filters: pre-filters, route-filters, and post-filters. (Zuul also supports error-filters and static-filters.) Pre-filters run before the edge service routes a request; route-filters forward requests to upstream services; and post-filters run after the proxied service returns its response.

The edge service consists of a series of Zuul filters which work together to write a response for a given request. The route-filters make requests to platform services to retrieve data and update state.

The edge service consists of a series of Zuul filters which work together to write a response for a given request. The route-filters make requests to platform services to retrieve data and update state.

Filters are defined by three pieces of logic:

  • Filter execution order
  • Conditional filter execution
  • Execution logic

Let’s dive into our example. We’ve written this filter in Java, but Zuul is compatible with all JVM languages.

Filter Execution Order

Zuul filters run in the same order for every request. This enables successive filters to make assumptions about the validations run and to access accumulated state. For example, we might store the request’s user ID in one filter and use it to apply rate limits in the next. Zuul filter ordering is defined with two methods: filterType, which specifies whether a filter is a pre-filter, route-filter or post-filter, and filterOrder, which orders filters of the same type.

// Defining the execution order for a Zuul filter in Java

public String filterType() {
   // run this filter before routing
   return "pre";

public int filterOrder() {
   // run this filter first among pre-filters
   return 0;

Conditional Filter Execution

Filters may be run conditionally for each request. This gives the designer significant flexibility. For example, a rate limit filter may not run for an authenticated user. This conditional logic is factored into the shouldfilter method.

// Configuring a Zuul filter to run on every request

public boolean shouldfilter() {
   // Always run this filter
   return true;
// Configuring a Zuul filter to run on requests from unauthenticated users only
public boolean shouldfilter() {
   RequestContext context = RequestContext.getCurrentContext();
   return !(boolean)context.getOrDefault("IS_AUTHENTICATED", false);

Execution Logic

The code to execute in a filter is defined in the runfilter method. Here, we make use of the static RequestContext object to store state for later filters, or to write the response.

// Defining Zuul filter logic to record and check rate limits.

public ZuulfilterResult runfilter() {
   RequestContext context = RequestContext.getCurrentContext();
   boolean isOverRate = rateLimiter.recordRate(context.getRequest());
   if (isOverRate) {
       context.set("IS_RATE_LIMITED", true);
   return null;

All together, this gives us an example filter:

// A simple Zuul filter that runs for unauthenticated users and records
// whether a rate limit has been exceeded.
public class RateLimitfilter extends Zuulfilter {

   public String filterType() {
       // run this filter before routing
       return "pre";

   public int filterOrder() {
       return 0;

   public ZuulfilterResult runfilter() {
       // records the request with the rate limiter and checks if the
       // current rate is above the configured rate limit
       RequestContext context = RequestContext.getCurrentContext();
       boolean isOverRate = rateLimiter.recordRate(context.getRequest());
       if (isOverRate) {
           context.set("IS_RATE_LIMITED", true);
       return null;

   public boolean shouldfilter() {
       // should only run if the user is not authenticated
       RequestContext context = RequestContext.getCurrentContext();
       return !(boolean)context.getOrDefault("IS_AUTHENTICATED", false);

In this way, we build up a modular set of request-processing functionality. This pattern makes it easy for multiple engineers to contribute new features. At Knewton, engineers outside of the API team have committed code for edge service features.

For more information about the design and lifecycle of a Zuul service, see this Netflix blog post.

Zuul and the Knewton Edge Service

Zuul is an opinionated but barebones framework. While adding functionality is simple, you will have to implement the filters yourself. Now we will explain the most important filters we wrote for the Knewton edge service. You will learn how to reject bad requests, reverse-proxy good ones, and reduce interservice traffic.


Pre-filters in our edge service validate, rate-limit, and authenticate incoming requests. This enables us to reject bad requests as early in the pipeline as possible.

Rate Limiting

Our edge service is responsible for protecting the Platform from bursts of requests. When the rate of requests from a given IP or user exceeds a specified threshold, the edge service responds with 429: Too Many Requests. This threshold helps to prevent harm to the platform from Denial of Service attacks and other excessive request load.

Rate limits are enforced in a pre-filter so that these excess requests will not place any load on platform services. This pre-filter tracks the number of requests made during a one-minute window. If this number exceeds the rate limit, the filter skips further request processing and immediately writes a “429” response.

Logic in the rate limiting Zuul prefilter. For a simple rate limiting implementation with Redis, see the Redis documentation.

Logic in the rate limiting Zuul prefilter. For a simple rate limiting implementation with Redis, see the Redis documentation.

Rates are stored in memory on each edge service node to provide the lowest latency possible to each request. The rates recorded by each node are reconciled asynchronously using a shared Redis cache. This means that, no matter which node handles a given request, all nodes will eventually acknowledge it. In practice, this reconciliation happens quickly; convergence occurs within a constant factor of AWS’s region-internal network latency.

A request to an endpoint is rejected when more than the configured limit of requests have been made within a minute. edge service nodes coordinate request counts through a shared Redis cache.

A request to an endpoint is rejected when more than the configured limit of requests have been made within a minute. edge service nodes coordinate request counts through a shared Redis cache.

Surge Queuing

Load on the Knewton API is not constant. Even within the broad patterns of school days and lunch breaks, request rates vary. Rather than passing these bursts on to platform services, our edge service smooths traffic, so that the platform sees smaller peaks. To accomplish this, our rate limiter follows the same Leaky Bucket pattern used in Nginx. Requests exceeding the rate limit get added to a fixed-length queue. The queued requests are processed at the rate limit, ensuring that the platform does not see a surge. If the queue is full when the rate limit is exceeded, these excess requests are rejected with a 429 error. This approach has the added benefit of simplifying Knewton API clients, because requests can be made in reasonable bursts without being rejected. Read more about how we implement request queueing in our next blog post.

A request surge queue sits in front of the rate limiter to smooth out bursts of requests.

A request surge queue sits in front of the rate limiter to smooth out bursts of requests.


At Knewton, we use OAuth to authenticate users. Each API request must contain a valid OAuth token. The edge service rejects requests which do not contain these tokens. This obviates the need for end-user authentication in upstream platform services and reduces interservice traffic by rejecting unauthenticated requests immediately at the edge.


The edge service is responsible for forwarding requests to the appropriate upstream microservice. Once a request has passed the gauntlet of Pre-filters, it is passed on to the Route-filters. Since Knewton services use Eureka for service discovery, we chose to use Netflix’s Ribbon HTTP client to make these upstream requests. Ribbon allows our edge service to automatically discover upstream services, load-balance traffic between them, and retry failed requests across instances of a service, all with minimal configuration.

// Creates a Ribbon client with Eureka discovery and round robin
// load balancing
AbstractLoadBalancerAwareClient setupRibbonClient(String upstreamName) {
    ServerList<DiscoveryEnabledServer> serverList =
            new DiscoveryEnabledNIWSServerList(upstreamName);
    IRule rule = new AvailabilityFilteringRule();
    ServerListFilter<DiscoveryEnabledServer> filter = new ZoneAffinityServerListFilter();
    ZoneAwareLoadBalancer<DiscoveryEnabledServer> loadBalancer =
    AbstractLoadBalancerAwareClient client =
            (AbstractLoadBalancerAwareClient) ClientFactory.getNamedClient(upstreamName);
    return client;

The Knewton edge service is not limited to endpoint-based routing. Our next blog post will cover specialized routing we have implemented to support shadow services and server-side API mocking.


After receiving the upstream responses, the post-filters record metrics and logs, and write a response back to the user.

Response Writing

The Post-filter stage is responsible for writing a response to the user. It is worth emphasizing that the edge service must always write back some response. If an unhandled exception or upstream timeout were to prevent this, the user would be left idling. This means catching exceptions and handling errors in all situations, so that the edge service always returns a timely answer, even if that answer is an error code (ie. 5xx response).

Metrics and Logging

Recording the right metrics and logs helps to ensure quality of service of the Knewton platform.  The edge service encapsulates the entire lifecycle of every API request. It records the entire duration of each request and handles every failure. This uniquely positions the edge service to report on end-user experience. Our post-filters publish metrics and logs to Graphite relays and Splunk Indexers. The resulting dashboards, alerts, and ad-hoc queries give us all the information we need to investigate problems, optimize performance, and guide future development work. Watch for our upcoming blogpost on API monitoring for more details.


The Knewton edge service validates and rate limits API requests, authenticates users, routes requests to upstream microservices, and records end-to-end metrics. Building this logic into the edge service simplifies our platform and provides performance and reliability guarantees to our users. We built our edge service on top of Zuul because its filter-based design is simple and extensible, its Java construction was compatible with our architecture, and because it has been proven in production at Netflix.

In this blog post, you learned how to build a Zuul edge service and how this design can improve your API. A follow-up post will describe custom logic we built to support surge queuing, shadow services, and API mocking in the Zuul framework.

Thanks to all the contributors on the Knewton API Platform team: Paul Sastrasinh, Rob Murcek, Daniel Singer, Bennett Wineholt, Robert Schultheis, Aditya Subramaniam, Stephanie Killian

Distributed Tracing: Observations in Production

Previous blog posts have explained Knewton’s motivation for implementing distributed tracing, and the architecture we put together for it. At Knewton, the major consumers of tracing are ~80 engineers working on ~50 services. A team consisting of three engineers handled designing and deploying the tracing infrastructure detailed in the previous sections.  This section highlights our experience and challenges running Zipkin at Knewton over the past few months.

Technical Hurdles

We elected to use Redis on Amazon Elasticache as our storage for spans, which are detailed in the previous section. This caused us a few problems:

Redis was inexplicably slow

Our initial rollout had an early version of the Zipkin Collector that wrote to a cache.r3.2xlarge (58.2 GB Ram, 8 vCPU) Redis Elasticache instance, which could not keep up with the traffic we were sending it. It could only handle ~250 requests per second. This seemed very slow to us. To set expectations, we dug into how to benchmark Redis, and stumbled upon the venerable redis-benchmark, which informed us that it could handle at least 120,000 requests per second with the 120 byte payloads that we were transmitting.

Looking at Amazon’s Cloudwatch metrics during the initial rollout, we quickly realized that, while the test was using multiple connections to Redis, the Zipkin code was using only a single connection. We modified the code to use multiple connections so we could make ~3000 requests per second and still keep up with incoming traffic.

Redis lost all data stored on it every few days

After we got Redis running, we found that Redis would restart every few days, when the amount of data that we stored in Redis approached the memory allocation of the Elasticache instance.

It turned out that the memory listed on Amazon’s Elasticache documentation refers to the total memory on the instance that runs Redis, and not to the memory allocated to Redis. As we continued to load data into Elasticache Redis, the underlying instance would start swapping heavily, and the Redis process would be restarted. We speculated that the process was being OOM killed, but because we don’t have access to Linux logs on Elasticache instances, we couldn’t be certain. After setting the `reserved-memory` parameter to approximately 15% of the total memory on the cache instance, we no longer had this problem.

Amazon Kinesis Queue Library

We packaged TDist into a library responsible for collecting span data at every service and publishing it to our tracing message bus, Amazon Kinesis. During early testing, we had configured the AWS Kinesis stream to have only a single shard, limiting it to 1000 writes per second.

When we deployed a few services using TDist, we began seeing unusual growth in JVM heap sizes under bursty traffic. When traffic reached baseline levels, the heap shrunk to reasonable sizes.

The default configuration for the Kinesis Queue Producer, which made the assumption that the rate at which it receives data is less than the rate at which it can submit data to the Kinesis queue. As such, it makes use of an unbounded in-memory queue to submit data to Kinesis. We decided to drop data when we are unable to keep up, and changed to use a bounded queue. This resulted in predictable performance under load or network blips at the expense of losing data under load spikes.

Now we have predictable performance, but during periods of high traffic, the Kinesis publishers fall behind and fill up the local queues, having the unfortunate effect of dropping spans. This can be solved by increasing the rate at which we publish to Kinesis. The two options are to increase the number of publishers and thus the number of calls we make to the Kinesis API, or to increase the number of spans published per API call. Because the span payload is small, most of the time spent on the Kinesis API call is network time. We intend to alleviate this problem by using batch Kinesis operations to submit multiple spans at once..

Metrics and Monitoring

When tracing was first rolled out to Knewton, it was hard to get an idea of how our client applications were doing. While the server and client side metrics were reported to our Graphite based metrics system, we had a difficult time correlating those with metrics for Kinesis, which are reported with different granularity, units, and retention periods using Amazon CloudWatch. Knewton contributed to the cloudwatch-to-graphite project to get consolidated metrics in Graphite.

Once this was done, we were able to focus on the metrics that gave us most insight into how tracing was performing: the rate at which we are emitting spans from applications to Kinesis, and how quickly the Collector was able to ship these off to Redis. These rates determine the time it takes for a trace to appear in the Zipkin web interface

Organizational hurdles

User education

While we had stealthily instrumented the Thrift protocol to pass tracing data around, we found that developers were still looking at service logs instead of at tracing data while debugging. We evangelize tracing when developers run into problems across services, and gave a presentation on how developers can use tracing to pinpoint performance problems.


TDist allows Knewton to see internal dependencies between services clearly. Now, a service owner can see precisely which services depends on their service, and which functions they  depend upon. This allows us to optimize service APIs to match usage patterns.

The week after we launched distributed tracing, we noticed that a particular external API call that resulted in hundreds of calls to one internal service. We were able to add a new endpoint to the internal service to eliminate these large number of calls and speed up this endpoint.


TDist proved that we could pass data across services in an noninvasive way by modifying the Thrift protocol. Since then, we’ve built the TDist ecosystem and are using it in production. While we faced problems getting TDist into the hands of developers, it is incredibly useful when debugging problems across services.

Distributed Tracing: Design and Architecture

The previous blog post talked about why Knewton needed a distributed tracing system and the value it can add to a company. This section will go into more technical detail as to how we implemented our distributed tracing solution.

General Architecture and Tracing Data Management

Our solution has two main parts: the tracing library that all services integrate with, and a place to  store and visualize the tracing data. We chose Zipkin, a scalable open-source tracing framework developed at Twitter, for storing and visualizing the tracing data. Zipkin is usually paired with Finagle, but as mentioned in Part I, we ruled it out due to complications with our existing infrastructure. Knewton built the tracing library, called TDist, from the ground up, starting as a company hack day experiment.


Tracing data model

For our solution, we chose to match the data model used in Zipkin, which in turn borrows heavily from Dapper. A trace tree is made up of a set of spans.  Spans represent a particular call from client start through server receive, server send, and, ultimately, client receive.  For example, the call-and-response between ServiceA and ServiceB would count as a single span: .


Each span tracks three identifiers:

  • Trace ID: Every span in a trace will share this ID.
  • Span ID: The ID for a particular span. The Span ID may or may not be the same as the Trace ID.
  • Parent Span ID: An optional ID present only on child spans. The root span does not have a Parent Span ID.

The diagram below shows how these IDs are applied to calls through the tree.  Notice that the Trace ID is consistent throughout the tree.


For more details, read the Dapper paper.


TDist is a Java library that Knewton developed. All of our services use it to enable tracing. TDist currently supports Thrift, HTTP, and Kafka, and it can also trace direct method invocations with the help of Guice annotations.

Each thread servicing or making a request to another service gets assigned a Span that is propagated and updated by the library in the background. Upon receipt of a request (or right before an outgoing request is made), the tracing data are added to an internal queue, and the name of the thread handling the request is changed to include the Trace ID by a DataManager. Worker threads then consume from that queue, and the tracing data get published to our tracing message bus. Java ThreadLocals makes it easy to globally store and access information assigned to a particular thread, and that is  the approach we used in the DataManager.

Often, threads offload the actual work to other threads, which then either do other remote calls or report back to the parent thread. Because of this, we also implemented thread factories as well as executors, which know how to retrieve the tracing data from the parent thread and assign it to the child thread so that the child thread can also be tracked.


Once the tracing data arrive at the tracing message bus through TDist, the Zipkin infrastructure handles the rest of the process. Multiple instances of collectors,consuming from the message bus, store each record in the tracing data store. A separate set of query and web services, part of the Zipkin source code, in turn query the database for traces. We decided to join the query and the web service to keep things simpler, and also because this combined service is internal and has predictable traffic patterns. However, the collector is decoupled from the query and web service because the more Knewton services integrated with the collector, the more tracing data itwould have to process.

Zipkin UI

Out of the box, Zipkin provides a simple UI to view traces across all services. While it’s easy to view logs for a particular Trace ID across all services, the Zipkin UI provides a summarized view of the duration of each call without having to look through hundreds of log statements. It’s also a useful way of identifying  the biggest or slowest traces over a given period of time. Finding these outliers allowed us to flag cases where we were making redundant calls to other services that were slowing down our overall SLA for certain call chains. Here’s a screenshot of a trace appearing in the Zipkin UI:


Of course, the UI doesn’t come without drawbacks. While it’s easy to look at distinct individual traces, we found the Zipkin UI lacking for examining aggregate data. For example, there’s currently no way to get aggregate timing information or aggregate data on most called endpoints, services etc.

Throughout the development process and rolling out of the Zipkin infrastructure, we made several open-source contributions to Zipkin, thanks to its active and growing community.


As mentioned above, the thread name of the current thread servicing a request is also changed, and the trace ID is appended to it. Because of this we can query for logs across all of the trace-enabled services for a particular call. This makes debugging a lot easier, and it’s proven to be very useful in post mortem analysis, log aggregations, debugging for isolated problems, and explaining uncommon platform behavior.


Thrift is a cross-language RPC framework for building up scalable services. The user can define a service and data model spec in Thrift, and Thrift will compile the spec into many different languages. Users can then implement the generated service interfaces in the desired language. Thrift also automatically generates the client code and data structures for the services defined by the user.

Thrift is the most widely used RPC method between services at Knewton. Most of our services talk to each other through this framework, so supporting it while still maintaining backwards compatibility was critical for the success of this project. More precisely, we wanted services that were not tracing-enabled to be able to talk to tracing-enabled services.

When we started looking into adding tracing support to Thrift, we experimented with two different approaches. The first approach involved a modified Thrift compiler, and the second involved modified serialization protocols and server processors. Both methods had their advantages and disadvantages.

Custom Compiler

In this approach, we experimented with modifying the C++ Thrift compiler to generate additional service interfaces that could pass along the tracing data to the user. Modified thrift compilers are not uncommon; perhaps the most famous example is Scrooge. One advantage of a modified compiler was that clients would have to swap out fewer class implementations in their code, since tracing was supported right in the generated code. Clients could also get a reference of the tracing data from the service interface.

Although we didn’t benchmark, we also think that this approach would have been marginally faster, since there are fewer classes delegating to tracing implementations. However, we would have had to recompile all of our Thrift code and deviate from the open-source version, making it harder to upgrade in the future. We also soon realized that allowing the user access to the tracing data might not be desirable or safe, and data management might be better left to TDist for consistency.

Custom Protocols and Server Processors

We ended up using this approach in production. Not having to maintain a custom compiler lowered our development cost significantly. The biggest disadvantage to customizing protocols and server processors was that we had to upgrade to Thrift 0.9.0 (from 0.7.0) to take advantage of some features that would make it easier to plug in our tracing components to the custom Thrift processors and protocols. The upgrade required a lot of coordination across the organization. Thankfully, the newer version of Thrift was backwards-compatible with the older version, and we could work on TDist while Knewton services were being updated to the newer version. However, we still had to release all Knewton services before we could start integrating them with our distributed tracing solution.

Upgrading Thrift

Upgrading libraries when using a dependency framework is relatively easy, but for an RPC framework like Thrift and a service-oriented architecture with a deep call dependency chain, it gets a little more complicated. A typical server will have server and client code, with the server code often depending on other client libraries. Because of this, upgrades needed to start from the leaf services and move up the tree to avoid introducing wire incompatibilities since the outgoing services might not know if the destination service will be able to detect the tracing data coming through the wire.


Another hurdle was that certain services, such as the Cassandra client library Astyanax depended on third-party libraries that in turn depended on the Thrift 0.7.0. For Astyanax, we had to shade the JARs using Maven and change package names so that they didn’t collide with the newer Thrift library. All of this had to happen quickly and without downtime. And because we didn’t want other teams at Knewton incurring the cost of this upgrade, the distributed tracing team had to implement and roll out all the changes.

Tracing Thrift Components: How It Works

Our Thrift solution consisted of custom, backwards-compatible protocols and custom server processors that extract tracing data and set them before routing them to the appropriate RPC call. Our protocols essentially write the tracing data at the beginning of each message. When the RPC call reaches the server, the processor will identify and note whether the incoming call has tracing data so it can respond appropriately. Calls with tracing data get responses with tracing data, and requests from non-integrated services that don’t carry tracing data get responses without tracing data. This makes the tracing protocols backwards-compatible since the server outgoing protocols don’t write tracing data if instructed not to by the processor servicing the request. A tracing protocol can detect whether the payload contains tracing data based from the first few bytes. Thrift appends a protocol ID to the beginning, and if the reading protocol sees that the first few bytes do not indicate the presence of tracing data the bytes are put back on the buffer and the payload is reread as a non-tracing payload. When reading a message, the protocols will extract the tracing data and set them to a ThreadLocal for the thread servicing the incoming RPC call using the DataManager. If that thread ever makes additional calls to other services downstream, the tracing data will be picked up from the DataManager automatically by TDist and will get appended to the outgoing message.


Here’s a diagram showing how the payload is modified to add tracing data:


Kafka Tracing Support

When we were adding tracing support to Kafka, we wanted to keep the Kafka servers, also referred to as brokers, as a black box. In other words, we wanted to pass the data through the brokers without them necessarily knowing and therefore not having to modify the Kafka broker code at all. Similar to our approach with RPC services, we upgraded the consumers before upgrading the producers. The consumers are backwards-compatible and can detect when a payload contains tracing data, deserializing the content in the manner of the Thrift protocols described above. To achieve this, we require clients to wrap their serializers/deserializers in tracing equivalents that delegate reading and writing of the non-tracing payload to the wrapped ones.



HTTP Tracing Support

As some of Knewton’s internal infrastructure and all public facing endpoints are HTTP-based, we needed a simple way of instrumenting HTTP calls to carry tracing information.

This was quite simple, because HTTP supports putting arbitrary data in headers. According to section 5 of rfc2047, the only guideline for adding custom headers is to prefix them with a `X-`.

We elected to continue the Zipkin tradition and use the following headers to propagate tracing information:

  • X-B3-TraceId
  • X-B3-SpanId
  • X-B3-ParentSpanId

Services at Knewton primarily use the Jetty HTTP Server and the Apache HTTP Client. Both of these projects allow for easy header manipulation.

Jetty services requests by routing them to a Servlet. As part of this routing, Jetty allows the request and response to pass through a series of Filters. We felt this was the ideal place to deal with tracing data. When any incoming request comes with tracing data headers, we construct span data from it and submit it to the DataManager.

With the Apache HTTP Client, we use an HttpRequestInterceptor and HttpResponseInterceptor, which were designed to interact with header contents and modify them. These interceptors readtracing data from headers and set them using the DataManager and vice versa.


For those unfamiliar with Guice, it’s a dependency management framework developed at Google. To make the TDist integrations with our existing services easier and less error-prone, we relied on Guice and implemented several modules that our clients simply had to install. Guice handles dependency injection during object instantiation and makes it easy when swapping implementations of interfaces. If throughout this article you have been  thinking that integrating with TDist sounds complicated, a lot of the time all our clients needed to do was install additional Guice modules that would bind our tracing implementations to existing Thrift interfaces. This also meant that our clients never had to instantiate any of our tracing-enabled constructs. Whenever a TDist client forgets to bind something, Guice would notify our clients at compile time. We put a lot of thought into how we laid out our Guice module hierarchies so that TDist didn’t collide with our clients, and we were very careful whenever we had to expose elements to the outside world.

The Tracing Message Bus

The tracing message bus is where all our client services place tracing data prior to its being consumed by the Zipkin collector and persisted. Our two options were Kafka and Kinesis, and we ended up choosing Kinesis. We were considering Kafka because Knewton, has had a stable Kafka deployment for several years. At the time, our Kafka cluster, which we’ve been using as our student event bus, was ingesting over 300 messages per second in production. Our initial estimates for putting us in the range of over 400,000 tracing messages per second with only a partial integration. The idea of straining production systems with instrumentation data made us nervous. Kinesis seemed like an attractive alternative that would be isolated from our Kafka servers, which were only handling production, non-instrumentation data. At the time of implementation, Kinesis was a new AWS service and none of us were familiar with it. Its price, throughput capabilities, and the lack of maintenance on our end sealed the deal for us. Overall, we’ve been satisfied with its performance and stability. It’s not as fast as Kafka, but the nature of our data made it acceptable to have an SLA of even a few minutes from publication to ingestion. Since we deployed the tracing message bus to production, we were also able to easily scale up the number of Kinesis shards without incurring any downtime.

The Tracing Data Store

The tracing data store is where all our tracing data ends up. The data stay there for a configurable time and are queried by the Zipkin query service to display on the UI. Zipkin supports a lot of data stores out of the box, including Cassandra, Redis, MongoDB, Postgres and MySQL. We experimented with Cassandra and DynamoDB, mainly because of the institutional knowledge we have at Knewton, but ended up choosing Amazon’s Elasticache Redis. The most important reasons behind our decision were

  • Time to production, given that we didn’t have to roll out and maintain a new cluster
  • cost
  • easier integration with Zipkin with less code
  • support for TTLs on the data.


Our tracing solution at Knewton has been in all environments for a few months now. So far it has proven to be invaluable. We have only started to scratch the surface with what we can do with the tracing and timing data we’re collecting. We had a lot of fun implementing and rolling out tracing at Knewton, and we have come to understand the value of this data.