Distributed Tracing: Observations in Production

Previous blog posts have explained Knewton’s motivation for implementing distributed tracing, and the architecture we put together for it. At Knewton, the major consumers of tracing are ~80 engineers working on ~50 services. A team consisting of three engineers handled designing and deploying the tracing infrastructure detailed in the previous sections.  This section highlights our experience and challenges running Zipkin at Knewton over the past few months.

Technical Hurdles

We elected to use Redis on Amazon Elasticache as our storage for spans, which are detailed in the previous section. This caused us a few problems:

Redis was inexplicably slow

Our initial rollout had an early version of the Zipkin Collector that wrote to a cache.r3.2xlarge (58.2 GB Ram, 8 vCPU) Redis Elasticache instance, which could not keep up with the traffic we were sending it. It could only handle ~250 requests per second. This seemed very slow to us. To set expectations, we dug into how to benchmark Redis, and stumbled upon the venerable redis-benchmark, which informed us that it could handle at least 120,000 requests per second with the 120 byte payloads that we were transmitting.

Looking at Amazon’s Cloudwatch metrics during the initial rollout, we quickly realized that, while the test was using multiple connections to Redis, the Zipkin code was using only a single connection. We modified the code to use multiple connections so we could make ~3000 requests per second and still keep up with incoming traffic.

Redis lost all data stored on it every few days

After we got Redis running, we found that Redis would restart every few days, when the amount of data that we stored in Redis approached the memory allocation of the Elasticache instance.

It turned out that the memory listed on Amazon’s Elasticache documentation refers to the total memory on the instance that runs Redis, and not to the memory allocated to Redis. As we continued to load data into Elasticache Redis, the underlying instance would start swapping heavily, and the Redis process would be restarted. We speculated that the process was being OOM killed, but because we don’t have access to Linux logs on Elasticache instances, we couldn’t be certain. After setting the `reserved-memory` parameter to approximately 15% of the total memory on the cache instance, we no longer had this problem.

Amazon Kinesis Queue Library

We packaged TDist into a library responsible for collecting span data at every service and publishing it to our tracing message bus, Amazon Kinesis. During early testing, we had configured the AWS Kinesis stream to have only a single shard, limiting it to 1000 writes per second.

When we deployed a few services using TDist, we began seeing unusual growth in JVM heap sizes under bursty traffic. When traffic reached baseline levels, the heap shrunk to reasonable sizes.

The default configuration for the Kinesis Queue Producer, which made the assumption that the rate at which it receives data is less than the rate at which it can submit data to the Kinesis queue. As such, it makes use of an unbounded in-memory queue to submit data to Kinesis. We decided to drop data when we are unable to keep up, and changed to use a bounded queue. This resulted in predictable performance under load or network blips at the expense of losing data under load spikes.

Now we have predictable performance, but during periods of high traffic, the Kinesis publishers fall behind and fill up the local queues, having the unfortunate effect of dropping spans. This can be solved by increasing the rate at which we publish to Kinesis. The two options are to increase the number of publishers and thus the number of calls we make to the Kinesis API, or to increase the number of spans published per API call. Because the span payload is small, most of the time spent on the Kinesis API call is network time. We intend to alleviate this problem by using batch Kinesis operations to submit multiple spans at once..

Metrics and Monitoring

When tracing was first rolled out to Knewton, it was hard to get an idea of how our client applications were doing. While the server and client side metrics were reported to our Graphite based metrics system, we had a difficult time correlating those with metrics for Kinesis, which are reported with different granularity, units, and retention periods using Amazon CloudWatch. Knewton contributed to the cloudwatch-to-graphite project to get consolidated metrics in Graphite.

Once this was done, we were able to focus on the metrics that gave us most insight into how tracing was performing: the rate at which we are emitting spans from applications to Kinesis, and how quickly the Collector was able to ship these off to Redis. These rates determine the time it takes for a trace to appear in the Zipkin web interface

Organizational hurdles

User education

While we had stealthily instrumented the Thrift protocol to pass tracing data around, we found that developers were still looking at service logs instead of at tracing data while debugging. We evangelize tracing when developers run into problems across services, and gave a presentation on how developers can use tracing to pinpoint performance problems.

Successes

TDist allows Knewton to see internal dependencies between services clearly. Now, a service owner can see precisely which services depends on their service, and which functions they  depend upon. This allows us to optimize service APIs to match usage patterns.

The week after we launched distributed tracing, we noticed that a particular external API call that resulted in hundreds of calls to one internal service. We were able to add a new endpoint to the internal service to eliminate these large number of calls and speed up this endpoint.

Conclusion

TDist proved that we could pass data across services in an noninvasive way by modifying the Thrift protocol. Since then, we’ve built the TDist ecosystem and are using it in production. While we faced problems getting TDist into the hands of developers, it is incredibly useful when debugging problems across services.

Distributed Tracing: Design and Architecture

The previous blog post talked about why Knewton needed a distributed tracing system and the value it can add to a company. This section will go into more technical detail as to how we implemented our distributed tracing solution.

General Architecture and Tracing Data Management

Our solution has two main parts: the tracing library that all services integrate with, and a place to  store and visualize the tracing data. We chose Zipkin, a scalable open-source tracing framework developed at Twitter, for storing and visualizing the tracing data. Zipkin is usually paired with Finagle, but as mentioned in Part I, we ruled it out due to complications with our existing infrastructure. Knewton built the tracing library, called TDist, from the ground up, starting as a company hack day experiment.

TDist-architecture-tracing-data-management

Tracing data model

For our solution, we chose to match the data model used in Zipkin, which in turn borrows heavily from Dapper. A trace tree is made up of a set of spans.  Spans represent a particular call from client start through server receive, server send, and, ultimately, client receive.  For example, the call-and-response between ServiceA and ServiceB would count as a single span: .

TDist-RPC-span-knewton

Each span tracks three identifiers:

  • Trace ID: Every span in a trace will share this ID.
  • Span ID: The ID for a particular span. The Span ID may or may not be the same as the Trace ID.
  • Parent Span ID: An optional ID present only on child spans. The root span does not have a Parent Span ID.

The diagram below shows how these IDs are applied to calls through the tree.  Notice that the Trace ID is consistent throughout the tree.

TDist-trace-tree-with-IDs-knewton

For more details, read the Dapper paper.

TDist

TDist is a Java library that Knewton developed. All of our services use it to enable tracing. TDist currently supports Thrift, HTTP, and Kafka, and it can also trace direct method invocations with the help of Guice annotations.

Each thread servicing or making a request to another service gets assigned a Span that is propagated and updated by the library in the background. Upon receipt of a request (or right before an outgoing request is made), the tracing data are added to an internal queue, and the name of the thread handling the request is changed to include the Trace ID by a DataManager. Worker threads then consume from that queue, and the tracing data get published to our tracing message bus. Java ThreadLocals makes it easy to globally store and access information assigned to a particular thread, and that is  the approach we used in the DataManager.

Often, threads offload the actual work to other threads, which then either do other remote calls or report back to the parent thread. Because of this, we also implemented thread factories as well as executors, which know how to retrieve the tracing data from the parent thread and assign it to the child thread so that the child thread can also be tracked.

Zipkin

Once the tracing data arrive at the tracing message bus through TDist, the Zipkin infrastructure handles the rest of the process. Multiple instances of collectors,consuming from the message bus, store each record in the tracing data store. A separate set of query and web services, part of the Zipkin source code, in turn query the database for traces. We decided to join the query and the web service to keep things simpler, and also because this combined service is internal and has predictable traffic patterns. However, the collector is decoupled from the query and web service because the more Knewton services integrated with the collector, the more tracing data itwould have to process.

Zipkin UI

Out of the box, Zipkin provides a simple UI to view traces across all services. While it’s easy to view logs for a particular Trace ID across all services, the Zipkin UI provides a summarized view of the duration of each call without having to look through hundreds of log statements. It’s also a useful way of identifying  the biggest or slowest traces over a given period of time. Finding these outliers allowed us to flag cases where we were making redundant calls to other services that were slowing down our overall SLA for certain call chains. Here’s a screenshot of a trace appearing in the Zipkin UI:

TDist-zipkin-UI-knewton

Of course, the UI doesn’t come without drawbacks. While it’s easy to look at distinct individual traces, we found the Zipkin UI lacking for examining aggregate data. For example, there’s currently no way to get aggregate timing information or aggregate data on most called endpoints, services etc.

Throughout the development process and rolling out of the Zipkin infrastructure, we made several open-source contributions to Zipkin, thanks to its active and growing community.

Splunk

As mentioned above, the thread name of the current thread servicing a request is also changed, and the trace ID is appended to it. Because of this we can query for logs across all of the trace-enabled services for a particular call. This makes debugging a lot easier, and it’s proven to be very useful in post mortem analysis, log aggregations, debugging for isolated problems, and explaining uncommon platform behavior.

Thrift

Thrift is a cross-language RPC framework for building up scalable services. The user can define a service and data model spec in Thrift, and Thrift will compile the spec into many different languages. Users can then implement the generated service interfaces in the desired language. Thrift also automatically generates the client code and data structures for the services defined by the user.

Thrift is the most widely used RPC method between services at Knewton. Most of our services talk to each other through this framework, so supporting it while still maintaining backwards compatibility was critical for the success of this project. More precisely, we wanted services that were not tracing-enabled to be able to talk to tracing-enabled services.

When we started looking into adding tracing support to Thrift, we experimented with two different approaches. The first approach involved a modified Thrift compiler, and the second involved modified serialization protocols and server processors. Both methods had their advantages and disadvantages.

Custom Compiler

In this approach, we experimented with modifying the C++ Thrift compiler to generate additional service interfaces that could pass along the tracing data to the user. Modified thrift compilers are not uncommon; perhaps the most famous example is Scrooge. One advantage of a modified compiler was that clients would have to swap out fewer class implementations in their code, since tracing was supported right in the generated code. Clients could also get a reference of the tracing data from the service interface.

Although we didn’t benchmark, we also think that this approach would have been marginally faster, since there are fewer classes delegating to tracing implementations. However, we would have had to recompile all of our Thrift code and deviate from the open-source version, making it harder to upgrade in the future. We also soon realized that allowing the user access to the tracing data might not be desirable or safe, and data management might be better left to TDist for consistency.

Custom Protocols and Server Processors

We ended up using this approach in production. Not having to maintain a custom compiler lowered our development cost significantly. The biggest disadvantage to customizing protocols and server processors was that we had to upgrade to Thrift 0.9.0 (from 0.7.0) to take advantage of some features that would make it easier to plug in our tracing components to the custom Thrift processors and protocols. The upgrade required a lot of coordination across the organization. Thankfully, the newer version of Thrift was backwards-compatible with the older version, and we could work on TDist while Knewton services were being updated to the newer version. However, we still had to release all Knewton services before we could start integrating them with our distributed tracing solution.

Upgrading Thrift

Upgrading libraries when using a dependency framework is relatively easy, but for an RPC framework like Thrift and a service-oriented architecture with a deep call dependency chain, it gets a little more complicated. A typical server will have server and client code, with the server code often depending on other client libraries. Because of this, upgrades needed to start from the leaf services and move up the tree to avoid introducing wire incompatibilities since the outgoing services might not know if the destination service will be able to detect the tracing data coming through the wire.

TDist-upgrading-thrift-knewton

Another hurdle was that certain services, such as the Cassandra client library Astyanax depended on third-party libraries that in turn depended on the Thrift 0.7.0. For Astyanax, we had to shade the JARs using Maven and change package names so that they didn’t collide with the newer Thrift library. All of this had to happen quickly and without downtime. And because we didn’t want other teams at Knewton incurring the cost of this upgrade, the distributed tracing team had to implement and roll out all the changes.

Tracing Thrift Components: How It Works

Our Thrift solution consisted of custom, backwards-compatible protocols and custom server processors that extract tracing data and set them before routing them to the appropriate RPC call. Our protocols essentially write the tracing data at the beginning of each message. When the RPC call reaches the server, the processor will identify and note whether the incoming call has tracing data so it can respond appropriately. Calls with tracing data get responses with tracing data, and requests from non-integrated services that don’t carry tracing data get responses without tracing data. This makes the tracing protocols backwards-compatible since the server outgoing protocols don’t write tracing data if instructed not to by the processor servicing the request. A tracing protocol can detect whether the payload contains tracing data based from the first few bytes. Thrift appends a protocol ID to the beginning, and if the reading protocol sees that the first few bytes do not indicate the presence of tracing data the bytes are put back on the buffer and the payload is reread as a non-tracing payload. When reading a message, the protocols will extract the tracing data and set them to a ThreadLocal for the thread servicing the incoming RPC call using the DataManager. If that thread ever makes additional calls to other services downstream, the tracing data will be picked up from the DataManager automatically by TDist and will get appended to the outgoing message.

TDist-tracing-thrift-components-knewton

Here’s a diagram showing how the payload is modified to add tracing data:

TDist-tracing-payload-knewton

Kafka Tracing Support

When we were adding tracing support to Kafka, we wanted to keep the Kafka servers, also referred to as brokers, as a black box. In other words, we wanted to pass the data through the brokers without them necessarily knowing and therefore not having to modify the Kafka broker code at all. Similar to our approach with RPC services, we upgraded the consumers before upgrading the producers. The consumers are backwards-compatible and can detect when a payload contains tracing data, deserializing the content in the manner of the Thrift protocols described above. To achieve this, we require clients to wrap their serializers/deserializers in tracing equivalents that delegate reading and writing of the non-tracing payload to the wrapped ones.

TDist-kafka-tracing-support-knewton

TDist-kafka-tracing-payload-knewton

HTTP Tracing Support

As some of Knewton’s internal infrastructure and all public facing endpoints are HTTP-based, we needed a simple way of instrumenting HTTP calls to carry tracing information.

This was quite simple, because HTTP supports putting arbitrary data in headers. According to section 5 of rfc2047, the only guideline for adding custom headers is to prefix them with a `X-`.

We elected to continue the Zipkin tradition and use the following headers to propagate tracing information:

  • X-B3-TraceId
  • X-B3-SpanId
  • X-B3-ParentSpanId

Services at Knewton primarily use the Jetty HTTP Server and the Apache HTTP Client. Both of these projects allow for easy header manipulation.

Jetty services requests by routing them to a Servlet. As part of this routing, Jetty allows the request and response to pass through a series of Filters. We felt this was the ideal place to deal with tracing data. When any incoming request comes with tracing data headers, we construct span data from it and submit it to the DataManager.

With the Apache HTTP Client, we use an HttpRequestInterceptor and HttpResponseInterceptor, which were designed to interact with header contents and modify them. These interceptors readtracing data from headers and set them using the DataManager and vice versa.

Guice

For those unfamiliar with Guice, it’s a dependency management framework developed at Google. To make the TDist integrations with our existing services easier and less error-prone, we relied on Guice and implemented several modules that our clients simply had to install. Guice handles dependency injection during object instantiation and makes it easy when swapping implementations of interfaces. If throughout this article you have been  thinking that integrating with TDist sounds complicated, a lot of the time all our clients needed to do was install additional Guice modules that would bind our tracing implementations to existing Thrift interfaces. This also meant that our clients never had to instantiate any of our tracing-enabled constructs. Whenever a TDist client forgets to bind something, Guice would notify our clients at compile time. We put a lot of thought into how we laid out our Guice module hierarchies so that TDist didn’t collide with our clients, and we were very careful whenever we had to expose elements to the outside world.

The Tracing Message Bus

The tracing message bus is where all our client services place tracing data prior to its being consumed by the Zipkin collector and persisted. Our two options were Kafka and Kinesis, and we ended up choosing Kinesis. We were considering Kafka because Knewton, has had a stable Kafka deployment for several years. At the time, our Kafka cluster, which we’ve been using as our student event bus, was ingesting over 300 messages per second in production. Our initial estimates for putting us in the range of over 400,000 tracing messages per second with only a partial integration. The idea of straining production systems with instrumentation data made us nervous. Kinesis seemed like an attractive alternative that would be isolated from our Kafka servers, which were only handling production, non-instrumentation data. At the time of implementation, Kinesis was a new AWS service and none of us were familiar with it. Its price, throughput capabilities, and the lack of maintenance on our end sealed the deal for us. Overall, we’ve been satisfied with its performance and stability. It’s not as fast as Kafka, but the nature of our data made it acceptable to have an SLA of even a few minutes from publication to ingestion. Since we deployed the tracing message bus to production, we were also able to easily scale up the number of Kinesis shards without incurring any downtime.

The Tracing Data Store

The tracing data store is where all our tracing data ends up. The data stay there for a configurable time and are queried by the Zipkin query service to display on the UI. Zipkin supports a lot of data stores out of the box, including Cassandra, Redis, MongoDB, Postgres and MySQL. We experimented with Cassandra and DynamoDB, mainly because of the institutional knowledge we have at Knewton, but ended up choosing Amazon’s Elasticache Redis. The most important reasons behind our decision were

  • Time to production, given that we didn’t have to roll out and maintain a new cluster
  • cost
  • easier integration with Zipkin with less code
  • support for TTLs on the data.

Conclusion

Our tracing solution at Knewton has been in all environments for a few months now. So far it has proven to be invaluable. We have only started to scratch the surface with what we can do with the tracing and timing data we’re collecting. We had a lot of fun implementing and rolling out tracing at Knewton, and we have come to understand the value of this data.

Distributed Tracing: Motivation

Knewton’s technology solutions comprise more than 50 different services. Such a complex system can be difficult to debug, which is why we set out to integrate distributed tracing into our architecture. A distributed tracing system tracks requests by assigning a unique identifier for the request that is maintained through all service layers, from initial reception through response to clients. Without a distributed tracing system, understanding issues occurring deep in the stack becomes extremely difficult.

Let’s walk through a typical debugging scenario without distributed tracing. Here is a simplified view of the architecture involved in receiving a student event and generating a subsequent recommendation:

distributed-tracing-simplified-flow-knewton

  1. Student completes some work. The results are posted to the HTTP server.
  2. The HTTP server reposts the event to the distributed queue for durability.
  3. The student event processor converts the information from client language to internal Knewton language.
  4. The student event processor posts the translated event to a distributed queue.
  5. The recommendation service reads the event, generates a recommendation, and returns it to the client.

Let’s say the client reports that an event was sent for User A, but no recommendation was received for what User A should work on next. The problem could exist at any of the five layers described above. The identifiers used within each service may be different, so to find where the problem occurred, we’d have to query the logs of each service in serial. Debugging becomes even more complicated in a non-linear dependency chain.

In the world of distributed tracing, debugging would be reduced to two steps: find the incoming event, noting the trace ID assigned to it, and searching across all services for logs associated with that trace ID.

The distributed tracing system provides latency data for each step in the transaction. Before distributed tracing we were unable to calculate end-to-end time for a single transaction, but we could visualize the connections between our services. The Grafana graph below shows 95% latency for various steps in recommendation processing.

grafana-graph-recommended-processing-latency-knewton

To get the most out of a distributed tracing system, we identified key requirements:

Integration:

  • Adding tracing support to a service should require minimal or no code changes.
  • Adding tracing support to a service should not increase latency, nor should it affect service uptime or reliability.
  • The solution must be able to support our interservice communication protocols: Thrift, HTTP and Kafka.
  • The solution must provide a way for an external system to input the trace ID.  Current full-platform tests tell us only that an issue occurred, but have no indication as to where.  Smoke test alerts could include the trace ID, which would make debugging much quicker.
  • The trace ID and information should be accessible to a service for any purpose that the service finds useful, such as logging intermediary actions or including in error logs.

Data:

  • A solution that traces all events.  Some tracing solutions trace only a portion of traffic, and you never know when a particular event will require investigation.
  • A solution that will display an event from end to end, through each service that interacts with it.
  • Tracing information must include unique identification, system, action, timing and sequence.
  • System time across services cannot be guaranteed, so the solution must implement tracking of a logical order so that events can be displayed in the order in which they occurred.
  • Tracing data will not contain any Personally Identifiable Information or information proprietary to customers or Knewton.

Availability:

  • The solution must function in all environments: development, quality assurance, production.  Retention policy may be different for different environments.
  • Tracing reports must be available for immediate debugging and investigation.
  • Tracing data will be available for real-time debugging for one week.  All tracing data will be retained offline for historical analysis.

We analyzed the following distributed tracing solutions for their ability to satisfy our requirements:

Most of these products did not support all of the protocols we require. Kafka was most often missing, with Thrift a close second. And it would not be possible to enhance for the proprietary products to fit our protocol needs. Brave was a particularly compelling solution, but ultimately we decided it was too invasive.

In the end we decided to use Zipkin without Finagle. Finagle is a great product, but did not support Thrift 7 and reverting to an older version of Thrift would have been a large effort in the wrong direction. In the end, we upgraded to Thrift 9, but this was wire compatible between server and client so it was much easier to roll out than a switch to Scrooge.

Our next blog post will explain how we were able to integrate distributed tracing compatible with Zipkin and Finagle into our code while meeting all of the above requirements.

Rolling Out the Mesos Slave Roller

A few months ago, Knewton started running most services via Docker containers, deployed to an Apache Mesos cluster with a Marathon scheduler. This new infrastructure makes it easy to deploy and manage services but adds complexity to maintaining them.

Applications running on a Mesos cluster run on a group of slave nodes. These slave nodes often need to be upgraded or reconfigured. Although it is usually possible to make modifications to a Mesos slave in place, the Infrastructure Team at Knewton prefers to replace slaves entirely, following the principles of immutable infrastructure. Mesos makes it simple to add and remove slaves and we run them on AWS EC2, so replacing a slave is really easy.

Moreover, immutable Mesos slaves have some nice properties:

  • Configuration management code can be much simpler as we don’t need to uninstall old versions or configurations
  • We can be sure that the state of configuration management at the time of the slave’s launch corresponds to the configuration of the Mesos slave.
  • Rollbacks are simpler in case of failures.
  • Replacing a Mesos slave is a good stress test. If we could not stand to lose one slave, we would be in trouble.

Still, relaunching all of the nodes in a Mesos slave cluster has to be done carefully to prevent outages. Marathon will relaunch tasks that were running on a slave that is being replaced, but if all of the remaining tasks of a service are on that one slave, there will be an outage.

When Marathon launches a task, it will launch it on any slave that fits the constraints of that application. For Knewton, the most useful set of constraints includes CPU, memory, and balanced distribution across availability zones. Since it is possible that any service task might be launched on any slave, changes need to be applied to the entire cluster at the same time.

The First Attempt

Shortly after switching to Mesos, we discovered that we needed to upgrade Docker. For the upgrade, we chose to bring up a new set of Mesos slaves, then start terminating the old slave instances one by one. We determined that would be safe because Marathon would relaunch tasks on the new slaves as the old ones were terminating.

The replacement process was successful, but not without its challenges:

It takes a while: Terminating an EC2 instance, deregistering a slave from its master cluster, and relaunching all of the service tasks took about four minutes. At the time, we had up to 55 slaves per environment. With the time it takes to launch new nodes, the process took around four hours.

Don’t trust your eyes: One health check failing can be an indication of a bigger problem. One bad service task may not cause an outage or integration test failures, but it can be an early indication that an upgrade needs to be rolled back. Having a human watch web interfaces for green dots or failure messages is ineffective and error-prone.

Validation: Marathon allows you to set a constraint that will prevent two tasks of the same application from launching on the same Mesos slave. That property seems appealing, but when you use a small set of large EC2 instances, you may have more tasks of an application than instances to run them on — in which case, the tasks won’t launch.

For that reason, we don’t enforce that constraint. So sometimes all tasks of some service will be running on a single slave. When that happens, taking that slave down may cause an overall service outage. Once again, having a human assert that is not the case takes time and is error prone.

Maintaining balance: Our Mesos slave clusters consist of EC2 instances that are part of an Auto Scaling Group. The ASG AZRebalance process keeps the instances in that ASG balanced across Availability Zones. The ASG maintains balance when launching new instances, capacity permitting. Terminating an instance can also trigger AZRebalance, launching a new instance and terminating an existing one. If you have services that have two tasks, terminating an instance and triggering AZRebalance may cause an outage. The AZRebalance process can be turned off during an upgrade. An ASG will still maintain balance when launching instances (unless there is only one task, in which case an outage must be acceptable), but once the original set of nodes has been removed, instances may no longer be evenly distributed across availability zones. When AZRebalance is turned back on, some instances may need to be terminated. For example if instances can be launched across three availability zones, one instance may be terminated. (See appendix for details.)

Given our first experience, we decided to automate the process of replacing slaves and built the Mesos Slave Roller.

Mesos Slave Roller

The Mesos Slave Roller (MSR) is an automated tool that replaces all the slaves in a cluster. It ensures that all components stay healthy and permits its user to stop and start at will.

mesos-slave-roller

The Mesos Slave Roller takes as input the environment to configure and the number of slaves that there should be when it’s done.

It compares this desired end-configuration to the existing state of the Mesos slave ASG. It then generates a list of actions that will produce the desired configuration:

  1. Scale up the slave ASG by the number of instances in the desired end-configuration.
  2. For each of the original Mesos slaves:
    1. Trigger a termination event
    2. Log the termination in a checkpoint file
    3. Wait for Mesos to deregister that slave
    4. Wait for Marathon to report that all services are healthy

Updating minimum and maximum instance counts for the ASG is a separate process. If a checkpoint file were to accidentally be deleted, rerunning the Mesos Slave Roller could result in a costly and potentially dangerous state with many more instances running than expected.

Scaling and terminating is done by directly invoking the autoscaling API. To scale, we just update the desired count for the ASG. To terminate, we use the terminate-instance-in-autoscaling-group function and tell it to decrement the capacity (desired count).

Health Checks: While running, the Mesos Slave Roller queries the autoscaling, Mesos, and Marathon APIs to make sure all parts of the system are functioning as expected. The following checks are performed:

  1. Checks the Mesos /slaves endpoint to confirm that the appropriate amount of Mesos slaves have been registered after increasing the size of the ASG.
  2. Checks the Mesos /slaves endpoint to confirm that the correct slave disconnected.
  3. Checks the Marathon /v2/apps endpoint to see that all apps are healthy. Our services are interconnected, so it is necessary to check that everything still works, not just the apps that had tasks on the terminated slave.

We consider an app healthy if there are no deployments in progress and the number of tasks reporting healthy is equal to the number of desired instances. If a given app has no health check, then the number of tasks running must be equal to the number of desired instances.

Checkpoints: All of the Mesos Slave Roller‘s operations have some timeout associated with them. If they take longer than expected, the Mesos Slave Roller will pause and wait for a human to kick it off again. If some error is thrown or if an upgrade goes poorly and a human stops it, the Mesos Slave Roller can be resumed from the checkpoint file that it has created. When it resumes, it will use the checkpoint file to determine the remaining work it has to do.

Auto-balancing Availability Zones: To keep things simple and to minimize surprises, we turn off the AZBalance process before starting the Mesos Slave Roller. When replacement is complete, we turn AZBalance back on. We limit our ASG to three AZs so the potential extra balancing operation is safe as this entire process rests on being able to take down one node at a time.

Planned Enhancements

The Mesos Slave Roller has proven to be useful for maintaining our infrastructure. There are several enhancements we’d like to make:

Offer Draining: Mesos 0.25 introduced a useful suite of maintenance primitives, including the ability to drain Mesos slaves of offers. That would allow the Mesos Slave Roller to tell a slave to evict all of its applications instead of relying on termination. If we can shut down service tasks gracefully, we can minimize the impact of losing a task.

Tagging Mesos Slaves and moving tasks: When a Mesos slave is terminated and Marathon receives a notification that a task died, it redeploys that task on a healthy Mesos slave. Some tasks may relaunch on “old” Mesos slaves a few times before finally winding up on a new one. This process works, but leads to more churn than necessary.

An alternate approach would involve some sort of indicator on each slave that shows when it was launched. Ideally the indicator could just alternate between green and blue, but it may need to be more complicated. With an indicator, we can ask Marathon to relaunch apps on a new slave. This approach has the added benefit that, once all tasks are moved to new slaves, we can terminate all of the old slaves without paying attention to when the termination process finishes. Given that waiting for deregistration takes most of the four minutes per node, this approach should cut the time required to replace a cluster by more than half.

PagerDuty Integration: When the Mesos Slave Roller fails, it will pause and wait for a human to restart it. For now, a human has to periodically check to see that whether it is running. Improving error handling to trigger a PagerDuty alert will make the Mesos Slave Roller into a truly hands-off tool.

Appendix

Calculating how many moves need to be made:

Balanced state:  The difference in the number of instances between any two availability zones is at most 1.

Claim: Because the ASG is balanced before scaling and after scaling, the difference in the number of instances in any AZ after removing the original set can be at most 2.

Let:

  • A_x \leftarrow the number of instances in the AZ called x at the start of this process
  • D_x \leftarrow  number of instances added to the AZ called x

We can say that for any pair of AZs i, j that are in some ASG:

Before scale: \left\vert A_i - A_j \right\vert \le 1 — Property of ASG balancing

After scale: \left\vert \left ( A_i + D_i \right ) - \left ( A_j + D_j \right ) \right\vert  \le 1 — Property of ASG balancing

Proof:

\left\vert D_i - D_j \right\vert = \left\vert D_i - D_j + A_i - A_i + A_j - A_j \right\vert

                = \left\vert \left ( A_i + D_i \right ) - \left ( A_j + D_j \right )  + \left ( A_j - A_i \right ) \right\vert

which, by the triangle inequality and the statements above means that

\left\vert  D_i - D_j \right\vert \le \left\vert \left ( A_i  + D_i \right ) - \left ( A_j + D_j \right ) \right\vert  + \left\vert \left ( A_j  -A_i \right ) \right\vert \rightarrow

\left\vert D_i - D_j \right\vert \le 2

Because of those properties, the number of final instances in each AZ can be one of three numbers: x - 1 (low), x (mid), and x + 1 (high) for some value x. In the case that there are occurrences of both the high and low values, some instances will have to move. The number of moves necessary is equal to the lesser of the number of highs and the number of lows. A move terminates an instance in one of the AZs with a high count and launches one in an AZ with low count. Once the counts remaining are only mids and lows or only mids and highs, the ASG is balanced.

In the worst case, half of the counts will be highs, and half of the counts will be lows. In that case, there are \tfrac {n}{2} moves where n is the number of AZs. If the number of AZs is odd, the amount of moves is \tfrac {n}{2} - 1 because the last node can not have a complementary low or high.

Why Knewton Is Trying Kotlin

Kotlin-logo

Kotlin is a statically typed general purpose JVM language that provides flawless interoperability with Java. It is open source and developed by Jetbrains,creators of IntelliJ IDEA. It is concise, reducing boilerplate, and offers great null safety. Version 1.0 is planned for the end of 2015. Kotlin is evolving quickly. We found switching to Kotlin to be painless.

At Knewton we have been happily hacking on Kotlin for months. We’ve collected our experiences and put together an overview of why we decided to try it, highlighting some of Kotlin’s key selling points.

Ease of Adoption

We expected some overhead and a learning curve, but Kotlin surprised us with its variety of tools and ease of adoption, especially for a language that is still in beta. IntelliJ provides excellent built-in support for Kotlin that is on par with Java. Continue reading

The Nine Circles of Python Dependency Hell

“Dependency hell” is a term for the frustration that arises from problems with transitive (indirect) dependencies. Dependency hell in Python often happens because pip does not have a dependency resolver and because all dependencies are shared across a project.1 In this post, I’ll share a few of the strategies that I use to deal with some commonly-encountered problems.

These strategies assume that you’re using a dependency management setup similar to what we use at Knewton, which includes using pip, virtualenv, and good practices for your requirements.txt and install_requires. Some items are specific to organizations that use both internal and external Python libraries, but many of these items will apply to any Python project.

Detecting dependency hell

Even if your project only has a few first-level dependencies, it could have many more transitive dependencies. This means that figuring out that you even have a version conflict is non-trivial. If you just run pip install -r requirements.txt, pip will happily ignore any conflicting versions of libraries. In the best case, your project will work fine and you won’t even notice that this has happened. If you’re unlucky, though, you’ll get a mysterious runtime error because the wrong version of a library is installed.

In this case, my_app depends on foo_client and bar_client. Let’s assume that bar_client uses a feature that was introduced after requests 2.3.1. If pip installs requests==2.3.1 (from foo_client), bar_client will break because the feature it needs is missing! Note that foo_client and bar_client can each build fine independently.

In this case, my_app depends on foo_client and bar_client. Let’s assume that bar_client uses a feature that was introduced after requests 2.3.1. If pip installs requests==2.3.1 (from foo_client), bar_client will break because the feature it needs is missing!
Note that foo_client and bar_client can each build fine independently.

Continue reading

How Knewton Cutover the Core of its Infrastructure from Kafka 0.7 to Kafka 0.8

Kafka has been a key component of the Knewton architecture for several years now. Knewton has 17 Kafka topics consumed by 33 services. So when it came time to upgrade from Kafka 0.7 to Kafka 0.8 it was important to consider the cutover plan carefully. The needs of each producer/consumer pair were unique, so it was important to identify constraints by which to evaluate the plans. The rollout plans and the considerations by which they were evaluated are the focus of this blog post.

Producer vs. Consumer Cutover

There are two main approaches that one can take when doing a cutover from Kafka 0.7 to 0.8. The first option, call this the “producer cutover,” is to let the producer service cut over from producing to 0.7 to producing to 0.8, while having consumer services consume from both 0.7 and 0.8 simultaneously. The second approach, call this the “consumer cutover,” is to let the consumer services cut over from consuming from 0.7 to consuming from 0.8 while having the producer service produce to both 0.7 and 0.8 simultaneously.

Screen Shot 2015-09-28 at 12.17.11 PMScreen Shot 2015-09-28 at 12.16.48 PM

Rollout risk, consumer tolerance and rollback options differ for each approach and should be considered before choosing a cutover approach.

Rollout Risk

When evaluating rollout risk, consider how many consumers are involved. If there are many consumers, a rollout plan that switches all consumers at one time may be riskier than an approach that switches a single consumer at a time.

Consumer Tolerance

In order to determine the appropriate cutover method, it must first be made clear what conditions the consumer services tolerate.

  • How many duplicate messages are acceptable during a cutover?
  • How much message loss is acceptable during a cutover?
  • How much message delay is acceptable during a cutover?

Rollback

Unless message loss is not a big concern, it is important to have a rollback plan in case there are complications. It is not obvious how to rollback the cutover procedure without losing messages, or alternatively, re-consuming a large number of duplicates. How to plan a rollback depends completely on which cutover approach is used.

Inconsistent Partitioning

Another aspect of the consumer service to consider is if inconsistent partitioning is acceptable, i.e, is it acceptable to have two different consumer service instances receive messages for the same key for a short period of time during a cutover? For example, this could be a problem if it leads to two different consumer threads writing to the same index in a data store.

Producer Cutover Details

The steps for a producer cutover will most likely look like this:

  1. Deploy consumer service that consumes from both the 0.7 topic and the 0.8 topic that is to be cut over.
  2. Deploy producer service that produces to the 0.8 topic instead of the 0.7 topic.
  3. After the cutover is completed, cleanup by deploying consumer service that only consumes from the 0.8 topic.

A producer cutover has high rollout risk because it is all or nothing. All consumers will switch to consuming from 0.8 at the same time. If the topic to be cutover has few consumers, the risk of consumers failing during the cutover may be acceptable, and a producer cutover might be a good option.

If successful, consumer tolerance is not a factor, and no messages will be duplicated, lost or delayed.

If, for some reason, messages are not going through the system properly after the cutover, the system must be rolled back to a previous configuration that is known to function properly.

To ensure that no messages are lost during that rollback, the messages produced to 0.8 that failed to be processed properly must somehow be replayed to 0.7. This means that a producer cutover approach must have a reliable replay process that copies messages from 0.8 to 0.7 if message loss is not acceptable.

Any rollback scenario will be more complicated if there are many consumers. If the cutover only fails for some consumers, and some are successful, production must still revert to 0.7. Replaying messages would in this scenario cause message duplication for the consumers that had a successful cutover.

Consumer Cutover Details

The consumer cutover approach is recommended by the Kafka team, which provides the KafkaMigrationTool for this purpose.

In the words of the Kafka team, a consumer cutover using the KafkaMigrationTool would look like:

  • Setup the migration tool to consume messages from the 0.7 cluster and re-publish them to the 0.8 cluster.”
  • “Move the 0.7 consumers consuming from the 0.7 cluster to 0.8.”
  • “Move the 0.7 producers to 0.8.”

Rollout risk is mitigated with the consumer cutover because each consumer service may move from 0.7 to 0.8 individually.

The problem with the approach, as described on the migration page, is that there is a very real risk of some message loss, or alternatively, some message duplication. The page has no discussion of consumer tolerance, including risk of message loss and message duplication, and this is important to consider. Assuming that the consumer is configured with auto.offset.reset=largest, some messages will be lost if the 0.7 consumers are stopped before the 0.8 consumers are started. If 0.7 consumers are stopped after the 0.8 consumers are started, there is a possibility of significant message duplication. As most readers would already know, the default Kafka consumer behavior of “at least once delivery,” also does not guarantee no duplicate messages, but for a cutover, there are things that can be done to eliminate the otherwise guaranteed duplicates.

To avoid the pitfalls mentioned above, the concept of tagging and filtering can be introduced. A tag is an additional piece of information the producer adds to each message, and the consumer then applies a filter based on this tag. There are a few different ways this tagging can be done. The suggestion made here is to filter based on a timestamp. The producer adds an additional key to the message to say the time at which it was put onto the queue. Based on some cutover time X, 0.7 consumers would only process messages with timestamps before X, and 0.8 consumers would only process messages with timestamps after X.

The benefit of the tag-and-filter approach is that after the filtering consumer service has been deployed, it is possible to verify that the 0.8 consumer is up and running correctly, although filtering out messages that should only be processed by the 0.7 consumer.

A consumer cutover process based on timestamp filtering would look like this:

  1. Deploy producer service that produces to both the 0.7 topic and the 0.8 topic that is to be cut over.
  2. Determine the cutover time X.
  3. Deploy consumer service that consumes with a time filter from both the 0.7 topic and the 0.8 topic. The 0.7 consumers only process messages with timestamps before X and the 0.8 consumers only process messages with timestamps after X.
  4. Wait for the cutover time.
  5. Deploy consumer service version that only consumes from the 0.8 topic.
  6. Deploy producer service version that only produces to the 0.8 topic.

Using the KafkaMigration tool to accomplish production to both 0.7 and 0.8 is possible, but it adds lag to the message delivery and only replaces one of the steps listed in the tag-and-filter approach. If many consumers are to be cutover, not necessarily all around the same time, this tool may have to run for a long time. A tool that runs for a long period of time and provides critical functionality should probably be treated the same way as other critical parts of the system, which typically means that it requires monitoring and alerting. The alternative — creating a producer service version that produces to both 0.7 and 0.8 — might require less work.

If for some unforeseen reason messages are still not being processed properly by the 0.8 consumer after the cutover, a rollback plan should be executed. Ideally, as the 0.7 consumer starts to filter out messages, it should also stop committing offsets. A rollback would then only entail re-deploying the old consumer service version that consumes only from 0.7, and message consumption would resume from the time of the cutover. Due to bug KAFKA-919, stopping the offsets from being committed might require some extra work (discussed more later).

If the cutover is successful, but issues with the 0.8 setup (e.g., performance issues) are discovered days later, a “non-immediate” rollback plan should be executed. Unless several days worth of message duplicates are acceptable, a different approach than the “immediate” rollback plan is required. For this scenario, a reverse cutover is recommended. Simply configure the consumer service to perform a reverse timestamp-based cutover with 0.7 consumers now only processing messages with timestamps after some reverse-cutover time Y, and 0.8 consumers only processing messages with timestamps before Y.

Inconsistent Partitioning

When consumer tolerance was previously discussed, the issue of having two different consumer service instances receiving messages for the same key was mentioned. It would be difficult (if even possible) to make 0.7 and 0.8 message partitioning match perfectly. A perfect matching here means that with a double producer and double consumer setup, each message, produced to both 0.7 and 0.8, will be routed to the same consumer service instance. Without this guarantee, two messages with the same key, one created just before the cutover time and one just after, might go to two different consumer service instances. As previously mentioned, this could be a problem if, for example, it leads to two different consumer threads writing to the same index in a data store.

If inconsistent partitioning is an issue, in order to prevent two threads from processing messages with the same key at the same time, the only option might be to let the 0.7 message queue “drain” before starting to process the 0.8 messages. This cutover method could be accomplished by either a producer or a consumer cutover, the additional key step being a pause before starting the 0.8 consumer service. Pausing the consumer service will obviously cause some message delay.

KAFKA-919

Ticket KAFKA-919 describes a major bug in the 0.7 client where “disabling of auto commit is ignored during consumer group rebalancing.” In order to be able to do an “immediate” rollback, the 0.7 consumer must stop committing offsets at the cutover time. This bug causes offsets to be committed during a consumer rebalance, even if consumers have set the auto-commit property to false.

The easiest workaround for this problem is preventing a consumer rebalance from taking place by scaling down the consumer service to have only one consumer in the consumer group. In this rollback scenario, the one consumer service instance is restarted with a configuration to only consume from 0.7 (because the 0.8 setup is not working, for some unexpected reason). Restarting all consumers in a consumer group usually triggers a consumer rebalance, unless there is only one consumer.

Conclusion

For most topic cutovers, Knewton employed a time-based tag-and-filter approach. This approach was deemed the least risky for the requirements of those producers and consumers, as well as the easiest to reason about and rollback. Hopefully the considerations outlined above will help you to choose the best cutover plan for your system.

Written by Christofer Hedbrandh and Sarah Haskins

Best Practices for Python Dependency Management

Dependency management is like your city’s sewage system. When it’s working well, it’s easy to forget that it even exists. The only time you’ll remember it is when you experience the agony induced by its failure.

Here’s what we want to accomplish with dependency management at Knewton:

  • Builds should be stable across environments. If a project builds on my machine, it should build on others’ machines and on our build server.
  • Builds should be stable over time. If a project builds now, it shouldn’t break in the future.1
  • Anyone at Knewton should be able to easily download, build, and make changes to any Knewton project.
  • We should be able to have many different projects with large dependency trees without running into dependency hell.

The items below reflect how we do Python dependency management at Knewton. You may not need everything in this list, so items are introduced in order of increasing complexity.

Easily install your dependencies with pip

When you want to use a Python library from your code, you’ll need to download the library, put it somewhere on your computer, and possibly build any external routines (e.g., C, C++, Fortran!?) that the library uses. It’s possible to do this all by hand, but there’s a much better way: pip.2 Pip is a Python tool that specializes in installing Python packages. For example, just run pip install numpy to install numpy and its dependencies. Pip also helps you to keep your version control repositories small by giving you a reproducible way to install packages without needing to include them in your source code repo.

Not only does pip let you install normal source packages, but it can also install packages from source control repos, wheels, and legacy binary distribution formats.

The instructions for installing Python from The Hitchhiker’s Guide to Python will also tell you how to install pip.3 Pip’s user guide is a good way to get started with using pip, and the pip install documentation is helpful if you need to dive deeper.

Pin your requirements with a requirements.txt file

It’s easy to get a Python project off the ground by just using pip to install dependent packages as you go. This works fine as long as you’re the only one working on the project, but as soon as someone else wants to run your code, they’ll need to go through the process of figuring which dependencies the project needs and installing them all by hand. Worse yet, if they install a different version of a dependency than you used, they could end up with some very mysterious errors.

To prevent this, you can define a requirements.txt file that records all of your project’s dependencies, versions included. This way, others can run pip install -r requirements.txt and all the project’s dependencies will be installed automatically! Placing this file into version control alongside the source code makes it easy for others to use and edit it. In order to ensure complete reproducibility, your requirements.txt file should include all of your project’s transitive (indirect) dependencies, not just your direct dependencies. Note that pip does not use requirements.txt when your project is installed as a dependency by others — see below for more on this.

SAMPLE FILE

requests==2.3.0
six==1.4.1

The pip user guide has a good section on requirements files.

Isolate your Python environments with virtualenvs

As a result of how Python paths work, pip installs all packages globally by default. This may be confusing if you’re used to Maven or npm, which install packages into your project directory. This may seem like an irrelevant detail, but it becomes very frustrating once you have two different projects that need to use different versions of the same library. Python requires some extra tooling in order to install separate dependencies per-project.

project_1 and project_2 depend on different versions of the requests library. This is bad because only one version of requests can be installed at a time.

project_1 and project_2 depend on different versions of the requests library. This is bad because only one version of requests can be installed at a time.

The solution for this problem is to use virtual environments. A virtual environment consists of a separate copy of Python, along with tools and installed packages. Creating a virtualenv for each project isolates dependencies for different projects. Once you have made a virtualenv for your project, you can install all of that project’s dependencies into the virtualenv instead of into your global Python environment. This makes your setup look more like something you would create with Maven.

Now you can install a different version of requests into each virtualenv, eliminating the conflict.

Now you can install a different version of requests into each virtualenv, eliminating the conflict.

I try to keep the number of packages I install to a minimum, both in my global Python environment, and in each virtualenv. I’ll be doing a follow-up post on how to handle virtualenvs with large numbers of packages installed.

A good virtualenv tutorial is A non-magical introduction to Pip and Virtualenv for Python beginners. The Python Packaging Guide provides a high-level overview that ties together pip and virtualenvs.

Build and rebuild virtualenvs easily with tox

Now that you’re using virtualenvs for all your projects, you’ll want an easy way to build the virtualenv and install all the dependencies from your requirements.txt file. An automatic way to set up virtualenvs is important for getting new users started with your project, and is also useful for enabling you to quickly and easily rebuild broken virtualenvs.

Tox is a Python tool for managing virtualenvs. It lets you quickly and easily build virtualenvs and automate running additional build steps like unit tests, documentation generation, and linting. When I download a new Python project at Knewton, I can just run tox, and it’ll build a new virtualenv, install all the dependencies, and run the unit tests. This really reduces setup friction, making it easy to contribute to any Python project at Knewton.

A tox.ini file at Knewton might look something like this:

[tox]
envlist=py27                         # We use only Python 2.7
indexserver =
     # We host our own PyPI (see below)
     default = https://python.internal.knewton.com/simple

[testenv]
deps =
     -rrequirements.txt              # Pinned requirements (yes, no space)
commands=
     pipconflictchecker              # Check for any version conflicts
     py.test . {posargs}             # Run unit tests

Get started with tox at its home page.

Indicate transitive dependencies using install_requires

At some point, you may want to package your Python project with sdist or as a wheel, so that others can depend on it by installing it with pip. Dependency management gets a bit more complicated at this point, because pip actually doesn’t look at your requirements.txt file when installing your packaged project.

Instead, pip looks at the install_requires field in setup.py, so you should be sure to fill this out in order to make a project that others can easily install. In contrast to requirements.txt, this field should list only your direct dependencies. Although requirements in requirements.txt should generally be pinned to exact versions, requirements in install_requires should permit the largest possible ranges. If you’d like to understand these differences, “The Package Dependency Blues” does a great job of explaining requirements.txt and install_requires.4

The way tox handles requirements.txt and install_requires can be a bit confusing. First, tox installs requirements from the deps section of tox.ini. Then tox runs python setup.py install, which will install dependencies from your install_requires. Since your requirements.txt file should contain a superset of the packages in your install_requires, this second step should not install any requirements if you’ve filled out your deps section correctly.

Of course, now you have two different lists of requirements to maintain. If only there were a simple way to do so! Pip-compile, from pip-tools, is the most promising tool for keeping your requirements.txt and install_requires in sync. It’s not yet fully mature, but it’s very helpful for projects with many transitive dependencies.

Specify which versions of Python tools you want to support

If you’re using pip, virtualenv, and tox, then anyone with those tools should be able to build your project, right? Unfortunately, the answer is, “almost.” If someone is running a different version of pip, virtualenv, or tox, their build may work differently than yours. As an example, tox 1.x passes all environment variables through to the commands it’s running, but tox 2.x runs its tasks in an environment with only a whitelist of environment variables. This means that, if you had a script that tried to read the $EDITOR environment variable, it might work fine when built with tox 1.x, but fail with tox 2.x.

At Knewton, we take the approach of restricting the allowed versions of these tools. We have a script called “Python Doctor” that will check your versions of Python, pip, virtualenv, and tox to ensure that they’re within our band of accepted ranges.

For an open source project, this is a little more complicated because you can’t restrict the versions of the tools running on your contributors’ workstations. In this case, it’s a good idea to mention the versions of these tools with which your project can be built.5 Note that this only applies to tools that are installed in your global Python environment, which will not appear in your requirements.txt or install_requires. For example, tox or pip would not generally appear in a requirements.txt file.

Example README snippet:

To build this project, run `tox -r`. This project has been tested with tox >=1.8,<2. If you want to make your own virtualenv instead, we recommend using virtualenv >=13.

Control your packages with a PyPI server

By default, pip will install packages from the python.org pypi server. If you work at a place with proprietary code, you may wish to run your own PyPI server. This will allow you to install your own packages as easily as those from the main PyPI server.

It’s actually much easier to set this up than you might think: your PyPI server can be as simple as an HTTP server serving a folder that contains sdist’ed tarballs of your Python project!

By hosting your own PyPI server, you can make it easy to maintain forked versions of external libraries.

You can also use a PyPI server to encourage consistent builds and reduce version conflicts by limiting the ability to add new libraries to your organization’s PyPI server.

Learn more about setting up a PyPI server here.

Examples

I’ve added to Github two Python project templates that illustrate how to tie all of this together:

Conclusion

This is our strategy, but you’ll probably need to modify it to suit your own circumstances. Additionally, the Python community has been growing quickly recently, so it’s likely that some of these practices will be replaced in the next few years. If you’re reading this in 2018, hopefully there will be some easier ways to manage Python dependencies!

Notes

  1. If you’re used to other dependency management systems, this may sound trivial. With Python, it’s not!
  2. “Pip” stands for “pip installs packages.” Easy_install was formerly used for this, but nowadays pip is superior.
  3. Pip is now included with Python 2 versions starting with 2.7.9, as well as Python 3 versions starting with 3.4.
  4. A nagging aside: make sure to follow semantic versioning to make it easier for other projects to restrict the version of your project in their install_requires.
  5. If you want to take this to the next level, you can specify your build tools programmatically too! Make a file called requirements-meta.txt that contains pinned versions of your build tools like tox. Then you’ll have a two-step build process:
    1. Install your per-project build system. To do this, use your global tox or virtualenvwrapper to make a virtualenv with this pinned version of tox in it.
    2. Use your per-project build system to build your project. To do this, run the tox that you just installed to run the project’s primary builds. If you understood this, great job!

Latent Skill Embedding

In this post, we explore a probabilistic model of student learning and assessment that can be used to recommend personalized lesson sequences. A student using Knewton-powered products currently receives content recommendations one step at a time. The model presented in this blog post is motivated by the idea that such a student could receive additional context and motivation by also seeing the most likely trajectory, or sequence of recommendations, leading to her goals, given her current state.

Problem

Students using Knewton-powered adaptive learning products are working in an online environment, acquiring knowledge that enables them to pass specific assignments and achieve their learning goals. These students receive recommendations from Knewton about what to work on, in a sequence that is determined by the work the student has previously done.

The goal of this blog post is to explore a new way of evaluating pathways through content based on their ability to improve student learning. We leverage existing log data (entries that look like Student A completed Lesson B and Student C passed assessment D) to compare the performance of different lesson sequences. Moreover, our method can be leveraged to generate displayable lesson sequences that maximize educational gains while meeting practical time constraints (i.e., the student has at most n days to maximize her probability of passing goal assessments). The additional context these sequences of recommendations provide could enhance students’ motivation, and consequently their overall performance.

General Approach

We use historical student-module interactions to learn a joint probabilistic embedding of the latent skills of students, skill requirements of assessments, and skill gains from lessons (modulated by prerequisite knowledge). Through offline evaluations on log data, we have shown that this model can successfully distinguish between lesson sequences that lead students to mastery or failure. In this post, we will outline how the model can be used to recommend lesson sequences in real time.

Model

Probabilistic embeddings are used for a variety of tasks, including music playlist prediction,1 feedback propagation in MOOCs,2 visualization of high-dimensional data,3 and language modeling for natural language processing.4 We are interested in embeddings that perform nonlinear dimensionality reduction on high-dimensional data, and also can be used to learn useful relationships between objects that are not explicitly expressed in the data. We will use a probabilistic embedding to capture the dynamics of student learning and testing, and in the process we will describe a method to recommend personalized lesson sequences. Our approach resembles a proposed sparse factor analysis method.5

In our model, students, lessons, and assessments are jointly embedded in a low-dimensional semantic space that we call the “latent skill space.” Students have trajectories through the latent skill space, while assessments and lessons have fixed locations. Formally, a student is represented as a set of d latent skill levels \vec{s}_t \in \mathbb{R}_+^d that can change over time; a lesson module is represented as a vector of skill gains \vec{\ell} \in \mathbb{R}_+^d and a set of prerequisite skill requirements \vec{q} \in \mathbb{R}_+^d; an assessment module is represented as a set of skill requirements \vec{a} \in \mathbb{R}_+^d.

Students interact with lessons and assessments in the following way. First, a student can be tested on an assessment module with a pass-fail result R \in \{0, 1\}, where the likelihood of passing is high when a student has skill levels that exceed all of the assessment requirements and vice versa. Second, a student can work on lesson modules to improve skill levels over time. To fully realize the skill gains associated with completing a lesson module, a student must satisfy prerequisites, though fulfilling some of the prerequisites to some extent will result in relatively small gains. Time is discretized such that at every timestep t \in \mathbb{N}, a student completes a lesson and may complete zero or many assessments. The evolution of student knowledge can be formalized as an input-output hidden Markov model.

There are two equations that characterize the dynamics of the latent skill space: the “assessment result likelihood,” and the “learning update.” Both are discussed in the following two sections.

Modeling Assessment Results

For student \vec{s}, assessment \vec{a}, and pass-fail result R,

    \[ R \sim \mathrm{Bernoulli}(\phi(\Delta(\vec{s},\vec{a}))) \]

where \phi is the sigmoid function, and

    \[ \Delta(\vec{s},\vec{a}) = \frac{\vec{s} \cdot \vec{a}}{\|\vec{a}\|} - \|\vec{a}\| + \gamma_s + \gamma_a \]

where \gamma_s, \gamma_a \in \mathbb{R}. A pass result is indicated by R=1, and a fail by R=0. The term \frac{\vec{s} \cdot \vec{a}}{\|\vec{a}\|} can be rewritten as \|\vec{s}\|\mathrm{cos}(\theta), where \theta is the angle between \vec{s} and \vec{a}; it can be interpreted as “relevant skill”. The term \|\vec{a}\| can be interpreted as assessment difficulty. The bias term \gamma_s is a student-specific term that captures a student’s general (assessment-invariant and time-invariant) ability to pass; it can be interpreted as a measure of how often the student guesses correct answers. The bias term \gamma_a is a module-specific term that captures an assessment’s general (student-invariant and time-invariant) difficulty. These bias terms are analogous to the bias terms used for modeling song popularity in playlists.1

Modeling Student Learning from Lessons

For student \vec{s} who worked on a lesson with skill gains \vec{\ell} and prerequisites \vec{q} at time t+1, the updated student state is

    \[ \vec{s}_{t+1} \sim \mathcal{N}\left(\vec{s}_t + \vec{\ell} \cdot \phi(\Delta(\vec{s}_t,\vec{q})),\mathbf{\sigma^2\mathbf{I}_d}\right) \]

where \Delta(\vec{s}_t,\vec{q}) = \frac{\vec{s}_t \cdot \vec{q}}{\|\vec{q}\|} - \|\vec{q}\| and \sigma is a constant. The intuition behind this equation is that the skill gain from a lesson should be weighted according to how well a student satisfies the lesson prerequisites. A student can compensate for lack of prerequisites in one skill through excess strength in another skill, but the extent to which this trade-off is possible depends on the lesson prerequisites. The same principle applies to satisfying assessment skill requirements. Figure 0 illustrates the vector field of skill gains possible for different students under fixed lesson prerequisites. Without prerequisites, the vector field is uniform.

Figure 0

The vector field of skill gains for a lesson with skill gains (0.5, 1) and prerequisites (0.7, 0.3). Contours are drawn for varying update magnitudes. A student can compensate for lack of prerequisites in one skill through excess strength in another skill, but the extent to which this trade-off is possible depends on the lesson prerequisites.

Parameter Estimation

How do we take a bunch of logged student-module interactions and turn them into an embedding? We compute maximum a posteriori (MAP) estimates of model parameters \Theta by maximizing the following objective function:

L(\Theta) = \sum\limits_{\mathcal{A}} \log{(Pr(R \mid \vec{s}_t, \vec{a}, \gamma_s, \gamma_a))} + \sum\limits_{\mathcal{L}} \log{(Pr(\vec{s}_{t+1} \mid \vec{s}_t, \vec{\ell}, \vec{q}))} - \beta \cdot \lambda(\Theta)

where \mathcal{A} is the set of assessment interactions, \mathcal{L} is the set of lesson interactions, \lambda(\Theta) is a regularization term that penalizes the L_2 norms of embedding parameters to reduce overfitting, and \beta is a regularization parameter. Non-negativity constraints on embedding parameters are enforced. We solve the optimization problem using the L-BFGS-B algorithm, which extends the popular L-BFGS algorithm to handle box constraints. We randomly initialize parameters and run the iterative optimization until the relative difference between consecutive objective function evaluations is less than 10^{-3}. Averaging validation accuracy over multiple runs during cross-validation reduces sensitivity to the random initializations (since the objective function is non-convex).

Toy Examples

To illustrate how an embedding can capture the underlying geometry of different scenarios, we conducted a series of experiments on small, synthetically-generated interaction histories.

Legend

Figure 1

The key observation here is that the model recovered positive skill gains for L1, and “correctly” arranged Alice and A1 in the latent space. Initially, Alice fails A1, so her skill level is behind the requirements of A1. After completing L1, Alice passes A1, indicating that her skill level has probably improved past the requirements of A1. Note that this scenario could have been explained with only one latent skill.

Figure 2

A two-dimensional embedding, where an intransitivity in assessment results requires more than one latent skill to explain. The key observation here is that the assessments are embedded on two different axes, meaning they require two completely independent skills. This makes sense, since student results on A1 are uncorrelated with results on A2. Fogell fails both assessments, so his skill levels are behind the requirements for A1 and A2. McLovin passes both assessments, so his skill levels are beyond the requirements for A1 and A2. Evan and Seth are each able to pass one assessment but not the other. Since the assessments have independent requirements, this implies that Evan and Seth have independent skill sets (i.e. Evan has enough of skill 2 to pass A2 but not enough of skill 1 to pass A1, and Seth has enough of skill 1 to pass A1 but not enough of skill 2 to pass A2).

Figure 3

We replicate the setting in Figure 2, then add two new students Slater and Michaels, and two new lesson modules L1 and L2. Slater is initially identical to Evan, while Michaels is initially identical to Seth. Slater reads lesson L1, then passes assessments A1 and A2. Michaels reads lesson L2, then passes assessments A1 and A2. The key observation here is that the skill gain vectors recovered for the two lesson modules are orthogonal, meaning they help students satisfy completely independent skill requirements. This makes sense, since initially Slater was lacking in Skill 1 while Michaels was lacking in Skill 2, but after completing their lessons they passed their assessments, showing that they gained from their respective lessons what they were lacking initially.

Figure 4

We replicate the setting in Figure 2, then add a new assessment module A3 and a new lesson module L1. All students initially fail assessment A3, then read lesson L1, after which McLovin passes A3 while everyone else still fails A3. The key observation here is that McLovin is the only student who initially satisfies the prerequisites for L1, so he is the only student who realizes significant gains.

 

Application

We have some evidence that an embedding can make good sequence recommendations in an offline evaluation. How can we use an embedding to make sequence recommendations in an online setting? Formally, we must address the following problem: Given a student’s current skill levels and a set of assessments the student wants to pass, what is the optimal sequence of lessons for the student? We can formulate the problem two ways: specify a minimum pass likelihood for the goal assessments and find the shortest path to mastery, or specify a time constraint and find the path that leads to the highest pass likelihood for the goal assessments while not exceeding the prescribed length.

In general, both problems can be tackled by using a trained Latent Skill Embedding model to specify a Markov Decision Process, in which the possible states are given by the student embedding, the set of possible actions corresponds to the set of lesson modules that the student can work on, the transition function is the learning update (Equation 2), and the reward function is the likelihood of passing all goal assessments (Equation 1) discounted by the expected time taken to complete the lesson sequence. Various methods for finding the optimal policy for an MDP with a continuous state space, discrete action space, and probabilistic transitions have been explored in the literature.6

For an embedding model without lesson prerequisites, the order in which lessons are completed is irrelevant since sums are commutative. Time-constrained lesson recommendation thus reduces to the 0-1 knapsack problem. In this case, each lesson contributes skill gains, but costs the student an expected amount of time,which can be predicted using matrix factorization. The 0-1 knapsack problem can be solved efficiently using dynamic programming.

In practice, content recommendations should intersperse assessments in the lesson sequence to keep students engaged and make sure they are learning. We are currently working on an algorithm that incorporates this idea into the sequence optimization problem.

Implementation

A Python implementation of the Latent Skill Embedding model is available at https://github.com/Knewton/lentil. The IPython notebooks used to perform data exploration, model evaluations, sensitivity analyses, and synthetic experiments are available at https://github.com/rddy/lentil. The data sets from Knewton used for these experiments are not available to the public. The project will be maintained at http://siddharth.io/lentil.

Authors

Siddharth Reddy is a junior at Cornell University, studying computer science and applied mathematics. He’s interested in machine learning, adaptive education, and startups.

This work was a collaboration between Siddharth Reddy, Igor Labutov, and Thorsten Joachims at Cornell University, in conjunction with the data science team at Knewton. The project was presented at the Machine Learning for Education workshop at ICML 2015 (see http://dsp.rice.edu/ML4Ed_ICML2015 for details).

References

  1. Shuo Chen, Joshua L Moore, Douglas Turnbull, and Thorsten Joachims.

Playlist prediction via metric embedding. In Proceedings of the 18th ACM

SIGKDD international conference on Knowledge discovery and data mining,

pages 714–722. ACM, 2012.

  1. Chris Piech, Jonathan Huang, Andy Nguyen, Mike Phulsuksombati, Mehran

Sahami, and Leonidas Guibas. Learning program embeddings to propagate

feedback on student code. arXiv preprint arXiv:1505.05969, 2015.

  1. Laurens Van der Maaten and Geoffrey Hinton. Visualizing data using t-sne.

Journal of Machine Learning Research, 9(2579-2605):85, 2008.

  1. Tomas Mikolov, Wen-tau Yih, and Geoffrey Zweig. Linguistic regularities

in continuous space word representations. In HLT-NAACL, pages 746–751,

2013.

  1. Andrew S Lan, Christoph Studer, and Richard G Baraniuk. Time-varying

learning and content analytics via sparse factor analysis. In Proceedings of

the 20th ACM SIGKDD international conference on Knowledge discovery

and data mining, pages 452–461. ACM, 2014.

  1. Marco Wiering and Martijn van Otterlo. Reinforcement Learning: State-of-the-art,

volume 12. Springer Science & Business Media, 2012.

Testing for Failure

12:20am. I was jerked awake by the ringing of my phone. There were problems in production.

I logged on, desperately trying to clear the fog of sleep from my brain. There were already a handful of alerts for a variety of services. I ran some quick tests, and quickly figured out that this was an outage.

After a bit more investigation, I was able to narrow down the cause to a specific service. After a short while, the platform was fixed.  I made some notes while everything was fresh in my head, and did my best to get back to sleep.

In the morning, several engineers from different teams worked together to investigate the problem further. It all boiled down to an interaction with a server that had gone down. In theory, Knewton’s architecture should have been resilient to any single server failing. As it turns out, theory and practice weren’t the same.

The Cost of Outages

Failure is part of engineering. But system failures like the one described above are very costly.

First and foremost there is the cost to users. For students using Knewton-powered products, this cost can be significant. When Facebook is down, you can’t look at your friend’s selfies. But when Knewton is down, you can’t do your homework, or finish your take-home test, or study for the upcoming exam. We take these potential consequences seriously.

Second, there is the cost to employees, and to the company they work for. Waking people up in the middle of the night, or otherwise interrupting their personal lives, is a real cost. It creates unhappy employees. If it happens too frequently, then engineers may start ignoring the problems, or leave the company — or both.

Third, system failures can lead to compounding problems. When an alarm wakes me up in the middle of the night, my brain is only functioning at a fraction of its normal capacity. It is much easier for me to make a mistake at 3 a.m. than at 3 p.m. If I make the wrong judgement call while dealing with a production issue, I could actually make things worse — turning a minor issue into a major one.

Fourth, failures hurts a company’s reputation. No one wants to use flaky products. If Gmail was down for an hour every week, would you continue to use it? What if it dropped 5% of all incoming and outgoing emails? Users don’t want the best product in the world 90% of the time. They’d rather use a product that is 90% as good and works 100% of the time.

How Do You Avoid Failure?

The first way to avoid costly outages is to learn from our mistakes. At Knewton, we hold a  post-mortem with clearly owned action items every time we have a production outage.

Second, we learn from other companies. Even though our product is unique, designing for failure isn’t a problem unique to Knewton. There is a lot of work being done at other companies to proactively test for reliability. While it is always tempting to reinvent the wheel, it’s always better to start by learning from others.

Here are two techniques that Knewton uses to improve reliability that we heard about elsewhere:

1. Introducing Failure

First, we are introducing failures into our platform, using Netflix’s wonderful Chaos Monkey tool. (Netflix, if you’re reading, THANK YOU for open sourcing this wonderful tool!) For each of our stacks, Chaos Monkey terminates one instance at random, once per workday, during business hours. This allows us to discover the weaknesses of our platform on our own terms, before they impact our users.

2. Mitigating Risk in Production

One of the challenges of effective testing is that you can’t truly duplicate your production environment. Production stresses services in unique ways that are difficult or impossible to reproduce in test environments. Moving a new version of a service into production is always the real, final test. If you fail this test, then you impact your users.

So really the question isn’t if you test in production, it’s how you test in production. Knewton has been using two related techniques to minimize the risk of deploying new services to production.

Shadow Deployment

A shadow deployment sits in parallel to the real service, processing duplicate copies of production messages, but without performing writes. We can monitor the performance of the shadow stack, but it will not directly affect our users. The shadow deployment allows us to test the performance of a new version with real production load, without the risk of impacting our users with bugs or regressions. The downsides, of course, are that a shadow service places additional load on dependent services (possibly indirectly affecting users), and requires work to create a custom read-only shadow configuration. Additionally, the difference in configuration means that we are not truly testing the same thing. In fact, if the service is write-limited, the shadow may give us greatly exaggerated performance measurements.

Canary Deployment

A canary deployment is a small subset (usually just one) of the boxes for a stack that run the new version of a service. The remaining boxes still run the current version. If something goes wrong with the new version, the damage is limited to only a fraction of our users. Unlike a shadow, this does not require any special configuration.

What’s Next?

What constitutes effective failure testing changes constantly, as new services are introduced and our platform evolves. Creating comprehensive failure testing plans for each new service or major service change that we deploy is key. What are the different ways that this service could fail? What happens if this service fails? How will that affect the connected services? What happens if one of the connected services fails or becomes temporarily unavailable?

Additionally, we are in the process of auditing our outage history and categorizing it into different failure modes. This audit will allow us to design new tools to introduce common failure modes and make sure that we are now resilient to them across our whole platform.