Distributed Deep learning with Keras & Spark

Overview

Elephas: Distributed Deep Learning with Keras & Spark

Elephas

Build Status license

Elephas is an extension of Keras, which allows you to run distributed deep learning models at scale with Spark. Elephas currently supports a number of applications, including:

Schematically, elephas works as follows.

Elephas

Table of content:

Introduction

Elephas brings deep learning with Keras to Spark. Elephas intends to keep the simplicity and high usability of Keras, thereby allowing for fast prototyping of distributed models, which can be run on massive data sets. For an introductory example, see the following iPython notebook.

ἐλέφας is Greek for ivory and an accompanying project to κέρας, meaning horn. If this seems weird mentioning, like a bad dream, you should confirm it actually is at the Keras documentation. Elephas also means elephant, as in stuffed yellow elephant.

Elephas implements a class of data-parallel algorithms on top of Keras, using Spark's RDDs and data frames. Keras Models are initialized on the driver, then serialized and shipped to workers, alongside with data and broadcasted model parameters. Spark workers deserialize the model, train their chunk of data and send their gradients back to the driver. The "master" model on the driver is updated by an optimizer, which takes gradients either synchronously or asynchronously.

Getting started

Just install elephas from PyPI with, Spark will be installed through pyspark for you.

pip install elephas

That's it, you should now be able to run Elephas examples.

Basic Spark integration

After installing both Elephas, you can train a model as follows. First, create a local pyspark context

from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('Elephas_App').setMaster('local[8]')
sc = SparkContext(conf=conf)

Next, you define and compile a Keras model

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation
from tensorflow.keras.optimizers import SGD
model = Sequential()
model.add(Dense(128, input_dim=784))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(128))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(10))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer=SGD())

and create an RDD from numpy arrays (or however you want to create an RDD)

from elephas.utils.rdd_utils import to_simple_rdd
rdd = to_simple_rdd(sc, x_train, y_train)

The basic model in Elephas is the SparkModel. You initialize a SparkModel by passing in a compiled Keras model, an update frequency and a parallelization mode. After that you can simply fit the model on your RDD. Elephas fit has the same options as a Keras model, so you can pass epochs, batch_size etc. as you're used to from tensorflow.keras.

from elephas.spark_model import SparkModel

spark_model = SparkModel(model, frequency='epoch', mode='asynchronous')
spark_model.fit(rdd, epochs=20, batch_size=32, verbose=0, validation_split=0.1)

Your script can now be run using spark-submit

spark-submit --driver-memory 1G ./your_script.py

Increasing the driver memory even further may be necessary, as the set of parameters in a network may be very large and collecting them on the driver eats up a lot of resources. See the examples folder for a few working examples.

Distributed Inference / Evaluation

The SparkModel can also be used for distributed inference (prediction) and evaluation. Similar to the fit method, the predict and evaluate methods conform to the Keras Model API.

from elephas.spark_model import SparkModel

# create/train the model, similar to the previous section (Basic Spark Integration)
model = ...
spark_model = SparkModel(model, ...)
spark_model.fit(...)

x_test, y_test = ... # load test data

predictions = spark_model.predict(x_test) # perform inference
evaluation = spark_model.evaluate(x_test, y_test) # perform evaluation/scoring

The paradigm is identical to the data parallelism in training, as the model is serialized and shipped to the workers and used to evaluate a chunk of the testing data. The predict method will take either a numpy array or an RDD.

Spark MLlib integration

Following up on the last example, to use Spark's MLlib library with Elephas, you create an RDD of LabeledPoints for supervised training as follows

from elephas.utils.rdd_utils import to_labeled_point
lp_rdd = to_labeled_point(sc, x_train, y_train, categorical=True)

Training a given LabeledPoint-RDD is very similar to what we've seen already

from elephas.spark_model import SparkMLlibModel
spark_model = SparkMLlibModel(model, frequency='batch', mode='hogwild')
spark_model.train(lp_rdd, epochs=20, batch_size=32, verbose=0, validation_split=0.1, 
                  categorical=True, nb_classes=nb_classes)

Spark ML integration

To train a model with a SparkML estimator on a data frame, use the following syntax.

df = to_data_frame(sc, x_train, y_train, categorical=True)
test_df = to_data_frame(sc, x_test, y_test, categorical=True)

estimator = ElephasEstimator(model, epochs=epochs, batch_size=batch_size, frequency='batch', mode='asynchronous',
                             categorical=True, nb_classes=nb_classes)
fitted_model = estimator.fit(df)

Fitting an estimator results in a SparkML transformer, which we can use for predictions and other evaluations by calling the transform method on it.

prediction = fitted_model.transform(test_df)
pnl = prediction.select("label", "prediction")
pnl.show(100)

prediction_and_label= pnl.rdd.map(lambda row: (row.label, row.prediction))
metrics = MulticlassMetrics(prediction_and_label)
print(metrics.precision())
print(metrics.recall())

If the model utilizes custom activation function, layer, or loss function, that will need to be supplied using the set_custom_objects method:

def custom_activation(x):
    ...
class CustomLayer(Layer):
    ...
model = Sequential()
model.add(CustomLayer(...))

estimator = ElephasEstimator(model, epochs=epochs, batch_size=batch_size)
estimator.set_custom_objects({'custom_activation': custom_activation, 'CustomLayer': CustomLayer})

Distributed hyper-parameter optimization

Hyper-parameter optimization with elephas is based on hyperas, a convenience wrapper for hyperopt and keras. Each Spark worker executes a number of trials, the results get collected and the best model is returned. As the distributed mode in hyperopt (using MongoDB), is somewhat difficult to configure and error prone at the time of writing, we chose to implement parallelization ourselves. Right now, the only available optimization algorithm is random search.

The first part of this example is more or less directly taken from the hyperas documentation. We define data and model as functions, hyper-parameter ranges are defined through braces. See the hyperas documentation for more on how this works.

from hyperopt import STATUS_OK
from hyperas.distributions import choice, uniform

def data():
    from tensorflow.keras.datasets import mnist
    from tensorflow.keras.utils import to_categorical
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train = x_train.reshape(60000, 784)
    x_test = x_test.reshape(10000, 784)
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /= 255
    x_test /= 255
    nb_classes = 10
    y_train = to_categorical(y_train, nb_classes)
    y_test = to_categorical(y_test, nb_classes)
    return x_train, y_train, x_test, y_test


def model(x_train, y_train, x_test, y_test):
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dense, Dropout, Activation
    from tensorflow.keras.optimizers import RMSprop

    model = Sequential()
    model.add(Dense(512, input_shape=(784,)))
    model.add(Activation('relu'))
    model.add(Dropout({{uniform(0, 1)}}))
    model.add(Dense({{choice([256, 512, 1024])}}))
    model.add(Activation('relu'))
    model.add(Dropout({{uniform(0, 1)}}))
    model.add(Dense(10))
    model.add(Activation('softmax'))

    rms = RMSprop()
    model.compile(loss='categorical_crossentropy', optimizer=rms)

    model.fit(x_train, y_train,
              batch_size={{choice([64, 128])}},
              nb_epoch=1,
              show_accuracy=True,
              verbose=2,
              validation_data=(x_test, y_test))
    score, acc = model.evaluate(x_test, y_test, show_accuracy=True, verbose=0)
    print('Test accuracy:', acc)
    return {'loss': -acc, 'status': STATUS_OK, 'model': model.to_yaml()}

Once the basic setup is defined, running the minimization is done in just a few lines of code:

from elephas.hyperparam import HyperParamModel
from pyspark import SparkContext, SparkConf

# Create Spark context
conf = SparkConf().setAppName('Elephas_Hyperparameter_Optimization').setMaster('local[8]')
sc = SparkContext(conf=conf)

# Define hyper-parameter model and run optimization
hyperparam_model = HyperParamModel(sc)
hyperparam_model.minimize(model=model, data=data, max_evals=5)

Distributed training of ensemble models

Building on the last section, it is possible to train ensemble models with elephas by means of running hyper-parameter optimization on large search spaces and defining a resulting voting classifier on the top-n performing models. With data and model defined as above, this is a simple as running

result = hyperparam_model.best_ensemble(nb_ensemble_models=10, model=model, data=data, max_evals=5)

In this example an ensemble of 10 models is built, based on optimization of at most 5 runs on each of the Spark workers.

Discussion

Premature parallelization may not be the root of all evil, but it may not always be the best idea to do so. Keep in mind that more workers mean less data per worker and parallelizing a model is not an excuse for actual learning. So, if you can perfectly well fit your data into memory and you're happy with training speed of the model consider just using keras.

One exception to this rule may be that you're already working within the Spark ecosystem and want to leverage what's there. The above SparkML example shows how to use evaluation modules from Spark and maybe you wish to further process the outcome of an elephas model down the road. In this case, we recommend to use elephas as a simple wrapper by setting num_workers=1.

Note that right now elephas restricts itself to data-parallel algorithms for two reasons. First, Spark simply makes it very easy to distribute data. Second, neither Spark nor Theano make it particularly easy to split up the actual model in parts, thus making model-parallelism practically impossible to realize.

Having said all that, we hope you learn to appreciate elephas as a pretty easy to setup and use playground for data-parallel deep-learning algorithms.

Literature

[1] J. Dean, G.S. Corrado, R. Monga, K. Chen, M. Devin, QV. Le, MZ. Mao, M’A. Ranzato, A. Senior, P. Tucker, K. Yang, and AY. Ng. Large Scale Distributed Deep Networks.

[2] F. Niu, B. Recht, C. Re, S.J. Wright HOGWILD!: A Lock-Free Approach to Parallelizing Stochastic Gradient Descent

[3] C. Noel, S. Osindero. Dogwild! — Distributed Hogwild for CPU & GPU

Maintainers / Contributions

This great project was started by Max Pumperla, and is currently maintained by Daniel Cahall (https://github.com/danielenricocahall). If you have any questions, please feel free to open up an issue or send an email to [email protected]. If you want to contribute, feel free to submit a PR, or start a conversation about how we can go about implementing something.

Issues
  • Elephas slice_X error

    Elephas slice_X error

    Elephas encounter this error with the latest theano. Can you help me fix this.

    Traceback (most recent call last):
      File "/home/zeus/workspace/./KerasOnSparkElephas.py", line 28, in <module>
        from elephas.spark_model import SparkModel
      File "/home/zeus/anaconda2/lib/python2.7/site-packages/elephas/spark_model.py", line 18, in <module>
        from keras.models import model_from_yaml, slice_X
    ImportError: cannot import name slice_X
    
    opened by ducminhnguyen 24
  • Massive dataset + data_generator

    Massive dataset + data_generator

    Hello guys,

    Before I begin, I want to thank you for this amazing tool. It's truely awesome and enables true distributed deep learning out of the box. Now for some questions.

    1. From what I've seen I need to load the dataset in memory before I send it to elephas for distributed processing. When the dataset is massive, as in multiple times my ram, how can I use an hdf5 so that each worker can load parts of the dataset from disk and do the processing? Please let me know.
    2. Is there any support for keras "data_generator" functionality? That enables real time data augmentation?

    Thank you for your support.

    opened by AntreasAntoniou 20
  • Faster async SGD

    Faster async SGD

    What do you think of https://github.com/DoctorTeeth/fred?

    opened by bhack 20
  • Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

    Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

    Could you please help me? I am giving my model and the error that I am seeing here.

    model.compile(loss=lossFunc, optimizer=gradDiscent, metrics=['accuracy']);
    ##############################START: DISTRIBUTED MODEL################
    from pyspark import SparkContext, SparkConf
    #Create spark context
    conf = SparkConf().setAppName('NSL-KDD-DISTRIBUTED').setMaster('local[8]');
    sc = SparkContext(conf=conf);
    
    from elephas.utils.rdd_utils import to_simple_rdd
    #Build RDD (Resilient Distributed Dataset) from numpy features and labels
    rdd = to_simple_rdd(sc, trainX, trainY);
    
    from elephas.spark_model import SparkModel
    from elephas import optimizers as elephas_optimizers
    #Initialize SparkModel from Keras model and Spark Context
    elphOptimizer = elephas_optimizers.Adagrad();
    sparkModel = SparkModel(sc, model, optimizer=elphOptimizer, frequency='epoch', model='asynchronous', num_workers=1);
    #Train Spark Model
    sparkModel.train(rdd, nb_epoch=epochs, batch_size=batchSize, verbose=2);
    
    #Evaluate Spark Model
    score = sparkModel.master_network.evaluate(testX, testY, verbose=2);
    print(score);
    

    ####################ERROR######################## Traceback (most recent call last): File "C:\PythonWorks\mine\dm-dist.py", line 230, in sparkModel.train(rdd, nb_epoch=epochs, batch_size=batchSize, verbose=2); File "C:\Miniconda3\lib\site-packages\elephas\spark_model.py", line 194, in train self._train(rdd, nb_epoch, batch_size, verbose, validation_split, master_url) File "C:\Miniconda3\lib\site-packages\elephas\spark_model.py", line 205, in _train self.start_server() File "C:\Miniconda3\lib\site-packages\elephas\spark_model.py", line 125, in start_server self.server.start() File "C:\Miniconda3\lib\multiprocessing\process.py", line 105, in start self._popen = self._Popen(self) File "C:\Miniconda3\lib\multiprocessing\context.py", line 223, in _Popen return _default_context.get_context().Process._Popen(process_obj) File "C:\Miniconda3\lib\multiprocessing\context.py", line 322, in _Popen return Popen(process_obj) File "C:\Miniconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in init reduction.dump(process_obj, to_child) File "C:\Miniconda3\lib\multiprocessing\reduction.py", line 60, in dump ForkingPickler(file, protocol).dump(obj) File "C:\Miniconda3\lib\site-packages\pyspark\context.py", line 306, in getnewargs "It appears that you are attempting to reference SparkContext from a broadcast " Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

    opened by mohaimenz 18
  • [BUG] ElephasTransformer:get_config has error

    [BUG] ElephasTransformer:get_config has error

    Hi @danielenricocahall I think there's a bug in this line https://github.com/maxpumperla/elephas/blob/master/elephas/ml_model.py#L171

    getattr(self, 'weights', []) should be most likely returning a Broadcast object so it needs .value to extract and actual content

    Also each weight is already np.ndarray so I think the .numpy() is weird?

    This is what I ended up using but not sure if it's a perfet fix: 'weights': [weight.tolist() for weight in self.weights.value],

    A follow-up q: do we need to make weight broadcasted before initializing ElephasTransformer?

    opened by OscarDPan 16
  • upgrade to modern keras

    upgrade to modern keras

    My attempt at upgrading the version of keras used. Associated to: https://github.com/maxpumperla/elephas/issues/22

    opened by sinjax 15
  • Custom layers

    Custom layers

    It seems that custom layers are not recognized in the train phase of spark model, specifically when loaded from yaml (model_from_yaml). I have tried with a custom activation function and it didn't work, the get_from_module function raised an exception "Invalid activation function: ...".

    opened by lenlen 12
  • Not working with regression problems

    Not working with regression problems

    Hello and thank you for this package!

    It seems that this package is not useful when the problem is regression rather than classification. After getting errors on my own data when using this package, I realized it always gives error for regression problems. As an example I slightly modified https://github.com/maxpumperla/elephas/blob/master/examples/ml_pipeline_otto.py to make it a regression problem from line 61 onwards:

    model = Sequential()
    model.add(Dense(512, input_shape=(input_dim,)))
    model.add(Activation('relu'))
    model.add(Dropout(0.5))
    model.add(Dense(512))
    model.add(Activation('relu'))
    model.add(Dropout(0.5))
    model.add(Dense(512))
    model.add(Activation('relu'))
    model.add(Dropout(0.5))
    model.add(Dense(1))
    model.add(Activation('linear'))
    
    model.compile(optimizer='adam', loss='mean_absolute_error')
    
    sgd = optimizers.SGD(lr=0.01)
    sgd_conf = optimizers.serialize(sgd)
    
    # Initialize Elephas Spark ML Estimator
    estimator = ElephasEstimator()
    estimator.set_keras_model_config(model.to_yaml())
    estimator.set_optimizer_config(sgd_conf)
    estimator.set_mode("synchronous")
    estimator.set_loss("mean_absolute_error")
    estimator.set_metrics(['mae'])
    estimator.setFeaturesCol("scaled_features")
    estimator.setLabelCol("index_category")
    estimator.set_epochs(10)
    estimator.set_batch_size(128)
    estimator.set_num_workers(1)
    estimator.set_verbosity(0)
    estimator.set_validation_split(0.15)
    estimator.set_categorical_labels(False)
    # estimator.set_nb_classes(nb_classes)
    
    # Fitting a model returns a Transformer
    pipeline = Pipeline(stages=[string_indexer, scaler, estimator])
    fitted_pipeline = pipeline.fit(train_df)
    
    # Evaluate Spark model
    prediction = fitted_pipeline.transform(train_df)
    pnl = prediction.select("index_category", "prediction")
    pnl.show(2)
    

    Unfortunately it gives error:

    19/04/10 05:32:34 WARN TaskSetManager: Lost task 0.0 in stage 459.0 (TID 6450, local[0], executor 103): org.apache.spark.api.python.PythonException: Traceback (most rece
    nt call last):
      File "/usr/lib/spark/python/pyspark/worker.py", line 372, in main
        process()
      File "/usr/lib/spark/python/pyspark/worker.py", line 367, in process
        serializer.dump_stream(func(split_index, iterator), outfile)
      File "/usr/lib/spark/python/pyspark/serializers.py", line 390, in dump_stream
        vs = list(itertools.islice(iterator, batch))
      File "/usr/lib/spark/python/pyspark/util.py", line 99, in wrapper
        return f(*args, **kwargs)
      File "/usr/lib/spark/python/pyspark/sql/session.py", line 730, in prepare
        verify_func(obj)
      File "/usr/lib/spark/python/pyspark/sql/types.py", line 1389, in verify
        verify_value(obj)
      File "/usr/lib/spark/python/pyspark/sql/types.py", line 1368, in verify_struct
        "length of fields (%d)" % (len(obj), len(verifiers))))
    ValueError: Length of object (7) does not match with length of fields (5)
    
            at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
            at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
            at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
            at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
            at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
            at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
            at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
            at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
            at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
            at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
            at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
            at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
            at org.apache.spark.scheduler.Task.run(Task.scala:121)
            at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    
    Driver stacktrace:
            at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2039)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2027)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2026)
            at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
            at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
            at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2026)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
            at scala.Option.foreach(Option.scala:257)
            at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2260)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2209)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2198)
            at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
            at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
            at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
            at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
            at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
            at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
           at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
            at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
            at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
            at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
            at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
            at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
            at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)
            at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
            at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
            at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
            at py4j.Gateway.invoke(Gateway.java:282)
            at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
            at py4j.commands.CallCommand.execute(CallCommand.java:79)
            at py4j.GatewayConnection.run(GatewayConnection.java:238)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/usr/lib/spark/python/pyspark/worker.py", line 372, in main
        process()
      File "/usr/lib/spark/python/pyspark/worker.py", line 367, in process
        serializer.dump_stream(func(split_index, iterator), outfile)
      File "/usr/lib/spark/python/pyspark/serializers.py", line 390, in dump_stream
        vs = list(itertools.islice(iterator, batch))
      File "/usr/lib/spark/python/pyspark/util.py", line 99, in wrapper
        return f(*args, **kwargs)
      File "/usr/lib/spark/python/pyspark/sql/session.py", line 730, in prepare
        verify_func(obj)
      File "/usr/lib/spark/python/pyspark/sql/types.py", line 1389, in verify
        verify_value(obj)
      File "/usr/lib/spark/python/pyspark/sql/types.py", line 1368, in verify_struct
        "length of fields (%d)" % (len(obj), len(verifiers))))
    ValueError: Length of object (7) does not match with length of fields (5)
    
    opened by mostafam 12
  • pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.lock objects

    pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.lock objects

    The error pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.lock objects when I run it on Spark cluster,how to solve it?

    opened by gzgywh 12
  • TypeError: can't pickle _thread.lock objects

    TypeError: can't pickle _thread.lock objects

    Similar to the issue here but different I think: https://github.com/maxpumperla/elephas/issues/82

    For the MNIST example but not the jupyter example, instead for the below code:

    
    import sys
    import os
    os.environ['JAVA_HOME'] = os.getenv("JAVA_HOME")
    print(os.getenv("JAVA_HOME"))
    
    import findspark
    findspark.init()
    
    #import pyspark
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SQLContext
    from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, StandardScaler, VectorAssembler
    from pyspark.ml import Pipeline
    from pyspark.sql.functions import rand
    from pyspark.mllib.evaluation import MulticlassMetrics
    
    # Keras / Deep Learning
    from keras.models import Sequential
    from keras.layers.core import Dense, Dropout, Activation
    from keras import optimizers, regularizers
    from keras.optimizers import Adam
    
    
    if __name__ == '__main__':
    
    	from pyspark import SparkContext, SparkConf
    	conf = SparkConf().setAppName('MNIST').setMaster('local[8]')
    
    
    	from keras.datasets import mnist
    	from keras.utils import np_utils
    
    	(x_train, y_train), (x_test, y_test) = mnist.load_data()
    	x_train = x_train.reshape(60000, 784)
    	x_test = x_test.reshape(10000, 784)
    	x_train = x_train.astype('float32')
    	x_test = x_test.astype('float32')
    	x_train /= 255
    	x_test /= 255
    	nb_classes = 10
    	y_train = np_utils.to_categorical(y_train, nb_classes)
    
    
    	from keras.models import Sequential
    	from keras.layers.core import Dense, Dropout, Activation
    	from keras.optimizers import SGD
    
    	model = Sequential()
    	model.add(Dense(128, input_dim=784))
    	model.add(Activation('relu'))
    	model.add(Dropout(0.2))
    	model.add(Dense(128))
    	model.add(Activation('relu'))
    	model.add(Dropout(0.2))
    	model.add(Dense(10))
    	model.add(Activation('softmax'))
    	model.compile(loss='categorical_crossentropy', optimizer=SGD())
    
    	from elephas.utils.rdd_utils import to_simple_rdd
    	rdd = to_simple_rdd(sc, x_train, y_train)
    
    	from elephas.spark_model import SparkModel
    
    	spark_model = SparkModel(model, frequency='epoch', mode='asynchronous')
    	spark_model.fit(rdd, epochs=20, batch_size=32, verbose=0, validation_split=0.1)
    
    

    I get an analogous error:

    
    C:\Java\Java8
    Using TensorFlow backend.
    19/10/09 15:27:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    WARNING
    2019-10-09 15:27:12.161649: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2
    >>> Fit model
    Traceback (most recent call last):
      File "pyspark.py", line 105, in <module>
        spark_model.fit(rdd, epochs=20, batch_size=32, verbose=0, validation_split=0.1)
      File "C:\Users\\Anaconda3\lib\site-packages\elephas\spark_model.py", line 151, in fit
        self._fit(rdd, epochs, batch_size, verbose, validation_split)
      File "C:\Users\\Anaconda3\lib\site-packages\elephas\spark_model.py", line 163, in _fit
        self.start_server()
      File "C:\Users\\Anaconda3\lib\site-packages\elephas\spark_model.py", line 118, in start_server
        self.parameter_server.start()
      File "C:\Users\\Anaconda3\lib\site-packages\elephas\parameter\server.py", line 85, in start
        self.server.start()
      File "C:\Users\\Anaconda3\lib\multiprocessing\process.py", line 105, in start
        self._popen = self._Popen(self)
      File "C:\Users\\Anaconda3\lib\multiprocessing\context.py", line 223, in _Popen
        return _default_context.get_context().Process._Popen(process_obj)
      File "C:\Users\\Anaconda3\lib\multiprocessing\context.py", line 322, in _Popen
        return Popen(process_obj)
      File "C:\Users\\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
        reduction.dump(process_obj, to_child)
      File "C:\Users\\Anaconda3\lib\multiprocessing\reduction.py", line 60, in dump
        ForkingPickler(file, protocol).dump(obj)
    TypeError: can't pickle _thread.lock objects
    C:\Java\Java8
    ERFOLGREICH: Der Prozess mit PID 8104 (untergeordnetem Prozess von PID 11096) wurde beendet.
    ERFOLGREICH: Der Prozess mit PID 11096 (untergeordnetem Prozess von PID 14268) wurde beendet.
    ERFOLGREICH: Der Prozess mit PID 14268 (untergeordnetem Prozess von PID 3964) wurde beendet.
    

    And afterwards:

    
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "C:\Users\\Anaconda3\lib\multiprocessing\spawn.py", line 105, in spawn_main
        exitcode = _main(fd)
      File "C:\Users\\Anaconda3\lib\multiprocessing\spawn.py", line 115, in _main
        self = reduction.pickle.load(from_parent)
    EOFError: Ran out of input
    
    opened by datistiquo 11
  • Elephas installation Problem

    Elephas installation Problem

    Hello I face a problem when installing Elephas using pip install elephas, pyspark is installed with all of its versions < 3.2. It said that he is trying to find the best version that meets the requirements. He installed Kersa, tensorflow but the process proceeded and no progress happened. What should I do? I'm in great need of this library. Note: The attached picture shows what is happening as I tried many times and a number of pyspark versions are cached in memory. What are the requirements needed before installing elephas? please I'm a beginner and I need details about Python version, Spark version, etc.. Many thanks for your help. Sama sama

    opened by samasalam 6
  • Issues on installing version 3.0.0

    Issues on installing version 3.0.0

    when installing version 3.0.0 using pip, error message show: ERROR: Could not find a version that satisfies the requirement h5py==3.3.0 (from elephas) (from versions: 2.2.1, 2.3.0b1, 2.3.0, 2.3.1, 2.4.0b1, 2.4.0, 2.5.0, 2.6.0, 2.7.0rc2, 2.7.0, 2.7.1, 2.8.0rc1, 2.8.0, 2.9.0rc1, 2.9.0, 2.10.0, 3.0.0rc1, 3.0.0, 3.1.0) ERROR: No matching distribution found for h5py==3.3.0

    opened by aabbcc002 4
  • CNN in elephas

    CNN in elephas

    Hi all Can we write CNN with elephas, i mean if we can add 2d conv layers in keras model and load 2d images. Can we load pretrained models like vgg for transfer learning.

    opened by Asif-Ejaz 0
  • Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.

    Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.

    Hi. I'm trying to solve errors on Spark_ML_Pipeline.py example a week. But I couldn't find the solution. I just run the original code.

    environments : local jupyter notebook, python=3.7, keras=2.2.5, tf=2.3.0, spark=2.4.8

    The error is below :'( Help me....

    dl_pipeline_fit_score_results(dl_pipeline=dl_pipeline,
                                 train_data=train_data,
                                 test_data=test_data,
                                 label='label_index')
    
    Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 46.0 failed 1 times, most recent failure: Lost task 0.0 in stage 46.0 (TID 245, localhost, executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
    	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:490)
    	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:479)
    	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:597)
    	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
    	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
    	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
    	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
    	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2107)
    	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2107)
    	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    	at org.apache.spark.scheduler.Task.run(Task.scala:123)
    	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
    	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    Caused by: java.io.EOFException
    	at java.io.DataInputStream.readInt(DataInputStream.java:392)
    	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:582)
    	... 26 more
    
    Driver stacktrace:
    	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
    	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
    	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
    	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
    	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
    	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
    	at scala.Option.foreach(Option.scala:257)
    	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
    	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
    	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
    	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
    	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
    	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
    	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2088)
    	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2107)
    	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2132)
    	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
    	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
    	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
    	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
    	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    	at py4j.Gateway.invoke(Gateway.java:282)
    	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    	at py4j.commands.CallCommand.execute(CallCommand.java:79)
    	at py4j.GatewayConnection.run(GatewayConnection.java:238)
    	at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
    	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:490)
    	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:479)
    	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:597)
    	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
    	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
    	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
    	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
    	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2107)
    	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2107)
    	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    	at org.apache.spark.scheduler.Task.run(Task.scala:123)
    	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
    	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	... 1 more
    Caused by: java.io.EOFException
    	at java.io.DataInputStream.readInt(DataInputStream.java:392)
    	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:582)
    	... 26 more
    
    opened by sparrow422 0
  • Does support LSTM or GRU models?

    Does support LSTM or GRU models?

    Hi,

    Does this package support LSTM or GRU models for training on spark?

    Thanks for your great work :)

    opened by ghorbani-mohammad 3
  • Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.

    Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.

    Hi, I'm trying to use elephas for my deep learning models on spark but so far I couldn't even get anything to work on 3 different machines and on multiple notebooks.

    • "ml_pipeline_otto.py" crashes on the load_data_frame function, more specifically on return sqlContext.createDataFrame(data, ['features', 'category']) with the error : Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.

    • "mnist_mlp_spark.py" crashes on the spark_model.fit method with the error : TypeError: can't pickle _thread.RLock objects

    • "My Own Pipeline" crashes right after fitting (it actually trains it) the model with this error : Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.

    I'm running tensorflow 2.1.0, pyspark 3.0.2, jdk-8u281 and python 3.7 and elephas 1.4.2

    opened by diogoribeiro09 6
  • Load Dataset for Image Classification

    Load Dataset for Image Classification

    In the examples the mnist dataset from keras is used, but it is already loaded as numpy.ndarray. I would like to load my RGB image dataset into a Spark dataframe. In Pyspark there is the method:

    spark.read.format("image").option("dropInvalid", True).load(path)

    which allows you to load all the images contained in the path into a dataframe. In the Dataframe there is a row for each image, and each row contains the binary format of the corresponding image. You can convert the binary format to RGB matrices with numpy's methods, but how do you save a Tensor in each row, and then give the Dataframe as input to a convolutional network in Keras?

    Is there any other way to not provide 3 matrices (RGB) for each image, and just provide a large vector of pixels?

    opened by cicciolado 0
  • datasize < batch size?

    datasize < batch size?

    Hi @maxpumperla do you mind give an explanation why you placed this if-statement in the first place? (cc @danielenricocahall )

    https://github.com/maxpumperla/elephas/blob/master/elephas/worker.py#L107 https://github.com/maxpumperla/elephas/blob/master/elephas/worker.py#L116

    Does it matter if my batch size is bigger than the available training data?

    opened by OscarDPan 5
  • Object detection, segmentation using elephas

    Object detection, segmentation using elephas

    Hi Elephas team, awesome project with great efforts, I manage to test it out with some simple model for classification problems. I wonder is there anyway for Elephas to be integrated in some popular net like YOLOv4, EfficientNet, etc. for object detection and segmentation problems. Could I have some documents or examples for them? Big thanks. I would also like to ask why there is a difference in accuracy using RDD and using DF, my friend test it and mention to me those differences. Thanks a lot.

    opened by riasto1 4
  • A problem about LSTM learning

    A problem about LSTM learning

    Dear sir, When I use the following LSTM architecture along with keras/elephas, as: conf = SparkConf().setAppName('RNN_LSTM_Elephas_App').setMaster('spark://master:7077')

    one driver and four workers

    sc = SparkContext(conf=conf)

    rdd = to_simple_rdd(sc, train_X, train_y)

    model = Sequential() model.add(LSTM(32, input_shape=(train_X.shape[1],train_X.shape[2]), activation='tanh', kernel_initializer='lecun_uniform')) model.add(Dense(1)) model.compile(loss="mse", optimizer='adam') sparkRNN_model = SparkModel(model, frequency='epoch', mode='asynchronous') sparkRNN_model.fit(rdd, epochs = 150, batch_size = 32, verbose = 2, validation_split = 0.1)

    The predicting results are the same values as follows: array([[33.390114], [33.390114], [33.390114], [33.390114], ..., [33.390114]]

    By the way, when I use setMaster('local[*]'), the results are different values. Hence, I have no idea about this problem.

    Rick

    opened by LinRick 1
Releases(3.0.0)
  • 3.0.0(Aug 17, 2021)

    • Update to support through the latest Tensorflow (2.6.0), which required converting YAML to JSON (https://github.com/tensorflow/tensorflow/releases/tag/v2.4.3).
    • Remove hyperparameter optimization feature, as Hyperas is archived and causing some compatibility issues
    Source code(tar.gz)
    Source code(zip)
  • 2.1.0(Apr 23, 2021)

  • 2.0.0(Apr 23, 2021)

  • 1.4.1(Feb 6, 2021)

    • Add support to predict probabilities in classification (https://github.com/maxpumperla/elephas/pull/177)
    • Fix gradient computation in synchronous mode (https://github.com/maxpumperla/elephas/pull/176)
    • Performance improvement for distributed predict (https://github.com/maxpumperla/elephas/pull/176)
    Source code(tar.gz)
    Source code(zip)
  • 1.3.1(Jan 31, 2021)

  • 1.2.1(Jan 25, 2021)

  • 1.2.0(Jan 24, 2021)

  • 1.1.0(Jan 19, 2021)

    • Add support for parallel prediction/evaluation in SparkModel
    • Added backwards compatibility for ElephasEstimator with setFeaturesCol, setLabelCol, and setOutputCol, along with a deprecation warning
    • Fixed a bug with socket server in hogwild mode (no longer behaving identical to asynchronous mode)
    • Added typehints, as we're now only supporting Python 3
    Source code(tar.gz)
    Source code(zip)
  • 1.0.0(Jan 14, 2021)

    • Dropped support for Python 2.7
    • Added support for Tensorflow 2.0.x - 2.1.x
    • Added/fixed capability to train with socket client/server

    Originally from: https://github.com/danielenricocahall/elephas/releases/tag/1.0.0

    Source code(tar.gz)
    Source code(zip)
Owner
Max Pumperla
Data Science Professor, Data Scientist & Engineer. DL4J core developer, Hyperopt maintainer, Keras contributor. Author of "Deep Learning and the Game of Go"
Max Pumperla
Scalable, Portable and Distributed Gradient Boosting (GBDT, GBRT or GBM) Library, for Python, R, Java, Scala, C++ and more. Runs on single machine, Hadoop, Spark, Dask, Flink and DataFlow

eXtreme Gradient Boosting Community | Documentation | Resources | Contributors | Release Notes XGBoost is an optimized distributed gradient boosting l

Distributed (Deep) Machine Learning Community 20.6k Feb 13, 2021
Keras udrl - Keras implementation of Upside Down Reinforcement Learning

keras_udrl Keras implementation of Upside Down Reinforcement Learning This is me

Eder Santana 7 Jan 24, 2022
This is an implementation of Googles Yogi-Optimizer in Keras (tf.keras)

Yogi-Optimizer_Keras This is an implementation of Googles Yogi-Optimizer in Keras (tf.keras) The NeurIPS-Paper can be found here: http://papers.nips.c

null 12 Dec 15, 2021
PArallel Distributed Deep LEarning: Machine Learning Framework from Industrial Practice (『飞桨』核心框架,深度学习&机器学习高性能单机、分布式训练和跨平台部署)

English | 简体中文 Welcome to the PaddlePaddle GitHub. PaddlePaddle, as the only independent R&D deep learning platform in China, has been officially open

null 17.5k Jan 30, 2022
Lightweight, Portable, Flexible Distributed/Mobile Deep Learning with Dynamic, Mutation-aware Dataflow Dep Scheduler; for Python, R, Julia, Scala, Go, Javascript and more

Apache MXNet (incubating) for Deep Learning Apache MXNet is a deep learning framework designed for both efficiency and flexibility. It allows you to m

The Apache Software Foundation 19.8k Feb 5, 2022
Lightweight, Portable, Flexible Distributed/Mobile Deep Learning with Dynamic, Mutation-aware Dataflow Dep Scheduler; for Python, R, Julia, Scala, Go, Javascript and more

Apache MXNet (incubating) for Deep Learning Apache MXNet is a deep learning framework designed for both efficiency and flexibility. It allows you to m

The Apache Software Foundation 19.8k Feb 2, 2022
Lightweight, Portable, Flexible Distributed/Mobile Deep Learning with Dynamic, Mutation-aware Dataflow Dep Scheduler; for Python, R, Julia, Scala, Go, Javascript and more

Apache MXNet (incubating) for Deep Learning Apache MXNet is a deep learning framework designed for both efficiency and flexibility. It allows you to m

The Apache Software Foundation 19.3k Feb 12, 2021
Lightweight, Portable, Flexible Distributed/Mobile Deep Learning with Dynamic, Mutation-aware Dataflow Dep Scheduler; for Python, R, Julia, Scala, Go, Javascript and more

Apache MXNet (incubating) for Deep Learning Master Docs License Apache MXNet (incubating) is a deep learning framework designed for both efficiency an

ROCm Software Platform 30 Dec 23, 2020
DeepSpeed is a deep learning optimization library that makes distributed training easy, efficient, and effective.

DeepSpeed is a deep learning optimization library that makes distributed training easy, efficient, and effective.

Microsoft 6.2k Feb 3, 2022
QKeras: a quantization deep learning library for Tensorflow Keras

QKeras github.com/google/qkeras QKeras 0.8 highlights: Automatic quantization using QKeras; Stochastic behavior (including stochastic rouding) is disa

Google 357 Feb 1, 2022
MMdnn is a set of tools to help users inter-operate among different deep learning frameworks. E.g. model conversion and visualization. Convert models between Caffe, Keras, MXNet, Tensorflow, CNTK, PyTorch Onnx and CoreML.

MMdnn MMdnn is a comprehensive and cross-framework tool to convert, visualize and diagnose deep learning (DL) models. The "MM" stands for model manage

Microsoft 5.5k Feb 4, 2022
Advanced Deep Learning with TensorFlow 2 and Keras (Updated for 2nd Edition)

Advanced Deep Learning with TensorFlow 2 and Keras (Updated for 2nd Edition)

Packt 1.2k Jan 31, 2022
Keras like implementation of Deep Learning architectures from scratch using numpy.

Mini-Keras Keras like implementation of Deep Learning architectures from scratch using numpy. How to contribute? The project contains implementations

MANU S PILLAI 5 Oct 10, 2021
Realtime Face Anti Spoofing with Face Detector based on Deep Learning using Tensorflow/Keras and OpenCV

Realtime Face Anti-Spoofing Detection ?? Realtime Face Anti Spoofing Detection with Face Detector to detect real and fake faces Please star this repo

Prem Kumar 49 Jan 30, 2022
This source code is implemented using keras library based on "Automatic ocular artifacts removal in EEG using deep learning"

CSP_Deep_EEG This source code is implemented using keras library based on "Automatic ocular artifacts removal in EEG using deep learning" {https://www

Seyed Mahdi Roostaiyan 1 Jan 7, 2022
Vision Deep-Learning using Tensorflow, Keras.

Welcome! I am a computer vision deep learning developer working in Korea. This is my blog, and you can see everything I've studied here. https://www.n

kimminjun 5 Jan 10, 2022
A deep learning network built with TensorFlow and Keras to classify gender and estimate age.

Convolutional Neural Network (CNN). This repository contains a source code of a deep learning network built with TensorFlow and Keras to classify gend

Pawel Dziemiach 1 Dec 18, 2021
A deep learning network built with TensorFlow and Keras to classify gender and estimate age.

Convolutional Neural Network (CNN). This repository contains a source code of a deep learning network built with TensorFlow and Keras to classify gend

Pawel Dziemiach 1 Dec 19, 2021