Infrastructure

Distributed learning in Torch

By
Monday, 25 January 2016

We recently released Autograd for Torch, which greatly simplified our workflow when experimenting with complex deep learning architectures. The Twitter Cortex team is continuously investing in better tooling for manipulating our large datasets, and distributing training processes across machines in our cluster.

Today we’re open-sourcing four components of our training pipeline, so the community using Torch and/or Autograd can simplify their workflows when it comes to parallelizing training, and manipulating large, distributed datasets.

1. Distributed learning (torch-distlearn)

This library provides common distributed learning algorithms built in Torch with the help of the the torch-ipc library. We’re starting by releasing two distributed learning algorithms:

  • AllReduceSGD: one of our favorite ways of parallelizing training of deep neural nets. We really love AllReduceSGD because it’s synchronous and scales nicely, which means no headache when parallelizing. Just turn it on, get a big speedup and be happy! There’s no extra meta-parameter tuning.
  • AllReduceEA: an implementation of elastic averaging, as proposed in Zhang et al. [1]. Elastic averaging requires more domain knowledge / meta-parameter tuning, but can yield better speedups, as it relies on an asynchronous reduce.

AllReduceSGD spreads the computation of gradients across N processes. Each process is responsible for computing gradients within 1/N of the overall batch. Processes are organized in a binary tree structure, such that each reduce involves going up and down the tree. Assuming the user launches N processes, each consuming 1/N of the dataset (which is easy using torch-dataset, with Dataset(path, {partition=n, partitions=N}) ), then each process would run code that looks like this:

local allReduceSGD = require 'distlearn.AllReduceSGD'(tree) -- Ensure params are the same on all nodes: allReduceSGD.synchronizeParameters(params) for _ = 1,epochs do for _ = 1,steps -- Compute your gradients as normal local grads = computeYourGrads(...) -- Sum and normalize them (this is synchronous): allReduceSGD.sumAndNormalizeGradients(grads) -- Do your SGD as normal sgdStep(params, grads) end -- Before validating we should make sure all nodes have -- the exact same parameter values (sometimes one process sees less -- data, which results in slightly different updates): allReduceSGD.synchronizeParameters(params) -- Validate... end

AllReduceEA is an asynchronous method for distributing SGD. This method is equally easy to use, but requires additional meta-parameters:

-- Use a tau of 10 and an alpha of 0.2 (these are meta-parameters -- that typically need tuning): local allReduceEA = require 'distlearn.AllReduceEA'(tree, 10, 0.2) -- Ensure params are the same on all nodes: allReduceSGD.synchronizeParameters(params) for _ = 1,epochs do for _ = 1,steps -- Compute your gradients as normal local grads = computeYourGrads(...) -- Do your SGD as normal sgdStep(params, grads) -- Average the params: allReduceEA.averageParameters(params) end -- Make sure the center's haven't drifted too far due to -- floating point precision error build up allReduceEA.synchronizeCenter(params) -- Validate... end

2. Datasets (torch-dataset)

This library provides many convenient ways for you to feed data into your model. It makes the acts of sampling, reading, and processing distinct so you can effectively mix and match solutions to fit your particular problem constraints. When training classifiers, having control over sampling is critical to achieve good performance. The abstractions provided in this package let us seamlessly experiment with various sampling procedures, without having to preprocess our data. Partitioning is also supported and plays nicely with sampling, such that you can easily distribute a job to process your dataset, or train a model on it.

All datasets are represented by an index, referencing raw data. We currently support four types of indexes:

  • a CSV index, referencing HTTP data
  • a simple directory of data
  • SlowFS: S3, HDFS or any other slow FS
  • a straight torch Tensor

The CSV index is particularly interesting to us, as it allows arbitrarily distributed datasets, and no need to pre-fetch the data to start processing it. It is particularly powerful for media datasets (images, videos, audio), where we usually store the records in an index.csv file and the actual records on a CDN for very fast retrieval during training. Index CSV files can live on the local file system or out on a SlowFS. Index files are typically small and fast to manipulate (even for very large datasets). In practice, this means that you can run a new training or processing script on an empty machine with zero overhead, as the actual data will get downloaded while you start requesting for it.

Just use a path like slowfs:///user/somebody/path/to/index.csv to load an index directly off of a SlowFS. Here is a very simple example index.csv:

filename,label1,label2

https://your.super.fast.cdn.com/somedir/2baf2949f9416ca0311.jpg,XX

https://your.super.fast.cdn.com/somedir/2baf2949f9416adcjd2.jpg,XX,XY

https://your.super.fast.cdn.com/somedir/2baf2949f9416dcdcd1.jpg,AA

https://your.super.fast.cdn.com/somedir/2baf2949f9416dcdgb3.jpg,AA,BB

And here is an example of CIFAR-10, hosted on Amazon S3. In this case, the urls have been factored, and the prefix url is provided in this meta file.

Given a csv index like this one, you would consume it like this:

Dataset = require 'dataset.Dataset' dataset, numBatches = Dataset('http:// d3jod65ytittfm.cloudfront.net/dataset/cifar10/training.csv') getBatch, numBatches = dataset.sampledBatcher({ -- request batches of 10 elements: batchSize = 10, -- input dimensions for each single element in the batch: inputDims = { 3, 32, 32 }, -- how to sample from the csv index -- (could be lots of other things, like uniform, label-uniform…): samplerKind = 'linear', -- each element fetched triggers a call to this processor, which -- is called in a separate lua env/thread (all processors run in -- parallel): processor = function(fetched, opt, decoded) -- the goal of the processor is to transform the raw asset -- into a decoded tensor of the right size. Here any user -- codec could be used (thrift, json, jpg, mp4, …): local image = require 'image' local bytes = torch.ByteTensor( torch.ByteStorage():string(fetched) ) local pixels = image.decompressPNG(bytes) decoded:copy(pixels) return true end, }) -- now we can simply request batches like this: for i = 1,numBatches do local batch = getBatch() print(batch.input, batch.target) -- batch.input is a 10x3x32x32 tensor -- while batch.target is a 10-dim tensor end

 

Any other type of data can be supported by simply defining another type of processor. The dataset object can also be created with a partition ID such that the index is automatically sharded, and each process only sees a subset of it.

3. Thrift serialization (torch-thrift)

This library provides a codec for the Thrift format. It supports very fast deserialization of arbitrary Thrift binary data to Lua native types. It also includes serialization of Lua native types back into Thrift binary based on a provided schema. We like Thrift a lot at Twitter, to represent our data records, in most of our structured datasets (think Tweet + engagement data + image + user info, etc.).

Example: assume that our dataset above is instead a dataset of Thrift records, each containing a binary image, and metadata about the image. Everything would remain the same, except for the dataset processor, which would look like this:

-- let’s assume a thrift record that’s defined like this: -- struct MediaRecord { -- 1: required string jpeg // the raw jpeg -- 2: optional string caption // a user caption -- 3: optional list<i32> resolution // original resolution -- } -- the processor would now be: processor = function(fetched, opt, decoded) -- fetched is now a thrift record local t = require 'libthrift'.codec() local record = t:read(fetched) -- the record is now a regular lua table, that respects -- the nested structure of the thrift definition local imageData = record[1] local image = require 'image' local bytes = torch.ByteTensor( torch.ByteStorage():string(imageData) ) local pixels = image.decompressPNG(bytes) local caption = record[2] or '<no caption>' local resolution = record[3] or {} -- return decoded image + free-form metadata: local metadata = { caption = caption, originalResolution = resolution, } decoded:copy(pixels) return true, metadata end

4. Interprocess communication (torch-ipc)

This library provides a set of primitives that extend Torch for high performance parallel computation across thread and process boundaries. These primitives are the backbone of our distributed training stack (work queues, client-server connections, grouping processes in a tree, and so on).

Stitching it all together

To get started, first install Torch, by following instructions on torch.ch. Next, go ahead and install our 5 packages with luarocks, Lua’s package manager:

luarocks install autograd luarocks install thrift luarocks install dataset luarocks install ipc luarocks install distlearn

At this point, you should be able to run all the code snippets above, and most importantly the examples we provide:

git clone [email protected]:twitter/torch-distlearn.git cd torch-distlearn/examples th mnist.lua # single process ./mnist.sh # 4 processes, distributed using all reduce SGD ./mnist-ea.sh # 4 processes, distributed using elastic averaging

We recommend you start looking at this MNIST example, or the CIFAR-10 example in the same directory, which both demonstrate how to:

  • use torch-autograd to define your loss function,
  • use torch-dataset to abstract data fetching, pre-processing, and partitioning,7
  • use torch-distlearn to parallelize gradients over multiple processes

We hope you enjoy these tools, and give us some feedback. We accept pull-requests and issues on GitHub. Happy sharding!

This post is unavailable
This post is unavailable.