Exam 203 back-end services Spark Delta Lake

From MillerSql.com

Spark Delta Lake.

This is a layer on top of Spark that provides for relational databases.

By using Delta Lake, you can implement a data lakehouse architecture in Spark.

Delta Lake supports:

  1. CRUD (create, read, update, and delete) operations
  2. ACID atomicity (transactions complete as a single unit of work), consistency (transactions leave the database in a consistent state), isolation (in-process transactions can't interfere with one another), and durability (when a transaction completes, the changes it made are persisted
  3. Data versioning and time travel
  4. Streaming as well as Batch data. Spark Structured Streaming API
  5. Underlying data is in Parquet format only, not CSV.
  6. Can use the Serverless pool in Synapse Studio to query it.

Create a Delta Lake table

Use the .Write method on a dataframe to create a Delta Lake table, specifying .format("delta"):

# Load a file into a dataframe
df = spark.read.load('/data/mydata.csv', format='csv', header=True)

# Save the dataframe as a delta table
delta_table_path = "/delta/mydata"
df.write.format("delta").save(delta_table_path)

Q: does this save into Gen2 storage?

A: No, the data files are stored within the storage used by the metastore.

This also creates a _delta_log folder containing the transaction log for the table (edit: I think these are json files)

Note that you have not given the table a name here. So it is only present as its underlying files.

Overwrite the table

To overwrite the table data with new data from a new dataframe, run:

new_df.write.format("delta").mode("overwrite").save(delta_table_path)

(where new_df is the new dataframe that already contains the new data required).

Or append the new data instead of overwriting it, with:

new_rows_df.write.format("delta").mode("append").save(delta_table_path)

Give the table a name

To give the table a name (in this case "deltaTable"), run the DeltaTable.forPath function

from delta.tables import *
from pyspark.sql.functions import *

# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)

Update the table

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

Check constraints

ALTER TABLE people ADD CONSTRAINT gendercheck CHECK (gender in ('M', 'F'))

Merge Schemas

You can use .option("MergeSchema") to add new columns into a table, when data from a dataframe is inserted into it.

Previous Versions

Use the .option("versionAsOf") and .option("timestampAsOf") methods to query historical data. Which comes from the data stored in the transaction log.

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_table_path)

deltaTable.history(10).show(20, False, True)

SaveAsTable

You can also use the .SaveAsTable method on a dataframe, to save it as a table (which can also be done with non delta-lake tables). This is called a "catalog" table

# Save a dataframe as a managed table
df.write.format("delta").saveAsTable("MyManagedTable")

## specify a path option to save as an external table
df.write.format("delta").option("path", "/mydata").saveAsTable("MyExternalTable")

Me: It is not clear what determines whether this is a normal table or an external table. I think that it is internal unless you specify a location.

You can also use SQL code to create the table empty, giving the schema:

%%pyspark
spark.sql("CREATE TABLE MyExternalTable USING DELTA LOCATION '/mydata'")

%%sql
CREATE TABLE MyExternalTable
USING DELTA
LOCATION '/mydata'

%%sql
CREATE TABLE ManagedSalesOrders
(
    Orderid INT NOT NULL,
    OrderDate TIMESTAMP NOT NULL,
    CustomerName STRING,
    SalesTotal FLOAT NOT NULL
)
USING DELTA

DeltaTables API

You can also run the following in Spark:

from delta.tables import *

DeltaTable.create(spark) \
  .tableName("default.ManagedProducts") \
  .addColumn("Productid", "INT") \
  .addColumn("ProductName", "STRING") \
  .addColumn("Category", "STRING") \
  .addColumn("Price", "FLOAT") \
  .execute()

Can also use createIfNotExists or createOrReplace

Streaming Data in Delta Lake

Source to Sink.

Spark Structured Streaming

From sources including Azure Event Hub

Spark.readStream. Spark.writeStream

Streaming Example 1

The following is an example streaming dataflow, from a Delta Lake table source (reading the data from the underlying files) to an output dataframe that is appended.

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Load a streaming dataframe from the Delta Table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .load("/delta/internetorders")

# Now you can process the streaming data in the dataframe
# for example, show it:
stream_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()


Streaming Example 2

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads JSON data from a folder
inputPath = '/streamingdata/'
jsonSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
])
stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

# Write the stream to a delta table
table_path = '/delta/devicetable'
checkpoint_path = '/delta/checkpoint'
delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)

OpenRowSet

You can load the delta files from their source (parquet files) into a dataframe using OpenRowSet:

%%sql
SELECT *
FROM
    OPENROWSET(
        BULK 'https://mystore.dfs.core.windows.net/files/delta/mytable/',
        FORMAT = 'DELTA'
    ) AS deltadata