Partition Stream Analytics output and query it with serverless Synapse SQL pool
Published Jul 19 2021 10:18 AM 5,655 Views
Microsoft

The use case is as follows:  
I have water meter telemetry I would like to do analytics on. Events are ingested from water meters and collected into a data lake in parquet format. The data is partitioned by Year, Month and Day based on the timestamp contained in the events themselves and not based on the time of the event processing in Azure Synapse Analytics as this is a frequent requirement.

 

Events are sent from the on premise SCADA systems to Event Hub then processed by Stream Analytics which then can easily:

  1. Convert events sent in JSON format into partitioned parquet.
  2. Partitioning is based on Year/Month/Day.
  3. Date used for partitioning is coming from within the event.

The result can immediately be queried with serverless Synapse SQL pool.

Input Stream

My Azure Synapse input stream named inputEventHub is plugged into an Event Hub in JSON format.

Output Stream

The output stream is the interesting part and will define the partition scheme:

 

lionelp_0-1624896450197.png

 

We see that its path pattern is based on a pseudo column named "time_struct" and all the partitioning logic is in the construct of this pseudo column.

 

Let's have a look at the Azure Synapse query:

 

lionelp_1-1624896696879.png

 

lionelp_2-1624896721550.png

 

We can see now that the pseudo column "time_struct" contains the path, Azure Synapse understands it and processes it literally including the "/" sign.

 

Here is the query code:

 

 

select 
    concat('year=',substring(createdAt,1,4),'/month=',substring(createdAt,6,2),'/day=',substring(createdAt,9,2)) as time_struct,
    eventId,
    [type],
    deviceId,
    deviceSequenceNumber,
    createdAt,
    Value,
    complexData,
    EventEnqueuedUtcTime AS enqueuedAt,
    EventProcessedUtcTime AS processedAt,
    cast(UDF.GetCurrentDateTime('') as datetime) AS storedAt,
    PartitionId
into
    [lionelpdl]
from 
    [inputEventHub]

 

 

After a few days of processing, the output folder looks like this as a result:

 

lionelp_0-1624896848841.png

 

lionelp_1-1624896848841.png

 

Query results with serverless SQL pool and take advantage of partitioning

Now I can directly query my Output Stream with serverless SQL pool:

 

lionelp_0-1624898246159.png

 

We can also notice that the metadata functions are fully functional without any additional work. For example I can run the following query using filepath metadata function:

 

 

  SELECT top 100
    [result].filepath(1) AS [year]
    ,[result].filepath(2) AS [month]
    ,[result].filepath(3) AS [day]
    ,*
FROM
    OPENROWSET(
        BULK 'https://lionelpdl.dfs.core.windows.net/parquetzone/deplasa1/year=*/month=*/day=*/*.parquet',
        FORMAT='PARQUET'
    ) AS [result]

where [result].filepath(2)=6
  and [result].filepath(3)=23

 

 

Spark post processing

Finally, to optimize my query performance I can schedule a Spark job which processes daily all events from the previous day, and compacts them into fewer and larger parquet files.

As an example, I've decided to rebuild the partitions with files containing 2 million rows.

 

Here are 2 versions of the same code:

 

PySpark notebook (for interactive testing for instance)

 

 

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from functools import reduce
from pyspark.sql import DataFrame
import datetime

account_name = "storage_account_name"
container_name = "container_name"
source_root = "source_directory_name"
target_root = "target_directory_name"
days_backwards = 4 #number of days from today, typicaly, as a daily job it'll be set to 1
adls_path = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, source_root)

hier = datetime.date.today() - datetime.timedelta(days = days_backwards)
day_to_process = '/year=%04d/month=%02d/day=%02d/' % (hier.year,hier.month,hier.day)
file_pattern='*.parquet'

print((adls_path + day_to_process + file_pattern))

df = spark.read.parquet(adls_path + day_to_process + file_pattern)

adls_result = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, target_root)

print(adls_result + day_to_process + file_pattern)

df.coalesce(1).write.option("header",True) \
        .mode("overwrite") \
        .option("maxRecordsPerFile", 2000000) \
        .parquet(adls_result + day_to_process)

 

 

Spark job (with input parameters scheduled daily)

 

lionelp_0-1625501958062.png

 

 

import sys
import datetime
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from functools import reduce
from pyspark.sql import DataFrame

if __name__ == "__main__":
	
	# create Spark context with necessary configuration
	conf = SparkConf().setAppName("dailyconversion").set("spark.hadoop.validateOutputSpecs", "false")
	sc = SparkContext(conf=conf)
	spark = SparkSession(sc)
	
	account_name = sys.argv[1] #'storage_account_name'
	container_name = sys.argv[2] #"container_name"
	source_root = sys.argv[3] #"source_directory_name"
	target_root = sys.argv[4] #"target_directory_name"
	days_backwards = sys.argv[5] #number of days backwards in order to reprocess the parquet files, typically 1

	hier = datetime.date.today() - datetime.timedelta(days=int(days_backwards))
    
	day_to_process = '/year=%04d/month=%02d/day=%02d/' % (hier.year,hier.month,hier.day)
	file_pattern='*.parquet'

	adls_path = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, source_root)

	print((adls_path + day_to_process + file_pattern))

	df = spark.read.parquet(adls_path + day_to_process + file_pattern)
	#display (df.limit(10))
	#df.printSchema()
	#display(df)
	adls_result = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, target_root)

	print(adls_result + day_to_process + file_pattern)

	df.coalesce(1).write.option("header",True) \
		.mode("overwrite") \
		.option("maxRecordsPerFile", 2000000) \
		.parquet(adls_result + day_to_process)

 

 

Conclusion

In this article we have covered:

  • How to easily use Stream Analytics to write an output with partitioned parquet files.
  • How to use serverless Synapse SQL pool to query Stream Analytics output.
  • How to reduce the number of parquet files using serverless Apache Spark pool.

Additional resources:

 

 

Co-Authors
Version history
Last update:
‎Dec 03 2021 02:18 AM
Updated by: