The use case is as follows:
I have water meter telemetry I would like to do analytics on. Events are ingested from water meters and collected into a data lake in parquet format. The data is partitioned by Year, Month and Day based on the timestamp contained in the events themselves and not based on the time of the event processing in Azure Synapse Analytics as this is a frequent requirement.
Events are sent from the on premise SCADA systems to Event Hub then processed by Stream Analytics which then can easily:
The result can immediately be queried with serverless Synapse SQL pool.
My Azure Synapse input stream named inputEventHub is plugged into an Event Hub in JSON format.
The output stream is the interesting part and will define the partition scheme:
We see that its path pattern is based on a pseudo column named "time_struct" and all the partitioning logic is in the construct of this pseudo column.
Let's have a look at the Azure Synapse query:
We can see now that the pseudo column "time_struct" contains the path, Azure Synapse understands it and processes it literally including the "/" sign.
Here is the query code:
select
concat('year=',substring(createdAt,1,4),'/month=',substring(createdAt,6,2),'/day=',substring(createdAt,9,2)) as time_struct,
eventId,
[type],
deviceId,
deviceSequenceNumber,
createdAt,
Value,
complexData,
EventEnqueuedUtcTime AS enqueuedAt,
EventProcessedUtcTime AS processedAt,
cast(UDF.GetCurrentDateTime('') as datetime) AS storedAt,
PartitionId
into
[lionelpdl]
from
[inputEventHub]
After a few days of processing, the output folder looks like this as a result:
Now I can directly query my Output Stream with serverless SQL pool:
We can also notice that the metadata functions are fully functional without any additional work. For example I can run the following query using filepath metadata function:
SELECT top 100
[result].filepath(1) AS [year]
,[result].filepath(2) AS [month]
,[result].filepath(3) AS [day]
,*
FROM
OPENROWSET(
BULK 'https://lionelpdl.dfs.core.windows.net/parquetzone/deplasa1/year=*/month=*/day=*/*.parquet',
FORMAT='PARQUET'
) AS [result]
where [result].filepath(2)=6
and [result].filepath(3)=23
Finally, to optimize my query performance I can schedule a Spark job which processes daily all events from the previous day, and compacts them into fewer and larger parquet files.
As an example, I've decided to rebuild the partitions with files containing 2 million rows.
Here are 2 versions of the same code:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from functools import reduce
from pyspark.sql import DataFrame
import datetime
account_name = "storage_account_name"
container_name = "container_name"
source_root = "source_directory_name"
target_root = "target_directory_name"
days_backwards = 4 #number of days from today, typicaly, as a daily job it'll be set to 1
adls_path = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, source_root)
hier = datetime.date.today() - datetime.timedelta(days = days_backwards)
day_to_process = '/year=%04d/month=%02d/day=%02d/' % (hier.year,hier.month,hier.day)
file_pattern='*.parquet'
print((adls_path + day_to_process + file_pattern))
df = spark.read.parquet(adls_path + day_to_process + file_pattern)
adls_result = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, target_root)
print(adls_result + day_to_process + file_pattern)
df.coalesce(1).write.option("header",True) \
.mode("overwrite") \
.option("maxRecordsPerFile", 2000000) \
.parquet(adls_result + day_to_process)
import sys
import datetime
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from functools import reduce
from pyspark.sql import DataFrame
if __name__ == "__main__":
# create Spark context with necessary configuration
conf = SparkConf().setAppName("dailyconversion").set("spark.hadoop.validateOutputSpecs", "false")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
account_name = sys.argv[1] #'storage_account_name'
container_name = sys.argv[2] #"container_name"
source_root = sys.argv[3] #"source_directory_name"
target_root = sys.argv[4] #"target_directory_name"
days_backwards = sys.argv[5] #number of days backwards in order to reprocess the parquet files, typically 1
hier = datetime.date.today() - datetime.timedelta(days=int(days_backwards))
day_to_process = '/year=%04d/month=%02d/day=%02d/' % (hier.year,hier.month,hier.day)
file_pattern='*.parquet'
adls_path = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, source_root)
print((adls_path + day_to_process + file_pattern))
df = spark.read.parquet(adls_path + day_to_process + file_pattern)
#display (df.limit(10))
#df.printSchema()
#display(df)
adls_result = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, target_root)
print(adls_result + day_to_process + file_pattern)
df.coalesce(1).write.option("header",True) \
.mode("overwrite") \
.option("maxRecordsPerFile", 2000000) \
.parquet(adls_result + day_to_process)
In this article we have covered:
Additional resources:
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.