Kafka Streams And Time-Based Batching

Vladimir Vajda provides a warning for people using Kafka Streams:

To completely understand the problem, we will first go into detail how ingestion and processing occur by default in Kafka Streams. For example purposes, the punctuate method is configured to occur every ten seconds, and in the input stream, we have exactly one message per second. The purpose of the job is to parse input messages, collect them, and, in the punctuate method, do a batch insert in the database, then to send metrics.

After running the Kafka Stream application, the Processor will be created, followed by the initmethod. Here is where all the connections are established. Upon successful start, the application will listen to input topic for incoming messages. It will remain idle until the first message arrives. When the first message arrives, the process method is called — this is where transformations occur and where the result is stored for later use. If no messages are in the input topic, the application will go idle again, waiting for the next message. After each successful process, the application checks if punctuate should be called. In our case, we will have ten process calls followed by one punctuate call, with this cycle repeating indefinitely as long as there are messages.

A pretty obvious behavior, isn’t it? Then why is one bolded?

Read on for more, including how to handle this edge case.

Testing Event Hub To Stream Analytics Performance

Rolf Tesmer tries a few different settings for optimizing performance when streaming data from Azure Event Hub to Azure Stream Analytics:

When you configure Azure Stream Analytics you only have 2 levers;

  • Streaming Units (SU) – Each SU is a blend of compute, memory and throughput between 1 and 48 (or more by contacting support).  The factors that impact SU are query complexity, latency, and volume of data. SU can be used to scale out a job to achieve higher throughput. Depending on query complexity and throughput required, more SU units may be necessary to achieve your performance requirements.  A level of SU6 assigns an entire Stream Analytics node.   For our test we wont change SU

  • SQL Query Design – Queries are expressed in a SQL-like query language. These queries are documented in the query language reference guide and includes several common query patterns.  The design of the query can greatly affect the job throughput, in particular if and/or how the PARTITION BY clause is used.

Rolf tests along three margins:  2 versus 16 input partitions, 2 versus 16 output partitions, and whether to partition the data or not.  Read on to see which combination was fastest.

Tracking Kafka Consumer Lag

Simarpreet Kaur Monga has a Scala-based example showing how to calculate Kafka offset lag for consumers:

The Consumer can subscribe to multiple topics, you need to pass the list of topics you want to consume from. For the sake of simplicity, I have just passed a single topic to consume from.

Now that the consumer has subscribed to the topic, it can consume from that topic.

The consumer maintains an offset to keep the track of the next record it needs to read.

Now, let us see how we can find the consumer offsets.

The Consumer offsets can be found using the method offset of class ConsumerRecord. This offset points to the record in a Kafka partition. The consumer consumes the records from the topic in the form of an object of class ConsumerRecord. The class ConsumerRecord also consists of a topic name and a partition number from which the record is being received, and a timestamp as marked by the corresponding ProducerRecord (the record sent by the producer).

Click through for the rest of the story.

Using C# To Stream Data Into Power BI

Chris Koester shows us how to pass data from our .NET applications into a Power BI streaming dataset:

This post will demonstrate how to push data into Power BI Streaming Datasets with C#. For demo purposes I normally use LINQPad to run the code, but you could also create a .Net or .Net Core console application. LINQPad is an excellent, lightweight scratchpad for C# and other .Net languages.

Power BI Streaming Datasets are a very cool feature because dashboard tiles that use them update in real time. You don’t have to refresh the browser window to display new data. With this feature you can watch your data in near real-time. This could be compelling in scenarios involving sensors, IoT, website traffic, etc.

Click through for the demo script.  This shows how easy it can be to take your on-premises data and feed it into live Power BI dashboards.

Avro And Streaming Data

Pat Patterson shows how to get the advantages of the Avro file format while streaming individual records:

Avro is a very efficient way of storing data in files, since the schema is written just once, at the beginning of the file, followed by any number of records (contrast this with JSON or XML, where each data element is tagged with metadata). Similarly, Avro is well suited to connection-oriented protocols, where participants can exchange schema data at the start of a session and exchange serialized records from that point on. Avro works less well in a message-oriented scenario since producers and consumers are loosely coupled and may read or write any number of records at a time. To ensure that the consumer has the correct schema, it must either be exchanged “out of band” or accompany every message. Unfortunately, sending the schema with every message imposes significant overhead — in many cases, the schema is as big as the data or even bigger!

Read on to see how the Confluent Schema Registry can solve this problem.

Tips For Running Kafka Streams On AWS

Ian Duffy and Nina Hanzlikova have some advice if you’re looking to spin up some EC2 instances to run Kafka Streams:

With upgrades in the underlying Kafka Streams library, the Kafka community introduced many improvements to the underlying stream configuration defaults. Where in previous, more unstable iterations of the client library we spent a lot of time tweaking config values such as session.timeout.ms, max.poll.interval.ms, and request.timeout.ms to achieve some level of stability.

With new releases we found ourselves discarding these custom values and achieving better results. However, some timeout issues persisted on some of our services, where a service would frequently get stuck in a rebalancing state. We noticed that reducing the max.poll.records value for the stream configs would sometimes alleviate issues experienced by these services. From partition lag profiles we also saw that the consuming issue seemed to be confined to only a few partitions, while the others would continue processing normally between re-balances. Ultimately we realised that the processing time for a record in these services could be very long (up to minutes) in some edge cases. Kafka has a fairly large maximum offset commit time before a stream consumer is considered dead (5 minutes) but with larger message batches of data this timeout was still being exceeded. By the time the processing of the record was finished the stream was already marked as failed and so the offset could not be committed. On rebalance, this same record would once again be fetched from Kafka, would fail to process in a timely manner and the situation would repeat. Therefore for any of the affected applications we introduced a processing timeout, ensuring there was an upper bound on the time taken by any of our edge cases.

There are some interesting tidbits in here.

Page Ranking With Kafka Streams

Hunter Kelly walks through a page ranking algorithm:

Once you have the adjacency matrix, you perform some straightforward matrix calculations to calculate a vector of Hub scores and a vector of Authority scores as follows:

  • Sum across the columns and normalize, this becomes your Hub vector
  • Multiply the Hub vector element-wise across the adjacency matrix
  • Sum down the rows and normalize, this becomes your Authority vector
  • Multiply the Authority vector element-wise down the the adjacency matrix
  • Repeat

An important thing to note is that the algorithm is iterative: you perform the steps above until  eventually you reach convergence—that is, the vectors stop changing—and you’re done. For our purposes, we just pick a set number of iterations, execute them, and then accept the results from that point.  We’re mostly interested in the top entries, and those tend to stabilize pretty quickly.

This is an architectural-level post, so there’s no code but there is a useful discussion of the algorithm.

Stateful Processing In Spark Streaming

Bill Chambers and Jules Damji look at a couple of stateful scenarios within Spark Streaming:

No streaming events are free of duplicate entries. Dropping duplicate entries in record-at-a-time systems is imperative—and often a cumbersome operation for a couple of reasons. First, you’ll have to process small or large batches of records at time to discard them. Second, some events, because of network high latencies, may arrive out-of-order or late, which may force you to reiterate or repeat the process. How do you account for that?

Structured Streaming, which ensures exactly once-semantics, can drop duplicate messages as they come in based on arbitrary keys. To deduplicate data, Spark will maintain a number of user-specified keys and ensure that duplicates, when encountered, are discarded.

Just as other stateful processing APIs in Structured Streaming are bounded by declaring watermarking for late data semantics, so is dropping duplicates. Without watermarking, the maintained state can grow infinitely over the course of your stream.

In this scenario, you would still want some sort of de-duplication code at the far end of your process if you can never have duplicates come in across the lifetime of the application.  This sounds like it’s more about preventing bursty duplicates from sensors.

Optimizing Apache Flink

Ivan Mushketyk has a few tips for speeding up programs using Apache Flink:

One more way to optimize your Flink application is to provide some information about what your user-defined functions are doing with input data. Since Flink can’t parse and understand code, you can provide crucial information that will help to build a more efficient execution plan. There are three annotations that we can use:

  1. @ForwardedFields: Specifies what fields in an input value were left unchanged and are used in an output value.

  2. @NotForwardedFields: Specifies fields that were not preserved in the same positions in the output.

  3. @ReadFields: Specifies what fields were used to compute a result value. You should only specify fields that were used in computations and not merely copied to the output.

Click through for his four tips.

Anomaly Detection With Kafka Streams

Ajmal Karuthakantakath shows us an application which performs fairly simple anomaly detection using Kafka Streams:

The problem is in the banking loan payment domain, where customers have taken a loan and they need to make monthly payments to repay the loan amount.

Assume there are millions of customers in the system and all these customers need to make monthly payments to their account. Each customer may have a different monthly due date depending on their monthly loan due date.

Each customer payment will appear as a PaymentScheduleEvent event. Customers can make more than one PaymentScheduleEvent per month. Each monthly due date for a customer will appear as a PaymentDueEvent.

An arbitrarily chosen anomaly condition for this example is that if the amount due is more than $150 for any customer at any point in time, this generates an anomaly.

Click through for instructions, the application, and further resources.  If you want to learn Kafka Streams, this should keep you busy for a little while.


December 2017
« Nov