How Kafka Does Exactly-Once

Kevin Feasel



Neha Narkhede explains exactly-once messaging and describes how Kafka implements their exactly-once process:

In a distributed system, the computers that make up the system can always fail independently of one another. In the case of Kafka, an individual broker can crash, or a network failure can happen while the producer is sending a message to a topic. Depending on the action the producer takes to handle such a failure, you can get different semantics:

  • At least once semantics: if the producer receives an acknowledgement (ack) from the Kafka broker and acks=all, it means that the message has been written exactly once to the Kafka topic. However, if a producer ack times out or receives an error, it might retry sending the message assuming that the message was not written to the Kafka topic. If the broker had failed right before it sent the ack but after the message was successfully written to the Kafka topic, this retry leads to the message being written twice and hence delivered more than once to the end consumer. And everybody loves a cheerful giver, but this approach can lead to duplicated work and incorrect results.

  • At most once semantics: if the producer does not retry when an ack times out or returns an error, then the message might end up not being written to the Kafka topic, and hence not delivered to the consumer. In most cases it will be, but in order to avoid the possibility of duplication, we accept that sometimes messages will not get through.

  • Exactly once semantics: even if a producer retries sending a message, it leads to the message being delivered exactly once to the end consumer. Exactly-once semantics is the most desirable guarantee, but also a poorly understood one. This is because it requires a cooperation between the messaging system itself and the application producing and consuming the messages. For instance, if after consuming a message successfully you rewind your Kafka consumer to a previous offset, you will receive all the messages from that offset to the latest one, all over again. This shows why the messaging system and the client application must cooperate to make exactly-once semantics happen.

Read on for a discussion of technical details.  I appreciate how Neha linked to a 60+ page design document as well, for those wanting to dig into the details.

Connecting Hadoop To LDAP

Kevin Feasel



Giovanni Lanzani walks through some of the difficulties of getting LDAP working with Hadoop:

This section could probably could have much less workarounds if I’d knew more about LDAP.

But I’m a data scientist at heart and I want to get things done.

If you ever dealt with Hadoop, you know that there are a bunch of non-interactive users, i.e. users who are not supposed to login, such as hdfs, spark, hadoop, etc. These users are important to have. However the groups with the same name are also important to have. For example when using airflow and launching a spark job, the log folders will be created under the airflow user, in the spark group.

LDAP, however, doesn’t allow you, to my knowledge, to have overlapping user/groups, as Unix does.

The way I solved it was to create, in LDAP, the spark_user (or hdfs_user or …) to work around this limitation.

Also, Giovanni apparently lives in an interesting neighborhood.


Mingliang Liu and Rajesh Balamohan explain why you shouldn’t use S3 as your primary Hadoop data store, as well as a tool which helps mitigate those problems:

Some of the real world use cases which can be impacted due to the S3 eventual consistency model are:

  1. Listing Files. Newly created files might not be visible for data processing. In Hive, Spark and MapReduce, this can lead to erroneous results from incomplete source data or failure to commit all intermediate results.

  2. ETL Workflow. Systems like Oozie rely on marker files to trigger the subsequent workflows. Any delay in the visibility of these files can lead to delays in the subsequent workflows.

  3. Existence-guarded path operations. Any action which fails if the destination path is present may see a deleted file in a listing, and so fail — even though the file has already been deleted.

Read on to see how S3Guard works and how to enable it in HDP 2.6.

Neural Nets On Spark

Nisha Muktewar and Seth Hendrickson show how to use Deeplearning4j to build deep learning models on Hadoop and Spark:

Modern convolutional networks can have several hundred million parameters. One of the top-performing neural networks in the Large Scale Visual Recognition Challenge (also known as “ImageNet”), has 140 million parameters to train! These networks not only take a lot of compute and storage resources (even with a cluster of GPUs, they can take weeks to train), but also require a lot of data. With only 30000 images, it is not practical to train such a complex model on Caltech-256 as there are not enough examples to adequately learn so many parameters. Instead, it is better to employ a method called transfer learning, which involves taking a pre-trained model and repurposing it for other use cases. Transfer learning can also greatly reduce the computational burden and remove the need for large swaths of specialized compute resources like GPUs.

It is possible to repurpose these models because convolutional neural networks tend to learn very general features when trained on image datasets, and this type of feature learning is often useful on other image datasets. For example, a network trained on ImageNet is likely to have learned how to recognize shapes, facial features, patterns, text, and so on, which will no doubt be useful for the Caltech-256 dataset.

This is a longer post, but on an extremely interesting topic.

Running H2O In R On Azure HDInsight

Daisy Deng shows how to configure HDInsight to be able to run the H2O package in R rather than Python or Scala:

We provide a few script actions for installing rsparkling on Azure HDInsight. When creating the HDInsight cluster, you can run the following script action for header node:

And run the following action for the worker node:

Please consult Customize Linux-based HDInsight clusters using Script Action for more details.

Click through for the full process.

Real-Time Streaming ETL With Kafka Streams

Yeva Byzek has a tutorial using Kafka and Kafka Streams to perform real-time ETL:

Let’s consider an application that does some real-time stateful stream processing with the Kafka Streams API. We’ll run through a specific example of the end-to-end reference architecture and show you how to:

  • Run a Kafka source connector to read data from another system (a SQLite3 database), then modify the data in-flight using Single Message Transforms (SMTs) before writing it to the Kafka cluster

  • Process and enrich the data from a Java application using the Kafka Streams API (e.g. count and sum)

  • Run a Kafka sink connector to write data from the Kafka cluster to another system (AWS S3)

Read the whole thing.

Kafka Offset Management With Spark Streaming

Guru Medasana and Jordan Hambleton explain how to perform Kafka offset management when using Spark Streaming:

Enabling Spark Streaming’s checkpoint is the simplest method for storing offsets, as it is readily available within Spark’s framework. Streaming checkpoints are purposely designed to save the state of the application, in our case to HDFS, so that it can be recovered upon failure.

Checkpointing the Kafka Stream will cause the offset ranges to be stored in the checkpoint. If there is a failure, the Spark Streaming application can begin reading the messages from the checkpoint offset ranges. However, Spark Streaming checkpoints are not recoverable across applications or Spark upgrades and hence not very reliable, especially if you are using this mechanism for a critical production application. We do not recommend managing offsets via Spark checkpoints.

The authors give several options, so check it out and pick the one that works best for you.

Updates In Apache Kafka

Yeva Byzek announces that Apache Kafka is shipping soon:

We are very excited for the GA for Kafka release which is just days away. This release is bringing many new features as described in the previous Log Compaction blog post.

The most notable new feature is Exactly Once Semantics (EOS).  Kafka’s EOS capabilities provide more stringent idempotent producer semantics with exactly once, in-order delivery per partition, and stronger transactional guarantees with atomic writes across multiple partitions. Together, these strong semantics make writing applications easier and expand Kafka’s addressable use cases. You can learn more about EOS in the online talk on June 29, 2017.

“Exactly once,” if done right, would be crazy—there’s a reason most brokers are either “at least once” or “best effort.”

Re-Shaping Data Flows

Maneesh Varshney explains some methods to trim the fat out of analytical data flows:

Big data comes in a variety of shapes. The Extract-Transform-Load (ETL) workflows are more or less stripe-shaped (left panel in the figure above) and produce an output of a similar size to the input. Reporting workflows are funnel-shaped (middle panel in the figure above) and progressively reduce the data size by filtering and aggregating.

However, a wide class of problems in analytics, relevance, and graph processing have a rather curious shape of widening in the middle before slimming down (right panel in the figure above). It gets worse before it gets better.

In this article, we take a deeper dive into this exploding middle shape: understanding why it happens, why it’s a problem, and what can we do about it. We share our experiences of real-life workflows from a spectrum of fields, including Analytics (A/B experimentation), Relevance (user-item feature scoring), and Graph (second degree network/friends-of-friends).

The examples relate directly to Hadoop, but are applicable in other data platforms as well.

Spark Streaming Vs Kafka Streams

Mahesh Chand Kandpal contrasts Kafka Streams with Spark Streaming:

The low latency and an easy-to-use event time support also apply to Kafka Streams. It is a rather focused library, and it’s very well-suited for certain types of tasks. That’s also why some of its design can be so optimized for how Kafka works. You don’t need to set up any kind of special Kafka Streams cluster, and there is no cluster manager. And if you need to do a simple Kafka topic-to-topic transformation, count elements by key, enrich a stream with data from another topic, or run an aggregation or only real-time processing — Kafka Streams is for you.

If event time is not relevant and latencies in the seconds range are acceptable, Spark is the first choice. It is stable and almost any type of system can be easily integrated. In addition it comes with every Hadoop distribution. Furthermore, the code used for batch applications can also be used for the streaming applications as the API is the same.

Read on for more analysis.


August 2017
« Jul