Working with Columns in Spark

Achilleus has a two-parter on working with columns in Spark. Part 1 covers some of the basic syntax and several functions:

Also, we can have typed columns which is basically a column with an expression encoder specified for the expected input and return type.

scala> val name = $"name".as[String]
name: org.apache.spark.sql.TypedColumn[Any,String] = name
scala> val name = $"name"
name: org.apache.spark.sql.ColumnName = name

There are more than 50 methods(67 the last time I counted ) that can be used for transformations on the column object. We will be covering some of the important methods that are generally used.

Part 2 covers other functions including window functions:

17) over
This is one of the most important function that is used in many of the window operations.We can talk about the window function in detail when discuss about aggregation in spark but for now, it will be fair enough to say that over method provides a way to apply an aggregation over a window specification which in turn can be used to specify partition, order and frame boundaries of the aggregation.

Check out both of these posts for useful tidbits.

Creating Threadpools with ExecutorService in Kafka

Prasanth Nair shows how we can use Java’s ExecutorService to create threadpools for Kafka consumers:

Apache Kafka is one of today’s most commonly used event streaming platforms. While using the Kafka platform, quite often, we run into a scenario where we have to process a large number of events/messages that are placed on a broker. Traditional approaches, where a consumer is listening to a topic and then processes these message within the consumer itself, can become a performance bottleneck if the number of messages being placed on the topic is high. In such cases, the rate at which a consumer can process messages will be very low, as there are a large number of messages getting placed on the topic. A potential solution that can be applied in such a scenario is to offload message processing to the worker threads in a thread pool.

Click through for the Java code.

Flink and Stateful Streaming

Himanshu Gupta explains some of the benefits Apache Flink offers for stateful streaming applicatons:

The 2 main types of stream processing done are:
1. Stateless: Where every event is handled completely independent from the preceding events.
2. Stateful: Where a “state” is shared between events and therefore past events can influence the way current events are processed.

Stateless stream processing is easy to scale up because events are processed independently. But Stateful stream processing is difficult to scale up because the “state” needs to be shared across the events.

Himanshu does point out alternatives, but this isn’t a comparison exercise.

Performance Testing Aiven Kafka

Kevin Feasel



Heikki Nousiainen tests the Aiven platform’s Kafka implementation on different cloud providers at different service levels:

We used a single topic for our write operations with a partition count set to either 3 or 6, depending on the number of brokers in each test cluster. As the test clusters were regular Aiven services, the partitions and replicas were spread out across availability zones.

Messages were produced via the librdkafka_performance tool with a message size of 512 bytes, a default batch size of 10,000 and no compression. Continuing our quest to simulate real-world use, client connections were made over TLS.

We used Kafka version 2.1 running with Java 8; as a side note, it’ll be interesting to benchmark Aiven Kafka running with Java 11 in future tests because we expect Java improvements to positively impact its performance.

During the test, we kept increasing the number of producing clients until we reached the maximum throughput rate each plan tier’s cluster could accept. To verify our readings, we left the load running for some time.

There are some interesting results here.

MRAppMaster Errors Running MapReduce Jobs

I have a post looking at potential causes when PolyBase MapReduce jobs are unable to find the MRAppMaster class:

Let me tell you about one of my least favorite things I like to see in PolyBase:

Error: Could not find or load main class

This error is not limited to PolyBase but is instead an issue when trying to run MapReduce jobs in Hadoop. There are several potential causes, so let’s cover each of them as they relate to PolyBase and hopefully one of these solves your issue.

Click through for four potential solutions to what ails you.

Database-First or Kafka-First for Event Streaming

Gwen Shapiro takes us through a scenario where database-first writes for event streaming makes the most sense:

Note that the DB does quite a lot for you: it enforces serializability, locks, your logical constraints, etc. If the DB is distributed (Vitesse, Cockroach, Spanner, Yugabyte), it does even more.

If you were to go Kafka-first… well, it isn’t impossible. But all those responsibilities now belong to you as a developer. And if you are thinking there may be multiple webservers handling user requests and passing them to Kafka, you have to solve fairly challenging problems.

Read the whole thing.

Handling Errors in Kafka Connect

Robin Moffatt shows us some techniques for handling errors in your Kafka topics:

We’ve seen how setting errors.tolerance = all will enable Kafka Connect to just ignore bad messages. When it does, by default it won’t log the fact that messages are being dropped. If you do set errors.tolerance = all, make sure you’ve carefully thought through if and how you want to know about message failures that do occur. In practice that means monitoring/alerting based on available metrics, and/or logging the message failures.

The most simplistic approach to determining if messages are being dropped is to tally the number of messages on the source topic with those written to the output:

Read on for a few different tactics and how you can implement them.

Batch Consumption from Kafka with Spark

Swapnil Chougule shares a few tips on performing batch processing of a Kafka topic using Apache Spark:

Spark as a compute engine is very widely accepted by most industries. Most of the old data platforms based on MapReduce jobs have been migrated to Spark-based jobs, and some are in the phase of migration. In short, batch computation is being done using Spark. As a result, organizations’ infrastructure and expertise have been developed around Spark.

So, the now question is: can Spark solve the problem of batch consumption of data inherited from Kafka? The answer is yes.

The advantages of doing this are: having a unified batch computation platform, reusing existing infrastructure, expertise, monitoring, and alerting.

Click through to get to the starting point on this as well as a few tips to avoid stumbling blocks.

PolyBase and Hive Shim Errors

I ran into a problem with Hive 3 and PolyBase:

My initial plan was to google things. The specific error: java.lang.IllegalArgumentException: Unrecognized Hadoop major version number. That pops up HIVE-15326 and HIVE-15016 but gave me no immediate joy.

After reaching out to James Rowland-Jones (t), we (by which I mean he) eventually figured out the issue.

Click through for the solution.

Securely Accessing External Resources From Databricks AWS

Itai Weiss shows how you can securely hit external data sources when using Databricks for AWS:

For security purposes, Databricks Apache Spark clusters are deployed in an isolated VPC dedicated to Databricks within the customer’s account. In order to run their data workloads, there is a need to have secure connectivity between the Databricks Spark Clusters and the above data sources.

It is straightforward for Databricks clusters located within the Databricks VPC to access data from AWS S3 which is not a VPC specific service. However, we need a different solution to access data from sources deployed in other VPCs such as AWS Redshift, RDS databases, streaming data from Kinesis or Kafka. This blog will walk you through some of the options you have available to access data from these sources securely and their cost considerations for deployments on AWS. In order to establish a secure connection to these data sources, we will have to configure the Databricks VPC with either one of the following two available options :

Read on for those two options.


April 2019
« Mar