By default, the DataNode uses the round-robin-based policy to write new blocks. However, in a long-running cluster, it is still possible for the DataNode to have created significantly imbalanced volumes due to events like massive file deletion in HDFS or the addition of new DataNode disks via the disk hot-swap feature. Even if you use the available-space-based volume-choosing policy instead, volume imbalance can still lead to less efficient disk I/O: For example, every new write will go to the newly-added empty disk while the other disks are idle during the period, creating a bottleneck on the new disk.
Recently, the Apache Hadoop community developed server offline scripts (as discussed inHDFS-1312, the [email protected] mailing list, and GitHub) to alleviate the data imbalance issue. However, due to being outside the HDFS codebase, these scripts require that the DataNode be offline before moving data between disks. As a result, HDFS-1312 also introduces an online disk balancer that is designed to re-balance the volumes on a running DataNode based on various metrics. Similar to the HDFS Balancer, the HDFS disk balancer runs as a thread in the DataNode to move the block files across volumes with the same storage types.
This is a good read and sounds like a very useful feature.
The first method works fine for non-production scenarios where you can stop all of the producers and consumers, but let’s say that you want to flush the topic while leaving your producers and consumers up (but maybe you have a downtime window where you know the producers aren’t pushing anything). In this case, we can change the retention period to something very short, let the queue flush, and bring it back to normal, all using the kafka-configs shell script.
Points deducted for slipping and writing “queue” there, but otherwise, I prefer the second method, as things are still online. In less-extreme scenarios, you might drop the retention period to a few minutes, especially if your consumers are all caught up.
From here, I hook into the OnMessage event just like before, and like before we decode the Kafka payload and turn it into a string. Unlike before, however, I call Newtonsoft’s DeserializeObject method and return a Flight type, which I’ve defined above. This is the same definition as in the Producer, so in a production-quality environment, I’d pull that out to a single location rather than duplicating it.
Going back to the main function, I call the consumer.Start() method and let ‘er rip. When I’m ready to aggregate, I’ll hit the enter key and that’ll call consumer.Stop(). When that happens, I’m going to have up to 7 million records in a list called flights. Out of all of this information, I only need two attributes: the destination state and the arrival delay in minutes. I get those by using the map function on my sequence of flights, taking advantage of F#’s match syntax to get all relevant scenarios safely and put the result into a tuple. The resulting sequence of tuples is called flightTuple. I pass that into the delaysByState function.
By the time I give this presentation, I’m going to change the way I aggregate just a little bit to cut down on the gigs of RAM necessary to do this operation. But hey, at least it works…
Spark provides a comprehensive framework to manage big data processing with a variety of data set types including text and graph data. It can also handle batch pipelines and real-time streaming data. Using Spark libraries, you can create big data analytics apps in Java, Scala, Clojure, and popular R and Python languages.
Spark brings analytics pros an improved MapReduce type query capability with more performant data processing in memory or on disk. It can be used with datasets that are larger than the aggregate memory in a cluster. Spark also has savvy lazy evaluation of big data queries which helps with workflow optimization and reuse of intermediate results in memory. TheSpark API is easy to learn.
One of my taglines is, Spark is not the future of Hadoop; Spark is the present of Hadoop. If you want to get into this space, learn how to work with Spark.
We’re going a bunch of setup work here, so let’s take it from the top. First, I declare a consumer group, which I’m calling “Airplane Enricher.” Kafka uses the concept of consumer groups to allow consumers to work in parallel. Imagine that we have ten separate servers available to process messages from the Flights topic. Each flight message is independent, so it doesn’t matter which consumer gets it. What does matter, though, is that multiple consumers don’t get the same message, as that’s a waste of resources and could lead to duplicate data processing, which would be bad.
The way Kafka works around this is to use consumer groups: within a consumer group, only one consumer will get a particular message. That way, I can have my ten servers processing messages “for real” and maybe have another consumer in a different consumer group just reading through the messages getting a count of how many records are in the topic. Once you treat topics as logs rather than queues, consumer design changes significantly.
This is a fairly lengthy read, but directly business-applicable, so I think it’s well worth it.
That’s the core of our code. The main function instantiates a new Kafka producer and gloms onto the Flights topic. From there, we call the loadEntries function. The loadEntries function takes a topic and filename. It streams entries from the 2008.csv file and uses the ParallelSeq library to operate in parallel on data streaming in (one of the nice advantages of using functional code: writing thread-safe code is easy!). We filter out any records whose length is zero—there might be newlines somewhere in the file, and those aren’t helpful. We also want to throw away the header row (if it exists) and I know that that starts with “Year” whereas all other records simply include the numeric year value. Finally, once we throw away garbage rows, we want to call the publish function for each entry in the list. The publish function encodes our text as a UTF-8 bytestream and pushes the results onto our Kafka topic.
All this plus a bonus F# pitch.
Before we get to the numbers, an overview of the test environment, query set and data is in order. The Impala and Hive numbers were produced on the same 10 node d2.8xlarge EC2 VMs. To prepare the Impala environment the nodes were re-imaged and re-installed with Cloudera’s CDH version 5.8 using Cloudera Manager. The defaults from Cloudera Manager were used to setup / configure Impala 2.6.0. It is worth pointing out that Impala’s Runtime Filtering feature was enabled for all queries in this test.
Data: While Hive works best with ORCFile, Impala works best with Parquet, so Impala testing was done with all data in Parquet format, compressed with Snappy compression. Data was partitioned the same way for both systems, along the date_sk columns. This was done to benefit from Impala’s Runtime Filtering and from Hive’s Dynamic Partition Pruning.
I’m impressed with both of these projects.
S3 is a massively scalable key-based object store that is well-suited for storing and retrieving large datasets. Due to its underlying infrastructure, S3 is excellent for retrieving objects with known keys. S3 maintains an index of object keys in each region and partitions the index based on the key name. For best performance, keys that are often read together should not have sequential prefixes. Keys should be distributed across many partitions rather than on the same partition.
For large datasets like genomics, population-level analyses of these data can require many concurrent S3 reads by many Spark executors. To maximize performance of high-concurrency operations on S3, we need to introduce randomness into each of the Parquet object keys to increase the likelihood that the keys are distributed across many partitions.
Reading the title, I wanted it to be an article on knobs to turn in S3 to maximize read performance. It’s still an article well worth reading, but focuses from the other side: how to write to S3 without stepping on your own toes.
There are three important things here: first, our Zookeeper port is 2181. Zookeeper is great for centralized configuration and coordination; if you want to learn more, check out this Sean Mackrory post.
The second bit of important information is how long our retention period is. Right now, it’s set to 7 days, and that’s our default. Remember that messages in a Kafka topic don’t go away simply because some consumer somewhere accessed them; they stay in the log until we say they can go.
Finally, we have a set of listeners. For the sandbox, the only listener is on port 6667. We connect to listeners from our outside applications, so knowing those addresses and ports is vital.
This is still quick-start level stuff, but I’m building up to custom development, honest!
Or one might want some assignment that results in uniform workloads, based on the number of messages in each partition. But until we have pluggable assignment functions, the reference implementation has a straightforward assignment strategy called Range Assignment. There is also a newer Round Robin assignor which is useful for applications like Mirror Maker, but most applications just use the default assignment algorithm.
The Range Assignor tries to land on a uniform distribution of partitions, at least within each topic, while at the same time avoiding the need to coordinate and bargain between nodes. This last goal, independent assignment, is done by each node executing a fixed algorithm: sort the partitions, sort the consumers, then for each topic take same-sized ranges of partitions for each consumer. Where the sizes cannot be the same, the consumers at the beginning of the sorted list will end up with one extra partition. With this algorithm, each application node can see the entire layout by itself, and from there take up the right assignments.
Click through to see an example of how this is implemented.