Exam 203 back-end services Spark Delta Lake
Spark Delta Lake.
This is a layer on top of Spark that provides for relational databases.
By using Delta Lake, you can implement a data lakehouse architecture in Spark.
Delta Lake supports:
- CRUD (create, read, update, and delete) operations
- ACID atomicity (transactions complete as a single unit of work), consistency (transactions leave the database in a consistent state), isolation (in-process transactions can't interfere with one another), and durability (when a transaction completes, the changes it made are persisted
- Data versioning and time travel
- Streaming as well as Batch data. Spark Structured Streaming API
- Underlying data is in Parquet format only, not CSV.
- Can use the Serverless pool in Synapse Studio to query it.
Create a Delta Lake table
Use the .Write method on a dataframe to create a Delta Lake table, specifying .format("delta"):
# Load a file into a dataframe df = spark.read.load('/data/mydata.csv', format='csv', header=True) # Save the dataframe as a delta table delta_table_path = "/delta/mydata" df.write.format("delta").save(delta_table_path)
Q: does this save into Gen2 storage?
A: No, the data files are stored within the storage used by the metastore.
This also creates a _delta_log folder containing the transaction log for the table.
Note that you have not given the table a name here. So it is only present as its underlying files.
Overwrite the table
To overwrite the table data with new data from a new dataframe, run:
new_df.write.format("delta").mode("overwrite").save(delta_table_path)
(where new_df is the new dataframe that already contains the new data required).
Or append the new data instead of overwriting it, with:
new_rows_df.write.format("delta").mode("append").save(delta_table_path)
Give the table a name
To give the table a name (in this case "deltaTable"), run the DeltaTable.forPath function
from delta.tables import * from pyspark.sql.functions import * # Create a deltaTable object deltaTable = DeltaTable.forPath(spark, delta_table_path)
Update the table
# Update the table (reduce price of accessories by 10%) deltaTable.update( condition = "Category == 'Accessories'", set = { "Price": "Price * 0.9" })
Previous Versions
Use the .option("versionAsOf") and .option("timestampAsOf") methods to query historical data. Which comes from the data stored in the transaction log.
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path) df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_table_path)
SaveAsTable
You can also use the .SaveAsTable method on a dataframe, to save it as a table (which can also be done with non delta-lake tables). This is called a "catalog" table
# Save a dataframe as a managed table df.write.format("delta").saveAsTable("MyManagedTable") ## specify a path option to save as an external table df.write.format("delta").option("path", "/mydata").saveAsTable("MyExternalTable")
Me: It is not clear what determines whether this is a normal table or an external table. I think that it is internal unless you specify a location.