Kafka Consumer

Kevin Feasel



I build a consumer and aggregator of Kafka data:

From here, I hook into the OnMessage event just like before, and like before we decode the Kafka payload and turn it into a string.  Unlike before, however, I call Newtonsoft’s DeserializeObject method and return a Flight type, which I’ve defined above.  This is the same definition as in the Producer, so in a production-quality environment, I’d pull that out to a single location rather than duplicating it.

Going back to the main function, I call the consumer.Start() method and let ‘er rip.  When I’m ready to aggregate, I’ll hit the enter key and that’ll call consumer.Stop().  When that happens, I’m going to have up to 7 million records in a list called flights.  Out of all of this information, I only need two attributes:  the destination state and the arrival delay in minutes.  I get those by using the map function on my sequence of flights, taking advantage of F#’s match syntax to get all relevant scenarios safely and put the result into a tuple.  The resulting sequence of tuples is called flightTuple.  I pass that into the delaysByState function.

By the time I give this presentation, I’m going to change the way I aggregate just a little bit to cut down on the gigs of RAM necessary to do this operation.  But hey, at least it works…

Related Posts

Enabling Exactly-Once Kafka Streams

Guozhang Wang wraps up his exactly-once series in Kafka: When restarting the application from the point of failure, we would then try to resume processing from the previously remembered position in the input Kafka topic, i.e. the committed offset. However, since the application was not able to commit the offset of the processed message A before crashing […]

Read More

Avro Schemas In Kafka

Stephane Maarek explains the value of using Apache Avro as a schema structure for your Kafka topics: Avro has support for primitive types ( int, string, long, bytes, etc…), complex types (enum, arrays, unions, optionals), logical types (dates, timestamp-millis, decimal), and data record (name and namespace). All the types you’ll ever need. Avro has support for embedded documentation. Although documentation is optional, in my workflow I […]

Read More


October 2016
« Sep Nov »