Exam 203 back-end services Spark Delta Lake

From MillerSql.com

Spark Delta Lake.

This is a layer on top of Spark that provides for relational databases.

By using Delta Lake, you can implement a data lakehouse architecture in Spark.

Delta Lake supports:

  1. CRUD (create, read, update, and delete) operations
  2. ACID atomicity (transactions complete as a single unit of work), consistency (transactions leave the database in a consistent state), isolation (in-process transactions can't interfere with one another), and durability (when a transaction completes, the changes it made are persisted
  3. Data versioning and time travel
  4. Streaming as well as Batch data. Spark Structured Streaming API
  5. Underlying data is in Parquet format only, not CSV.
  6. Can use the Serverless pool in Synapse Studio to query it.

Create a Delta Lake table

Use the .Write method on a dataframe to create a Delta Lake table, specifying .format("delta"):

# Load a file into a dataframe
df = spark.read.load('/data/mydata.csv', format='csv', header=True)

# Save the dataframe as a delta table
delta_table_path = "/delta/mydata"
df.write.format("delta").save(delta_table_path)

Q: does this save into Gen2 storage?

A: No, the data files are stored within the storage used by the metastore.

This also creates a _delta_log folder containing the transaction log for the table.

Note that you have not given the table a name here. So it is only present as its underlying files.

Overwrite the table

To overwrite the table data with new data from a new dataframe, run:

new_df.write.format("delta").mode("overwrite").save(delta_table_path)

(where new_df is the new dataframe that already contains the new data required).

Or append the new data instead of overwriting it, with:

new_rows_df.write.format("delta").mode("append").save(delta_table_path)

Give the table a name

To give the table a name (in this case "deltaTable"), run the DeltaTable.forPath function

from delta.tables import *
from pyspark.sql.functions import *

# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)

Update the table

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

Previous Versions

Use the .option("versionAsOf") and .option("timestampAsOf") methods to query historical data. Which comes from the data stored in the transaction log.

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_table_path)

SaveAsTable

You can also use the .SaveAsTable method on a dataframe, to save it as a table (which can also be done with non delta-lake tables). This is called a "catalog" table

# Save a dataframe as a managed table
df.write.format("delta").saveAsTable("MyManagedTable")

## specify a path option to save as an external table
df.write.format("delta").option("path", "/mydata").saveAsTable("MyExternalTable")

Me: It is not clear what determines whether this is a normal table or an external table. I think that it is internal unless you specify a location.

You can also use SQL code to create the table empty, giving the schema:

%%pyspark
spark.sql("CREATE TABLE MyExternalTable USING DELTA LOCATION '/mydata'")

%%sql
CREATE TABLE MyExternalTable
USING DELTA
LOCATION '/mydata'

%%sql
CREATE TABLE ManagedSalesOrders
(
    Orderid INT NOT NULL,
    OrderDate TIMESTAMP NOT NULL,
    CustomerName STRING,
    SalesTotal FLOAT NOT NULL
)
USING DELTA

DeltaTables API