Solving Spark error: “detected implicit cartesian product for FULL OUTER join between logical plans"

Error

I encountered an error when I want to outer join two dataframes using PySpark.

joined_df = (
    df1
    .join(df2), how='outer')
)

org.apache.spark.sql.AnalysisException:
detected implicit cartesian product for FULL OUTER join between logical plans

Solution

To enable crossJoin in SparkSession can solve this problem.

spark.sql.crossJoin.enabled: true

Code example

spark = (
    SparkSession
    .builder.appName('my_spark')
    .config("spark.sql.crossJoin.enabled", "true")
    .getOrCreate()
)
廣告

Solving Spark error: “Cannot broadcast the table that is larger than 8GB"

Error

Although I have already set crossJoin.enable to true and autoBroadcastJoinThreshold to -1, I still got an error.

spark = (
    SparkSession
    .builder.appName('my_spark')
    .config("spark.sql.crossJoin.enabled", "true")
    .config("spark.sql.autoBroadcastJoinThreshold", '-1')
    .getOrCreate()
)

java.util.concurrent.ExecutionException:
org.apache.spark.SparkException: Cannot broadcast the table that is larger than 8GB: 8 GB

This error is due to default PySpark broadcast size limit which is 8 GB.

Solution

There is no specific code or config we can set to solve this problem (at least I didn’t find one).

What we can do is to optimize our code.
Here are some of my ideas.

  • Use select(cols) or selectExpr(cols) to choose the columns we actually need before join to reduce the dataframe’s size.
  • Use filter(expr) to filter out what data we don’t need.
  • Use normal df1.join(df2) instead of using df1.join(broadcast(df2)).

I selected less columns of my dataframe (from 7 cols to 3 cols) to solve my problem.

Solving Spark error: “TaskMemoryManager: Failed to allocate a page"

Error

This error occurs endlessly during PySpark code running.

TaskMemoryManager: Failed to allocate a page.

Solution

I added one spark config in SparkSession that solved my problem.
Set autoBroadcastJoinThreshold to -1.

“spark.sql.autoBroadcastJoinThreshold": ‘-1’

Code example

spark = (
    SparkSession
    .builder.appName('my_spark')
    .config("spark.sql.autoBroadcastJoinThreshold", '-1')
    .getOrCreate()
)

Solving Spark error: “Decompression error: Version not supported" on GCP Dataproc

My gcloud command on terminal to create cluster

sudo gcloud dataproc clusters create my-project \
    --bucket my-bucket \
    --project my-gcp-project \
    --region asia-east1 \
    --zone asia-east1-b \
    --image-version=2.0-ubuntu18 \
    --master-machine-type n1-highmem-8 \
    --master-boot-disk-size 30 \
    --worker-machine-type n1-highmem-8 \
    --worker-boot-disk-size 100 \
    --num-workers 6 \
    --metadata='PIP_PACKAGES=xxhash' \
    --optional-components=JUPYTER \
    --initialization-actions gs://goog-dataproc-initialization-actions-asia-east1/python/pip-install.sh
    --subnet=default


Error

This error occurs during specific PySpark code running.

java.io.IOException: Decompression error: Version not supported

Solution

Change image-version from 2.0-ubuntu18 to 2.1-ubuntu20 can solve this version not supported error.

--image-version=2.1-ubuntu20 \

解決 git pull 錯誤:Need to specify how to reconcile divergent branches.

  1. 問題發生原因
    1. git push 時的錯誤
    2. git pull 時的錯誤
  2. 查看 git status
  3. 解決方法 Solution
    1. 執行 git config pull.rebase false 並再次 git pull
    2. 再次查看 git status
    3. 最後再執行一次 git push 即可成功

問題發生原因

可能有其他人使用 branch 並 push 上去,master 版本比我的新,導致我 commit 後想要 push 時出錯

git push 時的錯誤

但想要 pull 時又發生另一個錯誤:Need to specify how to reconcile divergent branches.

git pull 時的錯誤

閱讀更多»