Solving Spark error: “detected implicit cartesian product for FULL OUTER join between logical plans"

Error

I encountered an error when I want to outer join two dataframes using PySpark.

joined_df = (
    df1
    .join(df2), how='outer')
)

org.apache.spark.sql.AnalysisException:
detected implicit cartesian product for FULL OUTER join between logical plans

Solution

To enable crossJoin in SparkSession can solve this problem.

spark.sql.crossJoin.enabled: true

Code example

spark = (
    SparkSession
    .builder.appName('my_spark')
    .config("spark.sql.crossJoin.enabled", "true")
    .getOrCreate()
)
廣告

Solving Spark error: “Cannot broadcast the table that is larger than 8GB"

Error

Although I have already set crossJoin.enable to true and autoBroadcastJoinThreshold to -1, I still got an error.

spark = (
    SparkSession
    .builder.appName('my_spark')
    .config("spark.sql.crossJoin.enabled", "true")
    .config("spark.sql.autoBroadcastJoinThreshold", '-1')
    .getOrCreate()
)

java.util.concurrent.ExecutionException:
org.apache.spark.SparkException: Cannot broadcast the table that is larger than 8GB: 8 GB

This error is due to default PySpark broadcast size limit which is 8 GB.

Solution

There is no specific code or config we can set to solve this problem (at least I didn’t find one).

What we can do is to optimize our code.
Here are some of my ideas.

  • Use select(cols) or selectExpr(cols) to choose the columns we actually need before join to reduce the dataframe’s size.
  • Use filter(expr) to filter out what data we don’t need.
  • Use normal df1.join(df2) instead of using df1.join(broadcast(df2)).

I selected less columns of my dataframe (from 7 cols to 3 cols) to solve my problem.

Solving Spark error: “TaskMemoryManager: Failed to allocate a page"

Error

This error occurs endlessly during PySpark code running.

TaskMemoryManager: Failed to allocate a page.

Solution

I added one spark config in SparkSession that solved my problem.
Set autoBroadcastJoinThreshold to -1.

“spark.sql.autoBroadcastJoinThreshold": ‘-1’

Code example

spark = (
    SparkSession
    .builder.appName('my_spark')
    .config("spark.sql.autoBroadcastJoinThreshold", '-1')
    .getOrCreate()
)

Solving Spark error: “Decompression error: Version not supported" on GCP Dataproc

My gcloud command on terminal to create cluster

sudo gcloud dataproc clusters create my-project \
    --bucket my-bucket \
    --project my-gcp-project \
    --region asia-east1 \
    --zone asia-east1-b \
    --image-version=2.0-ubuntu18 \
    --master-machine-type n1-highmem-8 \
    --master-boot-disk-size 30 \
    --worker-machine-type n1-highmem-8 \
    --worker-boot-disk-size 100 \
    --num-workers 6 \
    --metadata='PIP_PACKAGES=xxhash' \
    --optional-components=JUPYTER \
    --initialization-actions gs://goog-dataproc-initialization-actions-asia-east1/python/pip-install.sh
    --subnet=default


Error

This error occurs during specific PySpark code running.

java.io.IOException: Decompression error: Version not supported

Solution

Change image-version from 2.0-ubuntu18 to 2.1-ubuntu20 can solve this version not supported error.

--image-version=2.1-ubuntu20 \