Hard Problems In Stream Processing

Kartik Paramasivam discusses tough issues within the Lambda architecture:

During a data center failover like the exampleabove, we could have a “late arrival,” i.e. the stream processor might see the AdClickEvent possibly a few minutes after the AdViewEvent. A poorly written stream processor might deduce that the ad was a low-quality ad when instead the ad might have actually been good. Another anomaly is that the stream processor might see the AdClickEvent before it sees the corresponding AdViewEvent. To ensure that the output of the stream processor is correct there has to be logic to handle this “out of order message arrival.”

In the example above, the geo-distributed nature of the data centers makes it easy to explain the delays. However delays can exist even within the same data center due to GC issues, Kafka cluster upgrades, partition rebalances, and other naturally occurring distributed system phenomena.

This is a pretty long article and absolutely worth a read if you are looking at streaming data.

Overlapping Ranges Using U-SQL

Michael Rys explains how to merge overlapping ranges of data using U-SQL:

If you look at the problem, you will at first notice that you want to define something like a user-defined aggregation to combine the overlapping time intervals. However, if you look at the input data, you will notice that since the data is not ordered, you will either have to maintain the state for all possible intervals and then merge disjoint intervals as bridging intervals appear, or you need to preorder the intervals for each user name to make the merging of the intervals easier.

The ordered aggregation is simpler to scale out, but U-SQL does not provide ordered user-defined aggregators (UDAGGs) yet. In addition, UDAGGs normally produce one row per group, while in this case, I may have multiple rows per group if the ranges are disjoint.

Luckily, U-SQL provides a scalable user-defined operator called a reducer which gives us the ability to aggregate a set of rows based on a grouping key set using custom code.

There are some good insights here, so read the whole thing.

Polybase DMVs

I look at the DMVs associated with Polybase and external table creation:

Let’s walk through this one step at a time and understand what the DMV is telling us.  Unfortunately, the DMV documentation is a little sparse, so some of this is guesswork on my part.

  1. A RandomIDOperation appears to create a temporary table.  In this case, the table (whose name is randomly generated) is named TEMP_ID_53.  I’m not sure where that name comes from; the session I ran this from was 54, so it wasn’t a session ID.

  2. After the table gets created, each Compute node gets told to create a table called TMP_ID_53 in tempdb whose structure matches our external table’s structure.  One thing you can’t see from the screenshot is that this table is created with DATA_COMPRESSION = PAGE.  I have to wonder if that’d be the same if my Compute node were on Standard edition.

  3. We add an extended property on the table, flagging it as IS_EXTERNAL_STREAMING_TABLE.

  4. We then update the statistics on that temp table based on expected values.  629 rows are expected here.

  5. Then, we create the dest stat, meaning that the temp table now has exactly the same statistics as our external table.

  6. The next step is that the Head node begins a MultiStreamOperation, which tells the Compute nodes to begin working.  This operator does not show up in the documentation, but we can see that the elapsed time is 58.8 seconds, which is just about as long as my query took.  My guess is that this is where the Head node passes code to the Compute nodes and tells them what to do.

  7. We have a HadoopRoundRobinOperation on DMS, which stands for “Data Movement Step” according to the location_type documentation.  What’s interesting is that according to the DMV, that operation is still going.  Even after I checked it 40 minutes later, it still claimed to be running.  If you check the full query, it’s basically a SELECT * from our external table.

  8. Next is a StreamingReturnOperation, which includes our predicate WHERE dest = ‘ORD’ in it.  This is a Data Movement Step and includes all of the Compute nodes (all one of them, that is) sending data back to the Head node so that I can see the results.

  9. Finally, we drop TEMP_ID_53 because we’re done with the table.

This post was about 70% legwork and 30% guesswork.  That’s a bit higher a percentage than I’d ideally like, but there isn’t that much information readily available yet, so I’m trying (in my own small way) to fix that.

You Should Use Biml

Meagan Longoria explains why you should use Biml if you’re building Integration Services packages:

Biml provides a way automate SSIS design patterns. This reduces the time required to complete a data integration project, and it helps employ consistent design patterns within and across projects. Re-generating multiple packages after making a change to a design pattern takes just a few minutes, so small changes to several similar packages are no longer a significant effort.

Automating SSIS design patterns allows teams to work more efficiently. Senior developers can stop solving the same problems over and over again. Instead, they can solve them once, automate the solution, and move on to new and interesting challenges. Junior developers still learn good development practices with Biml, but they require less supervision to create quality output in a shorter amount of time. SSIS developers that prefer typing code over the drag-and-drop interface of SQL Server Data Tools now get a better way to work in addition to the automation capabilities.

If there’s one piece of advice I can give ETL developers, it’s “learn Biml.”

Stored Procedure Last Run Times

Richie Lee has a script to see when stored procedures were last executed:

Quick script to get the last time a stored procedure was executed in the database. The reason for the seemingly over-engineered script is that different query plans can be generated, meaning that stored procedures can appear more than once in the list.

The query doesn’t quite work as-is, but making qs.execution_count into an aggregation and removing it from the GROUP BY would work.  I’d probably rewrite it to look a bit more like:

WITH querystats AS
(	SELECT	OBJECT_NAME(qt.objectid) AS ProcedureName,	SUM(qs.execution_count) OVER (PARTITION BY OBJECT_NAME(qt.objectid)) AS ExecutionCount,	qs.creation_time AS CreationTime,	ROW_NUMBER() OVER (PARTITION BY OBJECT_NAME(qt.objectid) ORDER BY creation_time DESC) AS rownum	FROM sys.dm_exec_query_stats AS qs	CROSS APPLY sys.dm_exec_sql_text(qs.[sql_handle]) AS qt	WHERE	qt.[dbid] = DB_ID()	AND OBJECT_NAME(qt.objectid) LIKE '%%'
SELECT	qs.ProcedureName,	qs.ExecutionCount,	qs.ExecutionCount
FROM querystats qs
WHERE	qs.rownum = 1;

KPI Indicators

Devin Knight’s Power BI Custom Visuals class continues:

Change the Banding type property to one of the following:

  • Increasing is better –  Increasing is best when you’re measuring things like sales or profit. If you go over your profit target that’s a good thing!

  • Decreasing is better – Decreasing is probably best when you’re looking at something like budgeting. Staying under budget is usually a good thing. Unless you being too far under budget means you won’t get that money again next year which leads to the last option

  • Closer is better – This is for when you need your data to land in the middle of a bell curve.  Meaning if you go too high or too low that’s a bad thing. This is often useful when looking at medical data.  For example, if your blood pressure is too high then that’s a bad thing, but if you’re blood pressure is too low that’s also a bad thing too. You need to land in the middle somewhere, which is what this option allows.

There’s plenty of good advice here, so check out the video.

Auto-Seeding Availability Groups

John Sterrett is the latest smart person to take a look at automatic seeding of Availability Groups:

The 600 GB databases took about 66 minutes to seed across the network from a primary replica to the secondary replica.  I noticed 1.4 Gbps of consistent throughput during the seeding process. This makes a lot of sense as it caps out around what the storage system can deliver in this environment.

The first thing I would look at for benchmarking throughput for network activity would be the bytes sent per second from the primary replica and bytes received per second on the secondary replicas.

John also includes an extended event session statement to track seeding.  Great read.

Finding Dates

Derik Hammer shows the right way and the wrong way of using date functions in a WHERE clause:

I then changed my thought process to find the age of a 65 year old who’s birth day is today. Then I compare the DateOfBirth column to that static value.


With the above query I bought myself an index seek and 345 logical reads. That works out to <3% of the cost.

This is true not just for date functions, but rather is applicable to almost all scalar functions.

Startup Stored Procedures

Dave Mason looks at using sp_procoption to execute stored procedures at startup:

If you have more than one task you want to run at startup, you could include code for each task in a single stored procedure. But that’s generally bad coding practice. Go with the modular approach and create a separate stored procedure for each distinct task. With sp_procoption, you can set more than one stored procedure for automatic execution. The MSDN documentation was not clear regarding the order of execution, though. I thought there might be something similar to sp_settriggerorder, but I wasn’t able to find anything like that. I ran a test on SQL 2014 with 3 stored procedures set for automatic execution.

Dave has interesting notes on procedure run order, where these procedures need to live, and even some ideas on what you might put into startup stored procedures.

CASE Statements In GROUP BY Clauses

Kevin Feasel



Grant Fritchey looks at CASE statements within GROUP BY clauses:

The same basic set of structures, scans against both tables, to arrive at the data. Cost estimates between the two plans are very different though, with the targeted queries having a much lower estimated cost.

Performance-wise, interestingly enough, the average execution time of the first query, only returning the 10 rows, is 157ms on average, while the query grouping directly on the SalesPersonID averages about 190ms. Now, the reads tell a slightly different story with 17428 on the generic query and 5721 on the specific query. So, maybe a server under load will see a significant performance increase. However, let’s deal with what we have in front of us and say that, at least for these tests, the catch-all GROUP BY query performs well.

Grant’s recommendation is to split this out into several procedures, and if you’re having performance problems, that’s a solid move.  I’m a bit more likely to keep them (especially in warehousing reports), but it’s nice to have options.


June 2016
« May Jul »