Exam 203 back-end services Spark Delta Lake
Spark Delta Lake.
This is a layer on top of Spark that provides for relational databases.
By using Delta Lake, you can implement a data lakehouse architecture in Spark.
Delta Lake supports:
- CRUD (create, read, update, and delete) operations
- ACID atomicity (transactions complete as a single unit of work), consistency (transactions leave the database in a consistent state), isolation (in-process transactions can't interfere with one another), and durability (when a transaction completes, the changes it made are persisted
- Data versioning and time travel
- Streaming as well as Batch data. Spark Structured Streaming API
- Underlying data is in Parquet format only, not CSV.
- Can use the Serverless pool in Synapse Studio to query it.
Create a Delta Lake table
Use the .Write method on a dataframe to create a Delta Lake table, specifying .format("delta"):
# Load a file into a dataframe df = spark.read.load('/data/mydata.csv', format='csv', header=True) # Save the dataframe as a delta table delta_table_path = "/delta/mydata" df.write.format("delta").save(delta_table_path)
Q: does this save into Gen2 storage?
A: No, the data files are stored within the storage used by the metastore.
This also creates a _delta_log folder containing the transaction log for the table (edit: I think these are json files)
Note that you have not given the table a name here. So it is only present as its underlying files.
Overwrite the table
To overwrite the table data with new data from a new dataframe, run:
new_df.write.format("delta").mode("overwrite").save(delta_table_path)
(where new_df is the new dataframe that already contains the new data required).
Or append the new data instead of overwriting it, with:
new_rows_df.write.format("delta").mode("append").save(delta_table_path)
Give the table a name
To give the table a name (in this case "deltaTable"), run the DeltaTable.forPath function
from delta.tables import * from pyspark.sql.functions import * # Create a deltaTable object deltaTable = DeltaTable.forPath(spark, delta_table_path)
Update the table
# Update the table (reduce price of accessories by 10%) deltaTable.update( condition = "Category == 'Accessories'", set = { "Price": "Price * 0.9" })
Check constraints
ALTER TABLE people ADD CONSTRAINT gendercheck CHECK (gender in ('M', 'F'))
Previous Versions
Use the .option("versionAsOf") and .option("timestampAsOf") methods to query historical data. Which comes from the data stored in the transaction log.
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path) df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_table_path) deltaTable.history(10).show(20, False, True)
SaveAsTable
You can also use the .SaveAsTable method on a dataframe, to save it as a table (which can also be done with non delta-lake tables). This is called a "catalog" table
# Save a dataframe as a managed table df.write.format("delta").saveAsTable("MyManagedTable") ## specify a path option to save as an external table df.write.format("delta").option("path", "/mydata").saveAsTable("MyExternalTable")
Me: It is not clear what determines whether this is a normal table or an external table. I think that it is internal unless you specify a location.
You can also use SQL code to create the table empty, giving the schema:
%%pyspark spark.sql("CREATE TABLE MyExternalTable USING DELTA LOCATION '/mydata'") %%sql CREATE TABLE MyExternalTable USING DELTA LOCATION '/mydata' %%sql CREATE TABLE ManagedSalesOrders ( Orderid INT NOT NULL, OrderDate TIMESTAMP NOT NULL, CustomerName STRING, SalesTotal FLOAT NOT NULL ) USING DELTA
DeltaTables API
You can also run the following in Spark:
from delta.tables import * DeltaTable.create(spark) \ .tableName("default.ManagedProducts") \ .addColumn("Productid", "INT") \ .addColumn("ProductName", "STRING") \ .addColumn("Category", "STRING") \ .addColumn("Price", "FLOAT") \ .execute()
Can also use createIfNotExists
or createOrReplace
Streaming Data in Delta Lake
Source to Sink.
Spark Structured Streaming
From sources including Azure Event Hub
Spark.readStream. Spark.writeStream
Streaming Example 1
The following is an example streaming dataflow, from a Delta Lake table source (reading the data from the underlying files) to an output dataframe that is appended.
from pyspark.sql.types import * from pyspark.sql.functions import * # Load a streaming dataframe from the Delta Table stream_df = spark.readStream.format("delta") \ .option("ignoreChanges", "true") \ .load("/delta/internetorders") # Now you can process the streaming data in the dataframe # for example, show it: stream_df.writeStream \ .outputMode("append") \ .format("console") \ .start()
Streaming Example 2
from pyspark.sql.types import * from pyspark.sql.functions import * # Create a stream that reads JSON data from a folder inputPath = '/streamingdata/' jsonSchema = StructType([ StructField("device", StringType(), False), StructField("status", StringType(), False) ]) stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath) # Write the stream to a delta table table_path = '/delta/devicetable' checkpoint_path = '/delta/checkpoint' delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)
OpenRowSet
You can load the delta files from their source (parquet files) into a dataframe using OpenRowSet:
%%sql SELECT * FROM OPENROWSET( BULK 'https://mystore.dfs.core.windows.net/files/delta/mytable/', FORMAT = 'DELTA' ) AS deltadata