In Spark 2.3, it added support for stream-stream joins, i.e, we can join two streaming Datasets/DataFrames and in this blog we are going to see how beautifully spark now give support for joining the two streaming dataframes.
I this example, I am going to useApache Spark 2.3.0 Apache Kafka 0.11.0.1 Scala 2.11.8
Click through for the demo.
A problem can arise when one of the inner fields of the json, has undesired non-json values in some of the records.For instance, an inner field might contains HTTP errors, that would be interpreted as a string, rather than as a struct.As a result, our schema would look like:root|– headers: struct (nullable = true)| |– items: array (nullable = true)| | |– element: struct (containsNull = true)|– requestBody: string (nullable = true)Instead ofroot|– headers: struct (nullable = true)| |– items: array (nullable = true)| | |– element: struct (containsNull = true)|– requestBody: struct (nullable = true)| |– items: array (nullable = true)| | |– element: struct (containsNull = true)When trying to explode a “string” type, we will get a miss type error:org.apache.spark.sql.AnalysisException: Can’t extract value from requestBody#10
Click through to see how to handle this scenario cleanly.
Since the load was taking longer than expected, we examined the sys.dm_exec_requests DMV while load was running, and saw that there was a fair amount of latch contention on various pages, which wouldn’t not be expected if data was being loaded via a bulk API.
Examining the statements being executed, we saw that the JDBC driver uses sp_prepare followed by sp_execute for each inserted row; therefore, the operation is not a bulk insert. One can further example the Spark JDBC connector source code, it builds a batch consisting of singleton insert statements, and then executes the batch via the prep/exec model.
It’s the power of bulk insertion.
It’s been a few years since Intel was able to push CPU clock rate higher. Rather than making a single core more powerful with higher frequency, the latest chips are scaling in terms of core count. Hence, it is not uncommon for laptops or workstations to have 16 cores, and servers to have 64 or even 128 cores. In this manner, these multi-core single-node machines’ work resemble a distributed system more than a traditional single core machine.
We often hear that distributed systems are slower than single-node systems when data fits in a single machine’s memory. By comparing memory usage and performance between Spark and Pandas using common SQL queries, we observed that is not always the case. We used three common SQL queries to show single-node comparison of Spark and Pandas:
SELECT max(ss_list_price) FROM store_sales
SELECT count(distinct ss_customer_sk) FROM store_sales
SELECT sum(ss_net_profit) FROM store_sales GROUP BY ss_store_sk
To demonstrate the above, we measure the maximum data size (both Parquet and CSV) Pandas can load on a single node with 244 GB of memory, and compare the performance of three queries.
Click through for the results.
When we do a transformation on any RDD, it gives us a new RDD. But it does not start the execution of those transformations. The execution is performed only when an action is performed on the new RDD and gives us a final result.
So once you perform any action on an RDD, Spark context gives your program to the driver.
The driver creates the DAG (directed acyclic graph) or execution plan (job) for your program. Once the DAG is created, the driver divides this DAG into a number of stages. These stages are then divided into smaller tasks and all the tasks are given to the executors for execution.
Click through for more details.
The key component of Spark streaming application is called Receiver. It is responsible for opening new connections with the sources, listening events from them and aggregating incoming data within the memory. If receiver’s worker node is running out of memory, it starts using disk storage for persistence operations. But this negatively impacts the overall application’s performance.
All incoming data is first aggregated within receiver into chunks called Blocks. After preconfigured interval of time called batchInterval Spark does logical aggregation of these blocks into another entity called Batch. Batch has links to all blocks formed by receivers and uses this information for generation of RDD. This is the main Spark’s entity which is used by the engine for the operations upon the data. Normally RDD would consist of a number of partitions where each partition would reference the block generated by the receiver on the start stage. Streaming application can have lots of receivers located at different physical nodes, so the actual data would be distributed across the cluster from the start. Batch interval is global for the whole application and is defined on the stage of creation of Streaming Context. Block generation interval is a receiver based property which could be defined through the configuration of spark.streaming.blockInterval property. By default blocks would be generated every 200ms but you can tune this property according to the nature of your data.
Read the whole thing, which includes some tips on design.
Suppose we want to build a real-time pipeline to flag fraudulent credit card transactions. Ideally, we want to identify and deny a fraudulent transaction as soon as the culprit has swiped his/her credit card. However, we don’t want to delay legitimate transactions as that would annoy customers. This leads to a strict upper bound on the end-to-end processing latency of our pipeline. Given that there are other delays in transit, the pipeline must process each transaction within 10-20 ms.
Let’s try to build this pipeline in Structured Streaming. Assume that we have a user-defined function “isPaymentFlagged” that can identify the fraudulent transactions. To minimize the latency, we’ll use a 0 second processing time trigger indicating that Spark should start each micro batch as fast as it can with no delays.
They also explain how this newer model differs from the prior model of collecting events in microbatches.
The Databricks team has been busy. They’ve recently announced Apache Spark 2.3 on Databricks:
Continuing with the objectives to make Spark faster, easier, and smarter, Spark 2.3 marks a major milestone for Structured Streaming by introducing low-latency continuous processing and stream-to-stream joins; boosts PySpark by improving performance with pandas UDFs; and runs on Kubernetes clusters by providing native support for Apache Spark applications.
In addition to extending new functionality to SparkR, Python, MLlib, and GraphX, the release focuses on usability, stability, and refinement, resolving over 1400 tickets. Other salient features from Spark contributors include:
Vectorized ORC reader [SPARK-16060]
Spark History Server v2 with K-V store [SPARK-18085]
Starting with Spark 2.3, users can run Spark workloads in an existing Kubernetes 1.7+ cluster and take advantage of Apache Spark’s ability to manage distributed data processing tasks. Apache Spark workloads can make direct use of Kubernetes clusters for multi-tenancy and sharing through Namespaces and Quotas, as well as administrative features such as Pluggable Authorization and Logging. Best of all, it requires no changes or new installations on your Kubernetes cluster; simply create a container image and set up the right RBAC rolesfor your Spark Application and you’re all set.
Concretely, a native Spark Application in Kubernetes acts as a custom controller, which creates Kubernetes resources in response to requests made by the Spark scheduler. In contrast with deploying Apache Spark in Standalone Mode in Kubernetes, the native approach offers fine-grained management of Spark Applications, improved elasticity, and seamless integration with logging and monitoring solutions. The community is also exploring advanced use cases such as managing streaming workloads and leveraging service meshes like Istio.
Stream to stream joins looks particularly interesting.
We splitted the pipeline into 2 main units: The aggregator job and the persisting job. The aggregator has one and only one responsibility. To read from the input kafka topic, process the messages and finally emit them to a new kafka topic. The persisting job then takes over and whenever a message is received from topic
temperatures.aggregatedit persists to elasticsearch.
The above approach might seem to be an overkill at first but it provides a lot of benefits (but also some drawbacks). Having two units means that each unit’s health won’t directly affect each other. If the processing job fails due OOM, the persisting job will still be healthy.
One major benefit we’ve seen using this approach is the replay capabilities this approach offers. For example, if at some point we need to persist the messages from
temperatures.aggregatedto Cassandra, it’s just a matter of wiring a new pipeline and start consuming the kafka topic. If we had one job for processing and persisting, we would have to reprocess every record from the
thermostat.data, which comes with a great computational and time cost.
Angelos also discusses some issues he and his team had with Spark Streaming on this data set, so it’s an interesting comparison.
When you’re finished, shut down your cluster using the
aztk spark cluster deletecommand. (While you can delete the nodes from the Pools view in the Azure portal, the command does some additional cleanup for you.) You’ll be charged for each node in the cluster at the usual VM rates for as long as the cluster is provisioned. (One cost-saving option is to use low-priority VMs for the nodes, for savings of up to 90% compared to the usual rates.)
That’s it! Once you get used to it, it’s all quick and easy — the longest part is waiting for the cluster to spin up in Step 5. This is just a summary, but the full details see the guide SparklyR on Azure with AZTK.
It’ll take a bit more than five minutes to get started, but it is a good sight easier than building the servers yourself.