Simplifying Cassandra Heap Size Allocation

As discussed previously, Knewton has a large Cassandra deployment to meet its data store needs. Despite best efforts to standardize configurations across the deployment, the systems are in a near-constant flux. In particular, upgrading the version of Cassandra may be quick for a single cluster but doing so for the entire deployment is usually a lengthy process, due to the overhead involved in bringing legacy stacks up to date.

At the time of this writing, Knewton is in the final stages of a system-wide Cassandra upgrade from Version 1.2 to 2.1. Due to a breaking change in the underlying SSTable format introduced in Version 2.1, this transition must be performed in stages, first from 1.2 to 2.0, then 2.0 to 2.1. While the SSTable format change does not have a direct bearing on the topic of interest here, another change introduced in Version 2.0 is of material importance.

This post describes some nuances of Cassandra’s JVM (maximum) heap memory allocation that we discovered along the way, and how we handled aspects that posed a threat to our systems.

A problem case

Our journey of discovery began with a struggle to address crashes on a particular cluster due to running out of heap memory. For ease of reference, let’s call this cluster Knerd. In our AWS infrastructure, the Knerd cluster was running on m4.large nodes having 8 GB of memory, and by default Cassandra was allocating 4 GB of heap to each node. After exhausting other options, we decided to migrate the cluster to m4.xlarge nodes with 16 GB of memory, believing that these larger nodes would be allocated 8 GB of heap.

Imagine our surprise to see the new nodes come up with 4 GB of heap, same as before.

Default heap allocation in Cassandra 1.2

This phenomenon forced us to take a harder look at the code behind the heap allocation, contained in the cassandra-env.sh config file. The relevant code block, as it ships with Cassandra 1.2, is contained in the function calculate_heap_sizes:

   # set max heap size based on the following
   # max(min(1/2 ram, 4096MB), min(1/4 ram, 8GB))
   # calculate 1/2 ram and cap to 4096MB
   # calculate 1/4 ram and cap to 8192MB
   # pick the max
   half_system_memory_in_mb=`expr $system_memory_in_mb / 2`
   quarter_system_memory_in_mb=`expr $half_system_memory_in_mb / 2`
   if [ "$half_system_memory_in_mb" -gt "4096" ]
   then
       half_system_memory_in_mb="4096"
   fi
   if [ "$quarter_system_memory_in_mb" -gt "8192" ]
   then
       quarter_system_memory_in_mb="8192"
   fi
   if [ "$half_system_memory_in_mb" -gt "$quarter_system_memory_in_mb" ]
   then
       max_heap_size_in_mb="$half_system_memory_in_mb"
   else
       max_heap_size_in_mb="$quarter_system_memory_in_mb"
   fi
   MAX_HEAP_SIZE="${max_heap_size_in_mb}M"

In short, this code block imposes a cap on each of two parameters, then uses the larger parameter as the maximum heap size.

The graph below shows the heap size in GB as a function of the system memory (also in GB), up to the absolute cap of an 8 GB heap. (We’ll focus on integer values for ease of discussion.)

cassandra-2-1-default-heap-allocation

The heap size scales linearly as half the system memory up to 4 GB, then plateaus for systems with total memory between 8 GB and 16 GB. This is precisely the range of values for which we observed a steady maximum heap size of 4 GB on the Knerd cluster.

This plateau is a direct consequence of the half-memory parameter in the shell code being capped at a smaller value than the quarter-memory parameter. While scrutiny of the shell code above will show exactly the behavior displayed in the graph, a new user is unlikely to see this coming, especially in the typical NoSQL climate with short time scales and developer-managed data stores. Indeed, since Cassandra’s heap scales with system memory, it is reasonable to assume that it will scale smoothly for all values of the system memory below the cap. The plateau is at odds with this assumption, and therefore constitutes behavior that might be deemed unpredictable or counterintuitive, and can disrupt provisioning and capacity planning. In other words, this internal Cassandra feature violates the Principle of Least Surprise.

Beyond the plateau, the curve begins to rise linearly again, this time scaling as one-quarter of system memory. Then, at a system memory of 32 GB, the heap size reaches the absolute cap of 8 GB.

Modified heap allocation in Cassandra 1.2

If you’re using Cassandra 1.2, we strongly recommend doing something about this plateau in order for your nodes to scale predictably. Here are some options.

The 8 GB cap is generally desirable, because if the heap is too large then stop-the-world garbage collection cycles will take longer and can cause downstream failures. On the other hand, the system memory value upon reaching the cap is not terribly important. It could be 32 GB, as in the default scheme, or it could be 24 GB with little noticeable difference. Thus the plateau can be eliminated in one of two ways:

  1. Keep the remaining features, shifting the transition points to lower system memory values.cassandra-2-1-modified-heap-no-plateau
  2. Take the average over the values less than system memory of 16 GB, which yields a simple one-quarter slope up to the cap.

cassandra-2-1-modified-heap-constant-slope

Averaging is simpler, but it may not be suitable for small values of system memory.  Either approach can be achieved with a simple set of conditionals specifying the scaling for each segment; for example, the first could be coded like

   if [ "$system_memory_in_mb" -lt "8192" ]
   then
       max_heap_size_in_mb="$half_system_memory_in_mb"
   elif [ "$system_memory_in_mb" -lt "24576" ]
   then
       max_heap_size_in_mb="$quarter_system_memory_in_mb"
   else
       max_heap_size_in_mb="8192"
   fi

Either approach eliminates the undesirable plateau. An upgrade to Cassandra 2.0 or above also effectively removes its impact.

Heap allocation in Cassandra 2.0+

Cassandra 2.0 introduced a change in heap allocation, although this change was easy to miss since it did not get highlighted in the What’s new in Cassandra post for 2.0.

The change is a small one: the cap of the one-half-memory variable was changed from 4 GB to 1 GB, such that the plateau occurs at a heap size of 1 GB and extends from system memory values of 2 GB to 4 GB. While the plateau still exists, it is no longer an issue for systems having memory greater than 4 GB. Production systems will doubtless be running with more memory than this, so upgrading alone will solve the scaling predictability issue in nearly all cases.

However, systems previously operating with system memory at or below 16 GB will have a smaller heap after upgrading from 1.2 to 2.0+. This is because the heap now scales as one-quarter of the system memory for a broader range of memory values than it did on Cassandra 1.2. Although the picture hasn’t changed for high-memory systems, this shift to smaller heaps can cause further problems on systems with relatively small node memory, such as the Knerd cluster.

Moving memtables off heap in Cassandra 2.1

The change in the cap in Cassandra 2.0 may be related to the effort to move more Cassandra components off of the JVM heap. Version 2.0 offered off-heap partition summaries, and version 2.1 introduced the ability to move memtables off heap. These features make a smaller heap size desirable in order to make room for the memtable components in off-heap memory.

Your heap may behave better, at the tradeoff of possible out-of-memory events at the system level. Since each Java process allocates a separate heap, other Java processes running on the same machine (e.g. Datastax OpsCenter) will result in less memory available for Cassandra’s off-heap components. In the extreme case, the operating system may kill the Cassandra process or crash altogether, producing identical behavior to the heap-crash case from the perspective of the application (i.e. Cassandra is not available on that node).

Knewton’s approach

At first, we applied a Band-Aid fix in Cassandra 1.2 to always allocate a heap size equal to half the system memory. This worked reasonably well, until migrating to 2.1. The relevant change to configuration templates had only been made for Cassandra 1.2, and other versions fell back to the default allocation code (which is how we discovered the properties described in this post).

That’s when we decided it was time to do some testing. We had three options:

  1. Move only the memtable buffers off heap (offheap_buffers);
  2. Move both the memtable buffers and the objects off heap (offheap_objects);
  3. Leave the memtables on heap and raise the heap size.

We regarded raising the heap size as our fallback, since we knew how the system would behave if we did that. Due to considerations related to scaling (e.g. data size growth), we weren’t very confident that any testing would yield a clear choice of how large to make the heap. In the end, then, this amounted to simply raising the maximum heap size to half the system memory. While doing so would be nice and simple, we hesitated to make our Cassandra installation deviate this much from the base installation.

So we experimented with off-heap memtables. As noted earlier, moving components off of the heap appears to be related to why the maximum heap size calculation was changed in the first place. Moreover, moving both the memtable buffers and the objects off heap is slated to become the default behavior in a future release of Cassandra, which led us to believe that this option’s performance is good enough that it is recommended in most cases.

Both offheap_buffers and offheap_objects showed good results in tests, reducing the stress on the heap and system performance overall relative to heap_buffers (the default configuration). As expected, the GC time was lowest for offheap_objects, as was the end-to-end processing time. To our surprise, however, offheap_buffers was the star performer. This setting yielded the best numbers while maintaining the most consistent read and write rates. In some of these measures, such as percentage of heap used and GC pause time, offheap_buffers narrowly edged out offheap_objects. In others, notably write latency, bloom filter false-positive ratio, and average response time, offheap_buffers was the clear winner. This performance was enough to convince us to move forward with offheap_buffers.

After deploying this change, we mostly saw positive results. Unfortunately, we did still sometimes see some transient high-heap events, mostly on clusters with the large data sets. On the other hand, these events were not accompanied by crashes. This caused us to reevaluate our longstanding alert threshold for heap usage, whereupon we concluded that our threshold needed some tuning. This resolved the remainder of the issue.

In summary, investigation of some puzzling behavior revealed unexpected heap allocation details in Cassandra’s internal configuration. Studying the configuration code allowed us to evaluate our options to address the behavior we originally observed. Our goal was to yield more predictable heap allocation results when scaling a cluster without impeding Cassandra’s performance. We achieved this by moving the memtable buffers off heap, which is possible in Cassandra 2.1+.

Digging Deep Into Cassandra Thrift Buffer Behavior

Everyone who works in tech has had to debug a problem.  Hopefully it is as simple as looking into a log file, but many times it is not.  Sometimes the problem goes away and sometimes it only looks like it goes away.  Other times it might not look like a problem at all.  A lot of factors will go into deciding if you need to investigate, and how deep you need to go.  These investigations can take a lot of resources for an organization and there is always the chance of coming up empty handed which should never be seen as a failure.

This post will summarize an investigation into some Cassandra memory behavior that the database team at Knewton conducted recently.  It is a good illustration of the kind of multi-pronged approach needed to unravel strange behaviors low in the stack.  While we will be specifically talking about Cassandra running on AWS the takeaways from this article are applicable to systems running on different platforms as well.

Uncovering a Problem

Background

Knewton had a Cassandra cluster that was very overprovisioned. The instances were scaled up (each instance is made more powerful) rather than scaled out (more instances are added to the cluster).  Cassandra is a horizontally scalable datastore and it benefits from having more machines rather than better ones.

So we added more machines and reduced the power of each one. We now had 4GB of available memory for the Java heap instead of 8GB.  This configuration worked well in all of our tests and in other clusters but in this specific overprovisioned cluster we found we had scaled each node down too much, so we moved to machines that could accommodate 8GB of heap, m4.xlarge instances.

Anomalous Behavior on New Instances

After moving all nodes over to m4.xl instances we saw our heap situation stabilize. However we began to notice anomalous CPU and load averages across the cluster.  Some nodes would run higher than other nodes.  The metrics showed that, out of the four cores on a m4.xl instance, one was  completely saturated.

If you saw this load on its own you would not think that it is a problem. Total usage  of CPU on the box is at 25% and there are no catastrophically long GC pauses.  However, the cluster was not uniform, which called for further investigation.

In these CPU graphs, you can see nodes running on low CPU that encountered something that would rapidly promote them into the high CPU band, and they would almost never drop back down.

cassandra-thrift-buffer-high-cpu-band

 

We found that this graph was correlated with the graph of average garbage collection time.

cassandra-thrift-buffer-garbage-collection-high-cpu

When nodes promote themselves into the high CPU band, their GC times spike.

What is Cassandra holding in its heap that is causing GC issues?  With a lot of overhead in memory and CPU, crashes are not a risk, but performance is far from where we want to be.

Before tackling the garbage collection question, we have two other questions that we can answer already:

Why is this behavior showing up now?

We had an 8GB heap before and should have seen the same behavior.  The reason we only saw this CPU and GC behavior once on m4.xlarge instances is twofold:

  1. Something unique in this cluster caused a 4GB heap to be fatal but an 8GB heap to be adequate.  Expanding the heap did not get rid of the problem.
  2. The original cluster that had an 8GB heap was around for years and all nodes were promoted to the high CPU band.  The baseline operation of the cluster looked like the high CPU band.  It is only because of previous issues with provisioning this cluster that we were  watching closely when we moved to these m4.xlarge instances.  This highlights the importance of understanding your baseline resource utilization and not assuming that it means you are in a healthy state.

Why is this behavior a problem?

Even though the high usage band is not a problematic load, the behavior of the cluster was unexpected, and this promotion behavior is problematic.  The biggest reason that is problematic is that it meant we had a nondeterministic factor in our latencies.  We can expect different performance on a node that is in the high CPU band than one in the low CPU usage band.  However we cannot predict when a node will promote itself as we don’t know the cause for this behavior.  So we have an unknown factor in our system, which is dangerous for repeatability and reliability.

Investigation

Investigating further is resource intensive, and often your senior engineering staff has to undertake the investigation, as it requires some degree of independence and experience.  So make sure you decide that the  problem is  actually worth the time of your organization before sinking days or weeks of engineering time into it.

Logs

The first thing to do is to look through the relevant logs.  Looking through the Cassandra logs we found, unsurprisingly, a lot of garbage collections.  We’d had GC logging on and found several “large block” messages in the CMS garbage collection logs at about the time these promotions happened.  To get more visibility into what is actually in the heap, we turned on GC class histogram logs, but these said that almost all the memory was being held in byte arrays.

Not helpful.  Time to go even deeper and see what is in the heap.

Heap Dump

So we took a heap dump from a problematic node and on a node that was exhibiting good GC behavior as a control.  A heap dump is the size of the used Java heap. Dumping the heap is a “stop the world” operation for the process you are dumping, so when doing this in production be sure that the service can be unresponsive for a minute or more.  The file is binary and examining it is labor intensive, so it is best to move the heap dump to a computer that’s not being used in production to investigate.

We used the Eclipse Memory Analyzer Tool (MAT) to investigate the heap dumps and found that almost all of the memory was taken up by the write buffer of the TframedTransport objects.  There were several hundred of these objects and the write buffer size ranged from 1kB to 30MB, with many in the range of 20-30MB.  We saw similar objects in the heap dump of the control node, but not nearly as many.  The write buffer contains what is being written to the Thrift transport socket and does not correspond to Cassandra reads and writes.  In this case Cassandra is writing the output of an incoming read to this buffer to send to the client that has requested the data.

It became pretty clear that this was a Thrift protocol issue so we searched for related issues.

cassandra-thrift-buffer-related-issues

Literature Search

Any problem you find has been reported in some way or another by someone else, especially if you are using anything but the latest versions of open-source software,  It is very useful to search the web for related problems at every step of the investigation, but as you get more specific you might uncover things that previously you would not have encountered.  In this case the investigation led us to the Thrift protocol, which is not something we would have searched for earlier.

The Thrift library that our version of Cassandra used had some memory issues referenced in CASSANDRA-4896. These are referenced again in more detail and resolved in  CASSANDRA-8685.  So you don’t have to read through the tickets, basically the write buffer — the thing taking up all of our space and the likely cause of our issues — will grow to accommodate a larger payload but never shrinks back down. So once an operation is requested of Cassandra that has a large payload, such as a query returning a 20MB record, these buffers increase in size and never drop back down.

This behavior lines up with what we are seeing in the cluster, where a node could be operating smoothly until a large set of queries come in and expand the write buffers on the Thrift frame objects.  Once the frame size increases, it stays big for the duration of the connection with the client. (We’ll come back to this point shortly).

This Thrift behavior is noted in THRIFT-1457 and a fix is out for it. The Thrift library fix is also rolled into Cassandra 2.1.3 and later.  This is also only in Thrift so any clients using CQL won’t see this.

Verify the Hypothesis

The link between this memory behavior and client connections was a working hypothesis. We got a very strong signal supporting the theory one morning when we deployed a new version of services that communicated with our cluster.  This deployment meant closing all of the Cassandra connections on the client side during a low traffic period and then reopening them.  Cold caches cause a lot of queries to occur during a deploy, but these queries do not return large blocks of data, so the TframedTransport write buffers do not expand. Memory stays low until we start getting production traffic, which then expands the buffers as described above.

cassandra-thrift-buffer-cpu-usage

The CPU and GC graphs from the deploy at 10:45 show that we have almost no traffic beforehand.  We’re still holding onto these large memory objects and they aren’t cleaned up because the TframedTransport objects live as long as the corresponding client connection.

cassandra-thrift-buffer-garbage-collection-time

Once the client connections close, the objects are garbage collected and the new connections start with small TframedTransport write buffers. So we see low heap and GC times at about 10:45.

cassandra-thrift-buffer-heap-size-by-node

There is a dramatic heap usage drop when the deploy happens at 10:45.

Conclusions

Technical Wrapup

The behavior of these TframedTransport buffers makes the Thrift protocol in pre-2.1 Cassandra dangerous.  Each connection holds a TframedTransport object in the Cassandra heap that can grow depending on the query executed but cannot shrink.  Memory then begins to fill with these TframedTransport objects as they cannot be cleaned out as long as the connection to the client is open.  If the client connection is severed by a client restart or a deploy, these objects get garbage collected.  New connections begin with a small TframedTransport object size which behaves well with GC / Heap / CPU on the node until a large query comes in which then expands the write buffer on the TframedTransport object again,  promoting the node to a high-CPU-usage node and damaging its performance and response time.  Eventually all nodes in the cluster reach this high-CPU-usage band and the extra resources taken up by these large TframedTransport objects become a burden on the performance of our database.

Next Steps

Because these buffers never decrease in size, the memory usage of the Cassandra instance is bounded only by the number of connections to the cluster, which is determined by the number of active clients.  Our previous 4GB heap was unable to accommodate the number of clients in Knewton’s infrastructure so it would perpetually crash, however 8GB is sufficient.

Once we have identified the source of the problem, we have to decide what, if anything, to do about it. Some options would be:

  • Upgrading to Cassandra 2.1.3 or later.
  • Migrating to CQL from Thrift

We know that the behavior is not fatal to our processes, so depending on the opportunity cost of each of these processes, we can decide if we will live with this behavior or if we are going to fix it.

Cost Analysis

It is difficult to quantify how much an investigation like this costs in terms of time, opportunity cost, and money to an organization.  Before starting an investigation, it is useful to timebox the project and restrict yourself to a certain number of engineering hours to investigate.  In hindsight one might quantify the cost of this investigation as:

Monetary Cost = Engineering Time – Hardware Optimization

In this case we will need less hardware once this memory issue is resolved, perhaps three fewer m4.xlarge instances.  The yearlong cost of a reserved m4.xl instance is currently $1,200. We spent about three engineer days investigating.  Estimating engineering costs at $200 an hour (including overhead and opportunity cost to the organization) each engineering day is $1,600.  So over the year, we spent about $1,200 more on the investigation than we recovered from optimizations.

Final Remarks

Beyond monetary considerations, we have a better understanding of our system, which benefits us in several ways.  We are able to adjust our alerts to better monitor the system.  We can justify our instance sizes.  And finally we are able to support larger engineering objectives, advising the organization on the best course of action for future projects.  For example, we decided to move aggressively to upgrade this cluster to 2.1.

Deep investigations like this do not come with guarantees of strong takeaways or definitive answers.  They are expensive in terms of time and often require senior engineers to dig into deep system internals.  Yet at Knewton we have found that these investigations are valuable for the database team and the wider tech organization.

Distributed Tracing: Observations in Production

Previous blog posts have explained Knewton’s motivation for implementing distributed tracing, and the architecture we put together for it. At Knewton, the major consumers of tracing are ~80 engineers working on ~50 services. A team consisting of three engineers handled designing and deploying the tracing infrastructure detailed in the previous sections.  This section highlights our experience and challenges running Zipkin at Knewton over the past few months.

Technical Hurdles

We elected to use Redis on Amazon Elasticache as our storage for spans, which are detailed in the previous section. This caused us a few problems:

Redis was inexplicably slow

Our initial rollout had an early version of the Zipkin Collector that wrote to a cache.r3.2xlarge (58.2 GB Ram, 8 vCPU) Redis Elasticache instance, which could not keep up with the traffic we were sending it. It could only handle ~250 requests per second. This seemed very slow to us. To set expectations, we dug into how to benchmark Redis, and stumbled upon the venerable redis-benchmark, which informed us that it could handle at least 120,000 requests per second with the 120 byte payloads that we were transmitting.

Looking at Amazon’s Cloudwatch metrics during the initial rollout, we quickly realized that, while the test was using multiple connections to Redis, the Zipkin code was using only a single connection. We modified the code to use multiple connections so we could make ~3000 requests per second and still keep up with incoming traffic.

Redis lost all data stored on it every few days

After we got Redis running, we found that Redis would restart every few days, when the amount of data that we stored in Redis approached the memory allocation of the Elasticache instance.

It turned out that the memory listed on Amazon’s Elasticache documentation refers to the total memory on the instance that runs Redis, and not to the memory allocated to Redis. As we continued to load data into Elasticache Redis, the underlying instance would start swapping heavily, and the Redis process would be restarted. We speculated that the process was being OOM killed, but because we don’t have access to Linux logs on Elasticache instances, we couldn’t be certain. After setting the `reserved-memory` parameter to approximately 15% of the total memory on the cache instance, we no longer had this problem.

Amazon Kinesis Queue Library

We packaged TDist into a library responsible for collecting span data at every service and publishing it to our tracing message bus, Amazon Kinesis. During early testing, we had configured the AWS Kinesis stream to have only a single shard, limiting it to 1000 writes per second.

When we deployed a few services using TDist, we began seeing unusual growth in JVM heap sizes under bursty traffic. When traffic reached baseline levels, the heap shrunk to reasonable sizes.

The default configuration for the Kinesis Queue Producer, which made the assumption that the rate at which it receives data is less than the rate at which it can submit data to the Kinesis queue. As such, it makes use of an unbounded in-memory queue to submit data to Kinesis. We decided to drop data when we are unable to keep up, and changed to use a bounded queue. This resulted in predictable performance under load or network blips at the expense of losing data under load spikes.

Now we have predictable performance, but during periods of high traffic, the Kinesis publishers fall behind and fill up the local queues, having the unfortunate effect of dropping spans. This can be solved by increasing the rate at which we publish to Kinesis. The two options are to increase the number of publishers and thus the number of calls we make to the Kinesis API, or to increase the number of spans published per API call. Because the span payload is small, most of the time spent on the Kinesis API call is network time. We intend to alleviate this problem by using batch Kinesis operations to submit multiple spans at once..

Metrics and Monitoring

When tracing was first rolled out to Knewton, it was hard to get an idea of how our client applications were doing. While the server and client side metrics were reported to our Graphite based metrics system, we had a difficult time correlating those with metrics for Kinesis, which are reported with different granularity, units, and retention periods using Amazon CloudWatch. Knewton contributed to the cloudwatch-to-graphite project to get consolidated metrics in Graphite.

Once this was done, we were able to focus on the metrics that gave us most insight into how tracing was performing: the rate at which we are emitting spans from applications to Kinesis, and how quickly the Collector was able to ship these off to Redis. These rates determine the time it takes for a trace to appear in the Zipkin web interface

Organizational hurdles

User education

While we had stealthily instrumented the Thrift protocol to pass tracing data around, we found that developers were still looking at service logs instead of at tracing data while debugging. We evangelize tracing when developers run into problems across services, and gave a presentation on how developers can use tracing to pinpoint performance problems.

Successes

TDist allows Knewton to see internal dependencies between services clearly. Now, a service owner can see precisely which services depends on their service, and which functions they  depend upon. This allows us to optimize service APIs to match usage patterns.

The week after we launched distributed tracing, we noticed that a particular external API call that resulted in hundreds of calls to one internal service. We were able to add a new endpoint to the internal service to eliminate these large number of calls and speed up this endpoint.

Conclusion

TDist proved that we could pass data across services in an noninvasive way by modifying the Thrift protocol. Since then, we’ve built the TDist ecosystem and are using it in production. While we faced problems getting TDist into the hands of developers, it is incredibly useful when debugging problems across services.

Distributed Tracing: Design and Architecture

The previous blog post talked about why Knewton needed a distributed tracing system and the value it can add to a company. This section will go into more technical detail as to how we implemented our distributed tracing solution.

General Architecture and Tracing Data Management

Our solution has two main parts: the tracing library that all services integrate with, and a place to  store and visualize the tracing data. We chose Zipkin, a scalable open-source tracing framework developed at Twitter, for storing and visualizing the tracing data. Zipkin is usually paired with Finagle, but as mentioned in Part I, we ruled it out due to complications with our existing infrastructure. Knewton built the tracing library, called TDist, from the ground up, starting as a company hack day experiment.

TDist-architecture-tracing-data-management

Tracing data model

For our solution, we chose to match the data model used in Zipkin, which in turn borrows heavily from Dapper. A trace tree is made up of a set of spans.  Spans represent a particular call from client start through server receive, server send, and, ultimately, client receive.  For example, the call-and-response between ServiceA and ServiceB would count as a single span: .

TDist-RPC-span-knewton

Each span tracks three identifiers:

  • Trace ID: Every span in a trace will share this ID.
  • Span ID: The ID for a particular span. The Span ID may or may not be the same as the Trace ID.
  • Parent Span ID: An optional ID present only on child spans. The root span does not have a Parent Span ID.

The diagram below shows how these IDs are applied to calls through the tree.  Notice that the Trace ID is consistent throughout the tree.

TDist-trace-tree-with-IDs-knewton

For more details, read the Dapper paper.

TDist

TDist is a Java library that Knewton developed. All of our services use it to enable tracing. TDist currently supports Thrift, HTTP, and Kafka, and it can also trace direct method invocations with the help of Guice annotations.

Each thread servicing or making a request to another service gets assigned a Span that is propagated and updated by the library in the background. Upon receipt of a request (or right before an outgoing request is made), the tracing data are added to an internal queue, and the name of the thread handling the request is changed to include the Trace ID by a DataManager. Worker threads then consume from that queue, and the tracing data get published to our tracing message bus. Java ThreadLocals makes it easy to globally store and access information assigned to a particular thread, and that is  the approach we used in the DataManager.

Often, threads offload the actual work to other threads, which then either do other remote calls or report back to the parent thread. Because of this, we also implemented thread factories as well as executors, which know how to retrieve the tracing data from the parent thread and assign it to the child thread so that the child thread can also be tracked.

Zipkin

Once the tracing data arrive at the tracing message bus through TDist, the Zipkin infrastructure handles the rest of the process. Multiple instances of collectors,consuming from the message bus, store each record in the tracing data store. A separate set of query and web services, part of the Zipkin source code, in turn query the database for traces. We decided to join the query and the web service to keep things simpler, and also because this combined service is internal and has predictable traffic patterns. However, the collector is decoupled from the query and web service because the more Knewton services integrated with the collector, the more tracing data itwould have to process.

Zipkin UI

Out of the box, Zipkin provides a simple UI to view traces across all services. While it’s easy to view logs for a particular Trace ID across all services, the Zipkin UI provides a summarized view of the duration of each call without having to look through hundreds of log statements. It’s also a useful way of identifying  the biggest or slowest traces over a given period of time. Finding these outliers allowed us to flag cases where we were making redundant calls to other services that were slowing down our overall SLA for certain call chains. Here’s a screenshot of a trace appearing in the Zipkin UI:

TDist-zipkin-UI-knewton

Of course, the UI doesn’t come without drawbacks. While it’s easy to look at distinct individual traces, we found the Zipkin UI lacking for examining aggregate data. For example, there’s currently no way to get aggregate timing information or aggregate data on most called endpoints, services etc.

Throughout the development process and rolling out of the Zipkin infrastructure, we made several open-source contributions to Zipkin, thanks to its active and growing community.

Splunk

As mentioned above, the thread name of the current thread servicing a request is also changed, and the trace ID is appended to it. Because of this we can query for logs across all of the trace-enabled services for a particular call. This makes debugging a lot easier, and it’s proven to be very useful in post mortem analysis, log aggregations, debugging for isolated problems, and explaining uncommon platform behavior.

Thrift

Thrift is a cross-language RPC framework for building up scalable services. The user can define a service and data model spec in Thrift, and Thrift will compile the spec into many different languages. Users can then implement the generated service interfaces in the desired language. Thrift also automatically generates the client code and data structures for the services defined by the user.

Thrift is the most widely used RPC method between services at Knewton. Most of our services talk to each other through this framework, so supporting it while still maintaining backwards compatibility was critical for the success of this project. More precisely, we wanted services that were not tracing-enabled to be able to talk to tracing-enabled services.

When we started looking into adding tracing support to Thrift, we experimented with two different approaches. The first approach involved a modified Thrift compiler, and the second involved modified serialization protocols and server processors. Both methods had their advantages and disadvantages.

Custom Compiler

In this approach, we experimented with modifying the C++ Thrift compiler to generate additional service interfaces that could pass along the tracing data to the user. Modified thrift compilers are not uncommon; perhaps the most famous example is Scrooge. One advantage of a modified compiler was that clients would have to swap out fewer class implementations in their code, since tracing was supported right in the generated code. Clients could also get a reference of the tracing data from the service interface.

Although we didn’t benchmark, we also think that this approach would have been marginally faster, since there are fewer classes delegating to tracing implementations. However, we would have had to recompile all of our Thrift code and deviate from the open-source version, making it harder to upgrade in the future. We also soon realized that allowing the user access to the tracing data might not be desirable or safe, and data management might be better left to TDist for consistency.

Custom Protocols and Server Processors

We ended up using this approach in production. Not having to maintain a custom compiler lowered our development cost significantly. The biggest disadvantage to customizing protocols and server processors was that we had to upgrade to Thrift 0.9.0 (from 0.7.0) to take advantage of some features that would make it easier to plug in our tracing components to the custom Thrift processors and protocols. The upgrade required a lot of coordination across the organization. Thankfully, the newer version of Thrift was backwards-compatible with the older version, and we could work on TDist while Knewton services were being updated to the newer version. However, we still had to release all Knewton services before we could start integrating them with our distributed tracing solution.

Upgrading Thrift

Upgrading libraries when using a dependency framework is relatively easy, but for an RPC framework like Thrift and a service-oriented architecture with a deep call dependency chain, it gets a little more complicated. A typical server will have server and client code, with the server code often depending on other client libraries. Because of this, upgrades needed to start from the leaf services and move up the tree to avoid introducing wire incompatibilities since the outgoing services might not know if the destination service will be able to detect the tracing data coming through the wire.

TDist-upgrading-thrift-knewton

Another hurdle was that certain services, such as the Cassandra client library Astyanax depended on third-party libraries that in turn depended on the Thrift 0.7.0. For Astyanax, we had to shade the JARs using Maven and change package names so that they didn’t collide with the newer Thrift library. All of this had to happen quickly and without downtime. And because we didn’t want other teams at Knewton incurring the cost of this upgrade, the distributed tracing team had to implement and roll out all the changes.

Tracing Thrift Components: How It Works

Our Thrift solution consisted of custom, backwards-compatible protocols and custom server processors that extract tracing data and set them before routing them to the appropriate RPC call. Our protocols essentially write the tracing data at the beginning of each message. When the RPC call reaches the server, the processor will identify and note whether the incoming call has tracing data so it can respond appropriately. Calls with tracing data get responses with tracing data, and requests from non-integrated services that don’t carry tracing data get responses without tracing data. This makes the tracing protocols backwards-compatible since the server outgoing protocols don’t write tracing data if instructed not to by the processor servicing the request. A tracing protocol can detect whether the payload contains tracing data based from the first few bytes. Thrift appends a protocol ID to the beginning, and if the reading protocol sees that the first few bytes do not indicate the presence of tracing data the bytes are put back on the buffer and the payload is reread as a non-tracing payload. When reading a message, the protocols will extract the tracing data and set them to a ThreadLocal for the thread servicing the incoming RPC call using the DataManager. If that thread ever makes additional calls to other services downstream, the tracing data will be picked up from the DataManager automatically by TDist and will get appended to the outgoing message.

TDist-tracing-thrift-components-knewton

Here’s a diagram showing how the payload is modified to add tracing data:

TDist-tracing-payload-knewton

Kafka Tracing Support

When we were adding tracing support to Kafka, we wanted to keep the Kafka servers, also referred to as brokers, as a black box. In other words, we wanted to pass the data through the brokers without them necessarily knowing and therefore not having to modify the Kafka broker code at all. Similar to our approach with RPC services, we upgraded the consumers before upgrading the producers. The consumers are backwards-compatible and can detect when a payload contains tracing data, deserializing the content in the manner of the Thrift protocols described above. To achieve this, we require clients to wrap their serializers/deserializers in tracing equivalents that delegate reading and writing of the non-tracing payload to the wrapped ones.

TDist-kafka-tracing-support-knewton

TDist-kafka-tracing-payload-knewton

HTTP Tracing Support

As some of Knewton’s internal infrastructure and all public facing endpoints are HTTP-based, we needed a simple way of instrumenting HTTP calls to carry tracing information.

This was quite simple, because HTTP supports putting arbitrary data in headers. According to section 5 of rfc2047, the only guideline for adding custom headers is to prefix them with a `X-`.

We elected to continue the Zipkin tradition and use the following headers to propagate tracing information:

  • X-B3-TraceId
  • X-B3-SpanId
  • X-B3-ParentSpanId

Services at Knewton primarily use the Jetty HTTP Server and the Apache HTTP Client. Both of these projects allow for easy header manipulation.

Jetty services requests by routing them to a Servlet. As part of this routing, Jetty allows the request and response to pass through a series of Filters. We felt this was the ideal place to deal with tracing data. When any incoming request comes with tracing data headers, we construct span data from it and submit it to the DataManager.

With the Apache HTTP Client, we use an HttpRequestInterceptor and HttpResponseInterceptor, which were designed to interact with header contents and modify them. These interceptors readtracing data from headers and set them using the DataManager and vice versa.

Guice

For those unfamiliar with Guice, it’s a dependency management framework developed at Google. To make the TDist integrations with our existing services easier and less error-prone, we relied on Guice and implemented several modules that our clients simply had to install. Guice handles dependency injection during object instantiation and makes it easy when swapping implementations of interfaces. If throughout this article you have been  thinking that integrating with TDist sounds complicated, a lot of the time all our clients needed to do was install additional Guice modules that would bind our tracing implementations to existing Thrift interfaces. This also meant that our clients never had to instantiate any of our tracing-enabled constructs. Whenever a TDist client forgets to bind something, Guice would notify our clients at compile time. We put a lot of thought into how we laid out our Guice module hierarchies so that TDist didn’t collide with our clients, and we were very careful whenever we had to expose elements to the outside world.

The Tracing Message Bus

The tracing message bus is where all our client services place tracing data prior to its being consumed by the Zipkin collector and persisted. Our two options were Kafka and Kinesis, and we ended up choosing Kinesis. We were considering Kafka because Knewton, has had a stable Kafka deployment for several years. At the time, our Kafka cluster, which we’ve been using as our student event bus, was ingesting over 300 messages per second in production. Our initial estimates for putting us in the range of over 400,000 tracing messages per second with only a partial integration. The idea of straining production systems with instrumentation data made us nervous. Kinesis seemed like an attractive alternative that would be isolated from our Kafka servers, which were only handling production, non-instrumentation data. At the time of implementation, Kinesis was a new AWS service and none of us were familiar with it. Its price, throughput capabilities, and the lack of maintenance on our end sealed the deal for us. Overall, we’ve been satisfied with its performance and stability. It’s not as fast as Kafka, but the nature of our data made it acceptable to have an SLA of even a few minutes from publication to ingestion. Since we deployed the tracing message bus to production, we were also able to easily scale up the number of Kinesis shards without incurring any downtime.

The Tracing Data Store

The tracing data store is where all our tracing data ends up. The data stay there for a configurable time and are queried by the Zipkin query service to display on the UI. Zipkin supports a lot of data stores out of the box, including Cassandra, Redis, MongoDB, Postgres and MySQL. We experimented with Cassandra and DynamoDB, mainly because of the institutional knowledge we have at Knewton, but ended up choosing Amazon’s Elasticache Redis. The most important reasons behind our decision were

  • Time to production, given that we didn’t have to roll out and maintain a new cluster
  • cost
  • easier integration with Zipkin with less code
  • support for TTLs on the data.

Conclusion

Our tracing solution at Knewton has been in all environments for a few months now. So far it has proven to be invaluable. We have only started to scratch the surface with what we can do with the tracing and timing data we’re collecting. We had a lot of fun implementing and rolling out tracing at Knewton, and we have come to understand the value of this data.

Rolling Out the Mesos Slave Roller

A few months ago, Knewton started running most services via Docker containers, deployed to an Apache Mesos cluster with a Marathon scheduler. This new infrastructure makes it easy to deploy and manage services but adds complexity to maintaining them.

Applications running on a Mesos cluster run on a group of slave nodes. These slave nodes often need to be upgraded or reconfigured. Although it is usually possible to make modifications to a Mesos slave in place, the Infrastructure Team at Knewton prefers to replace slaves entirely, following the principles of immutable infrastructure. Mesos makes it simple to add and remove slaves and we run them on AWS EC2, so replacing a slave is really easy.

Moreover, immutable Mesos slaves have some nice properties:

  • Configuration management code can be much simpler as we don’t need to uninstall old versions or configurations
  • We can be sure that the state of configuration management at the time of the slave’s launch corresponds to the configuration of the Mesos slave.
  • Rollbacks are simpler in case of failures.
  • Replacing a Mesos slave is a good stress test. If we could not stand to lose one slave, we would be in trouble.

Still, relaunching all of the nodes in a Mesos slave cluster has to be done carefully to prevent outages. Marathon will relaunch tasks that were running on a slave that is being replaced, but if all of the remaining tasks of a service are on that one slave, there will be an outage.

When Marathon launches a task, it will launch it on any slave that fits the constraints of that application. For Knewton, the most useful set of constraints includes CPU, memory, and balanced distribution across availability zones. Since it is possible that any service task might be launched on any slave, changes need to be applied to the entire cluster at the same time.

The First Attempt

Shortly after switching to Mesos, we discovered that we needed to upgrade Docker. For the upgrade, we chose to bring up a new set of Mesos slaves, then start terminating the old slave instances one by one. We determined that would be safe because Marathon would relaunch tasks on the new slaves as the old ones were terminating.

The replacement process was successful, but not without its challenges:

It takes a while: Terminating an EC2 instance, deregistering a slave from its master cluster, and relaunching all of the service tasks took about four minutes. At the time, we had up to 55 slaves per environment. With the time it takes to launch new nodes, the process took around four hours.

Don’t trust your eyes: One health check failing can be an indication of a bigger problem. One bad service task may not cause an outage or integration test failures, but it can be an early indication that an upgrade needs to be rolled back. Having a human watch web interfaces for green dots or failure messages is ineffective and error-prone.

Validation: Marathon allows you to set a constraint that will prevent two tasks of the same application from launching on the same Mesos slave. That property seems appealing, but when you use a small set of large EC2 instances, you may have more tasks of an application than instances to run them on — in which case, the tasks won’t launch.

For that reason, we don’t enforce that constraint. So sometimes all tasks of some service will be running on a single slave. When that happens, taking that slave down may cause an overall service outage. Once again, having a human assert that is not the case takes time and is error prone.

Maintaining balance: Our Mesos slave clusters consist of EC2 instances that are part of an Auto Scaling Group. The ASG AZRebalance process keeps the instances in that ASG balanced across Availability Zones. The ASG maintains balance when launching new instances, capacity permitting. Terminating an instance can also trigger AZRebalance, launching a new instance and terminating an existing one. If you have services that have two tasks, terminating an instance and triggering AZRebalance may cause an outage. The AZRebalance process can be turned off during an upgrade. An ASG will still maintain balance when launching instances (unless there is only one task, in which case an outage must be acceptable), but once the original set of nodes has been removed, instances may no longer be evenly distributed across availability zones. When AZRebalance is turned back on, some instances may need to be terminated. For example if instances can be launched across three availability zones, one instance may be terminated. (See appendix for details.)

Given our first experience, we decided to automate the process of replacing slaves and built the Mesos Slave Roller.

Mesos Slave Roller

The Mesos Slave Roller (MSR) is an automated tool that replaces all the slaves in a cluster. It ensures that all components stay healthy and permits its user to stop and start at will.

mesos-slave-roller

The Mesos Slave Roller takes as input the environment to configure and the number of slaves that there should be when it’s done.

It compares this desired end-configuration to the existing state of the Mesos slave ASG. It then generates a list of actions that will produce the desired configuration:

  1. Scale up the slave ASG by the number of instances in the desired end-configuration.
  2. For each of the original Mesos slaves:
    1. Trigger a termination event
    2. Log the termination in a checkpoint file
    3. Wait for Mesos to deregister that slave
    4. Wait for Marathon to report that all services are healthy

Updating minimum and maximum instance counts for the ASG is a separate process. If a checkpoint file were to accidentally be deleted, rerunning the Mesos Slave Roller could result in a costly and potentially dangerous state with many more instances running than expected.

Scaling and terminating is done by directly invoking the autoscaling API. To scale, we just update the desired count for the ASG. To terminate, we use the terminate-instance-in-autoscaling-group function and tell it to decrement the capacity (desired count).

Health Checks: While running, the Mesos Slave Roller queries the autoscaling, Mesos, and Marathon APIs to make sure all parts of the system are functioning as expected. The following checks are performed:

  1. Checks the Mesos /slaves endpoint to confirm that the appropriate amount of Mesos slaves have been registered after increasing the size of the ASG.
  2. Checks the Mesos /slaves endpoint to confirm that the correct slave disconnected.
  3. Checks the Marathon /v2/apps endpoint to see that all apps are healthy. Our services are interconnected, so it is necessary to check that everything still works, not just the apps that had tasks on the terminated slave.

We consider an app healthy if there are no deployments in progress and the number of tasks reporting healthy is equal to the number of desired instances. If a given app has no health check, then the number of tasks running must be equal to the number of desired instances.

Checkpoints: All of the Mesos Slave Roller‘s operations have some timeout associated with them. If they take longer than expected, the Mesos Slave Roller will pause and wait for a human to kick it off again. If some error is thrown or if an upgrade goes poorly and a human stops it, the Mesos Slave Roller can be resumed from the checkpoint file that it has created. When it resumes, it will use the checkpoint file to determine the remaining work it has to do.

Auto-balancing Availability Zones: To keep things simple and to minimize surprises, we turn off the AZBalance process before starting the Mesos Slave Roller. When replacement is complete, we turn AZBalance back on. We limit our ASG to three AZs so the potential extra balancing operation is safe as this entire process rests on being able to take down one node at a time.

Planned Enhancements

The Mesos Slave Roller has proven to be useful for maintaining our infrastructure. There are several enhancements we’d like to make:

Offer Draining: Mesos 0.25 introduced a useful suite of maintenance primitives, including the ability to drain Mesos slaves of offers. That would allow the Mesos Slave Roller to tell a slave to evict all of its applications instead of relying on termination. If we can shut down service tasks gracefully, we can minimize the impact of losing a task.

Tagging Mesos Slaves and moving tasks: When a Mesos slave is terminated and Marathon receives a notification that a task died, it redeploys that task on a healthy Mesos slave. Some tasks may relaunch on “old” Mesos slaves a few times before finally winding up on a new one. This process works, but leads to more churn than necessary.

An alternate approach would involve some sort of indicator on each slave that shows when it was launched. Ideally the indicator could just alternate between green and blue, but it may need to be more complicated. With an indicator, we can ask Marathon to relaunch apps on a new slave. This approach has the added benefit that, once all tasks are moved to new slaves, we can terminate all of the old slaves without paying attention to when the termination process finishes. Given that waiting for deregistration takes most of the four minutes per node, this approach should cut the time required to replace a cluster by more than half.

PagerDuty Integration: When the Mesos Slave Roller fails, it will pause and wait for a human to restart it. For now, a human has to periodically check to see that whether it is running. Improving error handling to trigger a PagerDuty alert will make the Mesos Slave Roller into a truly hands-off tool.

Appendix

Calculating how many moves need to be made:

Balanced state:  The difference in the number of instances between any two availability zones is at most 1.

Claim: Because the ASG is balanced before scaling and after scaling, the difference in the number of instances in any AZ after removing the original set can be at most 2.

Let:

  • A_x \leftarrow the number of instances in the AZ called (x) at the start of this process
  • D_x \leftarrow  number of instances added to the AZ called (x)

We can say that for any pair of AZs (i, j) that are in some ASG:

Before scale: \lvert A_i - A_j \rvert \le 1 — Property of ASG balancing

After scale: \lvert \left ( A_i + D_i \right ) - \left ( A_j + D_j \right ) \rvert  \le 1 — Property of ASG balancing

Proof:

\lvert D_i - D_j \rvert = \lvert D_i - D_j + A_i - A_i + A_j - A_j \rvert

= \lvert \left ( A_i + D_i \right ) - \left ( A_j + D_j \right )  + \left ( A_j - A_i \right ) \rvert

which, by the triangle inequality and the statements above means that

\lvert D_i - D_j \rvert \le \lvert \left ( A_i + D_i \right ) - \left ( A_j + D_j \right ) \rvert  + \lvert \left ( A_j  -A_i \right ) \rvert \rightarrow

\lvert D_i - D_j \rvert \le 2

Because of those properties, the number of final instances in each AZ can be one of three numbers: x - 1 (low), x (mid), and x + 1 (high) for some value x. In the case that there are occurrences of both the high and low values, some instances will have to move. The number of moves necessary is equal to the lesser of the number of highs and the number of lows. A move terminates an instance in one of the AZs with a high count and launches one in an AZ with low count. Once the counts remaining are only mids and lows or only mids and highs, the ASG is balanced.

In the worst case, half of the counts will be highs, and half of the counts will be lows. In that case, there are \tfrac {n}{2} moves where n is the number of AZs. If the number of AZs is odd, the amount of moves is \tfrac {n}{2} - 1 because the last node can not have a complementary low or high.

How Knewton Cutover the Core of its Infrastructure from Kafka 0.7 to Kafka 0.8

Kafka has been a key component of the Knewton architecture for several years now. Knewton has 17 Kafka topics consumed by 33 services. So when it came time to upgrade from Kafka 0.7 to Kafka 0.8 it was important to consider the cutover plan carefully. The needs of each producer/consumer pair were unique, so it was important to identify constraints by which to evaluate the plans. The rollout plans and the considerations by which they were evaluated are the focus of this blog post.

Producer vs. Consumer Cutover

There are two main approaches that one can take when doing a cutover from Kafka 0.7 to 0.8. The first option, call this the “producer cutover,” is to let the producer service cut over from producing to 0.7 to producing to 0.8, while having consumer services consume from both 0.7 and 0.8 simultaneously. The second approach, call this the “consumer cutover,” is to let the consumer services cut over from consuming from 0.7 to consuming from 0.8 while having the producer service produce to both 0.7 and 0.8 simultaneously.

Screen Shot 2015-09-28 at 12.17.11 PMScreen Shot 2015-09-28 at 12.16.48 PM

Rollout risk, consumer tolerance and rollback options differ for each approach and should be considered before choosing a cutover approach.

Rollout Risk

When evaluating rollout risk, consider how many consumers are involved. If there are many consumers, a rollout plan that switches all consumers at one time may be riskier than an approach that switches a single consumer at a time.

Consumer Tolerance

In order to determine the appropriate cutover method, it must first be made clear what conditions the consumer services tolerate.

  • How many duplicate messages are acceptable during a cutover?
  • How much message loss is acceptable during a cutover?
  • How much message delay is acceptable during a cutover?

Rollback

Unless message loss is not a big concern, it is important to have a rollback plan in case there are complications. It is not obvious how to rollback the cutover procedure without losing messages, or alternatively, re-consuming a large number of duplicates. How to plan a rollback depends completely on which cutover approach is used.

Inconsistent Partitioning

Another aspect of the consumer service to consider is if inconsistent partitioning is acceptable, i.e, is it acceptable to have two different consumer service instances receive messages for the same key for a short period of time during a cutover? For example, this could be a problem if it leads to two different consumer threads writing to the same index in a data store.

Producer Cutover Details

The steps for a producer cutover will most likely look like this:

  1. Deploy consumer service that consumes from both the 0.7 topic and the 0.8 topic that is to be cut over.
  2. Deploy producer service that produces to the 0.8 topic instead of the 0.7 topic.
  3. After the cutover is completed, cleanup by deploying consumer service that only consumes from the 0.8 topic.

A producer cutover has high rollout risk because it is all or nothing. All consumers will switch to consuming from 0.8 at the same time. If the topic to be cutover has few consumers, the risk of consumers failing during the cutover may be acceptable, and a producer cutover might be a good option.

If successful, consumer tolerance is not a factor, and no messages will be duplicated, lost or delayed.

If, for some reason, messages are not going through the system properly after the cutover, the system must be rolled back to a previous configuration that is known to function properly.

To ensure that no messages are lost during that rollback, the messages produced to 0.8 that failed to be processed properly must somehow be replayed to 0.7. This means that a producer cutover approach must have a reliable replay process that copies messages from 0.8 to 0.7 if message loss is not acceptable.

Any rollback scenario will be more complicated if there are many consumers. If the cutover only fails for some consumers, and some are successful, production must still revert to 0.7. Replaying messages would in this scenario cause message duplication for the consumers that had a successful cutover.

Consumer Cutover Details

The consumer cutover approach is recommended by the Kafka team, which provides the KafkaMigrationTool for this purpose.

In the words of the Kafka team, a consumer cutover using the KafkaMigrationTool would look like:

  • Setup the migration tool to consume messages from the 0.7 cluster and re-publish them to the 0.8 cluster.”
  • “Move the 0.7 consumers consuming from the 0.7 cluster to 0.8.”
  • “Move the 0.7 producers to 0.8.”

Rollout risk is mitigated with the consumer cutover because each consumer service may move from 0.7 to 0.8 individually.

The problem with the approach, as described on the migration page, is that there is a very real risk of some message loss, or alternatively, some message duplication. The page has no discussion of consumer tolerance, including risk of message loss and message duplication, and this is important to consider. Assuming that the consumer is configured with auto.offset.reset=largest, some messages will be lost if the 0.7 consumers are stopped before the 0.8 consumers are started. If 0.7 consumers are stopped after the 0.8 consumers are started, there is a possibility of significant message duplication. As most readers would already know, the default Kafka consumer behavior of “at least once delivery,” also does not guarantee no duplicate messages, but for a cutover, there are things that can be done to eliminate the otherwise guaranteed duplicates.

To avoid the pitfalls mentioned above, the concept of tagging and filtering can be introduced. A tag is an additional piece of information the producer adds to each message, and the consumer then applies a filter based on this tag. There are a few different ways this tagging can be done. The suggestion made here is to filter based on a timestamp. The producer adds an additional key to the message to say the time at which it was put onto the queue. Based on some cutover time X, 0.7 consumers would only process messages with timestamps before X, and 0.8 consumers would only process messages with timestamps after X.

The benefit of the tag-and-filter approach is that after the filtering consumer service has been deployed, it is possible to verify that the 0.8 consumer is up and running correctly, although filtering out messages that should only be processed by the 0.7 consumer.

A consumer cutover process based on timestamp filtering would look like this:

  1. Deploy producer service that produces to both the 0.7 topic and the 0.8 topic that is to be cut over.
  2. Determine the cutover time X.
  3. Deploy consumer service that consumes with a time filter from both the 0.7 topic and the 0.8 topic. The 0.7 consumers only process messages with timestamps before X and the 0.8 consumers only process messages with timestamps after X.
  4. Wait for the cutover time.
  5. Deploy consumer service version that only consumes from the 0.8 topic.
  6. Deploy producer service version that only produces to the 0.8 topic.

Using the KafkaMigration tool to accomplish production to both 0.7 and 0.8 is possible, but it adds lag to the message delivery and only replaces one of the steps listed in the tag-and-filter approach. If many consumers are to be cutover, not necessarily all around the same time, this tool may have to run for a long time. A tool that runs for a long period of time and provides critical functionality should probably be treated the same way as other critical parts of the system, which typically means that it requires monitoring and alerting. The alternative — creating a producer service version that produces to both 0.7 and 0.8 — might require less work.

If for some unforeseen reason messages are still not being processed properly by the 0.8 consumer after the cutover, a rollback plan should be executed. Ideally, as the 0.7 consumer starts to filter out messages, it should also stop committing offsets. A rollback would then only entail re-deploying the old consumer service version that consumes only from 0.7, and message consumption would resume from the time of the cutover. Due to bug KAFKA-919, stopping the offsets from being committed might require some extra work (discussed more later).

If the cutover is successful, but issues with the 0.8 setup (e.g., performance issues) are discovered days later, a “non-immediate” rollback plan should be executed. Unless several days worth of message duplicates are acceptable, a different approach than the “immediate” rollback plan is required. For this scenario, a reverse cutover is recommended. Simply configure the consumer service to perform a reverse timestamp-based cutover with 0.7 consumers now only processing messages with timestamps after some reverse-cutover time Y, and 0.8 consumers only processing messages with timestamps before Y.

Inconsistent Partitioning

When consumer tolerance was previously discussed, the issue of having two different consumer service instances receiving messages for the same key was mentioned. It would be difficult (if even possible) to make 0.7 and 0.8 message partitioning match perfectly. A perfect matching here means that with a double producer and double consumer setup, each message, produced to both 0.7 and 0.8, will be routed to the same consumer service instance. Without this guarantee, two messages with the same key, one created just before the cutover time and one just after, might go to two different consumer service instances. As previously mentioned, this could be a problem if, for example, it leads to two different consumer threads writing to the same index in a data store.

If inconsistent partitioning is an issue, in order to prevent two threads from processing messages with the same key at the same time, the only option might be to let the 0.7 message queue “drain” before starting to process the 0.8 messages. This cutover method could be accomplished by either a producer or a consumer cutover, the additional key step being a pause before starting the 0.8 consumer service. Pausing the consumer service will obviously cause some message delay.

KAFKA-919

Ticket KAFKA-919 describes a major bug in the 0.7 client where “disabling of auto commit is ignored during consumer group rebalancing.” In order to be able to do an “immediate” rollback, the 0.7 consumer must stop committing offsets at the cutover time. This bug causes offsets to be committed during a consumer rebalance, even if consumers have set the auto-commit property to false.

The easiest workaround for this problem is preventing a consumer rebalance from taking place by scaling down the consumer service to have only one consumer in the consumer group. In this rollback scenario, the one consumer service instance is restarted with a configuration to only consume from 0.7 (because the 0.8 setup is not working, for some unexpected reason). Restarting all consumers in a consumer group usually triggers a consumer rebalance, unless there is only one consumer.

Conclusion

For most topic cutovers, Knewton employed a time-based tag-and-filter approach. This approach was deemed the least risky for the requirements of those producers and consumers, as well as the easiest to reason about and rollback. Hopefully the considerations outlined above will help you to choose the best cutover plan for your system.

Written by Christofer Hedbrandh and Sarah Haskins

Kankoku: A Distributed Framework for Implementing Statistical Models

As future-facing as Knewton’s adaptive learning platform may be, the concept of a personalized classroom has a surprisingly rich history. The idea has intrigued educators and philosophers for decades. In 1954, behavioral psychologist B.F. Skinner invented the concept of programmed instruction, along with a working mechanical prototype to boot. His “teaching machine” consisted of a wooden box on which questions were displayed to students on strips of paper controlled by turning knobs. One would only progress upon answering a question correctly. A crucial feature of the teaching machine was that “the items were arranged in a special sequence, so that, after completing the material in frame 1, the students were better able to tackle frame 2, and their behavior became steadily more effective as they passed from frame to frame.” The argument upon Skinner’s teaching machine was founded still holds water today: that “what is taught to a large group cannot be precisely what each student is ready just at that moment to learn.”1

KONICA MINOLTA DIGITAL CAMERA

Sixty years later, examining Skinner’s prototype still provides an insightful frame of reference. Knewton’s platform is responsible for tracking the individual learning states of each student at the granularity of individual concepts and questions. Like the teaching machine, we must deliver relevant recommendations in real-time and classroom analytics in near real-time. Those recommendations and analytics serve as a tool for both students and teachers to improve student outcomes. Considerations like these influence the engineering decisions we make on a daily basis, including the decision to use a stream-processing framework to power several of our statistical models. In this blog post, we will open the hood of our own teaching machine to explore the tradeoffs behind the design of Knewton’s scientific computing platform.

Why Stream Processing?

Knewton’s recommendation engine faces the task of providing recommendations to millions of students in real-time. As one of the pioneers of behaviorism, Skinner certainly understood the importance of delivering the right feedback at the right time.2 Respond to a student event (e.g., finishing an article) just two minutes late, and the impact of a recommendation diminishes rapidly. But what goes into each recommendation under the hood? A recommendation is essentially a ranked selection of instructional content that is most relevant to the subject matter that a student is studying at any particular time. Every student’s learning history (the data representing their interactions with content and their activity on the system) is taken into account. Knewton’s recommendation engine also considers other factors, such as each student’s learning goals and deadlines. All of this data is processed through a variety of psychometric and statistical models that estimate various characteristics of students (e.g., their proficiency or engagement level) and content (e.g., its difficulty or effectiveness). While some of these computations can be performed ahead of time, there are still numerous models that must be computed on the spot in response to a student interaction.3 Combining and processing all of this data results in a very large sequence of actions that must be performed in a small period of time.

Knewton is much more than just a differentiated learning app. Imagine if Skinner’s teaching machine knew every student’s individual learning history, knowledge state, habits, strengths, and upcoming goals, and could take into account goals set by teachers or administrators.

To handle all this data, Knewton has built Kankoku4, a stream processing framework that can respond to individual events in real-time.5 Stream processing systems operate under the requirement that inputs must be processed “straight-through” — that is, real-time feeds must trigger a set of downstream outputs without necessarily having to resort to polling or any intermediate storage. Stream processing systems are also characterized by their support of real-time querying, fault-tolerance, and ability to scale horizontally.6 The primary complement to stream processing is batch processing, consisting of programming models such as MapReduce that execute groups of events scheduled as jobs. Batch computing is fantastic for efficiently performing heavy computations that don’t require immediate response times.

However, these advantages of batch processing are also what make it less suitable for responsive, high availability systems like Knewton’s.7

Kankoku

Kankoku is a scientific computing Java framework developed in-house that provides a programming model for developing decoupled scientific models that can be composed to create any kind of computable result. The framework aims to abstract away the details of retrieving and storing data from databases, reliability, scalability, and data durability, letting model writers concentrate on creating accurate and efficient models. In the example workflow below, the nodes (or Kankokulators, as we call them) represent individual (or sets of) calculations. Streams are fed into Kankoku from a queue, which serves as a message broker by publishing received student events into various topics to which Kankoku subscribes.

Kankoku.jpg

With this framework, complex multi-stage computations can be expressed as networks of smaller, self-contained calculations. This style of programming is especially well-suited for data analysis where the outputs of an arbitrary statistical model could be used as inputs to another. One example of this could be aggregating student psychometrics as inputs for modeling student ability using Item Response Theory (IRT).

Speed and horizontal scalability are also important in developing a stream processing framework for real-time events. One of the many ways Knewton achieves horizontal scalability is by partitioning the input data stream using a partitioning key in the queue.8

“Kankoku” Means “Recommendation”

Similar to how Skinner’s teaching machine immediately responds to individual inputs, Kankoku streamlines responsive event processing for arbitrary, unbounded data streams. Both serve a complex need — providing personalized learning recommendations — yet have internal mechanisms that are easily decomposable, and execution that is reproducible.

But Kankoku is very different from the teaching machine. The software it powers is capable of understanding and analyzing the learning mechanisms of millions of students. Ensuring that Knewton doesn’t sacrifice quality to meet the demands of quantity or speed is a top priority. To meet these ends, we are continually revising and extending our models to run more efficiently while delivering better results. Kankoku’s design is a strength here. Not only does it help Knewton break down a complex task into smaller pieces, it also makes it simpler to understand and tweak each component. Monitoring these models requires complex visibility tools that allow Knewton to examine intermediate computation in real-time. Kankoku is less like one teaching machine than it is hundreds of small machines working together in concert.

So What?

In his exposition “Programming Instruction Revisited,” Skinner spoke of his dream of creating technology that would help classrooms evolve beyond the “phalanx formation” by helping teachers become even more attuned to every student’s individual needs. As history has shown us, implementing such technology at scale is an extremely difficult problem. Truly understanding student needs and providing feedback in real-time is a non-trivial challenge for any person, much less a computer program. Practical machine learning and “artificial intelligence” is in many ways a systems engineering challenge — building models that can handle real-time workloads at scale is crucial to creating a service that will actually be useful to students and teachers. Well-designed systems will never replace teaching, but they can provide an automated, responsive, and unified platform to expose insights about student learning to teachers and parents around the world, who do understand how to best act on those insights.

Teachingmachine.jpg

Acknowledgements

I’d like to thank the creators of Kankoku — Nikos Michalakis, Ferdi Adeputra, Jordan Lewis, Erion Hasanbelliu, Rafi Shamim, Renee Revis, Paul Kernfeld, Brandon Reiss, George Davis, and Kevin Wilson — for their tireless work as well as letting me play with such an awesome piece of technology. Stay tuned for part 2 of this blog post for more details on my internship project (extending the Kankoku framework with Apache Storm).


  1. B.F. Skinner. Programming Instruction Revisited

  2. Knewton is not preaching or practicing behaviorism. This is only meant to be an analogy. 

  3. http://en.wikipedia.org/wiki/Online_algorithm 

  4. Kankoku means “advice” or “recommendation” in Japanese. It also means “Korea.” 

  5. In addition to powering Knewton’s recommendation engine, stream processing suits a variety of applications, ranging from powering Google Trends to supporting fraud detection and “ubiquitous computing” systems built on cheap micro-sensor technology that demand high-volume and low-latency requirements. Other applications include powering bank transactions (which require exactly-once delivery), image processing for Google Street View, and command-and-control in military environments. See: Akidau, et al. MillWheel: Fault-Tolerant Stream Processing at Internet Scale

  6. Stonebraker, et al. The 8 Requirements of Real-Time Stream Processing

  7. Frameworks such as the Lambda Architecture exist that unite both programming models. There is also technically a gray zone between batch and streaming processing frameworks – for instance, Spark Streaming processes events in microbatches. Some of our models can’t be implemented with microbatching, but it is an interesting idea worth exploring. 

  8. Alternative terminology for “grouping”: sharding, shuffling. 

Kankoku: A Distributed Framework For Implementing Statistical Models (Part 2)

The focus of my internship project this summer was to extend Kankoku (Knewton’s scientific computing framework) to operate in a more distributed fashion. There are a few reasons that drove this change — namely, increased functionality in model expression and greater operational control in model execution. In this blog post I will analyze these objectives in detail and explore why they necessitated tradeoffs from both the systems engineering and business perspectives.

Kankoku is a scientific computing Java framework developed in-house at Knewton that provides a stream-processing programming model for developing decoupled scientific models that can be composed to create any kind of computable result. For a more detailed discussion on stream processing and Kankoku, see part one of this blog post.

Weathering the Storm: Introducing Distributed Kankoku

Partitioning, or dividing a set of inputs into a collection of subsets, is a key problem in any distributed system. Mod hashing and consistent hashing are examples of how shuffle groupings would be implemented, in which keys are uniformly distributed across partitions in a pseudorandom process. Kankoku currently performs a shuffle grouping before model execution, which allows workloads to be balanced across separate machine stacks that each run independent instances of the same topology.1 However, the calculation of certain psychometrics may require additional partitioning (i.e., multi-partitioning).

Recall that Knewton performs online analysis of both the student and the classroom. Consider the scenario in which the output of Kankokulator node A (calculating a student metric) serves as the input to Kankokulator node B (calculating a classroom metric). Since A processes events per student, the initial grouping must happen by student ID. However, B must process events per classroom. This presents a problem, since there is no guarantee that two students in the same class are grouped to the same partition. A simple solution might be to route the output of A through a queue serving as an intermediate message broker. This queue can then regroup the data stream based on class ID:

Kankokulator.jpg

However, this approach scales poorly for several reasons. Creating new queue shards to track each new multi-partition can be difficult to maintain from a development standpoint. Rerouting the data stream to an intermediate broker with every grouping also introduces extra overhead and network latency. There is also no guarantee that the models execute deterministically. Previously, each instantiation of a Kankoku topology ran on its own machine, processing each input in a topologically-ordered fashion. With intermediate queues, keys may be processed out of order due to varying latency. A more general-purpose solution is preferable.

This is where the Apache Storm framework (originally developed by Twitter) comes in as a possible candidate. Like Kankoku, Storm is a general stream-processing framework, but with one crucial design difference: it is strongly distributed, in that nodes in the same topology need not run sequentially on the same machine. As a result, Storm supports the ability to perform arbitrary groupings between each node, and multiple groupings within the same topology.2 Nodes in a Storm topology are referred to as bolts, and data sources are referred to as spouts.

Using Storm’s Trident API, declaring a new grouping within the topology is as simple as calling the function partitionBy. The example below shows how our hypothetical scenario above might be implemented using Storm instead of rerouting through a queue:

tridenttopology.jpg

Kankoku can therefore be extended by “wrapping” subtopologies (individual Kankokulators or groups of Kankokulators) within Storm bolts. Bolts will encompass contiguous Kankokulators expecting data streams partitioned by a common key type, and a new bolt will be created whenever an additional partitioning operation is required. This interaction introduces the functionality of multi-partitioning while still preserving our original model execution; bolts do not define how data is managed and arbitrary Kankokulator code can still run within a bolt. Hence, in this architecture Kankoku provides a higher-level programming model built upon Storm.

Another use case for this particular design arises from Storm’s convenient parallelism hint feature. Parallelism hints are the initial number of executor threads allocated to a particular bolt, which can be rebalanced during runtime. Tuning the parallelism hint of bolts gives us additional operational control over executing topologies by weighting CPU resources differently for separate subtopologies. Therefore, subtopologies that we expect to be more computationally expensive can be allocated more processing power, which in turn helps increase throughput.

Queue.jpg

The topology above shows how a Storm-Kankoku topology might be represented. Within each bolt, the Kankoku subtopology will run deterministically so as to take advantage of data locality. Hence, it is advantageous to wrap as many Kankokulators as possible within each given bolt while still fitting the constraints imposed by weighted parallelism and multi-partitioning.

Tradeoffs of Operating In A Distributed Environment

My internship project this summer consisted of implementing a prototype of the integrated Storm-Kankoku framework similar to the sample topology displayed above in addition to examining the tradeoffs behind extending the Kankoku framework using Storm. Introducing added parallelism at a platform level can have sweeping effects on the behavior of our statistical models, affecting both integrity and performance. A few considerations we explored:

A) Bolt-level deterministic execution. Although Storm may not be able to recover the state of execution within an individual bolt if it fails, Storm’s “Transactional Topologies” guarantee that “multiple batches can be processed in parallel, but commits are guaranteed to be ordered.” Hence, topological ordering still applies and we expect reproducible execution.

B) Fault-tolerance. Storm provides fault tolerance with clear guarantees across bolt execution and state-saving operations (either exactly-once or at-least-once delivery). By assigning a monotonically increasing transaction ID to each commit of events, Storm provides the semantics needed to detect and filter out duplicate events replayed by Storm in the event of a failure. Fault tolerance is especially important when the outputs of Kankokulator nodes are saved or transmitted during execution — without Storm’s guarantees, events might be lost or duplicated.

C) Horizontal Scalability. Any implementation must take care to increase throughput gains without decreasing latency. One possible performance pitfall faced in a distributed environment is the added latency introduced by redundant computations that must be computed by each bolt (such as loading the Knewton knowledge graph). This could potentially be solved by an off-node cache such as ElastiCache at the cost of introducing additional complexity. In general, careful load testing must be performed to determine the ideal method of data processing — whether to pass values across the wire or to utilize intermediate checkpointing and storage structures.

As expected, many of these tradeoffs don’t point to a single right answer. For instance, depending on the scenario Knewton might leverage Storm’s exactly-once functionality at the expense of introducing more latency. In situations like these, it becomes less a question of which approach to take and more so a question of how important each requirement is. How important is it to filter out duplicate events? What is the cost of producing a recommendation that is stale, possibly by just a few seconds? How important is it for Knewton to keep its latency as low as possible? These questions strike at the heart of both Knewton’s internal systems design and its core business value-add, and encapsulate much of what made my internship intellectually engaging and rewarding.

Sources


  1. By topology, we mean a directed acyclic graph (DAG) that defines the workflow of calculations. 

  2. Storm implements the ability to partition by using an abstraction called an Active Distributed Hash Table (Active DHT). Active DHTs extend distributed hash tables to allow an arbitrary user defined function (UDF) to be executed on a key-value pair. Source: A. Goel, Algorithms for Distributed Stream Processing

Cassandra and Hadoop – Introducing the KassandraMRHelper

Here at Knewton we use Cassandra for storing a variety of data. Since we follow a service-oriented architecture, many of our internal services are backed by their own data store. Some of the types of data we store include student events, recommendations, course graphs, and parameters for models. Many of these services and clusters are often deployed in more than two environments, increasing the total number of Cassandra clusters deployed.

On the Analytics team at Knewton we are constantly working on improving a lot of the inferential models that go into our platform, while at the same time building new ones. This often involves munging a lot of data in short periods of time. For a lot of our ad-hoc analysis we use a data warehouse by which analysts can query and extract data relatively quickly. One of the challenges we’ve faced at Knewton — and specifically in Analytics — involved how to go about populating our data warehouse with data from Cassandra clusters that predated our data warehouse. To solve this problem, we implemented an internal library for bulk extracting data out of Cassandra into Hadoop with zero hits to the Cassandra cluster. A few months later we opened sourced it here and called it the KassandraMRHelper.

KassandraMRHelper takes a slightly different approach than the constructs contained in the Hadoop package in the Cassandra source code (e.g. AbstractColumnFamilyInputFormat), in that it doesn’t require a live Cassandra cluster to extract the data from. This allows us to re-run map-reduce jobs multiple times without worrying about any performance degradation of our production services. This means that we don’t have to accommodate more traffic for these offline analyses, which keeps costs down.

How does it work?
The KassandraMRHelper includes specialized Input Formats and Record Readers for SSTables. First, here’s a little bit about SSTables:

SSTables are immutable; once they’re written they never change.
SSTables can exist independently of each other but collectively they form the complete data set.
SSTables consist of 4-5 parts depending on which version you’re using:

giannisimage

There are 4 to 5 different components:

  • Data.db files store the data compressed or uncompressed depending on the configuration
  • Index.db is an index to the Data.db file.
  • Statistics.db stores various statistics about that particular SSTable (times accessed etc)
  • Filter.db is a bloomfilter that’s loaded in memory by the cassandra node that can tell it quickly whether a key is in a table or not.
  • CompressionInfo.db may or may not be there depending on whether compression is enabled for a ColumnFamily. It contains information about the compressed chunks in the Data.db file.

Data in columns and rows are essentially key value pairs, with rows as the keys and columns as values to the rows. The columns are also key value pairs consisting of a name and a value.

Given how data are stored, Cassandra is in fact a really good fit for MapReduce. The same partitioning schemes that Cassandra uses can also be used in MapReduce. Columns and rows can be the keys and values that get passed in the Mappers or Reducers in a MapReduce job.

Some key components of KassandraMRHelper are:

  • The SSTableInputFormat: The SSTableInputFormat describes how to read the SSTables and filters through all the components and ColumnFamilies, keeping only what’s necessary for processing using a custom PathFilter. There are two types of SSTableInputFormats depending on how you want to represent key/value pairs in MapReduce. The SSTableColumnInputFormat constructs an SSTableColumnRecordReader in which keys in the Mapper represent the row keys in Cassandra and values represent a single column under that row key. Similarly the SSTableRowInputFormat constructs an SSTableRowRecordReader in which keys in the Mappers are the Cassadndra row keys and values are column iterators over all the columns under a single row key.
  • The SSTableRecordReader: It internally uses an SSTableScanner and a Descriptor similar to the one contained in recent version of Cassandra but with backwards compatibility to identify all the parts of an SSTable. As described previously, subclasses of the SSTableRecordReader can represent values as single columns under a row or entire column iterators.
  • The SSTableColumnMapper: This abstract mapper inherits from the default Hadoop mapper but adds a bit more structure to deal with the ByteBuffers and columns contained in the SSTables. It can also skip tombstoned columns.
  • The SSTableRowMapper: This is similar to the mapper above that deals with column iterators.

Example
Setting up a MapReduce job for reading a Cassandra cluster becomes very simple. The only missing piece is finding an easy way to get all the SSTables into a Hadoop cluster. At Knewton we found Netflix’s Priam to be a good match. Priam backs up our Cassandra cluster multiple times a day into S3 making it really easy to transfer the data to Elastic MapReduce (EMR).

This simple MapReduce job shows a complete example job that consumes student event data from backed up Cassandra SSTables. The example can also be found here.

public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  SSTableInputFormat.setPartitionerClass(
      RandomPartitioner.class.getName(), job);
  SSTableInputFormat.setComparatorClass(LongType.class.getName(), job);
  SSTableInputFormat.setColumnFamilyName("StudentEvents", job);
  job.setOutputKeyClass(LongWritable.class);
  job.setOutputValueClass(StudentEventWritable.class);
  job.setMapperClass(StudentEventMapper.class);
  job.setReducerClass(StudentEventReducer.class);
  job.setInputFormatClass(SSTableColumnInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  SSTableInputFormat.addInputPaths(job, args[0]);
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  job.waitForCompletion(true);
}
public class StudentEventMapper extends SSTableColumnMapper
      <Long, StudentEvent, LongWritable, StudentEventWritable> {
  @Override
  public void performMapTask(Long key, StudentEvent value, Context context) {
    context.write(new LongWritable(getMapperKey(),
                  new StudentEventWritable(studentEvent));
  }
  @Override
  protected Long getMapperKey(ByteBuffer key, Context context) {
    ByteBuffer dup = key.slice();
    Long studentId = dup.getLong();
    return studentId;
  }
  @Override
  protected StudentEvent getMapperValue(IColumn iColumn, Context context) {
    return getStudentEvent(iColumn, context);
  }
}

Notice that the mapper extends from a specialized SSTableColumnMapper which can be used in conjunction with the SSTableColumnRecordReader.

The example above uses the identity reducer to simply write the data as comma separated values by calling the toString() method on the StudentEventWritable objects. The only additional task you have to worry about in the Reducer is deduping the data, since you will probably have a replication factor of > 1.

Automating this job becomes an easy task given that SSTables are immutable and older tables don’t have to be read if they were already read once. Enabling incremental snapshots can also help here.

Conclusion

If you want to get started on using the KassandraMRHelper you can check out the code here: https://github.com/Knewton/KassandraMRHelper. Contributions are more than welcome and encouraged.

If you’re interested in additional topics in Cassandra and Hadoop you should check out the presentation on bulk reading and writing Cassandra using Hadoop here with the slides shared here.