scala - Spark df.collect() results in random inconsistent output -


without going deep details, wondering whether came across similar strange behavior.

i running of in scala spark-shell, spark 1.4.1.

i have 1 spark df called "data". these parquet files read spark using sql.context. couple of transformations data set, including filters, groupby's, sorts, counts, ... nothing fancy, , of deterministic, no randomness whatsoever. means creating derived df's subset_1 , subset_2. @ end running following type of calculation:

data.join(subset_1,"key_a").withcolumnrenamed("count","count_1").join(subset_2,"key_a").filter("feature 'inactive'").groupby($"key_a",$"key_b").count.withcolumnrenamed("count","count_2").groupby($"key_a").count.withcolumnrenamed("count","count_3").groupby($"count_3").count.collect() 

this computation runs "fine" syntax point of view. however, in different runs of query, different results. example:

res82: array[org.apache.spark.sql.row] = array([31,3], [32,2], [34,1], [35,1], [38,1], [42,1], [44,1], [52,1], [61,2], [81,1], [1,4933], [2,2361], [3,924], [4,441], [5,220], [6,130], [7,80], [8,59], [9,36], [10,24], [11,13], [12,12], [13,7], [14,7], [15,11], [16,6], [17,4], [18,6], [19,3], [20,5], [21,6], [22,3], [24,1], [25,1], [26,2], [27,2], [28,1], [29,1], [30,3]) 

and

res81: array[org.apache.spark.sql.row] = array([32,3], [35,3], [43,1], [46,2], [52,1], [122,1], [145,1], [165,1], [1,3515], [2,1887], [3,836], [4,381], [5,238], [6,136], [7,84], [8,51], [9,39], [10,28], [11,28], [12,13], [13,7], [14,13], [15,8], [16,10], [17,8], [18,6], [19,4], [20,2], [21,4], [22,3], [23,4], [24,1], [25,2], [26,1], [28,3], [29,1], [30,2]) 

again, same data, same code, no included randomness in do, still: random results.

any thoughts highly appreciated.

ok, "problem" encounter related following:

spark sort key , group ordered iterable?

basically, 1 has careful combinations of sort , groupby.

example: let's have dataframe df columns person, status , date. let's you'd latest status of person. 1 might think of doing like:

df.sort($"date").groupby($"person").agg(sql.functions.last($"status")) 

unfortunately, if .collect result, , try many times, you'll realize outcomes might different (i guess, identical, if data underlying df consist of 1 partition). because .sort done locally on partitions, , way partitions put during groupby no means guaranteed maintain "global order" of $"date" within groups.


Comments

Popular posts from this blog

authentication - Mongodb revoke acccess to connect test database -

r - Update two sets of radiobuttons reactively - shiny -

ios - Realm over CoreData should I use NSFetchedResultController or a Dictionary? -