Infrastructure

Streaming logging pipeline of Home timeline prediction system

By
Monday, 22 June 2020

In the default configuration, a Twitter user's home timeline will show the top Tweets first. Top Tweets are ones you are likely to care about most, and we choose them based on accounts you interact with most, Tweets you engage with, and much more. The system that picks the top Tweets uses a dynamic algorithm known as a machine learning model to predict what Tweets you will be most interested in. The model learns how to make these predictions in a process called training, where it analyzes large amounts of data, and forms an understanding of user interests, based on their previous behaviour.

However, since user interest is constantly changing, this model must be updated regularly in a process called refreshing the model. In a recent effort, we have successfully reduced the time to refresh this model from 7 days to about 1 day via a redesign of our data logging pipeline.

This makes the model more responsive and adaptive to changes in your interests, providing top Tweets that change as quickly your interests do. Additionally, this change has helped the internal systems that back the Home Timeline ranking to be more agile, robust, and resilient.

This post is unavailable
This post is unavailable.

Background

It will be helpful to have some background information about how this prediction system works. The lifecycle of the prediction system  consists of the following stages (as shown in Figure 1):

This post is unavailable
This post is unavailable.
  • Data Collection: Modern prediction systems rely heavily on large amounts of data known as features and labels; this data is collected from both server logs and user engagements (e.g., Favorites and Retweets).
  • Joining the Data: Normally, the two kinds of data, features and labels, are stored separately, and to make them available for model training, we will need to join them together. Each Tweet and all its features will have a label of either positive (the Tweet is engaged), or negative (the Tweet is not engaged). 
  • Downsampling: Features and their labels make up an “example.” Typically, the number of positive examples is significantly less than the negative examples. This creates a strong skew towards the negative examples, leading to poor model performance (e.g. suggestions that are less accurate). To avoid this skewness and improve the model performance, “downsampling” is performed to remove some of the negative examples (and sometimes also positive examples if the quantity is too large) so that the positive examples have roughly the same number with the negative examples.
  • Training the Model: After the data is collected and prepared, it is fed to the machine learning model trainer. The model trainer reads historical data and generates a new mathematical model that can predict with the new, unseen data. It is worth mentioning that the raw training dataset is only kept for less than 7 days (no matter if the model is trained from it.)
  • Refreshing the Model: After a new model is trained it is evaluated against a historical data that is not part of the training data. If the model performance is sufficiently good then we replace the existing model with the new model, otherwise the existing model stays unchanged. 
  • Prediction: The newly refreshed model is used to rank the most relevant Tweets and put them to the top of users’ Home Timelines.
This post is unavailable
This post is unavailable.

The existing system

The Home Timeline’s existing prediction system, or more specifically the data collecting and preparation logging pipeline, does its work with batch offline processing. As shown in Figure 2, Server logs (the features) and users engagements (the labels) are first logged to offline storage as the "Raw Dataset". The features and labels are joined to become examples and are then downsampled. Finally, the examples are further split into multiple example datasets based on the engagement types (Favorites and Replies for example) and these datasets are ready to be fed to the model trainers.

This post is unavailable
This post is unavailable.

Currently, the entire prediction system suffers from two main problems, latency and data quality.

Latency

From the Time/Latency dimension, it takes 4-6 days from the time the data was logged until the newly trained model is refreshed in the prediction system (shown in Figure 3 below). The slow turnaround time to refresh the model is hurting the user experience.

This post is unavailable
This post is unavailable.

This latency is rooted in several components:

  • Data Availability: the training process reads daily-partitioned training data and it has to wait until the end of the day to collect all the data (1-2 days),
  • Data Pre-processing: training features and labels are stored separately and preprocessing is needed to join them (1-2 days), 
  • Model Training: model training takes almost a whole day (1 day),
  • Validation Data: the latest daily data is reserved for validation so the model is actually trained on the data that is one day before the latest data (1 day).

Why does a stale model hurt though? This might be intuitive: Twitter is well known for its ever-changing environment -- user behaviors evolve quickly; trends are dynamic and inconstant; special and emergent events occur unexpectedly at any time. A stale model that is trained from old data, even a couple of days old, can be irrelevant. The freshness of the model is especially vital to a responsive, adaptive and successful prediction system that eventually leads to user satisfaction. 

Table 1 also provides some practical evidence: Each column in Table 1 indicates the model performance on day T if we were to use the model that was trained on T-1 (i.e., one day ago), T-2 (i.e., two days ago), T-3 and T-4 for the prediction respectively. We can see that if we could update the model in a more timely fashion, we will be able to make a considerable gain of prediction model performance.

Table 1. Training Data Latency vs. Model Performance
 

T-4

T-3

T-2

T-1

RCE (Relative Cross Entropy)

38.81

38.82

38.84

39.12

 

Data quality

From the Data Quality dimension, there are two sub-issues, data collection and join windows.

Data collection

There are discrepancies between the data logged and the data actually used for prediction. Only partial data is directly logged by the existing pipeline since the complete raw data is too huge. Some large feature groups are reproduced offline and are joined with the directly logged features. As a unfortunate result, there is a lot of room for errors during the join which are extremely difficult to debug (this is what Google’s paper called “pipeline jungles”).

Join windows

The main logic during training data collection is joining the Tweet features with their engagement labels. Ideally, for each served Tweet the data collecting system waits for user engagements for some time and decides whether a Tweet is a positive example or a negative example. The waiting time is called the join window. 

There are typically two types of join windows, namely Tumbling Windows and Sliding Windows (see Figure 4). A tumbling window aligns to the epoch and it introduces an uneven join window phenomenon: the first half of the data has a longer joining window than the second half of the data (see Figure 5 for an illustration). Our existing system uses a tumbling window but what we really need is a sliding window that is created for each record.

This post is unavailable
This post is unavailable.

The extreme case occurs during the last hour in a day, where the served data has only minutes to match the potential labels and usually fails to do so. We also have a “UTC hour” feature, which together with the lesser amount of labels during the last hour, lead to a steep drop of the ranking scores (see Figure 6 below.)

This post is unavailable
This post is unavailable.

To solve the problems mentioned above, we recently built and shipped a new streaming logging pipeline for the home timeline prediction system.

This post is unavailable
This post is unavailable.

New architecture

At a high level, our solution is to adopt the so-called Kappa streaming architecture, to move from offline batch processing to a streaming-first logging pipeline. That is, all the training data is collected and prepared in realtime without any delay. We chose Apache Kafka and the KafkaStreams processing library as our stream processing engine to build the core part of the pipeline. Kafka and its eco-system is not new to Twitter: This blog post discusses the move toward Apache Kafka from in-house EventBus (which builds on top of Apache DistributedLog); Twitter also integrates KafkaStreams into Finatra -- its core web server stack.

A simplified architecture of the proposed logging pipeline is shown in Figure 7. There are multiple sequential steps of the pipeline, as described below:

  1. There are two main paths in the architecture, the features stream(s) and the engagement labels stream(s). Tweet features are published to Kafka by the prediction service right after Tweets are served to users. To make the entire architecture more scalable, two streams are created, Served Keys Topic and Served Features Topic. While the Served Features Topic contains all the features, the Served Key Topic only contains the Tweet/ User pair and other metadata that deterministically identifies a training example. Similarly, user engagements (Likes, Retweets, etc) are sent to the label topic. 
  2. The Served Key Topic first LeftJoins with labels. The resulting stream is (heavily) downsampled especially on negative examples, to make sure we have a balanced amount of positive/negative examples. In the diagram, the width of the arrows roughly indicates the data volume of each stream.
  3. The Served Features Topic InnerJoins with labels. Although there might be thousands of features of each Tweet, the InnerJoin in this step is scalable since we have heavily downsampled the data in the previous step. 
  4. The output of the Step 3 is further divided into multiple streams, representing various engagement types. This step also involves additional finer downsampling. 
  5. The output streams are copied to a distributed storage (e.g. HDFS) for training. 
  6. Currently, we rely on daily batch training with warmstarting for the training process. 
  7. The trained model is updated in the prediction service to serve the ranked Tweets, concluding the entire workflow.
This post is unavailable
This post is unavailable.

Although at a high level the system is not too complicated, we encountered several engineering challenges during the implementation. For example, the default join function in KafkaStreams does not serve our exact needs, so we invested in building our own join library. Also, the amount of the traffic we are handling on the Twitter Home Timeline causes both the KafkaStreams-backed joining and the Kafka cluster producing/consuming to struggle.

We will discuss how we tackle these issues in the following sections.

This post is unavailable
This post is unavailable.

A KafkaStreams join variant

At its core, the processing logic of the logging pipeline is to join served Tweet features with engagement labels. KafkaStreams comes with a default join functionality where: 

  • It uses RocksDb (a performant key-value store https://rocksdb.org/) as the local storage to backup the states, i.e., the temporary data, when the RAM cannot hold the entire state.
  • It uses sliding joining windows to avoid the skewness of the tumbling joining windows discussed above.
  • It is fault-tolerant by default, meaning restarts of the application will restore the state of the entire system before the restarts or crashes.
  • It has two flavors: LeftJoin and InnerJoin. The LeftJoin checks the right side store, and if there are no matching records it immediately emits the left side record.

For our use case, we will keep using the local RocksDb state store and the sliding joining window. However, we have some special requirements:

  • We don’t require strict fault-tolerance: huge historical data restoration could make the restarts difficult to manage. Although this kind of restoration is in general desirable, it may not be needed in our use case of logging training examples — the machine learning models that we use assume Independent and Identical Distribution (i.i.d) of the examples. Moderate loss of the data is often tolerable. 
  • We require customized behavior of the LeftJoin: in our case, the left side is served features while the right side is engagement labels. Labels usually arrive later than the features and we would like to wait a sufficient amount of time before emitting the unmatched left side records. We successfully leveraged KafkaStreams Processor APIs to implement our custom requirements. Table 2 shows an example of the behavior of our customized joins (InnerJoin behaves the same as the default KafkaStreams implementation):
Table 2. A KafkaStreams Join Variant Example

Timestamps

Left

Right

InnerJoin

LeftJoin

1

null

     

2

 

null

   

3

A

 

 

 

4

 

a

(A, a)

 

5

B

 

(B, a)

 

6

 

b

[(A, b), (B, b)]

 

7

A

 

[(A, a), (A, b)]

 

Window Expire

     

[(A, [a, b]), (B, [a, b]), (A, [a, b])]

Taming the large-scale traffic

Twitter’s Home Timeline evaluates candidate Tweets from billions of Tweets to select, organize and deliver the best timeline to users on a daily basis. An interesting challenge of building the new streaming logging pipeline is to gracefully handle the monstrous traffic. 

One critical strategy is to split the streams and do the downsampling that we have briefly mentioned In Figure 6. Here we want to emphasize the advantage of aggressively downsampling in the early stage of the pipeline: We could significantly reduce the burden for later InnerJoins that deal with heavy record payloads (all features). In contrast, directly joining all Tweet features with all labels would often cause problems since the memory footprint of such heavy joins is too huge. We found the split joins make the topology significantly more scalable and resilient (see Figure 8).

This post is unavailable
This post is unavailable.

Splitting the streams is not the only knob we tuned. Over the course of the development, we gradually figured out the best configuration for the Kafka cluster as well as the Kafka services to make sure everything works as expected.

The results

In the beginning of this post we mentioned two main problems with the existing pipeline: Latency and Data Quality and we will go over them and showcase the results one by one. 

From Latency perspective:

  • We drastically shortened the data preparation latency from 2-4 days to 4-6 hours and the end-to-end model refresh latency from 4-6 days to 1 day. The reduced latency also saves tons of engineering time from waiting for the results every time they explore new ideas. The shortened latency benefits tasks like feature engineering and ad-hoc exploration of the new models because of the fast iteration the new pipeline brings.
  • The performance of the new model is better compared with the old model for both model training and real-world prediction after refreshing the model. Users come to Twitter for the most up-to-date information and since the momentum is fast changing on Twitter, newer data and models are preferred.

For Data Quality:

  • Because of its ability to absorb the full set of data (the features), the new pipeline eliminates the long-existing discrepancy in the data and removes the room for errors in the training data. This also relieves the burden of recurring debugging time from engineers. 
  • As a result of using KafkaStreams’ native sliding joining window, we stopped seeing downgrades of the ranking scores due to the unevenly distributed joining window. Figure 9 shows smooth ranking scores produced by the streaming pipeline.
This post is unavailable
This post is unavailable.

In the future, we plan to first train the model on an hourly basis using warmstarting (e.g. initializing a model with weights from a previously trained model). We will also explore streaming training to replace the batch training. So please stay tuned.

Acknowledgement

Home Timeline’s Streaming Logging Pipeline is developed by Peilin Yang, Ikuhiro Ihara, Prasang Upadhyaya, Yan Xia and Siyang Dai. We would like to thank Arvind Thiagarajan, Xiao-hu Yan, Wenzhe Shi and the rest of Timelines Quality for their contributions to the project and this blog, and a special shoutout to leadership for supporting this project - Xun Tang, Shuang Song, Xiaobing Xue and Sandeep Pandey. During the development, Twitter’s Messaging team provides tremendous help with Kafka and we would like to thank Ming Liu, Yiming Zang, Jordan Bull, Kai Huang, Julio Ng for their dedicated support.

This post is unavailable
This post is unavailable.