Press "Enter" to skip to content

Category: Streaming

Enabling Exactly-Once Kafka Streams

Guozhang Wang wraps up his exactly-once series in Kafka:

When restarting the application from the point of failure, we would then try to resume processing from the previously remembered position in the input Kafka topic, i.e. the committed offset. However, since the application was not able to commit the offset of the processed message before crashing last time, upon restarting it would fetch A again. The processing logic will then be triggered a second time to update the state, and generate the output messages. As a result, the application state will be updated twice (e.g. from S’ to S’’) and the output messages will be sent and appended to topic TB twice as well. If, for example, your application is calculating a running count from the input data stream stored in topic TA, then this “duplicated processing” error would mean over-counting in your application, resulting in incorrect results.

Today, many stream processing systems that claim to provide “exactly-once” semantics actually depend on users themselves to cooperate with the underlying source and destination streaming data storage layer like Kafka, because they simply treat this layer as a blackbox and hence does not try to handle these failure cases at all. Application user code then has to either coordinate with these data systems—for example, via a two-phase commit mechanism—to guarantee no data duplicates, or handle duplicated records that could be generated from the clients talking to these systems when the above mentioned failure happens.

There’s some good information in here, so check it out.

Comments closed

Kafka Streams And Time-Based Batching

Vladimir Vajda provides a warning for people using Kafka Streams:

To completely understand the problem, we will first go into detail how ingestion and processing occur by default in Kafka Streams. For example purposes, the punctuate method is configured to occur every ten seconds, and in the input stream, we have exactly one message per second. The purpose of the job is to parse input messages, collect them, and, in the punctuate method, do a batch insert in the database, then to send metrics.

After running the Kafka Stream application, the Processor will be created, followed by the initmethod. Here is where all the connections are established. Upon successful start, the application will listen to input topic for incoming messages. It will remain idle until the first message arrives. When the first message arrives, the process method is called — this is where transformations occur and where the result is stored for later use. If no messages are in the input topic, the application will go idle again, waiting for the next message. After each successful process, the application checks if punctuate should be called. In our case, we will have ten process calls followed by one punctuate call, with this cycle repeating indefinitely as long as there are messages.

A pretty obvious behavior, isn’t it? Then why is one bolded?

Read on for more, including how to handle this edge case.

Comments closed

Testing Event Hub To Stream Analytics Performance

Rolf Tesmer tries a few different settings for optimizing performance when streaming data from Azure Event Hub to Azure Stream Analytics:

When you configure Azure Stream Analytics you only have 2 levers;

  • Streaming Units (SU) – Each SU is a blend of compute, memory and throughput between 1 and 48 (or more by contacting support).  The factors that impact SU are query complexity, latency, and volume of data. SU can be used to scale out a job to achieve higher throughput. Depending on query complexity and throughput required, more SU units may be necessary to achieve your performance requirements.  A level of SU6 assigns an entire Stream Analytics node.   For our test we wont change SU

  • SQL Query Design – Queries are expressed in a SQL-like query language. These queries are documented in the query language reference guide and includes several common query patterns.  The design of the query can greatly affect the job throughput, in particular if and/or how the PARTITION BY clause is used.

Rolf tests along three margins:  2 versus 16 input partitions, 2 versus 16 output partitions, and whether to partition the data or not.  Read on to see which combination was fastest.

Comments closed

Tracking Kafka Consumer Lag

Simarpreet Kaur Monga has a Scala-based example showing how to calculate Kafka offset lag for consumers:

The Consumer can subscribe to multiple topics, you need to pass the list of topics you want to consume from. For the sake of simplicity, I have just passed a single topic to consume from.

Now that the consumer has subscribed to the topic, it can consume from that topic.

The consumer maintains an offset to keep the track of the next record it needs to read.

Now, let us see how we can find the consumer offsets.

The Consumer offsets can be found using the method offset of class ConsumerRecord. This offset points to the record in a Kafka partition. The consumer consumes the records from the topic in the form of an object of class ConsumerRecord. The class ConsumerRecord also consists of a topic name and a partition number from which the record is being received, and a timestamp as marked by the corresponding ProducerRecord (the record sent by the producer).

Click through for the rest of the story.

Comments closed

Using C# To Stream Data Into Power BI

Chris Koester shows us how to pass data from our .NET applications into a Power BI streaming dataset:

This post will demonstrate how to push data into Power BI Streaming Datasets with C#. For demo purposes I normally use LINQPad to run the code, but you could also create a .Net or .Net Core console application. LINQPad is an excellent, lightweight scratchpad for C# and other .Net languages.

Power BI Streaming Datasets are a very cool feature because dashboard tiles that use them update in real time. You don’t have to refresh the browser window to display new data. With this feature you can watch your data in near real-time. This could be compelling in scenarios involving sensors, IoT, website traffic, etc.

Click through for the demo script.  This shows how easy it can be to take your on-premises data and feed it into live Power BI dashboards.

Comments closed

Avro And Streaming Data

Pat Patterson shows how to get the advantages of the Avro file format while streaming individual records:

Avro is a very efficient way of storing data in files, since the schema is written just once, at the beginning of the file, followed by any number of records (contrast this with JSON or XML, where each data element is tagged with metadata). Similarly, Avro is well suited to connection-oriented protocols, where participants can exchange schema data at the start of a session and exchange serialized records from that point on. Avro works less well in a message-oriented scenario since producers and consumers are loosely coupled and may read or write any number of records at a time. To ensure that the consumer has the correct schema, it must either be exchanged “out of band” or accompany every message. Unfortunately, sending the schema with every message imposes significant overhead — in many cases, the schema is as big as the data or even bigger!

Read on to see how the Confluent Schema Registry can solve this problem.

Comments closed

Tips For Running Kafka Streams On AWS

Ian Duffy and Nina Hanzlikova have some advice if you’re looking to spin up some EC2 instances to run Kafka Streams:

With upgrades in the underlying Kafka Streams library, the Kafka community introduced many improvements to the underlying stream configuration defaults. Where in previous, more unstable iterations of the client library we spent a lot of time tweaking config values such as session.timeout.ms, max.poll.interval.ms, and request.timeout.ms to achieve some level of stability.

With new releases we found ourselves discarding these custom values and achieving better results. However, some timeout issues persisted on some of our services, where a service would frequently get stuck in a rebalancing state. We noticed that reducing the max.poll.records value for the stream configs would sometimes alleviate issues experienced by these services. From partition lag profiles we also saw that the consuming issue seemed to be confined to only a few partitions, while the others would continue processing normally between re-balances. Ultimately we realised that the processing time for a record in these services could be very long (up to minutes) in some edge cases. Kafka has a fairly large maximum offset commit time before a stream consumer is considered dead (5 minutes) but with larger message batches of data this timeout was still being exceeded. By the time the processing of the record was finished the stream was already marked as failed and so the offset could not be committed. On rebalance, this same record would once again be fetched from Kafka, would fail to process in a timely manner and the situation would repeat. Therefore for any of the affected applications we introduced a processing timeout, ensuring there was an upper bound on the time taken by any of our edge cases.

There are some interesting tidbits in here.

Comments closed

Page Ranking With Kafka Streams

Hunter Kelly walks through a page ranking algorithm:

Once you have the adjacency matrix, you perform some straightforward matrix calculations to calculate a vector of Hub scores and a vector of Authority scores as follows:

  • Sum across the columns and normalize, this becomes your Hub vector
  • Multiply the Hub vector element-wise across the adjacency matrix
  • Sum down the rows and normalize, this becomes your Authority vector
  • Multiply the Authority vector element-wise down the the adjacency matrix
  • Repeat

An important thing to note is that the algorithm is iterative: you perform the steps above until  eventually you reach convergence—that is, the vectors stop changing—and you’re done. For our purposes, we just pick a set number of iterations, execute them, and then accept the results from that point.  We’re mostly interested in the top entries, and those tend to stabilize pretty quickly.

This is an architectural-level post, so there’s no code but there is a useful discussion of the algorithm.

Comments closed

Stateful Processing In Spark Streaming

Bill Chambers and Jules Damji look at a couple of stateful scenarios within Spark Streaming:

No streaming events are free of duplicate entries. Dropping duplicate entries in record-at-a-time systems is imperative—and often a cumbersome operation for a couple of reasons. First, you’ll have to process small or large batches of records at time to discard them. Second, some events, because of network high latencies, may arrive out-of-order or late, which may force you to reiterate or repeat the process. How do you account for that?

Structured Streaming, which ensures exactly once-semantics, can drop duplicate messages as they come in based on arbitrary keys. To deduplicate data, Spark will maintain a number of user-specified keys and ensure that duplicates, when encountered, are discarded.

Just as other stateful processing APIs in Structured Streaming are bounded by declaring watermarking for late data semantics, so is dropping duplicates. Without watermarking, the maintained state can grow infinitely over the course of your stream.

In this scenario, you would still want some sort of de-duplication code at the far end of your process if you can never have duplicates come in across the lifetime of the application.  This sounds like it’s more about preventing bursty duplicates from sensors.

Comments closed

Optimizing Apache Flink

Ivan Mushketyk has a few tips for speeding up programs using Apache Flink:

One more way to optimize your Flink application is to provide some information about what your user-defined functions are doing with input data. Since Flink can’t parse and understand code, you can provide crucial information that will help to build a more efficient execution plan. There are three annotations that we can use:

  1. @ForwardedFields: Specifies what fields in an input value were left unchanged and are used in an output value.

  2. @NotForwardedFields: Specifies fields that were not preserved in the same positions in the output.

  3. @ReadFields: Specifies what fields were used to compute a result value. You should only specify fields that were used in computations and not merely copied to the output.

Click through for his four tips.

Comments closed