Exam 203 back-end services Spark Delta Lake

From MillerSql.com

Spark Delta Lake.

This is a layer on top of Spark that provides for relational databases.

By using Delta Lake, you can implement a data lakehouse architecture in Spark.

Delta Lake supports:

  1. CRUD (create, read, update, and delete) operations
  2. ACID atomicity (transactions complete as a single unit of work), consistency (transactions leave the database in a consistent state), isolation (in-process transactions can't interfere with one another), and durability (when a transaction completes, the changes it made are persisted
  3. Data versioning and time travel
  4. Streaming as well as Batch data. Spark Structured Streaming API
  5. Underlying data is in Parquet format only, not CSV.
  6. Can use the Serverless pool in Synapse Studio to query it.

Create a Delta Lake table

Use the .Write method on a dataframe to create a Delta Lake table, specifying .format("delta"):

# Load a file into a dataframe
df = spark.read.load('/data/mydata.csv', format='csv', header=True)

# Save the dataframe as a delta table
delta_table_path = "/delta/mydata"
df.write.format("delta").save(delta_table_path)

Q: does this save into Gen2 storage?

This also creates a _delta_log folder containing the transaction log for the table.

Note that you have not given the table a name here. So it is only present as its underlying files.

Give the table a name, and update it

To give the table a name (in this case "deltaTable"), run the DeltaTable.forPath function

from delta.tables import *
from pyspark.sql.functions import *

# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

Overwrite the table

To overwrite the table data with new data from a new dataframe, run:

new_df.write.format("delta").mode("overwrite").save(delta_table_path)

(where new_df is the new dataframe that already contains the new data required).

Or append the new data instead of overwriting it, with:

new_rows_df.write.format("delta").mode("append").save(delta_table_path)