scala - Spark df.collect() results in random inconsistent output -
without going deep details, wondering whether came across similar strange behavior.
i running of in scala spark-shell, spark 1.4.1.
i have 1 spark df called "data". these parquet files read spark using sql.context. couple of transformations data set, including filters, groupby's, sorts, counts, ... nothing fancy, , of deterministic, no randomness whatsoever. means creating derived df's subset_1 , subset_2. @ end running following type of calculation:
data.join(subset_1,"key_a").withcolumnrenamed("count","count_1").join(subset_2,"key_a").filter("feature 'inactive'").groupby($"key_a",$"key_b").count.withcolumnrenamed("count","count_2").groupby($"key_a").count.withcolumnrenamed("count","count_3").groupby($"count_3").count.collect() this computation runs "fine" syntax point of view. however, in different runs of query, different results. example:
res82: array[org.apache.spark.sql.row] = array([31,3], [32,2], [34,1], [35,1], [38,1], [42,1], [44,1], [52,1], [61,2], [81,1], [1,4933], [2,2361], [3,924], [4,441], [5,220], [6,130], [7,80], [8,59], [9,36], [10,24], [11,13], [12,12], [13,7], [14,7], [15,11], [16,6], [17,4], [18,6], [19,3], [20,5], [21,6], [22,3], [24,1], [25,1], [26,2], [27,2], [28,1], [29,1], [30,3]) and
res81: array[org.apache.spark.sql.row] = array([32,3], [35,3], [43,1], [46,2], [52,1], [122,1], [145,1], [165,1], [1,3515], [2,1887], [3,836], [4,381], [5,238], [6,136], [7,84], [8,51], [9,39], [10,28], [11,28], [12,13], [13,7], [14,13], [15,8], [16,10], [17,8], [18,6], [19,4], [20,2], [21,4], [22,3], [23,4], [24,1], [25,2], [26,1], [28,3], [29,1], [30,2]) again, same data, same code, no included randomness in do, still: random results.
any thoughts highly appreciated.
ok, "problem" encounter related following:
spark sort key , group ordered iterable?
basically, 1 has careful combinations of sort , groupby.
example: let's have dataframe df columns person, status , date. let's you'd latest status of person. 1 might think of doing like:
df.sort($"date").groupby($"person").agg(sql.functions.last($"status")) unfortunately, if .collect result, , try many times, you'll realize outcomes might different (i guess, identical, if data underlying df consist of 1 partition). because .sort done locally on partitions, , way partitions put during groupby no means guaranteed maintain "global order" of $"date" within groups.
Comments
Post a Comment