System Design Documents at Knewton: RFCs

The team is in agreement: the Flimflamulator service is a mess of tech debt and something needs to be done. Other software teams at the company write code that depends on this service, however. They’ve built workarounds, and any changes will propagate into their systems. Flimflamulator provides some customer-facing functionality; the Product team will want to weigh in too.

How do you make sure make sure you’re not creating new problems? What if someone has already thought through solutions to this? There are so many stakeholders that a design review meeting would be chaos. How do you collect feedback from everyone?

At Knewton we use the RFC process. You might call it design documents or architecture review, but the goal is the same.

What is an RFC?

RFC stands for Request for Comment; at its most basic level an RFC is a way to collect feedback about an idea. A document explaining the idea is shared with peers and stakeholders who provide feedback and ultimately sign off (or don’t) on the idea. RFCs can be product proposals, design documents, process changes, or any other thing you’d like someone’s opinion on.

RFC does not stand for Request for Consensus. People are rarely equally informed or equally affected by issues, and some team members naturally hold more sway than others. RFCs are not decided by majority vote, and that’s OK.

Knewton’s process for this is loosely based on the RFC process used by the Internet Engineering Task Force to discuss and set standards for internet protocols.

When to Write an RFC?

Write an RFC when you’re considering something that will be hard to change later: typically public interfaces, data schemas, frameworks, platforms, or human processes.

How to Write an RFC?

Knewton’s RFC template explains what an RFC should cover. We use Google Docs to share and discuss documents because it has great commenting tools.

RFC Ownership

Each RFC must have at least one owner, who should be listed at the top of the RFC along with the RFC’s status and any relevant dates (refer to the template for examples). The owner is responsible for:

  • doing research on prior art
  • identifying the right audience of stakeholders and peers
  • publishing the RFC to that audience
  • addressing all questions, concerns, and suggestions transparently and expediently
  • documenting any dissenting opinions

The “Right” RFC Audience

“An RFC is like a wedding proposal; if you’re not sure your stakeholders are going to say yes, it’s too early to write it,” as my colleague Rémi likes to say.

Not all RFCs require involvement of the entire tech team. In fact, most don’t.

The RFC owner should identify key stakeholders and solicit their opinions first. These are the people affected by what you’re proposing, subject matter experts, or people who can make the proposal happen/not happen. An informal chat before you start writing can save lots of time.

Once you have a doc, share it with a small audience who can give you rapid feedback. Your manager is often a good place to start. Focus on quick iteration and tell people when you expect to receive their feedback.  Early in the process, 24-hour turnaround is a reasonable request. Be proactive in soliciting feedback. You’ll probably get a lot of comments on your first draft, and an in-person review can be useful to speed things along.

As major issues get worked out and details solidified, expand the audience in a few rounds: more stakeholders, your whole team, tech-wide. It should be like rolling a snowball down a hill. Allow up to 5 business days for the final audience to sign off. This will be a judgment call based on the size of the audience and urgency of the proposal. At Knewton it’s customary to share your RFC with the entire tech team as an FYI even if it isn’t relevant to everyone.

How to Address Comments

It’s unlikely everyone will be in perfect agreement about your proposal, and you’ll probably receive some feedback that you disagree with. Disagreement is OK.

A few things to keep in mind when addressing dissenting comments:

  • What would it take to change your mind? If you think incorporating the dissenting feedback would cause problems, ask the commenter to provide a solution to those problems.
  • Does the commenter have skin in the game? Are they a subject matter expert? You don’t have to address every comment on your proposal if they’re not relevant.
  • Close out comments you won’t incorporate by briefly but respectfully saying you disagree, and mark the comment resolved. If the commenter feels strongly, they’ll let you know.
  • Long comment threads can be a time sink. Resolve them with a real-time conversation.

RFC Statuses

RFC documents should list their current status in the document header (see RFC template).

Proposed means open for comment, actively reviewed and publicly debated.

Accepted means closed for comment, all comments “resolved,” dissenting opinions documented, and work to implement the proposal has been scheduled. Doc status should be updated to “Accepted on [date].”

Cancelled means the RFC owner decided to not proceed and cancel the RFC. Doc status should be updated to “Cancelled on [date].”

Summing It All Up

  1. Decide on problem to solve
  2. Identify and talk to key stakeholders
  3. Write RFC
  4. Solicit feedback
    1. Primary stakeholder or mentor (no more than 1 day per iteration)
    2. Team or wider group of stakeholders (2-3 days)
    3. Full audience (5 business days)
  5. Resolve comments
  6. Close RFC
    1. Update doc to indicate it has been closed

API Infrastructure at Knewton: What’s in an Edge Service?

The Knewton API gives students and teachers access to personalized learning recommendations and analytics in real time. In this post, we will pull back the covers of our API to explain how we handle user requests. You will first learn how to build an edge service with Netflix Zuul, the framework we chose for its simplicity and flexibility. Then, we’ll dive into the Knewton edge service to show you how it improves API simplicity, flexibility, and performance.

What’s in an Edge Service

An edge service is a component which is exposed to the public internet. It acts as a gateway to all other services, which we will refer to as platform services. For example, consider an Nginx reverse proxy in front of some web resource servers. Here, Nginx acts as an edge service by routing public HTTP requests to the appropriate platform service.

An example edge service: Nginx as a reverse proxy for two resource servers. Requests for images are routed to one server, while requests for text are routed to another.

An example edge service: Nginx as a reverse proxy for two resource servers. Requests for images are routed to one server, while requests for text are routed to another.

Beyond Routing: Edge Service Architecture

The reverse proxy pattern makes sense if every one of your resource servers is hardened to receive raw internet traffic. But in the typical service-oriented architecture, implementing a simple reverse proxy would duplicate a substantial amount of security and reliability code across platform services. For example, we do not want to reimplement user authentication in each platform service. Instead, we factor these responsibilities directly into the edge service. It validates and authenticates incoming requests; it enforces rate limits to protect the platform from undue load, and it routes requests to the appropriate upstream services. Factoring these high-level features into the edge service is a huge win for platform simplicity.

Building this service was a substantial engineering task. We wanted to leverage our experience developing and deploying JVM-based services by creating our edge service following the same pattern. This made Nginx and Apache’s Lua and PHP-based scripting capabilities unattractive. We would have had to rewrite our standard libraries and debug code in a new paradigm. Instead, we built our edge service on top of Netflix Zuul.

The edge service is Knewton’s interface to the public internet. It is registered directly with an AWS Elastic Load Balancer (ELB), and is responsible for sanitizing and routing requests for the Knewton Platform. To maintain high availability, our edge service runs as a cluster. On startup, edge service nodes register themselves with the load balancer, which then distributes requests across the cluster.

The edge service is Knewton’s interface to the public internet. It is registered directly with an AWS Elastic Load Balancer (ELB), and is responsible for sanitizing and routing requests for the Knewton Platform. To maintain high availability, our edge service runs as a cluster. On startup, edge service nodes register themselves with the load balancer, which then distributes requests across the cluster.

Netflix Zuul

Zuul is a framework created by Netflix to handle their own API and website traffic. The framework is structured around filters, a concept borrowed from the Java Servlet Filter pattern. A Zuul edge service is made up of a series of such filters, each performing some action on an HTTP request and / or response before passing control along to the next filter. For example, a filter might add authentication information to request headers, write an error response to an invalid request, or forward a request to an upstream service for further processing. In this section, we will walk through an example Zuul filter to show you the simplicity of the pattern.

Zuul Filters

We will consider three categories of Zuul filters: pre-filters, route-filters, and post-filters. (Zuul also supports error-filters and static-filters.) Pre-filters run before the edge service routes a request; route-filters forward requests to upstream services; and post-filters run after the proxied service returns its response.

The edge service consists of a series of Zuul filters which work together to write a response for a given request. The route-filters make requests to platform services to retrieve data and update state.

The edge service consists of a series of Zuul filters which work together to write a response for a given request. The route-filters make requests to platform services to retrieve data and update state.

Filters are defined by three pieces of logic:

  • Filter execution order
  • Conditional filter execution
  • Execution logic

Let’s dive into our example. We’ve written this filter in Java, but Zuul is compatible with all JVM languages.

Filter Execution Order

Zuul filters run in the same order for every request. This enables successive filters to make assumptions about the validations run and to access accumulated state. For example, we might store the request’s user ID in one filter and use it to apply rate limits in the next. Zuul filter ordering is defined with two methods: filterType, which specifies whether a filter is a pre-filter, route-filter or post-filter, and filterOrder, which orders filters of the same type.

// Defining the execution order for a Zuul filter in Java

@Override
public String filterType() {
   // run this filter before routing
   return "pre";
}

@Override
public int filterOrder() {
   // run this filter first among pre-filters
   return 0;
}

Conditional Filter Execution

Filters may be run conditionally for each request. This gives the designer significant flexibility. For example, a rate limit filter may not run for an authenticated user. This conditional logic is factored into the shouldfilter method.

// Configuring a Zuul filter to run on every request

@Override
public boolean shouldfilter() {
   // Always run this filter
   return true;
}
// Configuring a Zuul filter to run on requests from unauthenticated users only
@Override
public boolean shouldfilter() {
   RequestContext context = RequestContext.getCurrentContext();
   return !(boolean)context.getOrDefault("IS_AUTHENTICATED", false);
}

Execution Logic

The code to execute in a filter is defined in the runfilter method. Here, we make use of the static RequestContext object to store state for later filters, or to write the response.

// Defining Zuul filter logic to record and check rate limits.

@Override
public ZuulfilterResult runfilter() {
   RequestContext context = RequestContext.getCurrentContext();
   boolean isOverRate = rateLimiter.recordRate(context.getRequest());
   if (isOverRate) {
       context.set("IS_RATE_LIMITED", true);
   }
   return null;
}

All together, this gives us an example filter:

// A simple Zuul filter that runs for unauthenticated users and records
// whether a rate limit has been exceeded.
public class RateLimitfilter extends Zuulfilter {

   @Override
   public String filterType() {
       // run this filter before routing
       return "pre";
   }

   @Override
   public int filterOrder() {
       return 0;
   }

   @Override
   public ZuulfilterResult runfilter() {
       // records the request with the rate limiter and checks if the
       // current rate is above the configured rate limit
       RequestContext context = RequestContext.getCurrentContext();
       boolean isOverRate = rateLimiter.recordRate(context.getRequest());
       if (isOverRate) {
           context.set("IS_RATE_LIMITED", true);
       }
       return null;
   }

   @Override
   public boolean shouldfilter() {
       // should only run if the user is not authenticated
       RequestContext context = RequestContext.getCurrentContext();
       return !(boolean)context.getOrDefault("IS_AUTHENTICATED", false);
   }
}

In this way, we build up a modular set of request-processing functionality. This pattern makes it easy for multiple engineers to contribute new features. At Knewton, engineers outside of the API team have committed code for edge service features.

For more information about the design and lifecycle of a Zuul service, see this Netflix blog post.

Zuul and the Knewton Edge Service

Zuul is an opinionated but barebones framework. While adding functionality is simple, you will have to implement the filters yourself. Now we will explain the most important filters we wrote for the Knewton edge service. You will learn how to reject bad requests, reverse-proxy good ones, and reduce interservice traffic.

Pre-filters

Pre-filters in our edge service validate, rate-limit, and authenticate incoming requests. This enables us to reject bad requests as early in the pipeline as possible.

Rate Limiting

Our edge service is responsible for protecting the Platform from bursts of requests. When the rate of requests from a given IP or user exceeds a specified threshold, the edge service responds with 429: Too Many Requests. This threshold helps to prevent harm to the platform from Denial of Service attacks and other excessive request load.

Rate limits are enforced in a pre-filter so that these excess requests will not place any load on platform services. This pre-filter tracks the number of requests made during a one-minute window. If this number exceeds the rate limit, the filter skips further request processing and immediately writes a “429” response.

Logic in the rate limiting Zuul prefilter. For a simple rate limiting implementation with Redis, see the Redis documentation.

Logic in the rate limiting Zuul prefilter. For a simple rate limiting implementation with Redis, see the Redis documentation.

Rates are stored in memory on each edge service node to provide the lowest latency possible to each request. The rates recorded by each node are reconciled asynchronously using a shared Redis cache. This means that, no matter which node handles a given request, all nodes will eventually acknowledge it. In practice, this reconciliation happens quickly; convergence occurs within a constant factor of AWS’s region-internal network latency.

A request to an endpoint is rejected when more than the configured limit of requests have been made within a minute. edge service nodes coordinate request counts through a shared Redis cache.

A request to an endpoint is rejected when more than the configured limit of requests have been made within a minute. edge service nodes coordinate request counts through a shared Redis cache.

Surge Queuing

Load on the Knewton API is not constant. Even within the broad patterns of school days and lunch breaks, request rates vary. Rather than passing these bursts on to platform services, our edge service smooths traffic, so that the platform sees smaller peaks. To accomplish this, our rate limiter follows the same Leaky Bucket pattern used in Nginx. Requests exceeding the rate limit get added to a fixed-length queue. The queued requests are processed at the rate limit, ensuring that the platform does not see a surge. If the queue is full when the rate limit is exceeded, these excess requests are rejected with a 429 error. This approach has the added benefit of simplifying Knewton API clients, because requests can be made in reasonable bursts without being rejected. Read more about how we implement request queueing in our next blog post.

A request surge queue sits in front of the rate limiter to smooth out bursts of requests.

A request surge queue sits in front of the rate limiter to smooth out bursts of requests.

Authentication

At Knewton, we use OAuth to authenticate users. Each API request must contain a valid OAuth token. The edge service rejects requests which do not contain these tokens. This obviates the need for end-user authentication in upstream platform services and reduces interservice traffic by rejecting unauthenticated requests immediately at the edge.

Route-filters

The edge service is responsible for forwarding requests to the appropriate upstream microservice. Once a request has passed the gauntlet of Pre-filters, it is passed on to the Route-filters. Since Knewton services use Eureka for service discovery, we chose to use Netflix’s Ribbon HTTP client to make these upstream requests. Ribbon allows our edge service to automatically discover upstream services, load-balance traffic between them, and retry failed requests across instances of a service, all with minimal configuration.

// Creates a Ribbon client with Eureka discovery and round robin
// load balancing
AbstractLoadBalancerAwareClient setupRibbonClient(String upstreamName) {
    ServerList<DiscoveryEnabledServer> serverList =
            new DiscoveryEnabledNIWSServerList(upstreamName);
    IRule rule = new AvailabilityFilteringRule();
    ServerListFilter<DiscoveryEnabledServer> filter = new ZoneAffinityServerListFilter();
    ZoneAwareLoadBalancer<DiscoveryEnabledServer> loadBalancer =
            LoadBalancerBuilder.<DiscoveryEnabledServer>newBuilder()
                               .withDynamicServerList(serverList)
                               .withRule(rule)
                               .withServerListFilter(filter)
                               .buildDynamicServerListLoadBalancer();
    AbstractLoadBalancerAwareClient client =
            (AbstractLoadBalancerAwareClient) ClientFactory.getNamedClient(upstreamName);
    client.setLoadBalancer(loadBalancer);
    return client;
}

The Knewton edge service is not limited to endpoint-based routing. Our next blog post will cover specialized routing we have implemented to support shadow services and server-side API mocking.

Post-filters

After receiving the upstream responses, the post-filters record metrics and logs, and write a response back to the user.

Response Writing

The Post-filter stage is responsible for writing a response to the user. It is worth emphasizing that the edge service must always write back some response. If an unhandled exception or upstream timeout were to prevent this, the user would be left idling. This means catching exceptions and handling errors in all situations, so that the edge service always returns a timely answer, even if that answer is an error code (ie. 5xx response).

Metrics and Logging

Recording the right metrics and logs helps to ensure quality of service of the Knewton platform.  The edge service encapsulates the entire lifecycle of every API request. It records the entire duration of each request and handles every failure. This uniquely positions the edge service to report on end-user experience. Our post-filters publish metrics and logs to Graphite relays and Splunk Indexers. The resulting dashboards, alerts, and ad-hoc queries give us all the information we need to investigate problems, optimize performance, and guide future development work. Watch for our upcoming blogpost on API monitoring for more details.

Conclusion

The Knewton edge service validates and rate limits API requests, authenticates users, routes requests to upstream microservices, and records end-to-end metrics. Building this logic into the edge service simplifies our platform and provides performance and reliability guarantees to our users. We built our edge service on top of Zuul because its filter-based design is simple and extensible, its Java construction was compatible with our architecture, and because it has been proven in production at Netflix.

In this blog post, you learned how to build a Zuul edge service and how this design can improve your API. A follow-up post will describe custom logic we built to support surge queuing, shadow services, and API mocking in the Zuul framework.

Thanks to all the contributors on the Knewton API Platform team: Paul Sastrasinh, Rob Murcek, Daniel Singer, Bennett Wineholt, Robert Schultheis, Aditya Subramaniam, Stephanie Killian

Distributed Tracing: Observations in Production

Previous blog posts have explained Knewton’s motivation for implementing distributed tracing, and the architecture we put together for it. At Knewton, the major consumers of tracing are ~80 engineers working on ~50 services. A team consisting of three engineers handled designing and deploying the tracing infrastructure detailed in the previous sections.  This section highlights our experience and challenges running Zipkin at Knewton over the past few months.

Technical Hurdles

We elected to use Redis on Amazon Elasticache as our storage for spans, which are detailed in the previous section. This caused us a few problems:

Redis was inexplicably slow

Our initial rollout had an early version of the Zipkin Collector that wrote to a cache.r3.2xlarge (58.2 GB Ram, 8 vCPU) Redis Elasticache instance, which could not keep up with the traffic we were sending it. It could only handle ~250 requests per second. This seemed very slow to us. To set expectations, we dug into how to benchmark Redis, and stumbled upon the venerable redis-benchmark, which informed us that it could handle at least 120,000 requests per second with the 120 byte payloads that we were transmitting.

Looking at Amazon’s Cloudwatch metrics during the initial rollout, we quickly realized that, while the test was using multiple connections to Redis, the Zipkin code was using only a single connection. We modified the code to use multiple connections so we could make ~3000 requests per second and still keep up with incoming traffic.

Redis lost all data stored on it every few days

After we got Redis running, we found that Redis would restart every few days, when the amount of data that we stored in Redis approached the memory allocation of the Elasticache instance.

It turned out that the memory listed on Amazon’s Elasticache documentation refers to the total memory on the instance that runs Redis, and not to the memory allocated to Redis. As we continued to load data into Elasticache Redis, the underlying instance would start swapping heavily, and the Redis process would be restarted. We speculated that the process was being OOM killed, but because we don’t have access to Linux logs on Elasticache instances, we couldn’t be certain. After setting the `reserved-memory` parameter to approximately 15% of the total memory on the cache instance, we no longer had this problem.

Amazon Kinesis Queue Library

We packaged TDist into a library responsible for collecting span data at every service and publishing it to our tracing message bus, Amazon Kinesis. During early testing, we had configured the AWS Kinesis stream to have only a single shard, limiting it to 1000 writes per second.

When we deployed a few services using TDist, we began seeing unusual growth in JVM heap sizes under bursty traffic. When traffic reached baseline levels, the heap shrunk to reasonable sizes.

The default configuration for the Kinesis Queue Producer, which made the assumption that the rate at which it receives data is less than the rate at which it can submit data to the Kinesis queue. As such, it makes use of an unbounded in-memory queue to submit data to Kinesis. We decided to drop data when we are unable to keep up, and changed to use a bounded queue. This resulted in predictable performance under load or network blips at the expense of losing data under load spikes.

Now we have predictable performance, but during periods of high traffic, the Kinesis publishers fall behind and fill up the local queues, having the unfortunate effect of dropping spans. This can be solved by increasing the rate at which we publish to Kinesis. The two options are to increase the number of publishers and thus the number of calls we make to the Kinesis API, or to increase the number of spans published per API call. Because the span payload is small, most of the time spent on the Kinesis API call is network time. We intend to alleviate this problem by using batch Kinesis operations to submit multiple spans at once..

Metrics and Monitoring

When tracing was first rolled out to Knewton, it was hard to get an idea of how our client applications were doing. While the server and client side metrics were reported to our Graphite based metrics system, we had a difficult time correlating those with metrics for Kinesis, which are reported with different granularity, units, and retention periods using Amazon CloudWatch. Knewton contributed to the cloudwatch-to-graphite project to get consolidated metrics in Graphite.

Once this was done, we were able to focus on the metrics that gave us most insight into how tracing was performing: the rate at which we are emitting spans from applications to Kinesis, and how quickly the Collector was able to ship these off to Redis. These rates determine the time it takes for a trace to appear in the Zipkin web interface

Organizational hurdles

User education

While we had stealthily instrumented the Thrift protocol to pass tracing data around, we found that developers were still looking at service logs instead of at tracing data while debugging. We evangelize tracing when developers run into problems across services, and gave a presentation on how developers can use tracing to pinpoint performance problems.

Successes

TDist allows Knewton to see internal dependencies between services clearly. Now, a service owner can see precisely which services depends on their service, and which functions they  depend upon. This allows us to optimize service APIs to match usage patterns.

The week after we launched distributed tracing, we noticed that a particular external API call that resulted in hundreds of calls to one internal service. We were able to add a new endpoint to the internal service to eliminate these large number of calls and speed up this endpoint.

Conclusion

TDist proved that we could pass data across services in an noninvasive way by modifying the Thrift protocol. Since then, we’ve built the TDist ecosystem and are using it in production. While we faced problems getting TDist into the hands of developers, it is incredibly useful when debugging problems across services.

Distributed Tracing: Design and Architecture

The previous blog post talked about why Knewton needed a distributed tracing system and the value it can add to a company. This section will go into more technical detail as to how we implemented our distributed tracing solution.

General Architecture and Tracing Data Management

Our solution has two main parts: the tracing library that all services integrate with, and a place to  store and visualize the tracing data. We chose Zipkin, a scalable open-source tracing framework developed at Twitter, for storing and visualizing the tracing data. Zipkin is usually paired with Finagle, but as mentioned in Part I, we ruled it out due to complications with our existing infrastructure. Knewton built the tracing library, called TDist, from the ground up, starting as a company hack day experiment.

TDist-architecture-tracing-data-management

Tracing data model

For our solution, we chose to match the data model used in Zipkin, which in turn borrows heavily from Dapper. A trace tree is made up of a set of spans.  Spans represent a particular call from client start through server receive, server send, and, ultimately, client receive.  For example, the call-and-response between ServiceA and ServiceB would count as a single span: .

TDist-RPC-span-knewton

Each span tracks three identifiers:

  • Trace ID: Every span in a trace will share this ID.
  • Span ID: The ID for a particular span. The Span ID may or may not be the same as the Trace ID.
  • Parent Span ID: An optional ID present only on child spans. The root span does not have a Parent Span ID.

The diagram below shows how these IDs are applied to calls through the tree.  Notice that the Trace ID is consistent throughout the tree.

TDist-trace-tree-with-IDs-knewton

For more details, read the Dapper paper.

TDist

TDist is a Java library that Knewton developed. All of our services use it to enable tracing. TDist currently supports Thrift, HTTP, and Kafka, and it can also trace direct method invocations with the help of Guice annotations.

Each thread servicing or making a request to another service gets assigned a Span that is propagated and updated by the library in the background. Upon receipt of a request (or right before an outgoing request is made), the tracing data are added to an internal queue, and the name of the thread handling the request is changed to include the Trace ID by a DataManager. Worker threads then consume from that queue, and the tracing data get published to our tracing message bus. Java ThreadLocals makes it easy to globally store and access information assigned to a particular thread, and that is  the approach we used in the DataManager.

Often, threads offload the actual work to other threads, which then either do other remote calls or report back to the parent thread. Because of this, we also implemented thread factories as well as executors, which know how to retrieve the tracing data from the parent thread and assign it to the child thread so that the child thread can also be tracked.

Zipkin

Once the tracing data arrive at the tracing message bus through TDist, the Zipkin infrastructure handles the rest of the process. Multiple instances of collectors,consuming from the message bus, store each record in the tracing data store. A separate set of query and web services, part of the Zipkin source code, in turn query the database for traces. We decided to join the query and the web service to keep things simpler, and also because this combined service is internal and has predictable traffic patterns. However, the collector is decoupled from the query and web service because the more Knewton services integrated with the collector, the more tracing data itwould have to process.

Zipkin UI

Out of the box, Zipkin provides a simple UI to view traces across all services. While it’s easy to view logs for a particular Trace ID across all services, the Zipkin UI provides a summarized view of the duration of each call without having to look through hundreds of log statements. It’s also a useful way of identifying  the biggest or slowest traces over a given period of time. Finding these outliers allowed us to flag cases where we were making redundant calls to other services that were slowing down our overall SLA for certain call chains. Here’s a screenshot of a trace appearing in the Zipkin UI:

TDist-zipkin-UI-knewton

Of course, the UI doesn’t come without drawbacks. While it’s easy to look at distinct individual traces, we found the Zipkin UI lacking for examining aggregate data. For example, there’s currently no way to get aggregate timing information or aggregate data on most called endpoints, services etc.

Throughout the development process and rolling out of the Zipkin infrastructure, we made several open-source contributions to Zipkin, thanks to its active and growing community.

Splunk

As mentioned above, the thread name of the current thread servicing a request is also changed, and the trace ID is appended to it. Because of this we can query for logs across all of the trace-enabled services for a particular call. This makes debugging a lot easier, and it’s proven to be very useful in post mortem analysis, log aggregations, debugging for isolated problems, and explaining uncommon platform behavior.

Thrift

Thrift is a cross-language RPC framework for building up scalable services. The user can define a service and data model spec in Thrift, and Thrift will compile the spec into many different languages. Users can then implement the generated service interfaces in the desired language. Thrift also automatically generates the client code and data structures for the services defined by the user.

Thrift is the most widely used RPC method between services at Knewton. Most of our services talk to each other through this framework, so supporting it while still maintaining backwards compatibility was critical for the success of this project. More precisely, we wanted services that were not tracing-enabled to be able to talk to tracing-enabled services.

When we started looking into adding tracing support to Thrift, we experimented with two different approaches. The first approach involved a modified Thrift compiler, and the second involved modified serialization protocols and server processors. Both methods had their advantages and disadvantages.

Custom Compiler

In this approach, we experimented with modifying the C++ Thrift compiler to generate additional service interfaces that could pass along the tracing data to the user. Modified thrift compilers are not uncommon; perhaps the most famous example is Scrooge. One advantage of a modified compiler was that clients would have to swap out fewer class implementations in their code, since tracing was supported right in the generated code. Clients could also get a reference of the tracing data from the service interface.

Although we didn’t benchmark, we also think that this approach would have been marginally faster, since there are fewer classes delegating to tracing implementations. However, we would have had to recompile all of our Thrift code and deviate from the open-source version, making it harder to upgrade in the future. We also soon realized that allowing the user access to the tracing data might not be desirable or safe, and data management might be better left to TDist for consistency.

Custom Protocols and Server Processors

We ended up using this approach in production. Not having to maintain a custom compiler lowered our development cost significantly. The biggest disadvantage to customizing protocols and server processors was that we had to upgrade to Thrift 0.9.0 (from 0.7.0) to take advantage of some features that would make it easier to plug in our tracing components to the custom Thrift processors and protocols. The upgrade required a lot of coordination across the organization. Thankfully, the newer version of Thrift was backwards-compatible with the older version, and we could work on TDist while Knewton services were being updated to the newer version. However, we still had to release all Knewton services before we could start integrating them with our distributed tracing solution.

Upgrading Thrift

Upgrading libraries when using a dependency framework is relatively easy, but for an RPC framework like Thrift and a service-oriented architecture with a deep call dependency chain, it gets a little more complicated. A typical server will have server and client code, with the server code often depending on other client libraries. Because of this, upgrades needed to start from the leaf services and move up the tree to avoid introducing wire incompatibilities since the outgoing services might not know if the destination service will be able to detect the tracing data coming through the wire.

TDist-upgrading-thrift-knewton

Another hurdle was that certain services, such as the Cassandra client library Astyanax depended on third-party libraries that in turn depended on the Thrift 0.7.0. For Astyanax, we had to shade the JARs using Maven and change package names so that they didn’t collide with the newer Thrift library. All of this had to happen quickly and without downtime. And because we didn’t want other teams at Knewton incurring the cost of this upgrade, the distributed tracing team had to implement and roll out all the changes.

Tracing Thrift Components: How It Works

Our Thrift solution consisted of custom, backwards-compatible protocols and custom server processors that extract tracing data and set them before routing them to the appropriate RPC call. Our protocols essentially write the tracing data at the beginning of each message. When the RPC call reaches the server, the processor will identify and note whether the incoming call has tracing data so it can respond appropriately. Calls with tracing data get responses with tracing data, and requests from non-integrated services that don’t carry tracing data get responses without tracing data. This makes the tracing protocols backwards-compatible since the server outgoing protocols don’t write tracing data if instructed not to by the processor servicing the request. A tracing protocol can detect whether the payload contains tracing data based from the first few bytes. Thrift appends a protocol ID to the beginning, and if the reading protocol sees that the first few bytes do not indicate the presence of tracing data the bytes are put back on the buffer and the payload is reread as a non-tracing payload. When reading a message, the protocols will extract the tracing data and set them to a ThreadLocal for the thread servicing the incoming RPC call using the DataManager. If that thread ever makes additional calls to other services downstream, the tracing data will be picked up from the DataManager automatically by TDist and will get appended to the outgoing message.

TDist-tracing-thrift-components-knewton

Here’s a diagram showing how the payload is modified to add tracing data:

TDist-tracing-payload-knewton

Kafka Tracing Support

When we were adding tracing support to Kafka, we wanted to keep the Kafka servers, also referred to as brokers, as a black box. In other words, we wanted to pass the data through the brokers without them necessarily knowing and therefore not having to modify the Kafka broker code at all. Similar to our approach with RPC services, we upgraded the consumers before upgrading the producers. The consumers are backwards-compatible and can detect when a payload contains tracing data, deserializing the content in the manner of the Thrift protocols described above. To achieve this, we require clients to wrap their serializers/deserializers in tracing equivalents that delegate reading and writing of the non-tracing payload to the wrapped ones.

TDist-kafka-tracing-support-knewton

TDist-kafka-tracing-payload-knewton

HTTP Tracing Support

As some of Knewton’s internal infrastructure and all public facing endpoints are HTTP-based, we needed a simple way of instrumenting HTTP calls to carry tracing information.

This was quite simple, because HTTP supports putting arbitrary data in headers. According to section 5 of rfc2047, the only guideline for adding custom headers is to prefix them with a `X-`.

We elected to continue the Zipkin tradition and use the following headers to propagate tracing information:

  • X-B3-TraceId
  • X-B3-SpanId
  • X-B3-ParentSpanId

Services at Knewton primarily use the Jetty HTTP Server and the Apache HTTP Client. Both of these projects allow for easy header manipulation.

Jetty services requests by routing them to a Servlet. As part of this routing, Jetty allows the request and response to pass through a series of Filters. We felt this was the ideal place to deal with tracing data. When any incoming request comes with tracing data headers, we construct span data from it and submit it to the DataManager.

With the Apache HTTP Client, we use an HttpRequestInterceptor and HttpResponseInterceptor, which were designed to interact with header contents and modify them. These interceptors readtracing data from headers and set them using the DataManager and vice versa.

Guice

For those unfamiliar with Guice, it’s a dependency management framework developed at Google. To make the TDist integrations with our existing services easier and less error-prone, we relied on Guice and implemented several modules that our clients simply had to install. Guice handles dependency injection during object instantiation and makes it easy when swapping implementations of interfaces. If throughout this article you have been  thinking that integrating with TDist sounds complicated, a lot of the time all our clients needed to do was install additional Guice modules that would bind our tracing implementations to existing Thrift interfaces. This also meant that our clients never had to instantiate any of our tracing-enabled constructs. Whenever a TDist client forgets to bind something, Guice would notify our clients at compile time. We put a lot of thought into how we laid out our Guice module hierarchies so that TDist didn’t collide with our clients, and we were very careful whenever we had to expose elements to the outside world.

The Tracing Message Bus

The tracing message bus is where all our client services place tracing data prior to its being consumed by the Zipkin collector and persisted. Our two options were Kafka and Kinesis, and we ended up choosing Kinesis. We were considering Kafka because Knewton, has had a stable Kafka deployment for several years. At the time, our Kafka cluster, which we’ve been using as our student event bus, was ingesting over 300 messages per second in production. Our initial estimates for putting us in the range of over 400,000 tracing messages per second with only a partial integration. The idea of straining production systems with instrumentation data made us nervous. Kinesis seemed like an attractive alternative that would be isolated from our Kafka servers, which were only handling production, non-instrumentation data. At the time of implementation, Kinesis was a new AWS service and none of us were familiar with it. Its price, throughput capabilities, and the lack of maintenance on our end sealed the deal for us. Overall, we’ve been satisfied with its performance and stability. It’s not as fast as Kafka, but the nature of our data made it acceptable to have an SLA of even a few minutes from publication to ingestion. Since we deployed the tracing message bus to production, we were also able to easily scale up the number of Kinesis shards without incurring any downtime.

The Tracing Data Store

The tracing data store is where all our tracing data ends up. The data stay there for a configurable time and are queried by the Zipkin query service to display on the UI. Zipkin supports a lot of data stores out of the box, including Cassandra, Redis, MongoDB, Postgres and MySQL. We experimented with Cassandra and DynamoDB, mainly because of the institutional knowledge we have at Knewton, but ended up choosing Amazon’s Elasticache Redis. The most important reasons behind our decision were

  • Time to production, given that we didn’t have to roll out and maintain a new cluster
  • cost
  • easier integration with Zipkin with less code
  • support for TTLs on the data.

Conclusion

Our tracing solution at Knewton has been in all environments for a few months now. So far it has proven to be invaluable. We have only started to scratch the surface with what we can do with the tracing and timing data we’re collecting. We had a lot of fun implementing and rolling out tracing at Knewton, and we have come to understand the value of this data.

Distributed Tracing: Motivation

Knewton’s technology solutions comprise more than 50 different services. Such a complex system can be difficult to debug, which is why we set out to integrate distributed tracing into our architecture. A distributed tracing system tracks requests by assigning a unique identifier for the request that is maintained through all service layers, from initial reception through response to clients. Without a distributed tracing system, understanding issues occurring deep in the stack becomes extremely difficult.

Let’s walk through a typical debugging scenario without distributed tracing. Here is a simplified view of the architecture involved in receiving a student event and generating a subsequent recommendation:

distributed-tracing-simplified-flow-knewton

  1. Student completes some work. The results are posted to the HTTP server.
  2. The HTTP server reposts the event to the distributed queue for durability.
  3. The student event processor converts the information from client language to internal Knewton language.
  4. The student event processor posts the translated event to a distributed queue.
  5. The recommendation service reads the event, generates a recommendation, and returns it to the client.

Let’s say the client reports that an event was sent for User A, but no recommendation was received for what User A should work on next. The problem could exist at any of the five layers described above. The identifiers used within each service may be different, so to find where the problem occurred, we’d have to query the logs of each service in serial. Debugging becomes even more complicated in a non-linear dependency chain.

In the world of distributed tracing, debugging would be reduced to two steps: find the incoming event, noting the trace ID assigned to it, and searching across all services for logs associated with that trace ID.

The distributed tracing system provides latency data for each step in the transaction. Before distributed tracing we were unable to calculate end-to-end time for a single transaction, but we could visualize the connections between our services. The Grafana graph below shows 95% latency for various steps in recommendation processing.

grafana-graph-recommended-processing-latency-knewton

To get the most out of a distributed tracing system, we identified key requirements:

Integration:

  • Adding tracing support to a service should require minimal or no code changes.
  • Adding tracing support to a service should not increase latency, nor should it affect service uptime or reliability.
  • The solution must be able to support our interservice communication protocols: Thrift, HTTP and Kafka.
  • The solution must provide a way for an external system to input the trace ID.  Current full-platform tests tell us only that an issue occurred, but have no indication as to where.  Smoke test alerts could include the trace ID, which would make debugging much quicker.
  • The trace ID and information should be accessible to a service for any purpose that the service finds useful, such as logging intermediary actions or including in error logs.

Data:

  • A solution that traces all events.  Some tracing solutions trace only a portion of traffic, and you never know when a particular event will require investigation.
  • A solution that will display an event from end to end, through each service that interacts with it.
  • Tracing information must include unique identification, system, action, timing and sequence.
  • System time across services cannot be guaranteed, so the solution must implement tracking of a logical order so that events can be displayed in the order in which they occurred.
  • Tracing data will not contain any Personally Identifiable Information or information proprietary to customers or Knewton.

Availability:

  • The solution must function in all environments: development, quality assurance, production.  Retention policy may be different for different environments.
  • Tracing reports must be available for immediate debugging and investigation.
  • Tracing data will be available for real-time debugging for one week.  All tracing data will be retained offline for historical analysis.

We analyzed the following distributed tracing solutions for their ability to satisfy our requirements:

Most of these products did not support all of the protocols we require. Kafka was most often missing, with Thrift a close second. And it would not be possible to enhance for the proprietary products to fit our protocol needs. Brave was a particularly compelling solution, but ultimately we decided it was too invasive.

In the end we decided to use Zipkin without Finagle. Finagle is a great product, but did not support Thrift 7 and reverting to an older version of Thrift would have been a large effort in the wrong direction. In the end, we upgraded to Thrift 9, but this was wire compatible between server and client so it was much easier to roll out than a switch to Scrooge.

Our next blog post will explain how we were able to integrate distributed tracing compatible with Zipkin and Finagle into our code while meeting all of the above requirements.

Rolling Out the Mesos Slave Roller

A few months ago, Knewton started running most services via Docker containers, deployed to an Apache Mesos cluster with a Marathon scheduler. This new infrastructure makes it easy to deploy and manage services but adds complexity to maintaining them.

Applications running on a Mesos cluster run on a group of slave nodes. These slave nodes often need to be upgraded or reconfigured. Although it is usually possible to make modifications to a Mesos slave in place, the Infrastructure Team at Knewton prefers to replace slaves entirely, following the principles of immutable infrastructure. Mesos makes it simple to add and remove slaves and we run them on AWS EC2, so replacing a slave is really easy.

Moreover, immutable Mesos slaves have some nice properties:

  • Configuration management code can be much simpler as we don’t need to uninstall old versions or configurations
  • We can be sure that the state of configuration management at the time of the slave’s launch corresponds to the configuration of the Mesos slave.
  • Rollbacks are simpler in case of failures.
  • Replacing a Mesos slave is a good stress test. If we could not stand to lose one slave, we would be in trouble.

Still, relaunching all of the nodes in a Mesos slave cluster has to be done carefully to prevent outages. Marathon will relaunch tasks that were running on a slave that is being replaced, but if all of the remaining tasks of a service are on that one slave, there will be an outage.

When Marathon launches a task, it will launch it on any slave that fits the constraints of that application. For Knewton, the most useful set of constraints includes CPU, memory, and balanced distribution across availability zones. Since it is possible that any service task might be launched on any slave, changes need to be applied to the entire cluster at the same time.

The First Attempt

Shortly after switching to Mesos, we discovered that we needed to upgrade Docker. For the upgrade, we chose to bring up a new set of Mesos slaves, then start terminating the old slave instances one by one. We determined that would be safe because Marathon would relaunch tasks on the new slaves as the old ones were terminating.

The replacement process was successful, but not without its challenges:

It takes a while: Terminating an EC2 instance, deregistering a slave from its master cluster, and relaunching all of the service tasks took about four minutes. At the time, we had up to 55 slaves per environment. With the time it takes to launch new nodes, the process took around four hours.

Don’t trust your eyes: One health check failing can be an indication of a bigger problem. One bad service task may not cause an outage or integration test failures, but it can be an early indication that an upgrade needs to be rolled back. Having a human watch web interfaces for green dots or failure messages is ineffective and error-prone.

Validation: Marathon allows you to set a constraint that will prevent two tasks of the same application from launching on the same Mesos slave. That property seems appealing, but when you use a small set of large EC2 instances, you may have more tasks of an application than instances to run them on — in which case, the tasks won’t launch.

For that reason, we don’t enforce that constraint. So sometimes all tasks of some service will be running on a single slave. When that happens, taking that slave down may cause an overall service outage. Once again, having a human assert that is not the case takes time and is error prone.

Maintaining balance: Our Mesos slave clusters consist of EC2 instances that are part of an Auto Scaling Group. The ASG AZRebalance process keeps the instances in that ASG balanced across Availability Zones. The ASG maintains balance when launching new instances, capacity permitting. Terminating an instance can also trigger AZRebalance, launching a new instance and terminating an existing one. If you have services that have two tasks, terminating an instance and triggering AZRebalance may cause an outage. The AZRebalance process can be turned off during an upgrade. An ASG will still maintain balance when launching instances (unless there is only one task, in which case an outage must be acceptable), but once the original set of nodes has been removed, instances may no longer be evenly distributed across availability zones. When AZRebalance is turned back on, some instances may need to be terminated. For example if instances can be launched across three availability zones, one instance may be terminated. (See appendix for details.)

Given our first experience, we decided to automate the process of replacing slaves and built the Mesos Slave Roller.

Mesos Slave Roller

The Mesos Slave Roller (MSR) is an automated tool that replaces all the slaves in a cluster. It ensures that all components stay healthy and permits its user to stop and start at will.

mesos-slave-roller

The Mesos Slave Roller takes as input the environment to configure and the number of slaves that there should be when it’s done.

It compares this desired end-configuration to the existing state of the Mesos slave ASG. It then generates a list of actions that will produce the desired configuration:

  1. Scale up the slave ASG by the number of instances in the desired end-configuration.
  2. For each of the original Mesos slaves:
    1. Trigger a termination event
    2. Log the termination in a checkpoint file
    3. Wait for Mesos to deregister that slave
    4. Wait for Marathon to report that all services are healthy

Updating minimum and maximum instance counts for the ASG is a separate process. If a checkpoint file were to accidentally be deleted, rerunning the Mesos Slave Roller could result in a costly and potentially dangerous state with many more instances running than expected.

Scaling and terminating is done by directly invoking the autoscaling API. To scale, we just update the desired count for the ASG. To terminate, we use the terminate-instance-in-autoscaling-group function and tell it to decrement the capacity (desired count).

Health Checks: While running, the Mesos Slave Roller queries the autoscaling, Mesos, and Marathon APIs to make sure all parts of the system are functioning as expected. The following checks are performed:

  1. Checks the Mesos /slaves endpoint to confirm that the appropriate amount of Mesos slaves have been registered after increasing the size of the ASG.
  2. Checks the Mesos /slaves endpoint to confirm that the correct slave disconnected.
  3. Checks the Marathon /v2/apps endpoint to see that all apps are healthy. Our services are interconnected, so it is necessary to check that everything still works, not just the apps that had tasks on the terminated slave.

We consider an app healthy if there are no deployments in progress and the number of tasks reporting healthy is equal to the number of desired instances. If a given app has no health check, then the number of tasks running must be equal to the number of desired instances.

Checkpoints: All of the Mesos Slave Roller‘s operations have some timeout associated with them. If they take longer than expected, the Mesos Slave Roller will pause and wait for a human to kick it off again. If some error is thrown or if an upgrade goes poorly and a human stops it, the Mesos Slave Roller can be resumed from the checkpoint file that it has created. When it resumes, it will use the checkpoint file to determine the remaining work it has to do.

Auto-balancing Availability Zones: To keep things simple and to minimize surprises, we turn off the AZBalance process before starting the Mesos Slave Roller. When replacement is complete, we turn AZBalance back on. We limit our ASG to three AZs so the potential extra balancing operation is safe as this entire process rests on being able to take down one node at a time.

Planned Enhancements

The Mesos Slave Roller has proven to be useful for maintaining our infrastructure. There are several enhancements we’d like to make:

Offer Draining: Mesos 0.25 introduced a useful suite of maintenance primitives, including the ability to drain Mesos slaves of offers. That would allow the Mesos Slave Roller to tell a slave to evict all of its applications instead of relying on termination. If we can shut down service tasks gracefully, we can minimize the impact of losing a task.

Tagging Mesos Slaves and moving tasks: When a Mesos slave is terminated and Marathon receives a notification that a task died, it redeploys that task on a healthy Mesos slave. Some tasks may relaunch on “old” Mesos slaves a few times before finally winding up on a new one. This process works, but leads to more churn than necessary.

An alternate approach would involve some sort of indicator on each slave that shows when it was launched. Ideally the indicator could just alternate between green and blue, but it may need to be more complicated. With an indicator, we can ask Marathon to relaunch apps on a new slave. This approach has the added benefit that, once all tasks are moved to new slaves, we can terminate all of the old slaves without paying attention to when the termination process finishes. Given that waiting for deregistration takes most of the four minutes per node, this approach should cut the time required to replace a cluster by more than half.

PagerDuty Integration: When the Mesos Slave Roller fails, it will pause and wait for a human to restart it. For now, a human has to periodically check to see that whether it is running. Improving error handling to trigger a PagerDuty alert will make the Mesos Slave Roller into a truly hands-off tool.

Appendix

Calculating how many moves need to be made:

Balanced state:  The difference in the number of instances between any two availability zones is at most 1.

Claim: Because the ASG is balanced before scaling and after scaling, the difference in the number of instances in any AZ after removing the original set can be at most 2.

Let:

  • A_x \leftarrow the number of instances in the AZ called x at the start of this process
  • D_x \leftarrow  number of instances added to the AZ called x

We can say that for any pair of AZs i, j that are in some ASG:

Before scale: \left\vert A_i - A_j \right\vert \le 1 — Property of ASG balancing

After scale: \left\vert \left ( A_i + D_i \right ) - \left ( A_j + D_j \right ) \right\vert  \le 1 — Property of ASG balancing

Proof:

\left\vert D_i - D_j \right\vert = \left\vert D_i - D_j + A_i - A_i + A_j - A_j \right\vert

                = \left\vert \left ( A_i + D_i \right ) - \left ( A_j + D_j \right )  + \left ( A_j - A_i \right ) \right\vert

which, by the triangle inequality and the statements above means that

\left\vert  D_i - D_j \right\vert \le \left\vert \left ( A_i  + D_i \right ) - \left ( A_j + D_j \right ) \right\vert  + \left\vert \left ( A_j  -A_i \right ) \right\vert \rightarrow

\left\vert D_i - D_j \right\vert \le 2

Because of those properties, the number of final instances in each AZ can be one of three numbers: x - 1 (low), x (mid), and x + 1 (high) for some value x. In the case that there are occurrences of both the high and low values, some instances will have to move. The number of moves necessary is equal to the lesser of the number of highs and the number of lows. A move terminates an instance in one of the AZs with a high count and launches one in an AZ with low count. Once the counts remaining are only mids and lows or only mids and highs, the ASG is balanced.

In the worst case, half of the counts will be highs, and half of the counts will be lows. In that case, there are \tfrac {n}{2} moves where n is the number of AZs. If the number of AZs is odd, the amount of moves is \tfrac {n}{2} - 1 because the last node can not have a complementary low or high.

Why Knewton Is Trying Kotlin

Kotlin-logo

Kotlin is a statically typed general purpose JVM language that provides flawless interoperability with Java. It is open source and developed by Jetbrains,creators of IntelliJ IDEA. It is concise, reducing boilerplate, and offers great null safety. Version 1.0 is planned for the end of 2015. Kotlin is evolving quickly. We found switching to Kotlin to be painless.

At Knewton we have been happily hacking on Kotlin for months. We’ve collected our experiences and put together an overview of why we decided to try it, highlighting some of Kotlin’s key selling points.

Ease of Adoption

We expected some overhead and a learning curve, but Kotlin surprised us with its variety of tools and ease of adoption, especially for a language that is still in beta. IntelliJ provides excellent built-in support for Kotlin that is on par with Java. Continue reading

The Nine Circles of Python Dependency Hell

“Dependency hell” is a term for the frustration that arises from problems with transitive (indirect) dependencies. Dependency hell in Python often happens because pip does not have a dependency resolver and because all dependencies are shared across a project.1 In this post, I’ll share a few of the strategies that I use to deal with some commonly-encountered problems.

These strategies assume that you’re using a dependency management setup similar to what we use at Knewton, which includes using pip, virtualenv, and good practices for your requirements.txt and install_requires. Some items are specific to organizations that use both internal and external Python libraries, but many of these items will apply to any Python project.

Detecting dependency hell

Even if your project only has a few first-level dependencies, it could have many more transitive dependencies. This means that figuring out that you even have a version conflict is non-trivial. If you just run pip install -r requirements.txt, pip will happily ignore any conflicting versions of libraries. In the best case, your project will work fine and you won’t even notice that this has happened. If you’re unlucky, though, you’ll get a mysterious runtime error because the wrong version of a library is installed.

In this case, my_app depends on foo_client and bar_client. Let’s assume that bar_client uses a feature that was introduced after requests 2.3.1. If pip installs requests==2.3.1 (from foo_client), bar_client will break because the feature it needs is missing! Note that foo_client and bar_client can each build fine independently.

In this case, my_app depends on foo_client and bar_client. Let’s assume that bar_client uses a feature that was introduced after requests 2.3.1. If pip installs requests==2.3.1 (from foo_client), bar_client will break because the feature it needs is missing!
Note that foo_client and bar_client can each build fine independently.

Continue reading

How Knewton Cutover the Core of its Infrastructure from Kafka 0.7 to Kafka 0.8

Kafka has been a key component of the Knewton architecture for several years now. Knewton has 17 Kafka topics consumed by 33 services. So when it came time to upgrade from Kafka 0.7 to Kafka 0.8 it was important to consider the cutover plan carefully. The needs of each producer/consumer pair were unique, so it was important to identify constraints by which to evaluate the plans. The rollout plans and the considerations by which they were evaluated are the focus of this blog post.

Producer vs. Consumer Cutover

There are two main approaches that one can take when doing a cutover from Kafka 0.7 to 0.8. The first option, call this the “producer cutover,” is to let the producer service cut over from producing to 0.7 to producing to 0.8, while having consumer services consume from both 0.7 and 0.8 simultaneously. The second approach, call this the “consumer cutover,” is to let the consumer services cut over from consuming from 0.7 to consuming from 0.8 while having the producer service produce to both 0.7 and 0.8 simultaneously.

Screen Shot 2015-09-28 at 12.17.11 PMScreen Shot 2015-09-28 at 12.16.48 PM

Rollout risk, consumer tolerance and rollback options differ for each approach and should be considered before choosing a cutover approach.

Rollout Risk

When evaluating rollout risk, consider how many consumers are involved. If there are many consumers, a rollout plan that switches all consumers at one time may be riskier than an approach that switches a single consumer at a time.

Consumer Tolerance

In order to determine the appropriate cutover method, it must first be made clear what conditions the consumer services tolerate.

  • How many duplicate messages are acceptable during a cutover?
  • How much message loss is acceptable during a cutover?
  • How much message delay is acceptable during a cutover?

Rollback

Unless message loss is not a big concern, it is important to have a rollback plan in case there are complications. It is not obvious how to rollback the cutover procedure without losing messages, or alternatively, re-consuming a large number of duplicates. How to plan a rollback depends completely on which cutover approach is used.

Inconsistent Partitioning

Another aspect of the consumer service to consider is if inconsistent partitioning is acceptable, i.e, is it acceptable to have two different consumer service instances receive messages for the same key for a short period of time during a cutover? For example, this could be a problem if it leads to two different consumer threads writing to the same index in a data store.

Producer Cutover Details

The steps for a producer cutover will most likely look like this:

  1. Deploy consumer service that consumes from both the 0.7 topic and the 0.8 topic that is to be cut over.
  2. Deploy producer service that produces to the 0.8 topic instead of the 0.7 topic.
  3. After the cutover is completed, cleanup by deploying consumer service that only consumes from the 0.8 topic.

A producer cutover has high rollout risk because it is all or nothing. All consumers will switch to consuming from 0.8 at the same time. If the topic to be cutover has few consumers, the risk of consumers failing during the cutover may be acceptable, and a producer cutover might be a good option.

If successful, consumer tolerance is not a factor, and no messages will be duplicated, lost or delayed.

If, for some reason, messages are not going through the system properly after the cutover, the system must be rolled back to a previous configuration that is known to function properly.

To ensure that no messages are lost during that rollback, the messages produced to 0.8 that failed to be processed properly must somehow be replayed to 0.7. This means that a producer cutover approach must have a reliable replay process that copies messages from 0.8 to 0.7 if message loss is not acceptable.

Any rollback scenario will be more complicated if there are many consumers. If the cutover only fails for some consumers, and some are successful, production must still revert to 0.7. Replaying messages would in this scenario cause message duplication for the consumers that had a successful cutover.

Consumer Cutover Details

The consumer cutover approach is recommended by the Kafka team, which provides the KafkaMigrationTool for this purpose.

In the words of the Kafka team, a consumer cutover using the KafkaMigrationTool would look like:

  • Setup the migration tool to consume messages from the 0.7 cluster and re-publish them to the 0.8 cluster.”
  • “Move the 0.7 consumers consuming from the 0.7 cluster to 0.8.”
  • “Move the 0.7 producers to 0.8.”

Rollout risk is mitigated with the consumer cutover because each consumer service may move from 0.7 to 0.8 individually.

The problem with the approach, as described on the migration page, is that there is a very real risk of some message loss, or alternatively, some message duplication. The page has no discussion of consumer tolerance, including risk of message loss and message duplication, and this is important to consider. Assuming that the consumer is configured with auto.offset.reset=largest, some messages will be lost if the 0.7 consumers are stopped before the 0.8 consumers are started. If 0.7 consumers are stopped after the 0.8 consumers are started, there is a possibility of significant message duplication. As most readers would already know, the default Kafka consumer behavior of “at least once delivery,” also does not guarantee no duplicate messages, but for a cutover, there are things that can be done to eliminate the otherwise guaranteed duplicates.

To avoid the pitfalls mentioned above, the concept of tagging and filtering can be introduced. A tag is an additional piece of information the producer adds to each message, and the consumer then applies a filter based on this tag. There are a few different ways this tagging can be done. The suggestion made here is to filter based on a timestamp. The producer adds an additional key to the message to say the time at which it was put onto the queue. Based on some cutover time X, 0.7 consumers would only process messages with timestamps before X, and 0.8 consumers would only process messages with timestamps after X.

The benefit of the tag-and-filter approach is that after the filtering consumer service has been deployed, it is possible to verify that the 0.8 consumer is up and running correctly, although filtering out messages that should only be processed by the 0.7 consumer.

A consumer cutover process based on timestamp filtering would look like this:

  1. Deploy producer service that produces to both the 0.7 topic and the 0.8 topic that is to be cut over.
  2. Determine the cutover time X.
  3. Deploy consumer service that consumes with a time filter from both the 0.7 topic and the 0.8 topic. The 0.7 consumers only process messages with timestamps before X and the 0.8 consumers only process messages with timestamps after X.
  4. Wait for the cutover time.
  5. Deploy consumer service version that only consumes from the 0.8 topic.
  6. Deploy producer service version that only produces to the 0.8 topic.

Using the KafkaMigration tool to accomplish production to both 0.7 and 0.8 is possible, but it adds lag to the message delivery and only replaces one of the steps listed in the tag-and-filter approach. If many consumers are to be cutover, not necessarily all around the same time, this tool may have to run for a long time. A tool that runs for a long period of time and provides critical functionality should probably be treated the same way as other critical parts of the system, which typically means that it requires monitoring and alerting. The alternative — creating a producer service version that produces to both 0.7 and 0.8 — might require less work.

If for some unforeseen reason messages are still not being processed properly by the 0.8 consumer after the cutover, a rollback plan should be executed. Ideally, as the 0.7 consumer starts to filter out messages, it should also stop committing offsets. A rollback would then only entail re-deploying the old consumer service version that consumes only from 0.7, and message consumption would resume from the time of the cutover. Due to bug KAFKA-919, stopping the offsets from being committed might require some extra work (discussed more later).

If the cutover is successful, but issues with the 0.8 setup (e.g., performance issues) are discovered days later, a “non-immediate” rollback plan should be executed. Unless several days worth of message duplicates are acceptable, a different approach than the “immediate” rollback plan is required. For this scenario, a reverse cutover is recommended. Simply configure the consumer service to perform a reverse timestamp-based cutover with 0.7 consumers now only processing messages with timestamps after some reverse-cutover time Y, and 0.8 consumers only processing messages with timestamps before Y.

Inconsistent Partitioning

When consumer tolerance was previously discussed, the issue of having two different consumer service instances receiving messages for the same key was mentioned. It would be difficult (if even possible) to make 0.7 and 0.8 message partitioning match perfectly. A perfect matching here means that with a double producer and double consumer setup, each message, produced to both 0.7 and 0.8, will be routed to the same consumer service instance. Without this guarantee, two messages with the same key, one created just before the cutover time and one just after, might go to two different consumer service instances. As previously mentioned, this could be a problem if, for example, it leads to two different consumer threads writing to the same index in a data store.

If inconsistent partitioning is an issue, in order to prevent two threads from processing messages with the same key at the same time, the only option might be to let the 0.7 message queue “drain” before starting to process the 0.8 messages. This cutover method could be accomplished by either a producer or a consumer cutover, the additional key step being a pause before starting the 0.8 consumer service. Pausing the consumer service will obviously cause some message delay.

KAFKA-919

Ticket KAFKA-919 describes a major bug in the 0.7 client where “disabling of auto commit is ignored during consumer group rebalancing.” In order to be able to do an “immediate” rollback, the 0.7 consumer must stop committing offsets at the cutover time. This bug causes offsets to be committed during a consumer rebalance, even if consumers have set the auto-commit property to false.

The easiest workaround for this problem is preventing a consumer rebalance from taking place by scaling down the consumer service to have only one consumer in the consumer group. In this rollback scenario, the one consumer service instance is restarted with a configuration to only consume from 0.7 (because the 0.8 setup is not working, for some unexpected reason). Restarting all consumers in a consumer group usually triggers a consumer rebalance, unless there is only one consumer.

Conclusion

For most topic cutovers, Knewton employed a time-based tag-and-filter approach. This approach was deemed the least risky for the requirements of those producers and consumers, as well as the easiest to reason about and rollback. Hopefully the considerations outlined above will help you to choose the best cutover plan for your system.

Written by Christofer Hedbrandh and Sarah Haskins

Best Practices for Python Dependency Management

Dependency management is like your city’s sewage system. When it’s working well, it’s easy to forget that it even exists. The only time you’ll remember it is when you experience the agony induced by its failure.

Here’s what we want to accomplish with dependency management at Knewton:

  • Builds should be stable across environments. If a project builds on my machine, it should build on others’ machines and on our build server.
  • Builds should be stable over time. If a project builds now, it shouldn’t break in the future.1
  • Anyone at Knewton should be able to easily download, build, and make changes to any Knewton project.
  • We should be able to have many different projects with large dependency trees without running into dependency hell.

The items below reflect how we do Python dependency management at Knewton. You may not need everything in this list, so items are introduced in order of increasing complexity.

Easily install your dependencies with pip

When you want to use a Python library from your code, you’ll need to download the library, put it somewhere on your computer, and possibly build any external routines (e.g., C, C++, Fortran!?) that the library uses. It’s possible to do this all by hand, but there’s a much better way: pip.2 Pip is a Python tool that specializes in installing Python packages. For example, just run pip install numpy to install numpy and its dependencies. Pip also helps you to keep your version control repositories small by giving you a reproducible way to install packages without needing to include them in your source code repo.

Not only does pip let you install normal source packages, but it can also install packages from source control repos, wheels, and legacy binary distribution formats.

The instructions for installing Python from The Hitchhiker’s Guide to Python will also tell you how to install pip.3 Pip’s user guide is a good way to get started with using pip, and the pip install documentation is helpful if you need to dive deeper.

Pin your requirements with a requirements.txt file

It’s easy to get a Python project off the ground by just using pip to install dependent packages as you go. This works fine as long as you’re the only one working on the project, but as soon as someone else wants to run your code, they’ll need to go through the process of figuring which dependencies the project needs and installing them all by hand. Worse yet, if they install a different version of a dependency than you used, they could end up with some very mysterious errors.

To prevent this, you can define a requirements.txt file that records all of your project’s dependencies, versions included. This way, others can run pip install -r requirements.txt and all the project’s dependencies will be installed automatically! Placing this file into version control alongside the source code makes it easy for others to use and edit it. In order to ensure complete reproducibility, your requirements.txt file should include all of your project’s transitive (indirect) dependencies, not just your direct dependencies. Note that pip does not use requirements.txt when your project is installed as a dependency by others — see below for more on this.

SAMPLE FILE

requests==2.3.0
six==1.4.1

The pip user guide has a good section on requirements files.

Isolate your Python environments with virtualenvs

As a result of how Python paths work, pip installs all packages globally by default. This may be confusing if you’re used to Maven or npm, which install packages into your project directory. This may seem like an irrelevant detail, but it becomes very frustrating once you have two different projects that need to use different versions of the same library. Python requires some extra tooling in order to install separate dependencies per-project.

project_1 and project_2 depend on different versions of the requests library. This is bad because only one version of requests can be installed at a time.

project_1 and project_2 depend on different versions of the requests library. This is bad because only one version of requests can be installed at a time.

The solution for this problem is to use virtual environments. A virtual environment consists of a separate copy of Python, along with tools and installed packages. Creating a virtualenv for each project isolates dependencies for different projects. Once you have made a virtualenv for your project, you can install all of that project’s dependencies into the virtualenv instead of into your global Python environment. This makes your setup look more like something you would create with Maven.

Now you can install a different version of requests into each virtualenv, eliminating the conflict.

Now you can install a different version of requests into each virtualenv, eliminating the conflict.

I try to keep the number of packages I install to a minimum, both in my global Python environment, and in each virtualenv. I’ll be doing a follow-up post on how to handle virtualenvs with large numbers of packages installed.

A good virtualenv tutorial is A non-magical introduction to Pip and Virtualenv for Python beginners. The Python Packaging Guide provides a high-level overview that ties together pip and virtualenvs.

Build and rebuild virtualenvs easily with tox

Now that you’re using virtualenvs for all your projects, you’ll want an easy way to build the virtualenv and install all the dependencies from your requirements.txt file. An automatic way to set up virtualenvs is important for getting new users started with your project, and is also useful for enabling you to quickly and easily rebuild broken virtualenvs.

Tox is a Python tool for managing virtualenvs. It lets you quickly and easily build virtualenvs and automate running additional build steps like unit tests, documentation generation, and linting. When I download a new Python project at Knewton, I can just run tox, and it’ll build a new virtualenv, install all the dependencies, and run the unit tests. This really reduces setup friction, making it easy to contribute to any Python project at Knewton.

A tox.ini file at Knewton might look something like this:

[tox]
envlist=py27                         # We use only Python 2.7
indexserver =
     # We host our own PyPI (see below)
     default = https://python.internal.knewton.com/simple

[testenv]
deps =
     -rrequirements.txt              # Pinned requirements (yes, no space)
commands=
     pipconflictchecker              # Check for any version conflicts
     py.test . {posargs}             # Run unit tests

Get started with tox at its home page.

Indicate transitive dependencies using install_requires

At some point, you may want to package your Python project with sdist or as a wheel, so that others can depend on it by installing it with pip. Dependency management gets a bit more complicated at this point, because pip actually doesn’t look at your requirements.txt file when installing your packaged project.

Instead, pip looks at the install_requires field in setup.py, so you should be sure to fill this out in order to make a project that others can easily install. In contrast to requirements.txt, this field should list only your direct dependencies. Although requirements in requirements.txt should generally be pinned to exact versions, requirements in install_requires should permit the largest possible ranges. If you’d like to understand these differences, “The Package Dependency Blues” does a great job of explaining requirements.txt and install_requires.4

The way tox handles requirements.txt and install_requires can be a bit confusing. First, tox installs requirements from the deps section of tox.ini. Then tox runs python setup.py install, which will install dependencies from your install_requires. Since your requirements.txt file should contain a superset of the packages in your install_requires, this second step should not install any requirements if you’ve filled out your deps section correctly.

Of course, now you have two different lists of requirements to maintain. If only there were a simple way to do so! Pip-compile, from pip-tools, is the most promising tool for keeping your requirements.txt and install_requires in sync. It’s not yet fully mature, but it’s very helpful for projects with many transitive dependencies.

Specify which versions of Python tools you want to support

If you’re using pip, virtualenv, and tox, then anyone with those tools should be able to build your project, right? Unfortunately, the answer is, “almost.” If someone is running a different version of pip, virtualenv, or tox, their build may work differently than yours. As an example, tox 1.x passes all environment variables through to the commands it’s running, but tox 2.x runs its tasks in an environment with only a whitelist of environment variables. This means that, if you had a script that tried to read the $EDITOR environment variable, it might work fine when built with tox 1.x, but fail with tox 2.x.

At Knewton, we take the approach of restricting the allowed versions of these tools. We have a script called “Python Doctor” that will check your versions of Python, pip, virtualenv, and tox to ensure that they’re within our band of accepted ranges.

For an open source project, this is a little more complicated because you can’t restrict the versions of the tools running on your contributors’ workstations. In this case, it’s a good idea to mention the versions of these tools with which your project can be built.5 Note that this only applies to tools that are installed in your global Python environment, which will not appear in your requirements.txt or install_requires. For example, tox or pip would not generally appear in a requirements.txt file.

Example README snippet:

To build this project, run `tox -r`. This project has been tested with tox >=1.8,<2. If you want to make your own virtualenv instead, we recommend using virtualenv >=13.

Control your packages with a PyPI server

By default, pip will install packages from the python.org pypi server. If you work at a place with proprietary code, you may wish to run your own PyPI server. This will allow you to install your own packages as easily as those from the main PyPI server.

It’s actually much easier to set this up than you might think: your PyPI server can be as simple as an HTTP server serving a folder that contains sdist’ed tarballs of your Python project!

By hosting your own PyPI server, you can make it easy to maintain forked versions of external libraries.

You can also use a PyPI server to encourage consistent builds and reduce version conflicts by limiting the ability to add new libraries to your organization’s PyPI server.

Learn more about setting up a PyPI server here.

Examples

I’ve added to Github two Python project templates that illustrate how to tie all of this together:

Conclusion

This is our strategy, but you’ll probably need to modify it to suit your own circumstances. Additionally, the Python community has been growing quickly recently, so it’s likely that some of these practices will be replaced in the next few years. If you’re reading this in 2018, hopefully there will be some easier ways to manage Python dependencies!

Notes

  1. If you’re used to other dependency management systems, this may sound trivial. With Python, it’s not!
  2. “Pip” stands for “pip installs packages.” Easy_install was formerly used for this, but nowadays pip is superior.
  3. Pip is now included with Python 2 versions starting with 2.7.9, as well as Python 3 versions starting with 3.4.
  4. A nagging aside: make sure to follow semantic versioning to make it easier for other projects to restrict the version of your project in their install_requires.
  5. If you want to take this to the next level, you can specify your build tools programmatically too! Make a file called requirements-meta.txt that contains pinned versions of your build tools like tox. Then you’ll have a two-step build process:
    1. Install your per-project build system. To do this, use your global tox or virtualenvwrapper to make a virtualenv with this pinned version of tox in it.
    2. Use your per-project build system to build your project. To do this, run the tox that you just installed to run the project’s primary builds. If you understood this, great job!