Exam 203 back-end services Spark Delta Lake
Spark Delta Lake.
This is a layer on top of Spark that provides for relational databases.
By using Delta Lake, you can implement a data lakehouse architecture in Spark.
Delta Lake supports:
- CRUD (create, read, update, and delete) operations
- ACID atomicity (transactions complete as a single unit of work), consistency (transactions leave the database in a consistent state), isolation (in-process transactions can't interfere with one another), and durability (when a transaction completes, the changes it made are persisted
- Data versioning and time travel
- Streaming as well as Batch data. Spark Structured Streaming API
- Underlying data is in Parquet format only, not CSV.
- Can use the Serverless pool in Synapse Studio to query it.
Create a Delta Lake table
Use the .Write method on a dataframe to create a Delta Lake table, specifying .format("delta"):
# Load a file into a dataframe df = spark.read.load('/data/mydata.csv', format='csv', header=True) # Save the dataframe as a delta table delta_table_path = "/delta/mydata" df.write.format("delta").save(delta_table_path)
Q: does this save into Gen2 storage?
This also creates a _delta_log folder containing the transaction log for the table.
Note that you have not given the table a name here. So it is only present as its underlying files.
Give the table a name, and update it
To give the table a name (in this case "deltaTable"), run the DeltaTable.forPath function
from delta.tables import * from pyspark.sql.functions import * # Create a deltaTable object deltaTable = DeltaTable.forPath(spark, delta_table_path) # Update the table (reduce price of accessories by 10%) deltaTable.update( condition = "Category == 'Accessories'", set = { "Price": "Price * 0.9" })
Overwrite the table
To overwrite the table data with new data from a new dataframe, run:
new_df.write.format("delta").mode("overwrite").save(delta_table_path)
(where new_df is the new dataframe that already contains the new data required).
Or append the new data instead of overwriting it, with:
new_rows_df.write.format("delta").mode("append").save(delta_table_path)