Press "Enter" to skip to content

Category: Spark

Flattening JSON Data With Databricks

Ivan Vazharov gives us a Databricks notebook to parse and flatten JSON using PySpark:

With Databricks you get:

  • An easy way to infer the JSON schema and avoid creating it manually
  • Subtle changes in the JSON schema won’t break things
  • The ability to explode nested lists into rows in a very easy way (see the Notebook below)
  • Speed!

Following is an example Databricks Notebook (Python) demonstrating the above claims. The JSON sample consists of an imaginary JSON result set, which contains a list of car models within a list of car vendors within a list of people. We want to flatten this result into a dataframe.

Click through for the notebook.

Comments closed

Databricks MLflow

Matai Zaharia announces a new Databricks offering:

MLflow is inspired by existing ML platforms, but it is designed to be open in two senses:

  1. Open interface: MLflow is designed to work with any ML library, algorithm, deployment tool or language. It’s built around REST APIs and simple data formats (e.g., a model can be viewed as a lambda function) that can be used from a variety of tools, instead of only providing a small set of built-in functionality. This also makes it easy to add MLflow to your existing ML code so you can benefit from it immediately, and to share code using any ML library that others in your organization can run.
  2. Open source: We’re releasing MLflow as an open source project that users and library developers can extend. In addition, MLflow’s open format makes it very easy to share workflow steps and models across organizations if you wish to open source your code.

Mlflow is still currently in alpha, but we believe that it already offers a useful framework to work with ML code, and we would love to hear your feedback. In this post, we’ll introduce MLflow in detail and explain its components.

Even in alpha, it looks nice.

Comments closed

Spark: DataFrame To RDD For Data Cleansing

Gilad Moscovitch walks us through a common data cleansing problem with Spark data frames:

A problem can arise when one of the inner fields of the json, has undesired non-json values in some of the records.
For instance, an inner field might contains HTTP errors, that would be interpreted as a string, rather than as a struct.
As a result, our schema would look like:
root
 |– headers: struct (nullable = true)
 |    |– items: array (nullable = true)
 |    |    |– element: struct (containsNull = true)
 |– requestBody: string (nullable = true)
Instead of
root
 |– headers: struct (nullable = true)
 |    |– items: array (nullable = true)
 |    |    |– element: struct (containsNull = true)
 |– requestBody: struct (nullable = true)
 |    |– items: array (nullable = true)
 |    |    |– element: struct (containsNull = true)
When trying to explode a “string” type, we will get a miss type error:
org.apache.spark.sql.AnalysisException: Can’t extract value from requestBody#10

Click through to see how to handle this scenario cleanly.

Comments closed

Using The Spark Connector To Speed Up Data Loads

Denzil Riberio explains how you can use the Spark connector for Azure SQL DB and SQL Server to speed up inserting data from Spark into SQL Server 15x over the native JDBC client:

Since the load was taking longer than expected, we examined the sys.dm_exec_requests DMV while load was running, and saw that there was a fair amount of latch contention on various pages, which wouldn’t not be expected if data was being loaded via a bulk API.

Examining the statements being executed, we saw that the JDBC driver uses sp_prepare followed by sp_execute for each inserted row; therefore, the operation is not a bulk insert. One can further example the Spark JDBC connector source code, it builds a batch consisting of singleton insert statements, and then executes the batch via the prep/exec model.

It’s the power of bulk insertion.

Comments closed

Single-Node PySpark

Gengliang Weng, et al, explain that even a single Spark node can be useful:

It’s been a few years since Intel was able to push CPU clock rate higher. Rather than making a single core more powerful with higher frequency, the latest chips are scaling in terms of core count. Hence, it is not uncommon for laptops or workstations to have 16 cores, and servers to have 64 or even 128 cores. In this manner, these multi-core single-node machines’ work resemble a distributed system more than a traditional single core machine.

We often hear that distributed systems are slower than single-node systems when data fits in a single machine’s memory. By comparing memory usage and performance between Spark and Pandas using common SQL queries, we observed that is not always the case. We used three common SQL queries to show single-node comparison of Spark and Pandas:

Query 1. SELECT max(ss_list_price) FROM store_sales

Query 2. SELECT count(distinct ss_customer_sk) FROM store_sales

Query 3. SELECT sum(ss_net_profit) FROM store_sales GROUP BY ss_store_sk

To demonstrate the above, we measure the maximum data size (both Parquet and CSV) Pandas can load on a single node with 244 GB of memory, and compare the performance of three queries.

Click through for the results.

Comments closed

How Spark Works: RDDs And DAGs

Shubham Agarwal gets into the way that Spark translates operations on Resilient Distributed Datasets into actions:

When we do a transformation on any RDD, it gives us a new RDD. But it does not start the execution of those transformations. The execution is performed only when an action is performed on the new RDD and gives us a final result.

So once you perform any action on an RDD, Spark context gives your program to the driver.

The driver creates the DAG (directed acyclic graph) or execution plan (job) for your program. Once the DAG is created, the driver divides this DAG into a number of stages. These stages are then divided into smaller tasks and all the tasks are given to the executors for execution.

Click through for more details.

Comments closed

Spark Architecture: The Spark Streaming Receiver

Oleksii Yermolenko gives us an overview of the Receiver object in Spark Streaming:

The key component of Spark streaming application is called Receiver. It is responsible for opening new connections with the sources, listening events from them and aggregating incoming data within the memory. If receiver’s worker node is running out of memory, it starts using disk storage for persistence operations. But this negatively impacts the overall application’s performance.

All incoming data is first aggregated within receiver into chunks called Blocks. After preconfigured interval of time called batchInterval Spark does logical aggregation of these blocks into another entity called Batch. Batch has links to all blocks formed by receivers and uses this information for generation of RDD. This is the main Spark’s entity which is used by the engine for the operations upon the data. Normally RDD would consist of a number of partitions where each partition would reference the block generated by the receiver on the start stage. Streaming application can have lots of receivers located at different physical nodes, so the actual data would be distributed across the cluster from the start. Batch interval is global for the whole application and is defined on the stage of creation of Streaming Context. Block generation interval is a receiver based property which could be defined through the configuration of  spark.streaming.blockInterval property. By default blocks would be generated every 200ms but you can tune this property according to the nature of your data.

Read the whole thing, which includes some tips on design.

Comments closed

Continuous Processing Mode With Spark Structured Streaming

Joseph Torres, et al, explain how continuous processing mode works with Apache Spark 2.3’s structured streaming:

Suppose we want to build a real-time pipeline to flag fraudulent credit card transactions. Ideally, we want to identify and deny a fraudulent transaction as soon as the culprit has swiped his/her credit card. However, we don’t want to delay legitimate transactions as that would annoy customers. This leads to a strict upper bound on the end-to-end processing latency of our pipeline. Given that there are other delays in transit, the pipeline must process each transaction within 10-20 ms.

Let’s try to build this pipeline in Structured Streaming. Assume that we have a user-defined function “isPaymentFlagged” that can identify the fraudulent transactions. To minimize the latency, we’ll use a 0 second processing time trigger indicating that Spark should start each micro batch as fast as it can with no delays.

They also explain how this newer model differs from the prior model of collecting events in microbatches.

Comments closed

Apache Spark 2.3

The Databricks team has been busy.  They’ve recently announced Apache Spark 2.3 on Databricks:

Continuing with the objectives to make Spark faster, easier, and smarter, Spark 2.3 marks a major milestone for Structured Streaming by introducing low-latency continuous processing and stream-to-stream joins; boosts PySpark by improving performance with pandas UDFs; and runs on Kubernetes clusters by providing native support for Apache Spark applications.

In addition to extending new functionality to SparkR, Python, MLlib, and GraphX, the release focuses on usability, stability, and refinement, resolving over 1400 tickets. Other salient features from Spark contributors include:

Anirudh Ramanathan and Palak Bathia also get into Kubernetes support in Spark 2.3:

Starting with Spark 2.3, users can run Spark workloads in an existing Kubernetes 1.7+ cluster and take advantage of Apache Spark’s ability to manage distributed data processing tasks. Apache Spark workloads can make direct use of Kubernetes clusters for multi-tenancy and sharing through Namespaces and Quotas, as well as administrative features such as Pluggable Authorization and Logging. Best of all, it requires no changes or new installations on your Kubernetes cluster; simply create a container image and set up the right RBAC rolesfor your Spark Application and you’re all set.

Concretely, a native Spark Application in Kubernetes acts as a custom controller, which creates Kubernetes resources in response to requests made by the Spark scheduler. In contrast with deploying Apache Spark in Standalone Mode in Kubernetes, the native approach offers fine-grained management of Spark Applications, improved elasticity, and seamless integration with logging and monitoring solutions. The community is also exploring advanced use cases such as managing streaming workloads and leveraging service meshes like Istio.

Stream to stream joins looks particularly interesting.

Comments closed