Press "Enter" to skip to content

Category: Hadoop

.NET Producer For Kafka

I build a simple .NET console app to push messages to a Kafka topic:

That’s the core of our code.  The main function instantiates a new Kafka producer and gloms onto the Flights topic.  From there, we call the loadEntries function.  The loadEntries function takes a topic and filename.  It streams entries from the 2008.csv file and uses the ParallelSeq library to operate in parallel on data streaming in (one of the nice advantages of using functional code:  writing thread-safe code is easy!).  We filter out any records whose length is zero—there might be newlines somewhere in the file, and those aren’t helpful.  We also want to throw away the header row (if it exists) and I know that that starts with “Year” whereas all other records simply include the numeric year value.  Finally, once we throw away garbage rows, we want to call the publish function for each entry in the list.  The publish function encodes our text as a UTF-8 bytestream and pushes the results onto our Kafka topic.

All this plus a bonus F# pitch.

Comments closed

Hive And Impala

Carter Shanklin and Nita Dembla run a performance comparison of Hive LLAP versus Impala:

Before we get to the numbers, an overview of the test environment, query set and data is in order. The Impala and Hive numbers were produced on the same 10 node d2.8xlarge EC2 VMs. To prepare the Impala environment the nodes were re-imaged and re-installed with Cloudera’s CDH version 5.8 using Cloudera Manager. The defaults from Cloudera Manager were used to setup / configure Impala 2.6.0. It is worth pointing out that Impala’s Runtime Filtering feature was enabled for all queries in this test.

Data: While Hive works best with ORCFile, Impala works best with Parquet, so Impala testing was done with all data in Parquet format, compressed with Snappy compression. Data was partitioned the same way for both systems, along the date_sk columns. This was done to benefit from Impala’s Runtime Filtering and from Hive’s Dynamic Partition Pruning.

I’m impressed with both of these projects.

Comments closed

Optimizing S3 For High Concurrency

Aaron Friedman looks at how to optimize highly-concurrent, distributed workloads writing data to S3 buckets:

S3 is a massively scalable key-based object store that is well-suited for storing and retrieving large datasets. Due to its underlying infrastructure, S3 is excellent for retrieving objects with known keys. S3 maintains an index of object keys in each region and partitions the index based on the key name. For best performance, keys that are often read together should not have sequential prefixes. Keys should be distributed across many partitions rather than on the same partition.

For large datasets like genomics, population-level analyses of these data can require many concurrent S3 reads by many Spark executors. To maximize performance of high-concurrency operations on S3, we need to introduce randomness into each of the Parquet object keys to increase the likelihood that the keys are distributed across many partitions.

Reading the title, I wanted it to be an article on knobs to turn in S3 to maximize read performance.  It’s still an article well worth reading, but focuses from the other side:  how to write to S3 without stepping on your own toes.

Comments closed

Working With Topics In Kafka

I show how to do the basics of creating, deleting, and pushing messages on topics in Apache Kafka:

There are three important things here:  first, our Zookeeper port is 2181.  Zookeeper is great for centralized configuration and coordination; if you want to learn more, check out this Sean Mackrory post.

The second bit of important information is how long our retention period is.  Right now, it’s set to 7 days, and that’s our default.  Remember that messages in a Kafka topic don’t go away simply because some consumer somewhere accessed them; they stay in the log until we say they can go.

Finally, we have a set of listeners.  For the sandbox, the only listener is on port 6667.  We connect to listeners from our outside applications, so knowing those addresses and ports is vital.

This is still quick-start level stuff, but I’m building up to custom development, honest!

Comments closed

Kafka Consumer Group Assignment

David Brinegar discusses how consumers within an Apache Kafka consumer group get assigned work:

Or one might want some assignment that results in uniform workloads, based on the number of messages in each partition.  But until we have pluggable assignment functions, the reference implementation has a straightforward assignment strategy called Range Assignment.  There is also a newer Round Robin assignor which is useful for applications like Mirror Maker, but most applications just use the default assignment algorithm.

The Range Assignor tries to land on a uniform distribution of partitions, at least within each topic, while at the same time avoiding the need to coordinate and bargain between nodes.  This last goal, independent assignment, is done by each node executing a fixed algorithm:  sort the partitions, sort the consumers, then for each topic take same-sized ranges of partitions for each consumer.  Where the sizes cannot be the same, the consumers at the beginning of the sorted list will end up with one extra partition.  With this algorithm, each application node can see the entire layout by itself, and from there take up the right assignments.

Click through to see an example of how this is implemented.

Comments closed

What Is Kafka?

I start a new series on Apache Kafka:

The broker serves several purposes:

  1. Know who the producers are and who the consumers are.  This way, the producers don’t care who exactly consumes a message and aren’t responsible for the message after they hand it off.
  2. Buffer for performance.  If the consumers are a little slow at the moment but don’t usually get overwhelmed, that’s okay—messages can sit with the broker until the consumer is ready to fetch.
  3. Let us scale out more easily.  Need to add more producers?  That’s fine—tell the broker who they are.  Need to add consumers?  Same thing.
  4. What about when a consumer goes down?  That’s the same as problem #2:  hold their messages until they’re ready again.

So brokers add a bit of complexity, but they solve some important problems.  The nice part about a broker is that it doesn’t need to know anything about the messages, only who is supposed to receive it.

This is an introduction to the product and part one of an eight-part series.

Comments closed

Kafka Consumer Groups

David Brinegar discusses consumer groups and lag in Apache Kafka:

While the Consumer Group uses the broker APIs, it is more of an application pattern or a set of behaviors embedded into your application.  The Kafka brokers are an important part of the puzzle but do not provide the Consumer Group behavior directly.  A Consumer Group based application may run on several nodes, and when they start up they coordinate with each other in order to split up the work.  This is slightly imperfect because the work, in this case, is a set of partitions defined by the Producer.  Each Consumer node can read a partition and one can split up the partitions to match the number of consumer nodes as needed.  If the number of Consumer Group nodes is more than the number of partitions, the excess nodes remain idle. This might be desirable to handle failover.  If there are more partitions than Consumer Group nodes, then some nodes will be reading more than one partition.

Read the whole thing.  It’s part one of a series.

Comments closed

Hive Going In-Memory

Carter Shanklin and Nita Dembla discuss Hive memory-handling optimizations:

Let’s put this architecture to the test with a realistic dataset size and workload. Our previous performance blog, “Announcing Apache Hive 2.1: 25x Faster Queries and Much More”, discussed 4 reasons that LLAP delivers dramatically faster performance versus Hive on Tez. In that benchmark we saw 25+x performance boosts on ad-hoc queries with a dataset that fit entirely into the cluster’s memory.

In most cases, datasets will be far too large to fit in RAM so we need to understand if LLAP can truly tackle the big data challenge or if it’s limited to reporting roles on smaller datasets. To find out, we scaled the dataset up to 10 TB, 4x larger than aggregate cluster RAM, and we ran a number of far more complex queries.

Table 3 below shows how Hive LLAP is capable of running both At Speed and At Scale. The simplest query in the benchmark ran in 2.68 seconds on this 10 TB dataset while the most complex query, Query 64 performed a total of 37 joins and ran for more than 20 minutes.

Given how much faster memory is than disk, and given Spark’s broad adoption, this makes sense as a strategy for Hive’s continued value.

Comments closed

Use Folders With Polybase

Andrew Peterson argues that you should use folders instead of individual files when creating external tables:

Why?
1) Add more files to the directory, and Polybase External table will automagically read them.
2) Do INSERTS and UPDATES from PolyBase back to your files in Hadoop.
( See PolyBase – Insert data into a Hadoop Hue Directory ,
PolyBase – Insert data into new Hadoop Directory    ).
3) It’s cleaner.

This is good advice.  Also, if you’re using some other process to load data—for example, a map-reduce job or Spark job—you might have many smaller file chunks based on what the reducers spit out.  It’s not a bad idea to cat those file chunks together, but at least if you use a folder for your external data location, your downstream processes will still work as expected regardless of how the data is laid out.

Comments closed

Sentiment Analysis With Nifi

Timothy Spann ties together a bunch of interesting things with Apache Nifi, including integrations with Twitter, Slack, Tensorflow, and Zeppelin:

First up, I used GetTwitter to read tweets and filtered on these terms:

strata, stratahadoop, strataconf, NIFI, FutureOfData, ApacheNiFi, Hortonworks, Hadoop, ApacheHive, HBase, ApacheSpark, ApacheTez, MachineLearning, ApachePhoenix, ApacheCalcite,ApacheStorm, ApacheAtlas, ApacheKnox, Apache Ranger, HDFS, Apache Pig, Accumulo, Apache Flume, Sqoop, Apache Falcon

Input:

InvokeHttp: I used this to download the first image URL from tweets.

It’s interesting to see this all tie together relatively easily.

Comments closed