Press "Enter" to skip to content

Category: Hadoop

Streaming Foreign Key Joins in Kafka Streams

John Roesler and Adam Bellemare take us in depth on a feature:

Before 2.4.0, the absence of foreign-key joins in Kafka Streams was palpable. As soon as you have a KTable abstraction, you start to think of relational-DB-esque things that you’d like to do with it, and joining two tables is near the top of the list. In addition, Kafka users often started out by implementing change data capture (CDC) of their main database tables, resulting in the production of normalized record streams reflecting the database model. These records often contain foreign-key references, requiring you to either denormalize entirely within your source database (which can be quite expensive), or handle them downstream in your consumer. The ability to compute denormalization on the fly is exactly in the sweet spot of use cases for Kafka Streams.

In versions prior to 2.4, there were workarounds available to compute a foreign-key join, using the ability to transform the table, filter it, aggregate on properties, and join on primary keys. But these workarounds were complex, prone to bugs, and not very efficient. A concrete plan to implement first-class support for this crucial operation was first put together when Jan Filipiak proposed KIP-213 in 2017. Adam Bellemare took over driving the proposal in 2018 and brought it to a conclusion in time for the 2.4.0 release.

Click through for examples of how it all works, as well as how you might optimize foreign key joins.

Comments closed

Consistency and Completeness in Kafka Streams

Guozhang Wang announces a whitepaper:

Recently, however, some streaming engines, such as Apache Kafka® and its ecosystem component Kafka Streams, have been able to claim strong correctness guarantees, with the primary dual metrics being consistency, a guarantee that a stream processing application can recover from failures to a consistent state such that final results will not contain duplicates or lose any data, and completeness, a guarantee that a stream processing application does not generate incomplete partial outputs as final results even when input stream records may arrive out of order.

Click through for more details and a link to the paper itself. It’s good to understand as much as you can about the distributed system you use, especially because many times, the claims for consistency should come with large asterisks.

Comments closed

Spark SQL and Merge Errors from Multiple Source Rows Matched

Manoj Pandey explains an error message in Spark SQL:

UnsupportedOperationException: Cannot perform Merge as multiple source rows matched and attempted to modify the same target row in the Delta table in possibly conflicting ways. By SQL semantics of Merge, when multiple source rows match on the same target row, the result may be ambiguous as it is unclear which source row should be used to update or delete the matching target row. You can preprocess the source table to eliminate the possibility of multiple matches. Please refer to https://docs.microsoft.com/azure/databricks/delta/delta-update#upsert-into-a-table-using-merge

The above error says that while doing MERGE operation on the Target table there shouldn’t be any duplicates in the Source table. This check is applied implicitly by the SQL engine to avoid unnecessary updates and avoid inconsistent data.

Read on for a reproduction and what you can do to resolve the issue.

Comments closed

Using Spark in CDP’s Operational Database Experience

Gokul Kamaraj, et al, take us through using Apache Spark in Cloudera Data Platform’s Operational Database Experience:

Apache Spark is a very popular analytics engine used for large-scale data processing. It is widely used for many big data applications and use cases. CDP Operational Database Experience Experience (COD) is a CDP Public Cloud service that lets you create and manage operational database instances and it is powered by Apache HBase and Apache Phoenix. 

To know more about Apache Spark in CDP and CDP Operational Database Experience, see Apache Spark Overview and CDP Operational Database Experience Overview.

Apache Spark enables you to connect directly to databases that support JDBC. When integrating Apache Spark with Apache Phoenix in COD, you can leverage capabilities provided by Apache Phoenix to save and query data across multiple worker nodes, and use SELECT columns and pushdown predicates for filtering. 

In this blog post, let us look at how you can read and write data to COD from Apache Spark. We are going to use an Operational Database COD instance and Apache Spark present in the Cloudera Data Engineering experience

Read on for the process.

Comments closed

Change Data Capture in Delta Lake

Surya Sai Turaga and John O’Dwyer take us through change data capture in Delta Lake:

Change data capture (CDC) is a use case that we see many customers implement in Databricks – you can check out our previous deep dive on the topic here. Typically we see CDC used in an ingestion to analytics architecture called the medallion architecture. The medallion architecture that takes raw data landed from source systems and refines the data through bronze, silver and gold tables. CDC and the medallion architecture provide multiple benefits to users since only changed or added data needs to be processed. In addition, the different tables in the architecture allow different personas, such as Data Scientists and BI Analysts, to use the correct up-to-date data for their needs. We are happy to announce the exciting new Change Data Feed (CDF) feature in Delta Lake that makes this architecture simpler to implement and the MERGE operation and log versioning of Delta Lake possible!

Read on to gain an understanding of how it works.

Comments closed

Announcements from Data+AI Summit

Ryan Boyd summarizes Databricks announcements:

The Delta Lake open source project is a key enabler of the lakehouse, as it fixes many of the limitations of data lakes: data quality, performance and governance. The project has come a long way since its initial release, and the Delta Lake 1.0 release was just certified by the community. The release represents a variety of new features, including generated columns and cloud independence with multi-cluster writes and my favorite — Delta Lake standalone, which reads from Delta tables but doesn’t require Apache SparkTM.

We also announced a bunch of new committers to the Delta Lake project, including QP Hou, R.Tyler Croy, Christian Williams, Mykhailo Osypov and Florian Valeye.

Learn more about Delta Lake 1.0 in the keynotes from co-creator and Distinguished Engineer Michael Armbrust.

Read on for a variety of announcements in this vein.

Comments closed

Ranger and Jersey Clients

Jon Morisi troubleshoots an irksome issue:

Just a quick blog here about an issue I had with HDP-3.1.4.0.  I recently was setting up a new user with specific rights in Ranger for Hive access.  After creating the new policy and attempting to validate it, I received an error message stating that the hive user does not have use privilege.  This error was produced even though I had just created the policy specifically granting those privilege’s.

Upon further review I noticed that the plugin was downloading the policy, but not applying it.  

Read on to learn what the problem was and how Jon corrected it.

Comments closed

Scaling HDFS to an Exabyte

Konstantin Shvachko, et al, explain some of the changes to the Hadoop Distributed File System needed to scale to one exabyte of data:

LinkedIn runs its big data analytics on Hadoop. During the last five years, the analytics infrastructure has experienced tremendous growth, almost doubling every year in data size, compute workloads, and in all other dimensions. It recently reached two important milestones.

1. LinkedIn now stores 1 exabyte of total data across all Hadoop clusters.

2. Our largest 10,000-node cluster stores 500 PB of data. It maintains 1 billion objects (directories, files, and blocks) on a single NameNode serving RPCs with an average latency under 10 milliseconds, making it one of the largest (if not the largest) Hadoop cluster in the industry.

From the early days of LinkedIn, Apache Hadoop was the basis of our analytics infrastructure. Many teams assisted in this effort to make Hadoop our canonical big data platform.

Read on for different techniques they’ve used, as well as code changes implemented in HDFS to support this data size.

Comments closed

NameNode and Secondary NameNode in Hadoop

The Hadoop in Real World team hit on a naming scheme that I think is bad:

NameNode is the heart of HDFS. NameNode maintains the metadata of HDFS – files, list of blocks, directories, permissions etc. The metadata is persisted on a file named FSIMAGE. During the start up of NameNode, the FSIMAGE file will be read and loaded into memory. 

Any ongoing changes to the files, directories in FSIMAGE will be written to memory and to a temporary log file. NameNode does not save the ongoing changes to FSIMAGE directly and this is because FSIMAGE file could be big for a big HDFS and updating a big file at runtime will be quite expensive and slow.

Read on to learn what the secondary NameNode does. As a hint, it’s not a secondary NameNode in the sense of high availability. If you’re a new Hadoop administrator, the name can be deceiving, letting you think you have high availability when you really don’t.

Comments closed