cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results for 
Search instead for 
Did you mean: 

Why does .collect() cause a shuffle while .show() does not?

VaderK
New Contributor

I’m learning Spark using the book Spark: The Definitive Guide and came across some behavior I’m trying to understand.

I am reading a csv_file which has 3 columns: DEST_COUNTRY_NAMEORIGIN_COUNTRY_NAMEcount. The dataset has a total of 256 rows.

Here’s the code I'm running:

data = (spark
.read
.format('csv')
.option('inferSchema', 'true')
.option('header', 'true')
.option('path', 'dbfs:/FileStore/tables/spark_definitive_guide/data/data/csv/2015_summary.csv')
.load())

# Set numper of partitions to 5

spark.conf.set("spark.sql.shuffle.partitions", "5")


Now, I’m trying to sort the Data Frame by the count column. The physical plan looks like this:

data.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#337 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#337 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, [plan_id=255]
+- FileScan csv [DEST_COUNTRY_NAME#335,ORIGIN_COUNTRY_NAME#336,count#337] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/FileStore/tables/spark_definitive_guide/data/data/csv/201..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>

Now here's where I’m confused:

Question 1
Why does .collect() cause a shuffle, but .show(1000) does not?

When I run: 

data.sort("count").show(1000)​

I don’t see any shuffle in the Spark UI DAG — the data just gets scanned and displayed.

But when I run:

data.sort("count").collect()


I do see a shuffle in the DAG and execution plan.

Both commands are retrieving all 256 rows, so why the difference? Why does .collect() trigger a shuffle, while .show(1000) does not?

DAG of .show

show.png

DAG of .collect

collect.png

Question 2
Why does the DAG for .collect() for Scan csv returning 512 rows when the file only has 256 as shown above in the the .collect DAG?

Question 3
When we do .collect() and the data is loaded into driver memory, how does this happen under the hood? Is the data directly sent from executors to driver memory over the network, or is the data first written to disk and then read by driver into memory?


Any help in understanding this behavior would be much appreciated. Thank you!

1 ACCEPTED SOLUTION

Accepted Solutions

-werners-
Esteemed Contributor III

Q1: collect() moves all data to the driver, hence a shufle. show() just shows x records from the df, from a partition (or more partitions if x > partition size).  No shuffling needed.
For display purposes the results are of course gathered on the driver but this is not a spark shuffle.
Q2: I´d say that using collect, the file is read twice. Perhaps multiple stages.  Spark can read a file multiple times if necessary.
Q3: data is not written to disk, so it is worker RAM -> network -> driver RAM

View solution in original post

1 REPLY 1

-werners-
Esteemed Contributor III

Q1: collect() moves all data to the driver, hence a shufle. show() just shows x records from the df, from a partition (or more partitions if x > partition size).  No shuffling needed.
For display purposes the results are of course gathered on the driver but this is not a spark shuffle.
Q2: I´d say that using collect, the file is read twice. Perhaps multiple stages.  Spark can read a file multiple times if necessary.
Q3: data is not written to disk, so it is worker RAM -> network -> driver RAM

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now
OSZAR »