Press "Enter" to skip to content

Category: Spark

Adding An Index To A Spark RDD

Arijit Tarafdar gives us a good method for adding an index column to a Spark data frame based on a non-unique value:

The basic idea is to create a lookup table of distinct categories indexed by unique integer identifiers. The way to avoid is to collect the unique categories to the driver, loop through them to add the corresponding index to each to create the lookup table (as Map or equivalent) and then broadcast the lookup table to all executors. The amount of data that can be collected at the driver is controlled by the spark.driver.maxResultSize configuration which by default is set at 1 GB for Spark 1.6.1. Both collect and broadcast will eventually run into the physical memory limits of the driver and the executors respectively at some point beyond certain number of distinct categories, resulting in a non-scalable solution.

The solution is pretty interesting:  build out a new RDD of unique results, and then join that set back.  If you’re using SQL (including Spark SQL), I would use the DENSE_RANK() window function.

Comments closed

Aggregating Clickstream Data

Ofer Habushi solves a clickstream aggregation problem using Spark:

At this point, an interesting question came up for us: How can we keep the data partitioned and sorted? 

That’s a challenge. When we sort the entire data set, we shuffle in order to get sorted RDDs and create new partitions, which are different than the partitions we got from Step 1. And what if we do the opposite?

Sort first by creation time and then partition the data? We’ll encounter the same problem. The re-partitioning will cause a shuffle and we’ll lose the sort. How can we avoid that?

Partition→sort = losing the original partitioning

Sort→partition = losing the original sort

There’s a solution for that in Spark. In order to partition and sort in Spark, you can use repartitionAndSortWithinPartitions. 

This is an interesting solution to an ever-more-common problem.

Comments closed

Analysis Of Fantasy Sports Using Spark

Jordan Voiz knows how to get to my heart:

Although the data involved is not large in volume, the types of data processing, data analytics, and machine-learning techniques used in this area are common to many Apache Hadoop use cases. So, fantasy sports analytics provides a good (and fun) use case for exploring the Hadoop ecosystem.

Apache Spark is a natural fit in this environment. As a data processing platform with embedded SQL and machine-learning capabilities, Spark gives programmatic access to data while still providing an easy SQL access point and simple APIs to churn through the data. Users can write code in Python, Java, or Scala, and then use Apache Hive, Apache Impala (incubating), or even Cloudera Search (Apache Solr) for exploratory analysis.

Baseball was my introduction to statistics, and I think that fantasy sports is a great way of driving interest in stats and machine learning.  I’m looking forward to the other two parts of this series.

Comments closed

MapR Goes Spark-First

MapR has introduced a new version of their platform which is based on Spark:

With the emergence of Spark as a unified computing engine, developers can perform ETL and advanced analytics in both continuous (streaming) and batch mode either programmatically (using Scala, Java, Python, or R) or with procedural SQL (using Spark SQL or Hive QL).

With MapR converging the data management platform, you can now take a preferential Spark-first approach. This differs from the traditional approach of starting with extended Hadoop tools and then adding Spark as part of your big data technology stack. As a unified computing engine, Spark can be used for faster batch ETL and analytics (with Spark core instead of MapReduce and Hive), machine learning (with Spark MLlib instead of Mahout), and streaming ETL and analytics (with Spark Streaming instead of Storm).

MapReduce is so 2012…

Comments closed

Resilient Distributed Datasets

Spark is built around the concept of Resilient Distributed Datasets.  If you have not read Matei Zaharia, et al’s paper on the topic, I highly recommend it:

Spark exposes RDDs through a language-integrated API similar to DryadLINQ [31] and FlumeJava [8], where each dataset is represented as an object and transformations are invoked using methods on these objects.

Programmers start by defining one or more RDDs through transformations on data in stable storage (e.g., map and filter). They can then use these RDDs in actions, which are operations that return a value to the application or export data to a storage system. Examples of actions include count (which returns the number of elements in the dataset), collect (which returns the elements themselves), and save (which outputs the dataset to a storage system). Like DryadLINQ, Spark computes RDDs lazily the first time they are used in an action, so that it can pipeline transformations.

In addition, programmers can call a persist method to indicate which RDDs they want to reuse in future operations. Spark keeps persistent RDDs in memory by default, but it can spill them to disk if there is not enough RAM. Users can also request other persistence strategies, such as storing the RDD only on disk or replicating it across machines, through flags to persist. Finally, users can set a persistence priority on each RDD to specify which in-memory data should spill to disk first.

The link also has a video of their initial presentation at NSDI.  Check it out.

Comments closed

Spark SQL For ETL

Ben Snively discusses using Spark SQL as part of an ETL process:

Now interact with SparkSQL through a Zeppelin UI, but re-use the table definitions you created in the Hive metadata store.   You’ll create another table in SparkSQL later in this post to show how that would have been done there.

Connect to the Zeppelin UI and create a new notebook under the Notebook tab. Query to show the tables. You can see that the two tables you created in Hive are also available in SparkSQL.

There are a bunch of tools in here, but for me, the moral of the story is that SQL is a great language for data processing.  Spark SQL has gaps, but has filled many of those gaps over the past year or so, and I recommend giving it a shot.

Comments closed

Crime Analysis

Raghavan Madabusi combines Zeppelin, R, and Spark to perform crime analysis:

Apache Zeppelin, a web-based notebook, enables interactive data analytics including Data Ingestion, Data Discovery, and Data Visualization all in one place. Zeppelin interpreter concept allows any language/data-processing-backend to be plugged into Zeppelin. Currently, Zeppelin supports many interpreters such as Spark (Scala, Python, R, SparkSQL), Hive, JDBC, and others. Zeppelin can be configured with existing Spark eco-system and share SparkContext across Scala, Python, and R.

This links to a rather long post on how to set up and use all of these pieces.  I’m more familiar with Jupyter than Zeppelin, but regardless of the notebook you choose, this is a good exercise to become familiar with the process.

Comments closed

Spark Optimizations

Over at the DZone blog, we learn how to use Distribute By and Cluster By to optimize Spark performance:

Your DataFrame is skewed if most of its rows are located on a small number of partitions, while the majority of the partitions remain empty. You really should avoid such a situation. Why? This makes your application virtually not parallel – most of the time you will be waiting for a single task to finish. Even worse, in some cases you can run out of memory on some executors or cause an excessive spill of data to a disk. All of this can happen if your data is not evenly distributed.

To deal with the skew, you can repartition your data using distribute by. For the expression to partition by, choose something that you know will evenly distribute the data. You can even use the primary key of the DataFrame!

It’s interesting to see how cluster by, distribute by, and sort by can have such different performance consequences.

Comments closed

Tungsten Engine

Sameer Agarwal, Davies Liu, and Reynold Xin show off major Spark engine improvements:

From the above observation, a natural next step for us was to explore the possibility of automatically generating this handwritten code at runtime, which we are calling “whole-stage code generation.” This idea is inspired by Thomas Neumann’s seminal VLDB 2011 paper onEfficiently Compiling Efficient Query Plans for Modern Hardware. For more details on the paper, Adrian Colyer has coordinated with us to publish a review on The Morning Paper blog today.

The goal is to leverage whole-stage code generation so the engine can achieve the performance of hand-written code, yet provide the functionality of a general purpose engine. Rather than relying on operators for processing data at runtime, these operators together generate code at runtime and collapse each fragment of the query, where possible, into a single function and execute that generated code instead.

The possibility of getting an order of magnitude better performance is certainly enticing.

Comments closed

Using Python 3.4 With EMR And Spark

Bruno Faria shows how to use Python 3.4 with Spark on Amazon’s ElasticMapReduce:

An EMR 4.6 cluster running Spark 1.6.1 will still use Python 2.7 as the default interpreter. If you want to change this, you will need to set the environment variable: PYSPARK_PYTHON=python34. You can do this when you launch a cluster by using the configurations API and supplying the configuration shown in the snippet below:

I’m more of a SQL and Scala guy, but if you like Python and are on the Python 3 side of the divide, here’s a solution for you.

Comments closed