Press "Enter" to skip to content

Category: Hadoop

Sort-Based Blocking Shuffle in Flink

Yingjie Cao and Daisy Tsang have a multi-part series on sort-based blocking shuffles in Apache Flink. Part 1 acts as an overview:

The hash-based blocking shuffle has been supported in Flink for a long time. However, compared to the sort-based approach, it can have several weaknesses:

1. Stability: For batch jobs with high parallelism (tens of thousands of subtasks), the hash-based approach opens many files concurrently while writing or reading data, which can give high pressure to the file system (i.e. maintenance of too many file metas, exhaustion of inodes or file descriptors). We have encountered many stability issues when running large-scale batch jobs via the hash-based blocking shuffle.

2. Performance: For large-scale batch jobs, the hash-based approach can produce too many small files: for each data shuffle (or connection), the number of output files is (producer parallelism) * (consumer parallelism) and the average size of each file is (shuffle data size) / (number of files). The random IO caused by writing/reading these fragmented files can influence the shuffle performance a lot, especially on spinning disks. See the benchmark results section for more information.

By introducing the sort-based blocking shuffle implementation, fewer data files will be created and opened, and more sequential reads are done. As a result, better stability and performance can be achieved.

Part 2 provides some design considerations:

As discussed above, the hash-based blocking shuffle would produce too many small files for large-scale batch jobs. Producing fewer files can help to improve both stability and performance. The sort-merge approach has been widely adopted to solve this problem. By first writing to the in-memory buffer and then sorting and spilling the data into a file after the in-memory buffer is full, the number of output files can be reduced, which becomes (total data size) / (in-memory buffer size). Then by merging the produced files together, the number of files can be further reduced and larger data blocks can provide better sequential reads.

Flink’s sort-based blocking shuffle adopts a similar logic. A core difference is that data spilling will always append data to the same file so only one file will be spilled for each output, which means fewer files are produced.

Check it out for a behind-the-scenes look at

Comments closed

GPU-Accelerated Analysis on Databricks using PyTorch + Huggingface

Srijith Rajamohan walks us through an example of sentiment analysis using the PyTorch and Huggingface libraries on Databricks:

Sentiment analysis is commonly used to analyze the sentiment present within a body of text, which could range from a review, an email or a tweet. Deep learning-based techniques are one of the most popular ways to perform such an analysis. However, these techniques tend to be very computationally intensive and often require the use of GPUs, depending on the architecture and the embeddings used. Huggingface (https://huggingface.co) has put together a framework with the transformers package that makes accessing these embeddings seamless and reproducible. In this work, I illustrate how to perform scalable sentiment analysis by using the Huggingface package within PyTorch and leveraging the ML runtimes and infrastructure on Databricks.

Click through for a description of the process, as well as a link to a notebook you can walk through yourself.

Comments closed

A Primer on Kafka Streams

Bill Bejeck has an introduction to Kafka Streams:

Kafka Streams is an abstraction over Apache Kafka® producers and consumers that lets you forget about low-level details and focus on processing your Kafka data. You could of course write your own code to process your data using the vanilla Kafka clients, but the Kafka Streams equivalent will have far fewer lines, because it’s declarative rather than imperative. As a library, Kafka Streams lets you create a standalone application that can be run anywhere that can connect to a Kafka broker, whether that’s a laptop or a hefty cloud server. You just need to provide it with the host and port name of a broker. Combining Kafka Streams with Confluent Cloud grants you even more processing power with very little code investment.

Click through for a description as well as a whole series of embedded videos.

Comments closed

From Kafka to Azure Data Explorer

Niels Berglund uses Kafka Connect to link an Apache Kafka topic to Azure Data Explroer:

If you follow my blog, you probably know that I am a huge fan of Apache Kafka and event streaming/stream processing. Recently Azure Data Explorer (ADX) has caught my eye. In fact, in the last few weeks, I did two conference sessions about ADX. A month ago, I published a blog post related to Kafka and ADX: Run Self-Managed Kusto Kafka Connector Serverless in Azure Container Instances.

As the title of that post implies, it looked at the ADX Kafka sink connector and how to run it in Azure. What the post did not look at was how to configure the connector and connect it to ADX. That is what we will do in this post (and maybe in a couple of more posts).

This post serves as a complete tutorial, though Niels does promise future posts on other ingestion methods, so stay tuned.

Comments closed

Push-Based Shuffle in Apache Spark 3.2 via Project Magnet

Venkata Krishnan Sowrirajan and Min Shen announce that Project Magnet will be in Apache Spark 3.2:

Push-based shuffle is an implementation of shuffle where the shuffle blocks are pushed to the remote shuffle services from the mapper tasks in order to address shuffle scalability and reliability issues. In a nutshell, with push-based shuffle, a large number of small, random reads is converted into a small number of large, sequential reads, which significantly improves disk I/O efficiency and shuffle data locality.

This is explained in greater detail in an earlier blog post, Magnet: A scalable and performant shuffle architecture for Apache Spark, which you can read for more information about how we achieve push-based shuffle.

Read on to see when this matters and how you can make use of it once you’re in Spark 3.2 (whose first release was exactly two weeks ago, October 13th).

Comments closed

The Architecture of Apache Hive

The Hadoop in Real World team explains what the Apache Hive architecture looks like:

Metastore database

Metastore database is not part of HiveServer2 (and it is not shown in the picture). Every Hive installation needs to have an RDBMS like Derby (good for dev environments only), Oracle or MySQL.

Hive stores the metadata of the tables and database that is managed by Hive in the metastore database. Note that this database doesn’t hold the actual data. The data will reside in HDFS.

Click through for the full architecture.

Comments closed

SQL User-Defined Functions in Spark SQL

Serge Rielau and Allison Wang announce a new type of user-defined function in Spark SQL:

SQL UDFs are simple yet powerful extensions to Spark SQL. As functions, they provide a layer of abstraction to simplify query construction – making SQL queries more readable and modularized. Unlike UDFs that are written in a non-SQL language, SQL UDFs are more lightweight for SQL users to create. SQL function bodies are transparent to the query optimizer thus making them more performant than external UDFs. SQL UDFs can be created as either temporary or permanent functions, be reused across multiple queries, sessions and users, and be access-controlled via Access Control Language (ACL). In this blog, we will walk you through some key use cases of SQL UDFs with examples.

I look forward to dealing with cardinality issues and performance tuning these things in 5 years.

Comments closed

Session Windows in Spark Structured Streaming

Jungtaek Lim, et al, announce support for session windows in Spark Structured Streaming:

Tumbling windows are a series of fixed-sized, non-overlapping and contiguous time intervals. An input can only be bound to a single window.

Sliding windows are similar to the tumbling windows from the point of being “fixed-sized”, but windows can overlap if the duration of the slide is smaller than the duration of the window, and in this case, an input can be bound to the multiple windows.

Session windows have a different characteristic compared to the previous two types. Session window has a dynamic size of the window length, depending on the inputs. A session window starts with an input and expands itself if the following input has been received within the gap duration. A session window closes when there’s no input received within the gap duration after receiving the latest input. This enables you to group events until there are no new events for a specified time duration (inactivity).

Click through for more details. You could implement session windows when querying existing data using a gaps and islands approach (where you increment the island count when you have a lagged difference greater than the cutoff point), but for streaming scenarios, it’s very handy to have this as a native window type.

Comments closed