Press "Enter" to skip to content

Category: Hadoop

Aggregating And Joining Using Kafka Streams

Michael Noll digs into Kafka Streams, showing how to enrich data and collect aggregates:

The stream of user click events is considered to be a record stream, where each data record represents a self-contained datum.  In contrast, the stream of user geo-location updates is interpreted as a changelog stream, where each data record represents an update (i.e. any previous data records having the same record key will be replaced by the latest update).  In Kafka Streams, a record stream is represented via the so-called KStream interface and a changelog stream via the KTable interface.  Going from the high-level view to the technical view, this means that our streaming application will demonstrate how to perform a join operation between a KStream and a KTable, i.e. it is an example of a stateful computation.  This KStream-KTable join also happens to be Kafka Streams’ equivalent of performing a table lookup in a streaming context, where the table is updated continuously and concurrently.  Specifically, for each user click event in the KStream, we will lookup the user’s region (e.g. “europe”) in the KTable in order to subsequently compute the total number of user clicks per region.

Let’s showcase the beginning (input) and the end (expected output) of this data pipeline with some example data.

This article is fairly detailed, but it covers a rather interesting topic in a good way.

Comments closed

Using Sqoop To Transfer Data

Sai Sriparasa shows how to use Sqoop to transfer data from a Hadoop cluster into a relational database:

Here are a few best practices for exporting with Sqoop:

  • Options file—As commands with Sqoop export and Sqoop import tend to be bigger in size, I recommend storing the commands in an options file. By keeping it in an options file, you can even make it part of a version control pipeline to monitor changes to the command.

  • Field termination—With Sqoop export, I recommend providing field termination metadata using the “–fields-terminated-by” option. Also, other formatting options such as “lines-terminated-by”, “enclosed-by”, “escaped-by”, etc., can be used as required.

  • Mapper tuning—When an export job is submitted, Sqoop creates a Java class and submits a MapReduce job based on input splits; then, each mapper connects to the database to export the data. The default number of mappers is 4, so I recommend tuning the number of mappers depending on the availability of processors on the cluster. Too many mappers might cause the load to increase on the database. We recommend that you monitor the number of connections and keep track of processlist on MySQL.

  • Staging table—The Sqoop export job is broken down into multiple transactions based on the mappers. Each transaction is therefore atomic and does not have any dependencies on other transactions. I recommend using the “–staging-table” option that acts as the buffer table for the separate transactions. After all transactions have been committed, a single transaction move is made to move the data to the final destination. Use the “–clear-staging-table” option to clean up the staging table after the export job.

There’s a lot in here which is Amazon-specific and there are a couple of things you’d have to change to deploy to SQL Server, but there’s a lot of useful information here.  I like that Sai shows how to use the Hadoop credential API instead of doing something silly like saving your password in plaintext.

Comments closed

Hive With LLAP

Carter Shanklin looks at Hive 2’s performance improvements:

LLAP KEY BENEFITS

  • LLAP enables as fast as sub-second query in Hive by keeping all data and servers running and in-memory all the time, while retaining the ability to scale elastically within a YARN cluster.

  • LLAP, along with Apache Ranger enables fine-grained security for the Hadoop ecosystem, including data masking and filtering, by providing interfaces for external clients like Spark to read.

  • LLAP is great for cloud because it caches data in memory and keeps it compressed, overcoming long cloud storage access times and stretching the amount of data you can fit in RAM.

This sounds very much like a response to Spark.

Comments closed

Hard Problems In Stream Processing

Kartik Paramasivam discusses tough issues within the Lambda architecture:

During a data center failover like the exampleabove, we could have a “late arrival,” i.e. the stream processor might see the AdClickEvent possibly a few minutes after the AdViewEvent. A poorly written stream processor might deduce that the ad was a low-quality ad when instead the ad might have actually been good. Another anomaly is that the stream processor might see the AdClickEvent before it sees the corresponding AdViewEvent. To ensure that the output of the stream processor is correct there has to be logic to handle this “out of order message arrival.”

In the example above, the geo-distributed nature of the data centers makes it easy to explain the delays. However delays can exist even within the same data center due to GC issues, Kafka cluster upgrades, partition rebalances, and other naturally occurring distributed system phenomena.

This is a pretty long article and absolutely worth a read if you are looking at streaming data.

Comments closed

Polybase DMVs

I look at the DMVs associated with Polybase and external table creation:

Let’s walk through this one step at a time and understand what the DMV is telling us.  Unfortunately, the DMV documentation is a little sparse, so some of this is guesswork on my part.

  1. A RandomIDOperation appears to create a temporary table.  In this case, the table (whose name is randomly generated) is named TEMP_ID_53.  I’m not sure where that name comes from; the session I ran this from was 54, so it wasn’t a session ID.

  2. After the table gets created, each Compute node gets told to create a table called TMP_ID_53 in tempdb whose structure matches our external table’s structure.  One thing you can’t see from the screenshot is that this table is created with DATA_COMPRESSION = PAGE.  I have to wonder if that’d be the same if my Compute node were on Standard edition.

  3. We add an extended property on the table, flagging it as IS_EXTERNAL_STREAMING_TABLE.

  4. We then update the statistics on that temp table based on expected values.  629 rows are expected here.

  5. Then, we create the dest stat, meaning that the temp table now has exactly the same statistics as our external table.

  6. The next step is that the Head node begins a MultiStreamOperation, which tells the Compute nodes to begin working.  This operator does not show up in the documentation, but we can see that the elapsed time is 58.8 seconds, which is just about as long as my query took.  My guess is that this is where the Head node passes code to the Compute nodes and tells them what to do.

  7. We have a HadoopRoundRobinOperation on DMS, which stands for “Data Movement Step” according to the location_type documentation.  What’s interesting is that according to the DMV, that operation is still going.  Even after I checked it 40 minutes later, it still claimed to be running.  If you check the full query, it’s basically a SELECT * from our external table.

  8. Next is a StreamingReturnOperation, which includes our predicate WHERE dest = ‘ORD’ in it.  This is a Data Movement Step and includes all of the Compute nodes (all one of them, that is) sending data back to the Head node so that I can see the results.

  9. Finally, we drop TEMP_ID_53 because we’re done with the table.

This post was about 70% legwork and 30% guesswork.  That’s a bit higher a percentage than I’d ideally like, but there isn’t that much information readily available yet, so I’m trying (in my own small way) to fix that.

Comments closed

Analyzing Real-Time Data

Manjeet Chayel connects Spark Streaming to Amazon Kinesis and shows how to analyze the data in real time:

To use this post to play around with streaming data, you need an AWS account and AWS CLI configured on your machine. The entire pattern can be implemented in few simple steps:

  1. Create an Amazon Kinesis stream.

  2. Spin up an EMR cluster with Hadoop, Spark, and Zeppelin applications from advanced options.

  3. Use a Simple Java producer to push random IoT events data into the Amazon Kinesis stream.

  4. Connect to the Zeppelin notebook.

  5. Import the Zeppelin notebook from GitHub.

  6. Analyze and visualize the streaming data.

This is a good way of getting started with streaming data.  I’ve grown quite fond of notebooks in the short time that I’ve used them, as they make it very easy for people who know what they’re doing to provide code and information to people who want to know what they’re doing.

Comments closed

Where Polybase Stats Live

I dig into where the statistics against a Polybase table actually live:

Today, we learned that Polybase statistics are stored in the same way as other statistics; as far as SQL Server is concerned, they’re just more statistics built from a table (remembering that the way stats get created involves loading data into a temp table and building stats off of that temp table).  We can do most of what you’d expect with these stats, but beware calling sys.dm_db_stats_properties() on Polybase stats, as they may not show up.

Also, remember that you cannot maintain, auto-create, auto-update, or otherwise modify these stats.  The only way to modify Polybase stats is to drop and re-create them, and if you’re dealing with a large enough table, you might want to take a sample.

The result isn’t very surprising in retrospect, and it’s good that “stats are stats are stats” is the correct answer.

Comments closed

Spark Metrics

Swaroop Ramachandra looks at some key metrics for Spark administration:

Once you have identified and broken down the Spark and associated infrastructure and application components you want to monitor, you need to understand the metrics that you should really care about that affects the performance of your application as well as your infrastructure. Let’s dig deeper into some of the things you should care about monitoring.

  1. In Spark, it is well known that Memory related issues are typical if you haven’t paid attention to the memory usage when building your application. Make sure you track garbage collection and memory across the cluster on each component, specifically, the executors and the driver. Garbage collection stalls or abnormality in patterns can increase back pressure.

There are a few metrics of note here.  Check it out.

Comments closed

Detecting Web Traffic Anomalies

Jan Kunigk combines a few Apache products to perform near-real-time analysis of web traffic data:

meinestadt.de web servers generate up to 20 million user sessions per day, which can easily result in up to several thousand HTTP GET requests per second during peak times (and expected to scale to much higher volumes in the future). Although there is a permanent fraction of bad requests, at times the number of bad requests jumps.

The meinestadt.de approach is to use a Spark Streaming application to feed an Impala table every n minutes with the current counts of HTTP status codes within the n minutes window. Analysts and engineers query the table via standard BI tools to detect bad requests.

What follows is a fairly detailed architectural walkthrough as well as configuration and implementation work.  It’s a fairly long read, but if you’re interested in delving into Hadoop, it’s a good place to start.

Comments closed

Polybase Setup Errors

Murshed Zaman on the Azure CAT team covers a number of Polybase configuration errors:

SSMS Error:

Any Select query fails with the following error.
Msg 106000, Level 16, State 1, Line 1
Java heap space

Possible Reason:

Illegal input may cause the java out of memory error.  In this particular case the file was not in UTF8 format. DMS tries to read the whole file as one row since it cannot decode the row delimiter and runs into Java heap space error.

Possible Solution:

Convert the file to UTF8 format since PolyBase currently requires UTF8 format for text delimited files.

I imagine that this page will get quite a few hits over the years, as there currently exists limited information on how to solve these issues if you run into them, and some of the error messages (especially the one quoted above) have nothing to do with root causes.

Comments closed