Press "Enter" to skip to content

Category: Hadoop

Resource Allocation in Spark Applications

The folks at Beginner’s Hadoop take us through resource allocation in Spark applications:

Tiny executors essentially means one executor per core. Following table depicts the values of our spar-config params with this approach:

Analysis: With only one executor per core, as we discussed above, we’ll not be able to take advantage of running multiple tasks in the same JVM. Also, shared/cached variables like broadcast variables and accumulators will be replicated in each core of the nodes which is 16 times. Also, we are not leaving enough memory overhead for Hadoop/Yarn daemon processes and we are not counting in ApplicationManager. NOT GOOD!

Read on for the full analysis.

Comments closed

KarelDB: SQL on Kafka

George Leopold informs us on a new project called KarelDB:

Unlike Confluent’s Kafka-based platform, KarelDB is not a streaming database. Yokota nevertheless flagged the relational database largely because it’s based on open-source components backed by Kafka. Hence, he reckons there’s a chance it could take off.

Those open source components include Calcite, an SQL framework that pushes relational queries to the data store, an approach seen as providing more efficient processing. Yokota noted that KarelDB would “automatically benefit” from upcoming Calcite optimizations.

Other open source projects such as the Apache Flink stream processing engine also have leveraged Calcite, including an SQL API. Calcite also includes an SQL parser.

Kafka already had KSQL for Kafka Streams, but this is a totally different validation of Feasel’s Law.

Comments closed

HBase and S3

Krishna Maheshwari, et al, explain how we can allow Apache HBase to use S3 for storage:

Cloudera Data Platform (CDP) provides an out-of-the-box solution that allows Apache HBase deployments to use Amazon Simple Storage Service (S3) as its main persistence layer for saving table data. Amazon S3 is an object store which offers a high degree of durability with a pay-per-use cost structure. There is no server-side component to run or manage for S3 — all that is needed is the S3 client library and AWS credentials. However, HBase requires a consistent and atomic filesystem which means that it cannot directly use S3 because it is an eventually consistent object store. Both CDH and HDP have only provided HBase solely using HDFS because there have been long-standing impediments that prevented HBase from natively using S3. To address these issues, we’ve built an out-of-the-box solution which we are delivering for the first time via CDP. When you launch an Operational Database (HBase) cluster on CDP, HBase StoreFiles (the backing files for HBase tables) are stored in S3 and HBase write-ahead-logs (WAL) are stored in an HDFS instance run alongside HBase per usual.

I hadn’t thought of using S3, but it’s an interesting post.

Comments closed

Delta Lake Schema Enforcement

Burak Yavuz, et al, explain the concept of schema enforcement with Databricks Delta Lake:

Schema enforcement, also known as schema validation, is a safeguard in Delta Lake that ensures data quality by rejecting writes to a table that do not match the table’s schema. Like the front desk manager at a busy restaurant that only accepts reservations, it checks to see whether each column in data inserted into the table is on its list of expected columns (in other words, whether each one has a “reservation”), and rejects any writes with columns that aren’t on the list.

Something something “relational database” something something. They also walk us through some examples in a Databricks notebook, so check that out.

Comments closed

Calculating YARN Utilization Metrics

Dmitry Tolpeko shows how you can calculate per-second cluster utilization measures from YARN’s resource manager logs:

But even if you query YARN REST API every second it still can only provide a snapshot of the used YARN resources. It does not show which application allocates or releases containers, their memory and CPU capacity, in which order these events occur, what is their exact timestamp and so on.

For this reason I prefer a different approach that is based on using the YARN Resource Manager logs to calculate the exact per second utilization metrics of a Hadoop cluster.

This is a bit more complicated than hitting the REST API, but Dmitry shows some of the benefits of doing so.

Comments closed

Spark Streaming DStreams

Manish Mishra explains the fundamental abstraction of Spark Streaming:

Before going into details of the operations available on the DStream API, let us look at the input sources from which we can start a Stream. There are multiple ways in which we can get the inputs from e.g. Kafka, Flume, etc. Or simple Idle files. To get the details on the available input sources supported by Spark, you can refer to this section. As part of this blog, we will take the example of Kafka.

Read on to see an example of pulling data from Kafka and converting inputs into microbatches.

Comments closed

Multi-Region Replication with Confluent Platform

David Arthur walks us through multi-region replication of Kafka clusters in the Confluent Platform 5.4 preview:

Running a single Apache Kafka® cluster across multiple datacenters (DCs) is a common, yet somewhat taboo architecture. This architecture, referred to as a stretch cluster, provides several operational benefits and unlocks the door to many uses cases. Stretch clusters provide better durability guarantees and make disaster recovery much easier by avoiding the problem of offset translation and restarting clients. However, in order to operate a reliable stretch cluster, datacenters must be relatively close to each other and have a very stable, low latency, and high-bandwidth connection among the DCs.

This changes with the preview release of Confluent Platform 5.4, which includes multi-region replication built directly into Confluent Server. Now operators can choose to replicate data on a per-region basis, synchronously or asynchronously, per topic. This functionality allows operators to increase data durability and automate client failover in the event of a disaster.

And of course all of those rules about RPO, RTO, etc. apply to this.

Comments closed

Diagnosing TCP SACKs-Related Slowdown in Databricks

Chris Stevens, et al, walk us through troubleshooting a slowdown after using Linux images which have been patched for the TCP SACKs vulnerabilities:

In order to figure out why the straggler task took 15 minutes, we needed to catch it in the act. We reran the benchmark while monitoring the Spark UI, knowing that all but one of the tasks for the save operation would complete within a few minutes. Sorting the tasks in that stage by the Status column, it did not take long for there to be only one task in the RUNNING state. We had found our skewed task and the IP address in the Host column pointed us at the executor experiencing the regression.

This is a nice case study of network troubleshooting, so of course there are Wireshark screenshots in it.

Comments closed

RDDs, DataFrames, and Datasets in Spark

Brad Llewellyn walks us through the three key data structures in Apache Spark:

We see that creating an RDD can be done with one easy function.  In this snippet, sc represents the default SparkContext.  This is extremely important, but is better left for a later post.  SparkContext offers the .textFile() function which creates an RDD from a text file, parsing each line into it’s own element in the RDD.  These lines happen to represent CSV records.  However, there are many other common examples that use lines of free text.

It should also be noted that we can use the .collect() and .take() functions to view the contents of an RDD.  The difference between .collect() and .take() is that .take() allows us to specify the number of elements we want to retrieve, whereas .collect() returns the entire RDD.

My tendencies are probably skewed pretty heavily, but I live in DataFrames and almost never use raw RDDs anymore.

Comments closed

Flink’s State Processor API

Seth Wiesman and Fabian Hueske show off Apache Flink’s State Processor API:

The State Processor API that comes with Flink 1.9 is a true game-changer in how you can work with application state! In a nutshell, it extends the DataSet API with Input and OutputFormats to read and write savepoint or checkpoint data. Due to the interoperability of DataSet and Table API, you can even use relational Table API or SQL queries to analyze and process state data.

For example, you can take a savepoint of a running stream processing application and analyze it with a DataSet batch program to verify that the application behaves correctly. Or you can read a batch of data from any store, preprocess it, and write the result to a savepoint that you use to bootstrap the state of a streaming application. It’s also possible to fix inconsistent state entries now. Finally, the State Processor API opens up many ways to evolve a stateful application that were previously blocked by parameter and design choices that could not be changed without losing all the state of the application after it was started. For example, you can now arbitrarily modify the data types of states, adjust the maximum parallelism of operators, split or merge operator state, re-assign operator UIDs, and so on

Read on to learn more about how this works.

Comments closed