Press "Enter" to skip to content

Category: Spark

Working With Key-Value Pairs In Spark

Teena Vashist shows us a few of the functions available with Spark for working with key-value pairs:

1. Creating Key/Value Pair RDD: 
The pair RDD arranges the data of a row into two parts. The first part is the Key and the second part is the Value. In the below example, I used a parallelize method to create a RDD, and then I used the length method to create a Pair RDD. The key is the length of the each word and the value is the word itself.

scala> val rdd = sc.parallelize(List("hello","world","good","morning"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val pairRdd = rdd.map(a => (a.length,a))
pairRdd: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[1] at map at <console>:26
scala> pairRdd.collect().foreach(println)
(5,hello)
(5,world)
(4,good)
(7,morning)

Click through for more operations.  Spark is a bit less KV-centric than classic MapReduce jobs, but there are still plenty of places where you want to use them.

Comments closed

Load Testing Spark To MongoDB

Abdelghani Tassi has a quick load test to see how fast Spark can load data into MongoDB:

Recently, my company faced the serious challenge of loading a 10 million rows of CSV-formatted geographic data to MongoDB in real-time.

We first tried to make a simple Python script to load CSV files in memory and send data to MongoDB. Processing 10 million rows this way took 26 minutes!

26 minutes for processing a dataset in real-time is unacceptable so we decided to proceed differently.

I’m not sure the test was totally fair, but the results comport to my biases…  There is some good advice here:  storing data in optimized formats (Parquet in this instance) can make a big difference, Spark is useful for ETL style operations, and Scala is generally the fastest language in the Spark world.

Comments closed

Hive And Spark Integrated Together

Bikas Saha and Saumitra Buragohain share some of the direction the Apache Hive team is going in version 3:

The latest release of Apache Hive 3 (part of HDP 3) provides significant new capabilities including ACID support for data ingest. This functionality has many applications, a crucial one being privacy support for data modifications and deletions for GDPR. In addition, ACID also significantly reduces the time to ingest for data, thereby improving data freshness for Hive queries. To provide these features, Hive needs to take full control of the files that store the table data and thus this data is no longer directly accessible by third party systems like Apache Spark. Thus Apache Spark’s built-in support for Hive table data is no longer supported for data managed by Hive 3.

At the same time, Apache Spark has become the de-facto standard for a wide variety complex processing use cases on Big Data. This includes data stored in Hive 3 tables and thus we need a way to provide efficient, high-performance, ACID compliant access to Hive 3 table data from Spark. Fortunately, Apache Spark supports a pluggable approach for various data sources and Apache Hive itself can also be considered as one data source. We have implemented the Hive Warehouse Connector (HWC) as library to provide first class support for Spark to read Hive 3 data for subsequent complex processing (like machine learning) in Spark.

Spark is also commonly used to ETL raw data into Hive tables and this scenario should continue to be supported in the Hive ACID world. To do that, HWC integrates with the latest Hive Streaming APIs to support ingest into Hive both from batch jobs as well as structured streaming jobs.

Overall the Hive Warehouse connector provide efficient read write access to Hive warehouse data from Spark jobs, while providing transparent user identity propagation and maintaining consistent security and access control.

Spark has had some dependencies on Hive (or at least expectations of certain Hive conventions like /tmp/hive existing), but the two systems have historically been more (friendly) competitors than tools integrated in the same chain.

Comments closed

Apache Avro Now Supported In Spark 2.4

Gengliang Wang, et al, announce built-in support for Apache Avro in Spark 2.4:

Apache Avro is a popular data serialization format. It is widely used in the Apache Spark and Apache Hadoop ecosystem, especially for Kafka-based data pipelines. Starting from Apache Spark 2.4 release, Spark provides built-in support for reading and writing Avro data. The new built-in spark-avro module is originally from Databricks’ open source project Avro Data Source for Apache Spark (referred to as spark-avro from now on). In addition, it provides:

  • New functions from_avro() and to_avro() to read and write Avro data within a DataFrame instead of just files.
  • Avro logical types support, including Decimal, Timestamp, and Date types. See the related schema conversions for details.
  • 2X read throughput improvement and 10% write throughput improvement.

In this blog, we examine each of the above features through examples, giving you a flavor of its easy API usage, performance improvements, and merits.

Avro is one of the better rowstore data formats in the Hadoop world, so it’s good to see built-in support here.

Comments closed

Spark MLflow 0.8.0 Released

Aaron Davidson and Jules Damji announce MLflow 0.8.0 on the Spark platform:

Improved MLflow UI Experience

  1. Compact Display for Metrics and Parameters: To avoid clutter and an explosion of columns for each metric or parameter, now we group them together in a single tabular column by default. That way, each runs’ parameters and metrics are listed nearby. Users can still click each parameter or metric to display it in a separate column or sort by it and customize their view this way.

  2. Nesting Runs: For nested MLflow runs, which are common in hyperparameter search or multi-step workflows, the UI will display a collapsible tree underneath each parent run. This makes it much easier to organize and visualize multi-step workflows.

  3. Labeling Runs: While MLflow gives each run a UUID by default, you can also now assign each run a name through the API. These names can also be edited in the UI.

  4. UI Persistence: The MLflow UI now remembers your filters, sorting and column setup in browser local storage so you no longer need to reconfigure the view each time.

Looks like there are some nice additions here.

Comments closed

Working With The Databricks API Via Powershell

Gerhard Brueckl has a Powershell module for interacting with Databricks, either Azure or AWS:

As most of our deployments use PowerShell I wrote some cmdlets to easily work with the Databricks API in my scripts. These included managing clusters (create, start, stop, …), deploying content/notebooks, adding secrets, executing jobs/notebooks, etc. After some time I ended up having 20+ single scripts which was not really maintainable any more. So I packed them into a PowerShell module and also published it to the PowerShell Gallery (https://www.powershellgallery.com/packages/DatabricksPS) for everyone to use!

This looks like a pretty good module if you work with Databricks.

Comments closed

Tuning Apache Spark Applications

Vidisha Gupta has a few tips for tuning Apache Spark programs:

Data Serialization – Serialization plays an important role in increasing the performance of any application. Spark provides two serialization libraries –

  • Java Serialization: By default, spark uses Java’s ObjectOutputStream framework which can work with any class that implements java.io.serializable. This serialization is flexible but slow and creates large serialized formats for many classes.

  • Kryo Serialization: Spark can use Kryo library to serialize objects. It is much faster and compact but does not support all serializable types. So we must register those classes which we want to be serialized. Therefore, Kryo uses indices instead of full class names to identify data types which reduce the size of the serialized data thereby increasing performance. We can initialize our spark conf by setting the value of the property spark.serializer to org.apache.spark.serializer.KryoSerializer. This serializer has a major impact on performance when we are shuffling or caching a large amount of data. To know more about this serializer, refer  Kryo documentation

There are some good tips in here.

Comments closed

Game Theory With Apache Spark

Konor Unyelioglu has a four-part series on solving game theoretical problems with Apache Spark.  Part one lays out the scenario:

One application of game theory is finding optimal resource allocation. For example, as discussed in this article, resource management for heterogeneous wireless networks involves sharing network links, e.g. 3G, Wi-Fi, WiMAX, LTE, between mobile devices of different types and different bandwidth needs. In such environments, game theory algorithms can be effectively used to decide which devices must be allocated to which network resources. Similarly, game theory can be used for allocation of cloud computing resources, e.g. CPU, storage, memory or network bandwidth, between resource clients, as discussed in this article (also see here). The concept of Mobile Edge Computing, where mobile devices offload computationally intensive tasks to the small scale computing servers located in the network edge, could utilize game theory concepts for resource allocation, as studied here.

Using game theory for resource allocation is not limited to cloud computing or telecommunications. For example, in a recent study, a technique was developed based on game theory for efficient distribution of water supply to consumers. Optimum decision making for traffic flow control at major traffic intersections can also be modeled using concepts from game theory, as studied in this article.

Part two defines an algorithm for maximizing utility given the finite set of resources:

Consider Qi(P) defined previously for i = 1, …, N. Let Qi1(P) be defined as the K-dimensional vector where the j-th entry is 1 if and only if there exists an element in Qi(P) where the j-th entry is greater than 0, j = 1,…, K. In other words, if the j-th entry of Qi1(P) is 0 then for every element in Qi(P) the j-th entry must be 0; if the j-th entry of Qi1(P) is 1 then for at least one element in Qi(P) the j-th entry must be 1.

Part 1 starts with the initial price vector at 0, i.e. P = 0, and then at each iterative step finds a new price vector, built on the previous one, that minimizes C(P). At each step, the newly constructed price vector is guaranteed to be no less than the previous one. When the price vector no longer increases, i.e. the newly constructed and previous price vectors are equal, the optimal price Po has been reached. Along with Po we also obtain Qi1(Po), i = 1, …, N, which we call optimal assignments. If the j-th entry of Qi1(Po) = 0 then agent i will not be allocated any units of resource type j. On the other hand, if the j-th entry of Qi1(Po) = 1 then agent i may be allocated some units of resource type j in Part 2 of the algorithm, although not necessarily.

Part three lays out some helper methods for solving the problem in Spark:

For an agent i, the method getMaxUtility() below calculates Vi(P) at price P, i.e. it solves the maximization problem:

max x ∈ Xi {Ui(x) – P * x}

where Xi is the consumption set of the agent.

Recall that

  • Ui = [ui1 ui2 … uiK]T
  • Ui(x) = UiT  * x = ∑ j = 1, 2, …, K  (uij * xij)
  • Ui(x) – P * x = ∑ j = 1, 2, …, K  (uij – pj)* xij

Part four shows us the code for the solution and wraps up:

In this article, we discussed an algorithm based on game theory for optimal resource allocation. The algorithm provides a fairness-based equilibrium where every agent (bidder) maximizes its utility and the resource manager (auctioneer) maximizes the price of the resources it is allocating. In addition, all the available units are allocated across all resource types and no agent is forced to take more than it is willing to. The algorithm is based on economist Ausubel’s Efficient Dynamic Auction Method.

We showed via two examples that the algorithm can be applied to different types of resource allocation problems. In one example, we applied the algorithm to allocate cloud computing resources, e.g. CPU, memory, bandwidth, to computing clients. Secondly, we applied the algorithm to a logistics example where various types of goods are transported over shared transportation resources.

If you were to create a parlor game around things guaranteed to show up in Curated SQL, “Game theory with Apache Spark” is way up on the list.  If somebody does a post combining Apache Kafka with agorics, that’s an instant link too.

Comments closed

Quick Spark Notes

Leela Prasad has a few quick notes on concepts in Apache Spark:

Broadcast Variables

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

There’s some good stuff on accumulators and the SparkSession object in there as well.

Comments closed

Azure Databricks Geospatial Analysis

Jose Mendes gives us an example of using Azure Databricks to perform geospatial analysis:

Magellan is a distributed execution engine for geospatial analytics on big data. It is implemented on top of Apache Spark and deeply leverages modern database techniques like efficient data layout, code generation and query optimization in order to optimize geospatial queries (further details here).

Although people mentioned in their GitHub page that the 1.0.5 Magellan library is available for Apache Spark 2.3+ clusters, I learned through a very difficult process that the only way to make it work in Azure Databricks is if you have an Apache Spark 2.2.1 cluster with Scala 2.11. The cluster I used for this experience consisted of a Standard_DS3_v2 driver type with 14GB Memory, 4 Cores and auto scaling enabled.

In terms of datasets, I used the NYC Taxicab dataset to create the geometry points and the Magellan NYC Neighbourhoods GeoJSON dataset to extract the polygons. Both datasets were stored in a blob storage and added to Azure Databricks as a mount point.

It sounds like this is much faster than using U-SQL to perform the same task.

Comments closed