But now we run into a problem: there are certain ports which need to be open for Polybase to work. This includes port 50010 on each of the data nodes against which we want to run MapReduce jobs. This goes back to the issue we see with spinning up data nodes in Docker: ports are not available. If you’ve put your HDInsight cluster into an Azure VNet and monkey around with ports, you might be able to open all of the ports necessary to get this working, but that’s a lot more than I’d want to mess with, as somebody who hasn’t taken the time to learn much about cloud networking.
As I mention in the post, I’d much rather build my own Hadoop cluster; I don’t think you save much maintenance time in the long run going with HDInsight.
This query succeeds but returns results we don’t really want:
This brings back all 9 records tied to products 1 and 2 (because product 3 didn’t exist on July 2nd at 8 AM UTC). But it gives us the same start and end date, so that’s not right. What I really want to do is replace
DatePredictionMade, so let’s try that:
This returns a syntax error. It would appear that at the time
FOR SYSTEM_TIMEis resolved,
QuantitySoldPredictiondoes not yet exist. This stops us dead in our tracks.
This is one of the two things I’d really like to change about temporal tables; the other thing (now that auto-retention is slated for release) is the ability to backfill data without turning off system versioning.
Using a view, we were able to create a “partitioned” Polybase experience, similar to what we had in SQL Server 2000. This form of poor man’s partitioning allows us to segment out data sets and query them independently, something which can be helpful when storing very large amounts of data off-site and only occasionally needing to query it. The thing to remember, though, is that if you store this in Azure Blob Storage, you will need to pull down the entire table’s worth of data to do any processing.
This leads to a concept I first heard from Ginger Grant: pseudo-StretchDB. Instead of paying for what Stretch offers, you get an important subset of the functionality at a much, much lower price. If you do store the data in Azure Blob Storage, you’re paying pennies per gigabyte per month. For cold storage, like a scenario in which you need to keep data around to keep the auditors happy but your main application doesn’t use that information, it can work fine. But if you need to query this data frequently, performance might be a killer.
For Polybase tables without the ability to perform external pushdown, coming up with a good partitioning strategy is probably one of the two best ways to improve performance, with creating a Polybase scale-out cluster the other method.
Even for a simple query, I’m not going to expect you to read 174 lines of XML; I’m not a sadist, after all…
What follows is a look at significant lines and my commentary.
Don’t listen to me there; that guy really is a sadist who wants you to read 174 lines of XML.
As a reminder, in order to allow predicate pushdown to occur, we need to hit a Hadoop cluster; we can’t use predicate pushdown on other systems like Azure Blob Storage. Second, we need to have a resource manager link set up in our external data source. Third, we need to make sure that everything is configured correctly on the Polybase side. But once you have those items in place, it’s possible to use the FORCE EXTERNALPUSHDOWN command like so:
There’s also discussion of preventing MapReduce job creation as well as a pushdown-related error I had received in the past.
Notice how 3bd shows up for pretty much all of these services. This is not what you’d want to do in a real production environment, but because we want to use Docker and easily pass ports through, it’s the simplest way for me to set this up. If you knew beforehand which node would host which service, you could modify the run.sh batch script that we discussed earlier and open those specific ports.
After assigning masters, we next have to define which nodes are clients in which clusters.
Click through for a screenshot-laden walkthrough.
This is a very interesting set of results. First, 7Zip archived files do not work with the default encoding. I’m not particularly surprised by this result, as 7Zip support is relatively scarce across the board and it’s a niche file format (though a very efficient format).
The next failure case is tar. Tar is a weird case because it missed the first row in the file but was able to collect the remaining 776 records. Same goes for .tar.gz. I unpackaged the .tar file and the constituent SecondBasemen.csv file did in fact have all 777 records, so it’s something weird about the codec.
Stick to BZip2 and GZip if you’re using flat files.
There are a couple of things I want to point out here. First, the Type is HADOOP, one of the three types currently available: HADOOP (for Hadoop, Azure SQL Data Warehouse, and Azure Blob Storage), SHARD_MAP_MANAGER (for sharded Azure SQL Database Elastic Database queries), and RDBMS (for cross-database Elastic Database queries on Azure SQL Database).
Second, the Location is my name node on port 8020. If you’re curious about how we figure that one out, go to Ambari (which, for me, is http://sandbox.hortonworks.com:8080) and go to HDFS and then Configs. In the Advanced tab, you can see the name node:
There are different options available for different sources, but this post is focused on Hadoop.
We’re going a bunch of setup work here, so let’s take it from the top. First, I declare a consumer group, which I’m calling “Airplane Enricher.” Kafka uses the concept of consumer groups to allow consumers to work in parallel. Imagine that we have ten separate servers available to process messages from the Flights topic. Each flight message is independent, so it doesn’t matter which consumer gets it. What does matter, though, is that multiple consumers don’t get the same message, as that’s a waste of resources and could lead to duplicate data processing, which would be bad.
The way Kafka works around this is to use consumer groups: within a consumer group, only one consumer will get a particular message. That way, I can have my ten servers processing messages “for real” and maybe have another consumer in a different consumer group just reading through the messages getting a count of how many records are in the topic. Once you treat topics as logs rather than queues, consumer design changes significantly.
This is a fairly lengthy read, but directly business-applicable, so I think it’s well worth it.
There are three important things here: first, our Zookeeper port is 2181. Zookeeper is great for centralized configuration and coordination; if you want to learn more, check out this Sean Mackrory post.
The second bit of important information is how long our retention period is. Right now, it’s set to 7 days, and that’s our default. Remember that messages in a Kafka topic don’t go away simply because some consumer somewhere accessed them; they stay in the log until we say they can go.
Finally, we have a set of listeners. For the sandbox, the only listener is on port 6667. We connect to listeners from our outside applications, so knowing those addresses and ports is vital.
This is still quick-start level stuff, but I’m building up to custom development, honest!