Distributed Keras
Distributed Deep Learning with Apache Spark and Keras.
Introduction
Distributed Keras is a distributed deep learning framework built on top of Apache Spark and Keras. We designed the framework in such a way that a developer could implement a new distributed optimizer with ease, thus enabling a person to focus on research. Several distributed methods are supported, such as, but not restricted to, the training of ensemble models, and data parallel models.
As discussed above, most methods are implemented as data parallel models. Data parallel models, as described in [3], is a learning paradigm where multiple replicas of a model are used to optimize a single objective. Using data parallelism, we are able to significantly increase the training time of the model. Depending on the parametrization, we are able to achieve better model performance compared to a more common training approach (e.g., like the SingleTrainer implementation), and yet, spending less time on the training of the model.
Attention: We recommend reading the workflow notebook. This includes a complete description of the problem, how to use it, preprocess your data with Apache Spark, and a performance evaluation of all included distributed optimizers.
Installation
We will guide you how to install Distributed Keras. However, we will assume that an Apache Spark installation is available.
pip
When you only require the framework, just use pip
to install dist-keras.
pip install git+https://github.com/JoeriHermans/dist-keras.git
git
Using this approach, you will be able to easily execute the examples.
git clone https://github.com/JoeriHermans/dist-keras
In order to install possible missing dependencies, and to compile the dist-keras modules, we need to run pip
.
cd dist-keras
pip install -e .
General notes
.bashrc
Make sure the following variables are set in your .bashrc
. It is possible, depending on your system configuration, that the following configuration doesn't have to be applied.
# Example of a .bashrc configuration.
export SPARK_HOME=/usr/lib/spark
export PYTHONPATH="$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH"
Running an example
TODO
Spark 2.0
If you want to run the examples using Apache Spark 2.0.0 and higher. You will need to remove the line containing sqlContext = SQLContext(sc)
. We need to do this because in Spark 2.0+, the SQLContext, and Hive context are now merged in the Spark session.
Algorithms
Single Trainer
A single trainer is in all simplicity a trainer which will use a single Spark thread to train a model. This trainer is usually used as a baseline metrics for new distributed optimizers.
SingleTrainer(keras_model, num_epoch=1, batch_size=1000, features_col="features", label_col="label")
EASGD
The distinctive idea of EASGD is to allow the local workers to perform more exploration (small rho) and the master to perform exploitation. This approach differs from other settings explored in the literature, and focus on how fast the center variable converges [1] .
We want to note the basic version of EASGD is a synchronous algorithm, i.e., once a worker is done processing a batch of the data, it will wait until all other workers have submitted their variables (this includes the weight parameterization, iteration number, and worker id) to the parameter server before starting the next data batch.
EASGD(keras_model, num_workers=2, rho=5.0, learning_rate=0.01, batch_size=1000, features_col="features", label_col="label")
Asynchronous EASGD (currently recommended)
In this section we propose the asynchronous version of EASGD. Instead of waiting on the synchronization of other trainers, this method communicates the elastic difference (as described in the paper), with the parameter server. The only synchronization mechanism that has been implemented, is to ensure no race-conditions occur when updating the center variable.
AsynchronousEASGD(keras_model, num_workers=2, rho=5.0, learning_rate=0.01, batch_size=1000, features_col="features", label_col="label", communcation_window=5)
Asynchronous EAMSGD
Asynchronous EAMSGD is a variant of asynchronous EASGD. It is based on the Nesterov's momentum scheme, where the update of the local worker is modified to incorepare a momentum term.
AsynchornousEAMSGD(keras_model, worker_optimizer, loss, num_workers=2, batch_size=32,
features_col="features", label_col="label", communication_window=10,
rho=5.0, learning_rate=0.01, momentum=0.9, master_port=5000, num_epoch=1)
DOWNPOUR
An asynchronous stochastic gradient descent procedure supporting a large number of model replicas and leverages adaptive learning rates. This implementation is based on the pseudocode provided by [1] .
DOWNPOUR(keras_model, learning_rate=0.01, num_workers=2, batch_size=1000, features_col="features", label_col="label", communication_window=5)
Utility classes
Transformers
A transformer is a utility class which takes a collection of columns (or just a single column), and produces an additional column which is added to the resulting DataFrame. An example of such a Transformer is our LabelIndexTransformer
.
Predictors
Predictors are utility classes which addsa prediction column to the DataFrame given a specified Keras model and its input features.
Known issues
Synchronous algorithms
-
It is possible, depending on your
batch_size
and the size of your dataset, that there will be an unequal amount of batches distributed over the different worker partitions. Imagine havingn - 1
workers withk
batches, and a worker withk - 1
batches. In this case, then - 1
workers all wish to process batchk
, however, the other worker (which has onlyk - 1
batches) has already finished its process. This results in endless "waiting" behaviour, i.e., then - 1
workers are waiting for the last worker to publish its gradient. But this worker can't send its gradient of batchk
to the parameter server since it does not have a batchk
.Possible solutions:
a. Make sure that every partition has an equal amount of batches. For example, using a custom partitioner.
b. Modify the optimizer in such a way that it does allow for timeouts from a worker.
References
-
Zhang, S., Choromanska, A. E., & LeCun, Y. (2015). Deep learning with elastic averaging SGD. In Advances in Neural Information Processing Systems (pp. 685-693). [1]
-
Moritz, P., Nishihara, R., Stoica, I., & Jordan, M. I. (2015). SparkNet: Training Deep Networks in Spark. arXiv preprint arXiv:1511.06051. [2]
-
Dean, J., Corrado, G., Monga, R., Chen, K., Devin, M., Mao, M., ... & Ng, A. Y. (2012). Large scale distributed deep networks. In Advances in neural information processing systems (pp. 1223-1231). [3]
- Pumperla, M. (2015). Elephas. Github Repository https://github.com/maxpumperla/elephas/. [4]