Finagle: A Protocol-Agnostic RPC System

By
Friday, 19 August 2011

Finagle is a protocol-agnostic, asynchronous RPC system for the JVM that makes it easy to build robust clients and servers in Java, Scala, or any JVM-hosted language.

Rendering even the simplest web page on twitter.com requires the collaboration of dozens of network services speaking many different protocols. For example, in order to render the home page, the application issues requests to the Social Graph Service, Memcached, databases, and many other network services. Each of these speaks a different protocol: Thrift, Memcached, MySQL, and so on. Additionally, many of these services speak to other services — they are both servers and clients. The Social Graph Service, for instance, provides a Thrift interface but consumes from a cluster of MySQL databases.

In such systems, a frequent cause of outages is poor interaction between components in the presence of failures; common failures include crashed hosts and extreme latency variance. These failures can cascade through the system by causing work queues to back up, TCP connections to churn, or memory and file descriptors to become exhausted. In the worst case, the user sees a Fail Whale.

Challenges of building a stable distributed system

Sophisticated network servers and clients have many moving parts: failure detectors, load-balancers, failover strategies, and so on. These parts need to work together in a delicate balance to be resilient to the varieties of failure that occur in a large production system.

This is made especially difficult by the many different implementations of failure detectors, load-balancers, and so on, per protocol. For example, the implementation of the back-pressure strategies for Thrift differ from those for HTTP. Ensuring that heterogeneous systems converge to a stable state during an incident is extremely challenging.

Our approach

We set out to develop a single implementation of the basic components of network servers and clients that could be used for all of our protocols. Finagle is a protocol-agnostic, asynchronous Remote Procedure Call (RPC) system for the Java Virtual Machine (JVM) that makes it easy to build robust clients and servers in Java, Scala, or any JVM-hosted language. Finagle supports a wide variety of request/response- oriented RPC protocols and many classes of streaming protocols.

Finagle provides a robust implementation of:

  • connection pools, with throttling to avoid TCP connection churn;
  • failure detectors, to identify slow or crashed hosts;
  • failover strategies, to direct traffic away from unhealthy hosts;
  • load-balancers, including “least-connections” and other strategies; and
  • back-pressure techniques, to defend servers against abusive clients and dogpiling.

Additionally, Finagle makes it easier to build and deploy a service that

  • publishes standard statistics, logs, and exception reports;
  • supports distributed tracing (a la Dapper) across protocols;
  • optionally uses ZooKeeper for cluster management; and
  • supports common sharding strategies.

We believe our work has paid off — we can now write and deploy a network service with much greater ease and safety.

Finagle at Twitter

Today, Finagle is deployed in production at Twitter in several front- and back-end serving systems, including our URL crawler and HTTP Proxy. We plan to continue deploying Finagle more widely.

Finagle: A Protocol-Agnostic RPC System A Finagle-based architecture (under development)

The diagram illustrates a future architecture that uses Finagle pervasively. For example, the User Service is a Finagle server that uses a Finagle memcached client, and speaks to a Finagle Kestrel service.

How Finagle works

Finagle is flexible and easy to use because it is designed around a few simple, composable primitives: Futures, Services, and Filters.

Future objects

In Finagle, Future objects are the unifying abstraction for all asynchronous computation. A Future represents a computation that may not yet have completed and that can either succeed or fail. The two most basic ways to use a Future are to:

  • block and wait for the computation to return
  • register a callback to be invoked when the computation eventually succeeds or fails

    Future callbacks

    In cases where execution should continue asynchronously upon completion of a computation, you can specify a success and a failure callback. Callbacks are registered via the onSuccess and onFailure methods:

val request: HttpRequest =
  new DefaultHttpRequest(HTTP_1_1, GET, "/")
val responseFuture: Future[HttpResponse] = client(request)

responseFuture onSuccess { responseFuture =>
  println(responseFuture)
} onFailure { exception =>
  println(exception)
}

Composing Futures

Futures can be combined and transformed in interesting ways, leading to the kind of compositional behavior commonly seen in functional programming languages. For instance, you can convert a Future[String] to a Future[Int] by using map:

 val stringFuture: Future[String] = Future("1")
  val intFuture: Future[Int] = stringFuture map { string =>
    string.toInt
  }

Similarly, you can use flatMap to easily pipeline a sequence of Futures:

val authenticatedUser: Future[User] =
    User.authenticate(email, password)

val lookupTweets: Future[Seq[Tweet]] =
    authenticatedUser flatMap { user =>
      Tweet.findAllByUser(user)
    }

In this example, User.authenticate() is performed asynchronously; Tweet.findAllByUser() is invoked on its eventual result. This is alternatively expressed in Scala, using the for statement:

for {
   user <- User.authenticate(email, password)
   tweets <- Tweet.findAllByUser(user)
} yield tweets

Handling errors and exceptions is very easy when Futures are pipelined using flatMap or the for statement. In the above example, if User.authenticate() asynchronously raises an exception, the subsequent call to Tweet.findAllByUser() never happens. Instead, the result of the pipelined expression is still of the type Future[Seq[Tweet]], but it contains the exceptional value rather than tweets. You can respond to the exception using the onFailure callback or other compositional techniques.

A nice property of Futures, as compared to other asynchronous programming techniques (such as the continuation passing style), is that you an easily write clear and robust asynchronous code, even with more sophisticated operations such as scatter/gather:

val severalFutures = Seq[Future[Int]] =
   Seq(Tweet.find(1), Tweet.find(2), …)
val combinedFuture: Future[Seq[Int]] =
   Future.collect(severalFutures)

Service objects

A Service is a function that receives a request and returns a Future object as a response. Note that both clients and servers are represented as Service objects.

To create a Server, you extend the abstract Service class and listen on a port. Here is a simple HTTP server listening on port 10000:

val service = new Service[HttpRequest, HttpResponse] {
  def apply(request: HttpRequest) =
    Future(new DefaultHttpResponse(HTTP_1_1, OK))
}

val address = new InetSocketAddress(10000)

val server: Server[HttpRequest, HttpResponse] = ServerBuilder()
  .name("MyWebServer")
  .codec(Http())
  .bindTo(address)
  .build(service)

Building an HTTP client is even easier:

val client: Service[HttpRequest, HttpResponse] = ClientBuilder()
  .codec(Http())
  .hosts(address)
  .build()

// Issue a request, get a response:
val request: HttpRequest =
  new DefaultHttpRequest(HTTP_1_1, GET, "/")

client(request) onSuccess { response =>
  println("Received response: " + response)
}

Filter objects

Filters are a useful way to isolate distinct phases of your application into a pipeline. For example, you may need to handle exceptions, authorization, and so forth before your Service responds to a request.

A Filter wraps a Service and, potentially, converts the input and output types of the Service to other types. In other words, a Filter is a Service transformer. Here is a filter that ensures an HTTP request has valid OAuth credentials that uses an asynchronous authenticator service:

class RequireAuthentication(a: Authenticator) extends Filter[...] {
  def apply(
    request: Request,
    continue: Service[AuthenticatedRequest, HttpResponse]
  ) = {
      a.authenticate(request) flatMap {
        case AuthResult(OK, passport) =>
          continue(AuthenticatedRequest(request, passport))
        case ar: AuthResult(Error(code)) =>
          Future.exception(new RequestUnauthenticated(code))
    }
  }
}

A Filter then decorates a Service, as in this example:

val baseService = new Service[HttpRequest, HttpResponse] {
  def apply(request: HttpRequest) =
    Future(new DefaultHttpResponse(HTTP_1_1, OK))
}

val authorize = new RequireAuthorization(…)
val handleExceptions = new HandleExceptions(...)

val decoratedService: Service[HttpRequest, HttpResponse] =
  handleExceptions andThen authorize andThen baseService

Finagle is an open source project, available under the Apache License, Version 2.0. Source code and documentation are available on GitHub.

Acknowledgements

Finagle was originally conceived by Marius Eriksen and Nick Kallen. Other key contributors are Arya Asemanfar, David Helder, Evan Meagher, Gary McCue, Glen Sanford, Grant Monroe, Ian Ownbey, Jake Donham, James Waldrop, Jeremy Cloud, Johan Oskarsson, Justin Zhu, Raghavendra Prabhu, Robey Pointer, Ryan King, Sam Whitlock, Steve Jenson, Wanli Yang, Wilhelm Bierbaum, William Morgan, Abhi Khune, and Srini Rajagopal.