Netty at Twitter with Finagle

Thursday, 13 February 2014

Finagle is our fault tolerant, protocol-agnostic RPC framework built atop Netty. Twitter’s core services are built on Finagle, from backends serving user profile information, Tweets, and timelines to front end API endpoints handling HTTP requests.

Part of scaling Twitter was the shift from a monolithic Ruby on Rails application to a service-oriented architecture. In order to build out this new architecture we needed a performant, fault tolerant, protocol-agnostic, asynchronous RPC framework. Within a service-oriented architecture, services spend most of their time waiting for responses from other upstream services. Using an asynchronous library allows services to concurrently process requests and take full advantage of the hardware. While Finagle could have been built directly on top of NIO, Netty had already solved many of the problems we would have encountered as well as provided a clean and clear API.

Twitter is built atop several open source protocols: primarily HTTP, Thrift, Memcached, MySQL, and Redis. Our network stack would need to be flexible enough that it could speak any of these protocols and extensible enough that we could easily add more. Netty isn’t tied to any particular protocol. Adding to it is as simple as creating the appropriate event handlers. This extensibility has lead to many community driven protocol implementations including, SPDY, PostrgreSQL, WebSockets, IRC, and AWS.

Netty’s connection management and protocol agnosticism provided an excellent base from which Finagle could be built. However we had a few other requirements Netty couldn’t satisfy out of the box as those requirements are more “high-level”. Clients need to connect to and load balance across a cluster of servers. All services need to export metrics (request rates, latencies, etc) that provide valuable insight for debugging service behavior. With a service-oriented architecture a single request may go through dozens of services making debugging performance issues nearly impossible without a tracing framework. Finagle was built to solve these problems. In the end Finagle relies on Netty for IO multiplexing providing a transaction-oriented framework on top of Netty’s connection-oriented model.

How Finagle Works

Finagle emphasizes modularity by stacking independent components together. Each component can be swapped in or out depending on the provided configuration. For example, tracers all implement the same interface. Thus, a tracer can be created to send tracing data to a local file, hold it in memory and expose a read endpoint, or write out to the network.

At the bottom of a Finagle stack is a Transport. A Transport represents a stream of objects that can be asynchronously read from and written to. Transports are implemented as Netty ChannelHandlers and inserted into the end of a ChannelPipeline. Finagle’s ChannelHandlerTransport manages Netty interest ops to propagate back pressure. When Finagle indicates that the service is ready to read, Netty reads data off the wire and runs it through the ChannelPipeline where they’re interpreted by a codec then sent to the Finagle Transport. From there, Finagle sends the message through its own stack.

For client connections, Finagle maintains a pool of transports across which it balances load. Depending on the semantics of the provided connection pool Finagle either requests a new connection from Netty or re-uses an existing one if it’s idle. When a new connection is requested, a Netty ChannelPipeline is created based on the client’s codec. Extra ChannelHandlers are added to the ChannelPipeline for stats, logging, and SSL. The connection is then handed to a channel transport which Finagle can write to and read from. If all connections are busy requests will be queued according to configurable queueing policies.

On the server side Netty manages the codec, stats, timeouts, and logging via a provided ChannelPipelineFactory. The last ChannelHandler in a server’s ChannelPipeline is a Finagle bridge. The bridge will watch for new incoming connections and create a new Transport for each one. The Transport wraps the new channel before it’s handed to a server implementation. Messages are then read out of the ChannelPipeline and sent to the implemented server instance.

Netty at Twitter with Finagle

1) Finagle Client which is layered on top of the Finagle Transport. This Transport abstracts Netty away from the user
2) The actual ChannelPipeline of Netty that contains all the ChannelHandler implementations that do the actual work
3) Finagle Server which is created for each connection and provided a transport to read from and write to
4) ChannelHandlers implementing protocol encoding/decoding logic, connection level stats, SSL handling.
Bridging Netty and Finagle

Finagle clients use ChannelConnector to bridge Finagle and Netty. ChannelConnector is a function that takes a SocketAddress and returns a Future Transport. When a new connection is requested of Netty, Finagle uses a ChannelConnector to request a new Channel and create a new Transport with that Channel. The connection is established asynchronously, fulfilling the Future with the new Transport on success or a failure if the connection cannot be established. A Finagle client can then dispatch requests over the Transport.

Finagle servers bind to an interface and port via a Listener. When a new connection is established, the Listener creates a Transport and passes it to a provided function. From there, the Transport is handed to a Dispatcher which dispatches requests from the Transport to the Service according to a given policy.

Finagle’s Abstraction

Finagle’s core concept is a simple function (functional programming is the key here) from Request to Future of Response.

type Service[Req, Rep] = Req => Future[Rep]

A future is a container used to hold the result of an asynchronous operation such as a network RPC, timeout, or disk I/O operation. A future is either empty—the result is not yet available; succeeded—the producer has completed and has populated the future with the result of the operation; or failed—the producer failed, and the future contains the resulting exception.

This simplicity allows for very powerful composition. Service is a symmetric API representing both the client and the server. Servers implement the service interface. The server can be used concretely for testing or Finagle can expose it on a network interface. Clients are provided an implemented service that is either virtual or a concrete representation of a remote server.

For example, we can create a simple HTTP server by implementing a service that takes an HttpReq and returns a Future[HttpRep] representing an eventual response:

val s: Service[HttpReq, HttpRep] = new Service[HttpReq, HttpRep] { 
  def apply(req: HttpReq): Future[HttpRep] =
    Future.value(HttpRep(Status.OK, req.body))
}
Http.serve(":80", s)

A client is then provided with a symmetric representation of that service:

val client: Service[HttpReq, HttpRep] = Http.newService("twitter.com:80")
val f: Future[HttpRep] = client(HttpReq("/"))
f map { rep => transformResponse(rep) }

This example exposes the server on port 80 of all interfaces and consumes from twitter.com port 80. However we can also choose not to expose the server and instead use it directly:

server(HttpReq("/")) map { rep => transformResponse(rep) }

Here the client code behaves the same way but doesn’t require a network connection. This makes testing clients and servers very simple and straightforward.

Clients and servers provide application-specific functionality. However, there is a need for application agnostic functionality as well. Timeouts, authentication, and statics are a few examples. Filters provide an abstraction for implementing application-agnostic functionality.

Filters receive a request and a service with which it is composed.

type Filter[Req, Rep] = (Req, Service[Req, Rep]) => Future[Rep]

Filters can be chained together before being applied to a service.

recordHandletime andThen
traceRequest andThen
collectJvmStats
andThen myService

This allows for clean abstractions of logic and good separation of concerns. Internally, Finagle heavily uses filters. Filters enhance modularity and reusability. They’ve proved valuable for testing as they can be unit tested in isolation with minimal mocking.

Filters can also modify both the data and type of requests and responses. The figure below shows a request making its way through a filter chain into a service and back out.

Netty at Twitter with Finagle

We might use type modification for implementing authentication.

val auth: Filter[HttpReq, AuthHttpReq, HttpRes, HttpRes] =
{ (req, svc) => authReq(req) flatMap { authReq => svc(authReq) } }

val authedService: Service[AuthHttpReq, HttpRes] = ...
val service: Service[HttpReq, HttpRes] =
auth andThen authedService

Here we have a service that requires and AuthHttpReq. To satisfy the requirement, a filter is created that can receive an HttpReq and authenticate it. The filter is then composed with the service, yielding a new service that can take an HttpReq and produce an HttpRes. This allows us to test the authenticating filter in isolation of the service.

Failure Management

At scale, failure becomes common rather than exceptional; hardware fails, networks become congested, network links fail. Libraries capable of extremely high throughput and extremely low latency are meaningless if the systems they run on or communicate with fail. To that end, Finagle is set up to manage failures in a principled way. It trades some throughput and latency for better failure management.

Finagle can balance load across a cluster of hosts implicitly using latency as a heuristic. A Finagle client locally tracks load on every host it knows about. It does so by counting the number of outstanding requests being dispatched to a single host. Given that, Finagle will dispatch new requests to hosts with the lowest load and implicitly the lowest latency.

Failed requests will cause Finagle to close the connection to the failing host and remove it from the load balancer. In the background, Finagle will continuously try to reconnect. The host will be re-added to the load balancer only after Finagle can re-establish a connection. Service owners are then free to shut down individual hosts without negatively impacting downstream clients. Clients also keep per-connection health heuristics and remove the connection if it’s deemed unhealthy.

Composing Services

Finagle’s service as a function philosophy allows for simple, but expressive code. For example, a user’s request for their home timeline touches several services. The core of these are the authentication service, timeline service, and Tweet service. These relationships can be expressed succinctly.

val timelineSvc = Thrift.newIface[TimelineService](...) // #1 
val tweetSvc = Thrift.newIface[TweetService](...)
val authSvc = Thrift.newIface[AuthService](...)
 
val authFilter = Filter.mk[Req, AuthReq, Res, Res] { (req, svc) => // #2 
  authSvc.authenticate(req) flatMap svc(_)
}
 
val apiService = Service.mk[AuthReq, Res] { req => 
  timelineSvc(req.userId) flatMap {tl =>
    val tweets = tl map tweetSvc.getById(_)
    Future.collect(tweets) map tweetsToJson(_) }
    } 
  } //#3 
Http.serve(":80", authFilter andThen apiService) // #4
 
// #1 Create a client for each service
// #2 Create new Filter to authenticate incoming requests
// #3 Create a service to convert an authenticated timeline request to a json response 
// #4 Start a new HTTP server on port 80 using the authenticating filter and our service

Here we create clients for the timeline service, Tweet service, and authentication service. A filter is created for authenticating raw requests. Finally, our service is implemented, combined with the auth filter and exposed on port 80.

When a request is received, the auth filter will attempt to authenticate it. A failure will be returned immediately without ever affecting the core service. Upon successful authentication the AuthReq will be sent to the API service. The service will use the attached userId to lookup the user’s timeline via the timeline service. A list of tweet ids is returned then iterated over. Each id is then used to request the associated tweet. Finally, the list of Tweet requests is collected and converted into a JSON response.

As you can see, the flow of data is defined and we leave the concurrency to Finagle. We don’t have to manage thread pools or worry about race conditions. The code is clear and safe.

Conclusion

We’ve been working closely with the Netty committers to improve on parts of Netty that both Finagle and the wider community can benefit from. Recently the internal structure of Finagle has been updated to be more modular, paving the way for an upgrade to Netty 4.

Finagle has yielded excellent results. We’ve managed to dramatically increase the amount of traffic we can serve while reducing latencies and hardware requirements. For example, after moving our API endpoints from the Ruby stack onto Finagle, we saw p99 latencies drop from hundreds of milliseconds to tens. Our new stack has enabled us to reach new records in throughput and as of this writing our record tweets per second is 143,199.

Finagle was born out of a need to set Twitter up to scale out to the entire globe at a time when we were struggling with site stability for our users. Using Netty as a base, we could quickly design and build Finagle to manage our scaling challenges. Finagle and Netty handle every request Twitter sees.

Thanks

This post will also appear as a case study in the Netty in Action book by Norman Maurer.

Your Server as a Function by Marius Eriksen provides more insight into Finagle’s philosophy.

Many thanks to Trustin Lee and Norman Maurer for their work on Netty. Thanks to Marius Eriksen, Evan Meagher, Moses Nakamura, Steve Gury, Ruben Oanta, Brian Degenhardt for their insights.