The basic idea is to create a lookup table of distinct categories indexed by unique integer identifiers. The way to avoid is to collect the unique categories to the driver, loop through them to add the corresponding index to each to create the lookup table (as
Map
or equivalent) and then broadcast the lookup table to all executors. The amount of data that can be collected at the driver is controlled by thespark.driver.maxResultSize
configuration which by default is set at 1 GB for Spark 1.6.1. Bothcollect
andbroadcast
will eventually run into the physical memory limits of the driver and the executors respectively at some point beyond certain number of distinct categories, resulting in a non-scalable solution.
The solution is pretty interesting: build out a new RDD of unique results, and then join that set back. If you’re using SQL (including Spark SQL), I would use the DENSE_RANK() window function.
Comments closed