Solving Spark error: “detected implicit cartesian product for FULL OUTER join between logical plans"

Error

I encountered an error when I want to outer join two dataframes using PySpark.

joined_df = (
    df1
    .join(df2), how='outer')
)

org.apache.spark.sql.AnalysisException:
detected implicit cartesian product for FULL OUTER join between logical plans

Solution

To enable crossJoin in SparkSession can solve this problem.

spark.sql.crossJoin.enabled: true

Code example

spark = (
    SparkSession
    .builder.appName('my_spark')
    .config("spark.sql.crossJoin.enabled", "true")
    .getOrCreate()
)

Solving Spark error: “Cannot broadcast the table that is larger than 8GB"

Error

Although I have already set crossJoin.enable to true and autoBroadcastJoinThreshold to -1, I still got an error.

spark = (
    SparkSession
    .builder.appName('my_spark')
    .config("spark.sql.crossJoin.enabled", "true")
    .config("spark.sql.autoBroadcastJoinThreshold", '-1')
    .getOrCreate()
)

java.util.concurrent.ExecutionException:
org.apache.spark.SparkException: Cannot broadcast the table that is larger than 8GB: 8 GB

This error is due to default PySpark broadcast size limit which is 8 GB.

Solution

There is no specific code or config we can set to solve this problem (at least I didn’t find one).

What we can do is to optimize our code.
Here are some of my ideas.

  • Use select(cols) or selectExpr(cols) to choose the columns we actually need before join to reduce the dataframe’s size.
  • Use filter(expr) to filter out what data we don’t need.
  • Use normal df1.join(df2) instead of using df1.join(broadcast(df2)).

I selected less columns of my dataframe (from 7 cols to 3 cols) to solve my problem.

Solving Spark error: “TaskMemoryManager: Failed to allocate a page"

Error

This error occurs endlessly during PySpark code running.

TaskMemoryManager: Failed to allocate a page.

Solution

I added one spark config in SparkSession that solved my problem.
Set autoBroadcastJoinThreshold to -1.

“spark.sql.autoBroadcastJoinThreshold": ‘-1’

Code example

spark = (
    SparkSession
    .builder.appName('my_spark')
    .config("spark.sql.autoBroadcastJoinThreshold", '-1')
    .getOrCreate()
)