Drinking from the enterprise stream

Monday, 7 July 2014

Two years ago, we released Hosebird Client (HBC), a Java HTTP library for consuming our Streaming API. As of late last week we’re pleased to announce v2.2.0 of HBC and the addition of the Gnip streaming APIs to the project, which includes the PowerTrack and Decahose products.

Now it’s easier than ever to transition from using the Public API to consuming even higher volumes of data from Gnip — and it only requires some small configurations and changing an endpoint. If you haven’t already done so, revisit the original post to familiarize yourself with the client.

Why use HBC?

HBC implements reconnection logic, backing off the stream when necessary, along with other best practices. It’s robust enough to handle the largest volumes we can pass it without falling over, and we use it at Twitter. We’ve already done the heavy lifting so you can concentrate on deriving value from the data more quickly.

Streaming best practices

As always, we recommend a few best practices when connecting to streaming APIs, including:

  • Handle the incoming payloads on a separate thread (pass messages across threads with a Queue)
  • Log errors to a rolling file
  • Keep authentication data safe (no password/token literals, no config files in git!)
  • Cleanly handle reconnects/disconnects from the stream (free with HBC)
  • Handle a heartbeat mechanism to keep the connection alive (also free with HBC)

Migrating to an enterprise stream

Let’s say you have been using the public streaming API (t.co/streaming) and are ready to move up to an enterprise-level data package from Gnip. Connecting to these new streaming options requires just a few lines of code, as demonstrated in the example below. The main difference is the switch to a BasicAuth authentication object to accommodate Gnip authentication.

The first step for using Hosebird is to set up the client using the ClientBuilder API:

 // Gnip credentials
    String account = “my_gnip_account_name";
    String label = “my_gnip_stream_label";
    String product = “my_gnip_product";
    String userName = “my_gnip_user_name";
    String password = “my_gnip_password";
    LinkedBlockingQueue<String> downstream = new LinkedBlockingQueue<String>(10000);
    // NEW EnterpriseStreamingEndpoint wraps Gnip endpoints
    RealTimeEnterpriseStreamingEndpoint endpoint = new RealTimeEnterpriseStreamingEndpoint(account, product, label);

    // NEW BasicAuth enables Gnip authentication
    BasicAuth auth = new BasicAuth(userName, password);
    
    // NEW LineStringProcessor to handle Gnip formatted streaming HTTP
    LineStringProcessor processor = new LineStringProcessor(downstream);
    Client hostBirdClient = new ClientBuilder()
            .name("Connection Name")
            .hosts(Constants.ENTERPRISE_STREAM_HOST) // Declared in HBC Constants
            .endpoint(endpoint)
            .authentication(auth)
            .processor(processor)
            .build();

After we have created a Client, we can connect and process messages (just like we did before with the Public API):

   hostBirdClient.connect();
    while (!hostBirdClient.isDone()) {
      String message = null;
      try {
        message = downstream.take();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      System.out.println(message); // Here is where you could put the payload on a queue for another thread to come in and take care of the message
    }

Notes

One thing to keep in mind: HBC-Twitter4J models haven’t been updated yet to accommodate the Gnip data format, so parsing the payloads is currently left to the user. In addition to supporting Gnip’s enterprise endpoints, we have refactored, updated and polished Hosebird with this release. We look forward to your comments in our forums and, as always, pull requests are more than welcome. Happy streaming!

Update: On July 25, 2014, we replaced the samples to fix a typo in the code block as well as remove the “.eventMessageQueue( )” method call, which is not required in this instance