Scalding 0.9: Get it while it’s hot!

Thursday, 3 April 2014

It’s been just over two years since we open sourced Scalding and today we are very excited to release the 0.9 version. Scalding at Twitter powers everything from internal and external facing dashboards, to custom relevance and ad targeting algorithms, including many graph algorithms such as PageRank, approximate user cosine similarity and many more.

There have been a wide breadth of new features added to Scalding since the last release:

Joins
An area of particular activity and impact has been around joins. The Fields API already had an API to do left and right joins over multiple streams, but with 0.9 we bring this functionality to the Typed-API. In 0.9, joins followed by reductions followed by more joins are automatically planned as single map reduce jobs, potentially reducing the number of steps in your pipelines.

 case class UserName(id: Long, handle: String)
 case class UserFavs(byUser: Long, favs: List[Long])
 case class UserTweets(byUser: Long, tweets: List[Long])
 
 def users: TypedSource[UserName]
 def favs: TypedSource[UserFavs]
 def tweets: TypedSource[UserTweets]
 
 def output: TypedSink[(UserName, UserFavs, UserTweets)]
 
 // Do a three-way join in one map-reduce step, with type safety
 users.groupBy(_.id)
   .join(favs.groupBy(_.byUser))
   .join(tweets.groupBy(_.byUser))
   .map { case (uid, ((user, favs), tweets)) =>
    (user, favs, tweets)
   }   
   .write(output)

This includes custom co-grouping, not just left and right joins. To handle skewed data there is a new count-min-sketch based algorithm to solve the curse of the last reducer, and a critical bug-fix for skewed joins in the Fields API.

Input/output
In addition to joins, we’ve added support for new input/output formats:

  • Parquet Format is a columnar storage format which we open sourced in collaboration with Cloudera. Parquet can dramatically accelerate map-reduce jobs that read only a subset of the columns in an dataset, and can similarly reduce storage cost with more efficiently serialization.
  • Avro is an Apache project to standardize serialization with self-describing IDLs. Ebay contributed the scalding-avro module to make it easy to work with Apache Avro serialized data.
  • TemplateTap support eases partitioned writes of data, where the output path depends on the value of the data.

Hadoop counters
We’re also adding support for incrementing Hadoop counters inside map and reduce functions. For cases where you need to share a medium sized data file across all your tasks, support for Hadoop’s distributed cache was added in this release cycle.

Typed API
The typed API saw many improvements. When doing data-cubing, partial aggregation should happen before key expansion and sumByLocalKeys enables this. The type-system enforces constraints on sorting and joining that previously would have caused run-time exceptions. When reducing a data-set to a single value, a ValuePipe is returned. Like TypedPipe is analogous to a program to produce a distributed list, a ValuePipe is a like a program to produce a single value, with which we might want to filter or transform some TypedPipe.

Matrix API
When it comes to linear algebra, Scalding 0.9 introduced a new Matrix API which will replace the former one in our next major release. Due to the associative nature of matrix multiplication we can choose to compute (AB)C or A(BC). One of those orders might create a much smaller intermediate product than the other. The new API includes a dynamic programming optimization of the order of multiplication chains of matrices to minimize realized size along with several other optimizations. We have seen some considerable speedups of matrix operations with this API. In addition to the new optimizing API, we added some functions to efficiently compute all-pair inner-products (A A^T) using DISCO and DIMSUM. These algorithms excel for cases of vectors highly skewed in their support, which is to say most vectors have few non-zero elements, but some are almost completely dense.

Upgrading and Acknowledgements
Some APIs were deprecated, some were removed entirely, and some added more constraints. We have some sed rules to aid in porting. All changes fixed significant warts. For instance, in the Fields API sum takes a type parameter, and works for any Semigroup or Monoid. Several changes improve the design to aid in using scalding more as a library and less as a framework.

This latest release is our biggest to date spanning over 800 commits from 57 contributors It is available today in maven central. We hope Scalding is as useful to you as it is for us and the growing community. Follow us @scalding, join us on IRC (#scalding) or via the mailing list.