How to avoid one Spark Streaming window blocking another window with both running some native Python code -


i'm running spark streaming 2 different windows (on window training model sklearn , other predicting values based on model) , i'm wondering how can avoid 1 window (the "slow" training window) train model, without "blocking" "fast" prediction window.
simplified code looks follows:

conf = sparkconf() conf.setmaster("local[4]") sc = sparkcontext(conf=conf) ssc = streamingcontext(sc, 1)  stream = ssc.sockettextstream("localhost", 7000)   import custom_modelcontainer  ### window 1 ### ### predict data based on model computed in window 2 ###  def predict(time, rdd):     try:        # ... rdd conversion df, feature extraction etc...         # regular python code         x = np.array(df.map(lambda lp: lp.features.toarray()).collect())        pred = custom_modelcontainer.getmodel().predict(x)         # send prediction gui      except exception, e: print e  predictionstream = stream.window(60,60) predictionstream.foreachrdd(predict)   ### window 2 ### ### fit new model ###  def trainmodel(time, rdd): try:     # ... rdd conversion df, feature extraction etc...      x = np.array(df.map(lambda lp: lp.features.toarray()).collect())     y = np.array(df.map(lambda lp: lp.label).collect())      # train test split etc...      model = svr().fit(x_train, y_train)     custom_modelcontainer.setmodel(model)  except exception, e: print e  modeltrainingstream = stream.window(600,600) modeltrainingstream.foreachrdd(trainmodel) 

(note: custom_modelcontainer class wrote save , retrieve trained model)

my setup works fine, exception every time new model trained in second window (which takes minute), first windows not compute predictions until model training finished. actually, guess makes sense, since model fitting , predictions both computed on master node (in non-distributed setting - due sklearn).

so question following: possible train model on single worker node (instead of master node)? if so, how achieve latter , resolve issue?

if not, other suggestion on how make such setup work without delaying computations in window 1?

any appreciated.

edit: guess more general question be: how can run 2 different task on 2 different workers in parallel?

disclaimer: set of ideas. none of these has been tested in practice.


a couple of things can try:

  1. don't collect predict. scikit-learn models typically serializable prediction process can handled on cluster:

    def predict(time, rdd):     ...       model = custom_modelcontainer.getmodel()     pred = (df.rdd.map(lambda lp: lp.features.toarray())         .mappartitions(lambda iter: model.predict(np.array(list(iter)))))     ... 

    it should not parallelize predictions also, if raw data not passed gui, reduce amount of data has collected.

  2. try collect , send data asynchronously. pyspark doesn't provide collectasync method can try achieve similar concurrent.futures:

    from pyspark.rdd import rdd concurrent.futures import threadpoolexecutor  executor = threadpoolexecutor(max_workers=4)  def submit_to_gui(*args): ...  def submit_if_success(f):     if not f.exception():         executor.submit(submit_to_gui, f.result()) 

    continue 1.

    def predict(time, rdd):     ...     f = executor.submit(rdd.collect, pred)     f.add_done_callback(submit_if_success)     ... 
  3. if want use local scikit-learn model try collect , fit using futures above. can try collect once, if data not cached:

    def collect_and_train(df):     y, x = zip(*((p.label, p.features.toarray()) p in df.collect()))     ...     return svr().fit(x_train, y_train)  def set_if_success(f):     if not f.exception():         custom_modelcontainer.setmodel(f.result())    def trainmodel(time, rdd):     ...     f = excutor.submit(collect_and_train, df)     f.add_done_callback(set_if_success)     ... 
  4. move training process cluster either using existing solutions spark-sklearn or custom approach:

    • naive solution - prepare data, coalesce(1) , train single model using mappartitions.
    • distributed solution - create , validate separate model per partition using mappartitions, collect models , use ensemble example taking average or median prediction.
  5. throw away scikit-learn , use model can trained , maintained in distributed, streaming environment (for example streaminglinearregressionwithsgd).

    your current approach makes spark obsolete. if can train model locally there chance can perform other tasks faster on local machine. otherwise program fail on collect.


Comments

Popular posts from this blog

php - Wordpress website dashboard page or post editor content is not showing but front end data is showing properly -

How to get the ip address of VM and use it to configure SSH connection dynamically in Ansible -

javascript - Get parameter of GET request -