During a data center failover like the exampleabove, we could have a “late arrival,” i.e. the stream processor might see the AdClickEvent possibly a few minutes after the AdViewEvent. A poorly written stream processor might deduce that the ad was a low-quality ad when instead the ad might have actually been good. Another anomaly is that the stream processor might see the AdClickEvent before it sees the corresponding AdViewEvent. To ensure that the output of the stream processor is correct there has to be logic to handle this “out of order message arrival.”
In the example above, the geo-distributed nature of the data centers makes it easy to explain the delays. However delays can exist even within the same data center due to GC issues, Kafka cluster upgrades, partition rebalances, and other naturally occurring distributed system phenomena.
This is a pretty long article and absolutely worth a read if you are looking at streaming data.
Let’s walk through this one step at a time and understand what the DMV is telling us. Unfortunately, the DMV documentation is a little sparse, so some of this is guesswork on my part.
A RandomIDOperation appears to create a temporary table. In this case, the table (whose name is randomly generated) is named TEMP_ID_53. I’m not sure where that name comes from; the session I ran this from was 54, so it wasn’t a session ID.
After the table gets created, each Compute node gets told to create a table called TMP_ID_53 in tempdb whose structure matches our external table’s structure. One thing you can’t see from the screenshot is that this table is created with DATA_COMPRESSION = PAGE. I have to wonder if that’d be the same if my Compute node were on Standard edition.
We add an extended property on the table, flagging it as IS_EXTERNAL_STREAMING_TABLE.
We then update the statistics on that temp table based on expected values. 629 rows are expected here.
Then, we create the dest stat, meaning that the temp table now has exactly the same statistics as our external table.
The next step is that the Head node begins a MultiStreamOperation, which tells the Compute nodes to begin working. This operator does not show up in the documentation, but we can see that the elapsed time is 58.8 seconds, which is just about as long as my query took. My guess is that this is where the Head node passes code to the Compute nodes and tells them what to do.
We have a HadoopRoundRobinOperation on DMS, which stands for “Data Movement Step” according to the location_type documentation. What’s interesting is that according to the DMV, that operation is still going. Even after I checked it 40 minutes later, it still claimed to be running. If you check the full query, it’s basically a SELECT * from our external table.
Next is a StreamingReturnOperation, which includes our predicate WHERE dest = ‘ORD’ in it. This is a Data Movement Step and includes all of the Compute nodes (all one of them, that is) sending data back to the Head node so that I can see the results.
Finally, we drop TEMP_ID_53 because we’re done with the table.
This post was about 70% legwork and 30% guesswork. That’s a bit higher a percentage than I’d ideally like, but there isn’t that much information readily available yet, so I’m trying (in my own small way) to fix that.
To use this post to play around with streaming data, you need an AWS account and AWS CLI configured on your machine. The entire pattern can be implemented in few simple steps:
Create an Amazon Kinesis stream.
Spin up an EMR cluster with Hadoop, Spark, and Zeppelin applications from advanced options.
Use a Simple Java producer to push random IoT events data into the Amazon Kinesis stream.
Connect to the Zeppelin notebook.
Import the Zeppelin notebook from GitHub.
Analyze and visualize the streaming data.
This is a good way of getting started with streaming data. I’ve grown quite fond of notebooks in the short time that I’ve used them, as they make it very easy for people who know what they’re doing to provide code and information to people who want to know what they’re doing.
Today, we learned that Polybase statistics are stored in the same way as other statistics; as far as SQL Server is concerned, they’re just more statistics built from a table (remembering that the way stats get created involves loading data into a temp table and building stats off of that temp table). We can do most of what you’d expect with these stats, but beware calling sys.dm_db_stats_properties() on Polybase stats, as they may not show up.
Also, remember that you cannot maintain, auto-create, auto-update, or otherwise modify these stats. The only way to modify Polybase stats is to drop and re-create them, and if you’re dealing with a large enough table, you might want to take a sample.
The result isn’t very surprising in retrospect, and it’s good that “stats are stats are stats” is the correct answer.
Once you have identified and broken down the Spark and associated infrastructure and application components you want to monitor, you need to understand the metrics that you should really care about that affects the performance of your application as well as your infrastructure. Let’s dig deeper into some of the things you should care about monitoring.
In Spark, it is well known that Memory related issues are typical if you haven’t paid attention to the memory usage when building your application. Make sure you track garbage collection and memory across the cluster on each component, specifically, the executors and the driver. Garbage collection stalls or abnormality in patterns can increase back pressure.
There are a few metrics of note here. Check it out.
meinestadt.de web servers generate up to 20 million user sessions per day, which can easily result in up to several thousand HTTP GET requests per second during peak times (and expected to scale to much higher volumes in the future). Although there is a permanent fraction of bad requests, at times the number of bad requests jumps.
The meinestadt.de approach is to use a Spark Streaming application to feed an Impala table every n minutes with the current counts of HTTP status codes within the n minutes window. Analysts and engineers query the table via standard BI tools to detect bad requests.
What follows is a fairly detailed architectural walkthrough as well as configuration and implementation work. It’s a fairly long read, but if you’re interested in delving into Hadoop, it’s a good place to start.
Any Select query fails with the following error.
Msg 106000, Level 16, State 1, Line 1
Java heap space
Illegal input may cause the java out of memory error. In this particular case the file was not in UTF8 format. DMS tries to read the whole file as one row since it cannot decode the row delimiter and runs into Java heap space error.
Convert the file to UTF8 format since PolyBase currently requires UTF8 format for text delimited files.
I imagine that this page will get quite a few hits over the years, as there currently exists limited information on how to solve these issues if you run into them, and some of the error messages (especially the one quoted above) have nothing to do with root causes.
Polybase offers the ability to create statistics on tables, the same way that you would on normal tables. There are a few rules about statistics:
Stats are not auto-created. You need to create all statistics manually.
Stats are not auto-updated. You will need to update all statistics manually, and currently, the only way you can do that is to drop and re-create the stats.
When you create statistics, SQL Server pulls the data into a temp table, so if you have a billion-row table, you’d better have the tempdb space to pull that off. To mitigate this, you can run stats on a sample of the data.
Round one did not end on a high note, so we’ll see what round two has to offer.
We’ll look at both zTot and nTot, and consider the player’s age and experience.The latter is potentially important because there have been shifts in what ages players joined the league over the timespan we are considering. It used to be rare for players to skip college, then it wasn’t, now they are required to play at least one year. It will be interesting to see if we see a difference in age versus experience in the numbers.
We start with the RDD containing all the raw stats, z-scores, and normalized z-scores. Another piece of data to consider is how a player’s z-score and normalized z-score change each year, so we’ll calculate the change in both from year to year. We’ll save off two sets of data, one a key-value pair of age-values, and one a key-value pair of experience-values. (Note that in this analysis, we disregard all players who played in 1980, as we don’t have sufficient data to determine their experience level.)
Jordan also looks at player performance over time and makes data analysis look pretty easy.
An interesting thing about FIELD_TERMINATOR is that it can be multi-character. MSDN uses ~|~ as a potential delimiter. The reason you’d look at a multi-character delimiter is that not all file formats handle quoted identifiers—for example, putting quotation marks around strings that have commas in them to indicate that commas inside quotation marks are punctuation marks rather than field separators—very well. For example, the default Hive SerDe (Serializer and Deserializer) does not handle quoted identifiers; you can easily grab a different SerDe which does offer quoted identifiers and use it instead, or you can make your delimiter something which is guaranteed not to show up in the file.
You can also set some defaults such as date format, string format, and data compression codec you’re using, but we don’t need those here. Read the MSDN doc above if you’re interested in digging into that a bit further.
It’s a bit of a read, but the end result is that we can retrieve data from a Hadoop cluster as though it were coming from a standard SQL Server table. This is easily my favorite feature in SQL Server 2016.