Stream-To-Stream Joins In Spark

Ayush Tiwari shows how to join a pair of streams in Apache Spark 2.3:

In Spark 2.3, it added support for stream-stream joins, i.e, we can join two streaming Datasets/DataFrames and in this blog we are going to see how beautifully spark now give support for joining the two streaming dataframes.

I this example, I am going to use

Apache Spark 2.3.0
Apache Kafka
Scala 2.11.8

Click through for the demo.

Spark: DataFrame To RDD For Data Cleansing

Gilad Moscovitch walks us through a common data cleansing problem with Spark data frames:

A problem can arise when one of the inner fields of the json, has undesired non-json values in some of the records.
For instance, an inner field might contains HTTP errors, that would be interpreted as a string, rather than as a struct.
As a result, our schema would look like:
 |– headers: struct (nullable = true)
 |    |– items: array (nullable = true)
 |    |    |– element: struct (containsNull = true)
 |– requestBody: string (nullable = true)
Instead of
 |– headers: struct (nullable = true)
 |    |– items: array (nullable = true)
 |    |    |– element: struct (containsNull = true)
 |– requestBody: struct (nullable = true)
 |    |– items: array (nullable = true)
 |    |    |– element: struct (containsNull = true)
When trying to explode a “string” type, we will get a miss type error:
org.apache.spark.sql.AnalysisException: Can’t extract value from requestBody#10

Click through to see how to handle this scenario cleanly.

Visualization Over Kafka And KSQL

Shant Hovsepian shows off a data visualization tool which can read Kafka Streams data:

KSQL is a game-changer not only for application developers but also for non-technical business users. How? The SQL interface opens up access to Kafka data to analytics platforms based on SQL. Business analysts who are accustomed to non-coding, drag-and-drop interfaces can now apply their analytical skills to Kafka. So instead of continually building new analytics outputs due to evolving business requirements, IT teams can hand a comprehensive analytics interface directly to the business analysts. Analysts get a self-service environment where they can independently build dashboards and applications.

Arcadia Data is a Confluent partner that is leading the charge for integrating visual analytics and BI technology directly with KSQL. We’ve been working to combine our existing analytics stack with KSQL to provide a platform that requires no complicated new skills for your analysts to visualize streaming data. Just as they will create semantic layers, build dashboards, and deploy analytical applications on batch data, they can now do the same on streaming data. Real-time analytics and visualizations for business users have largely been a misnomer until now. For example, some architectures enabled visualizations for end users by staging Kafka data into a separate data store, which added latency. KSQL removes that latency to let business users see the most recent data directly in Kafka and react immediately.

Click through for a couple repos and demos.

Installing Kafka On Ubuntu

Gaurav Garg has an article on installing Apache Kafka on a fresh Ubuntu installation:

For beginners, the default configurations of the Kafka broker are good enough, but for production-level setup, one must understand each configuration. I am going to explain some of these configurations.

  • The ID of the broker instance in a cluster.
  • zookeeper.connect: The ZooKeeper address (can list multiple addresses comma-separated for the ZooKeeper cluster). Example: localhost:2181,localhost:2182.
  • Time to wait before going down if, for some reason, the broker is not able to connect.

Read the whole thing.

Serializing Data In Scala

Akhil Vijayan has a two-parter on serializing data in Scala.  In the first post, he looks at uPickle:

uPickle serializer is a lightweight Json library for scala. uPickle is built on top of uJson which are used for easy manipulation of json without the need of converting it to a scala case class. We can even use uJson as standalone too. In this blog, I will focus only on uPickle library.

Note: uPickle does not support Scala 2.10; only 2.11 and 2.12 are supported

uPickle (pronounced micro-pickle) is a lightweight JSON serialization library which is fast than many other json serializers. I will talk more about the comparison of different serializers in my next blog. This blog will cover all the basic stuff about uPickle.

Then, he follows up with a comparison to other serializers:

In my previous blog, I talked about how uPickle works. Now I will be comparing it will many other json serializers where I will be serializing and deserializing scala case class.

Before that let me discuss all the json serializers that I have used in my comparison. I will compare uPickle with PlayJson, Circe and Argonaut.

Check it out.

Data Governance On Apache Kafka With Lenses

Antonios Chalkipoulos explains how Landoop’s Lenses product helps with data governance:

One of the fundamental requirements of GDPR is the Right to Retrieve Personal Data.

With Lenses SQL the above requirement can be covered via a set of simple but thorough queries into the topics that contain PII data:

SELECT * from topicA WHERE = "XXX"

Lenses will retrieve and deserialize the data from a binary format (i.e. Avro) into a human-readable format and provide full Control Execution.

Control Execution brings into context the fact that streaming SQL is operating on un-bounded streams of events: A query would normally be a never-ending query. In order to bring query termination schemantics into Apache Kafka we introduced 4 controls:

  • LIMIT 10000 – Force the query to terminate when 10,000 records are matched

  • max.bytes = 20000000 – Force the query to terminate once 20 MBytes have been retrieved

  • max.time = 60000 – Force the query to terminate after 60 seconds

  • = 8 – Force the query to terminate after 8 consecutive polls are empty, indicating we have exhausted a topic

GDPR implementation is a lot trickier for a system like Kafka, but it’s still possible.

Choose Your Hadoop File Format

Alex Woodie explains three of the most common Hadoop file formats:

You have many choices when it comes to storing and processing data on Hadoop, which can be both a blessing and a curse. The data may arrive in your Hadoop cluster in a human readable format like JSON or XML, or as a CSV file, but that doesn’t mean that’s the best way to actually store data.

In fact, storing data in Hadoop using those raw formats is terribly inefficient. Plus, those file formats cannot be stored in a parallel manner. Since you’re using Hadoop in the first place, it’s likely that storage efficiency and parallelism are high on the list of priorities, which means you need something else.

Luckily for you, the big data community has basically settled on three optimized file formats for use in Hadoop clusters: Optimized Row Columnar (ORC), Avro, and Parquet. While these file formats share some similarities, each of them are unique and bring their own relative advantages and disadvantages.

Read the whole thing.  I’m partial to ORC and Avro but won’t blink if someone recommends Parquet.

What’s New In Hadoop 3.1?

Wangda Tan, et al, look at some of the new features in Apache Hadoop 3.1:

The diagram below captures the building blocks together at a high level. If you have to tie this back to a fictitious self-flying drone company, the company will collect tons of raw images from the test drones’ built-in cameras for computer vision. Those images can be stored in the Apache Hadoop data lake in a cost-effective (with erasure coding) yet highly available manner (multiple standby namenodes). Instead of providing GPU machines to each of the data scientists, GPU cards are pooled across the cluster for access by multiple data scientists. GPU cards in each server can be isolated for sharing between multiple users.

Support of Docker containerized workloads means that data scientists/data engineers can bring the deep learning frameworks to the Apache Hadoop data lake and there is no need to have a separate compute/GPU cluster. GPU pooling allows the application of the deep learning neural network algorithms and the training of the data-intensive models using the data collected in the data lake at a speed almost 100x faster than regular CPUs.

If the customer wants to pool the FPGA (field programmable gate array) resources instead of GPUs, this is also possible in Apache Hadoop 3.1. Additionally, use of affinity and anti-affinity labels allows us to control how we deploy the microservices in the clusters — some of the components can be set to have anti-affinity so that they are always deployed in separate physical servers.

It’s interesting to see Hadoop evolve over time as the ecosystem solves more real-time problems instead of focusing on giant batch problems.

Scaling Kafka With Consumer Groups

Suhita Goswami explains how to use consumer groups to scale processing from Apache Kafka:

Kafka builds on the publish-subscribe model with the advantages of a message queuing system. It achieves this with:

  • the use of consumer groups
  • message retention by brokers

When consumers join a group and subscribe to a topic, only one consumer from the group actually consumes each message from the topic. The messages are also retained by the brokers in their topic partitions, unlike traditional message queues.

Multiple consumer groups can read from the same set of topics, and at different times catering to different logical application domains. Thus, Kafka provides both the advantage of high scalability via consumers belonging to the same consumer group and the ability to serve multiple independent downstream applications simultaneously.

Consumer groups are a great solution to the problem of long-running consumers when items to process are independent and can run concurrently.

Using The Spark Connector To Speed Up Data Loads

Denzil Riberio explains how you can use the Spark connector for Azure SQL DB and SQL Server to speed up inserting data from Spark into SQL Server 15x over the native JDBC client:

Since the load was taking longer than expected, we examined the sys.dm_exec_requests DMV while load was running, and saw that there was a fair amount of latch contention on various pages, which wouldn’t not be expected if data was being loaded via a bulk API.

Examining the statements being executed, we saw that the JDBC driver uses sp_prepare followed by sp_execute for each inserted row; therefore, the operation is not a bulk insert. One can further example the Spark JDBC connector source code, it builds a batch consisting of singleton insert statements, and then executes the batch via the prep/exec model.

It’s the power of bulk insertion.


May 2018
« Apr