When compiling a SQL query that references an external table stored in an HDFS file, the PDW Engine Service contacts the Hadoop Namenode for information about the file. This information, combined with the number of DMS instances in the PDW cluster, is used to calculate the portion (offset and length) of the input file(s) each DMS instance should read from HDFS. This information is passed to DMS in the HDFS Shuffle step of the DSQL (distributed SQL) plan along with other information needed to read the file, including the file’s path, the location of the appropriate Namenode, and the name of the RecordReader that the bridge should use.
The system attempts to evenly balance the number of bytes read by each DMS instance. Once the DMS instances obtain split information from the Namenode, each can independently read the portion of the file it is assigned, directly communicating with the appropriate Datanodes without any centralized control.
This is a very clear paper which helps describe the core constructs of Polybase. Highly recommended.
One additional question I have involves whether the process for loading data is round-robin on a row-by-row basis. My conjecture is that it is not (particularly given that our first example had 4 files with zero records in them!), but I figured I’d create a new table and test. In this case, I’m using three fixed-width data types and loading 10 million identical records. I chose to use identical record values to make sure that the text length of the columns in this line were exactly the same; the reason is that we’re taking data out of SQL Server (where an int is stored in a 4-byte block) and converting that int to a string (where each numeric value in the int is stored as a one-byte character). I chose 10 million because I now that’s well above the cutoff point for data to go into each of the eight files, so if there’s special logic to handle tiny row counts, I’d get past it.
Read on for the exciting(?) conclusion.
In this case, all of those packets were 1514 bytes, so it’s an easy multiplication problem to see that we downloaded approximately 113 MB. The 2008.csv.bz2 file itself is 108 MB, so factoring in TCP packet overhead and that there were additional, smaller packets in the stream, I think that’s enough to show that we did in fact download the entire file. Just like in the Hadoop scenario without MapReduce, the Polybase engine needs to take all of the data and load it into a temp table (or set of temp tables if you’re using a Polybase scale-out cluster) before it can pull out the relevant rows based on our query.
The upshot is that Polybase behaves very similarly on Azure Blob Storage as it does with on-prem Hadoop for non-MapReduce queries.
To this point, I have focused my Polybase series on interactions with on-premises Hadoop, as it’s the use case most apropos to me. I want to start expanding that out to include other interaction mechanisms, and I’m going to start with one of the easiest: Azure Blob Storage.
Ayman El-Ghazali has a great blog post the topic, which he turned into a full-length talk. As such, this post will fill in the gaps rather than start from scratch. In today’s post, my intention is to retrieve data from Azure Blob Storage and get an idea of what’s happening. From there, we’ll spend a couple more posts on Azure Blob Storage, looking a bit deeper into the process. That said, my expectation going into this series is that much of what we do with Azure Blob Storage will mimic what we did with Hadoop, as there are no Polybase core concepts unique to Azure Blob Storage, at least any of which I am aware.
Spoilers: I’m still not aware of any core concepts unique to Azure Blob Storage.
What’s interesting is the error message itself is correct, but could be confusing. Note that it’s looking for a path with this name, but it isn’t seeing a path; it’s seeing a file with that name. Therefore, it throws an error.
This proves that you cannot control insertion into a single file by specifying the file at create time. If you do want to keep the files nicely packed (which is a good thing for Hadoop!), you could run a job on the Hadoop cluster to concatenate all of the results of the various files into one big file and delete the other files. You might do this as part of a staging process, where Polybase inserts into a staging table and then something kicks off an append process to put the data into the real tables.
Sometime in the future, I plan to see how it scales: with multiple files writing to a multi-node Hadoop cluster, do I get better write performance with a Polybase scaleout cluster? And if so, how close to linear scale can I get?
Even for a simple query, I’m not going to expect you to read 174 lines of XML; I’m not a sadist, after all…
What follows is a look at significant lines and my commentary.
Don’t listen to me there; that guy really is a sadist who wants you to read 174 lines of XML.
Once we did that and I restarted all of the services, I ended up getting an interesting error message from SQL Server:
Msg 7320, Level 16, State 110, Line 2
Cannot execute the query “Remote Query” against OLE DB provider “SQLNCLI11” for linked server “(null)”. EXTERNAL TABLE access failed due to internal error: ‘Java exception raised on call to JobSubmitter_SubmitJob: Error [org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException: Invalid resource request, requested memory < 0, or requested memory > max configured, requestedMemory=1536, maxMemory=512
The error message is pretty clear: the Polybase service wants to create containers that are 1536 MB in size, but the maximum size I’m allowing is 512 MB. Therefore, the Polybase MapReduce operation fails.
Long story short, I needed enough RAM to be able to give 4 1/2 GB to YARN for creating MapReduce containers in order to run my query.
As a reminder, in order to allow predicate pushdown to occur, we need to hit a Hadoop cluster; we can’t use predicate pushdown on other systems like Azure Blob Storage. Second, we need to have a resource manager link set up in our external data source. Third, we need to make sure that everything is configured correctly on the Polybase side. But once you have those items in place, it’s possible to use the FORCE EXTERNALPUSHDOWN command like so:
There’s also discussion of preventing MapReduce job creation as well as a pushdown-related error I had received in the past.
Stream 2 sends along 27 MB worth of data. It’s packaging everything Polybase needs to perform operations, including JAR files and any internal conversion work that the Polybase engine might need to translate Hadoop results into SQL Server results. For example, the sqlsort at the bottom is a DLL that performs ordering using SQL Server-style collations, as per the Polybase academic paper.
Stream 2 is responsible for almost every packet from 43 through 19811; only 479 packets in that range belonged to some other stream (19811 – 43 – 18864 – 425 = 479). We send all of this data to the data node via port 50010.
If you love looking at Wireshark streams, you’ll love this post.
The dm_exec_external_work DMV tells us which execution we care about; in this case, I ended up running the same query twice, but I decided to look at the first run of it. Then, I can get step information from dm_exec_distributed_request_steps. This shows that we created a table in tempdb called TEMP_ID_14 and streamed results into it. The engine also created some statistics (though I’m not quite sure where it got the 24 rows from), and then we perform a round-robin query. Each Polybase compute node queries its temp table and streams the data back to the head node. Even though our current setup only has one compute node, the operation is the same as if we had a dozen Polybase compute nodes.
Click through for Wireshark-related fun.