Exam 203 back-end services Spark Delta Lake
Spark Delta Lake.
This is a layer on top of Spark that provides for relational databases.
By using Delta Lake, you can implement a data lakehouse architecture in Spark.
Delta Lake supports:
- CRUD (create, read, update, and delete) operations
- ACID atomicity (transactions complete as a single unit of work), consistency (transactions leave the database in a consistent state), isolation (in-process transactions can't interfere with one another), and durability (when a transaction completes, the changes it made are persisted
- Data versioning and time travel
- Streaming as well as Batch data. Spark Structured Streaming API
- Underlying data is in Parquet format only, not CSV.
- Can use the Serverless pool in Synapse Studio to query it.
Create a Delta Lake table
Use the .Write method on a dataframe to create a Delta Lake table, specifying .format("delta"):
# Load a file into a dataframe
df = spark.read.load('/data/mydata.csv', format='csv', header=True)
# Save the dataframe as a delta table
delta_table_path = "/delta/mydata"
df.write.format("delta").save(delta_table_path)
Q: does this save into Gen2 storage?
A: No, the data files are stored within the storage used by the metastore.
This also creates a _delta_log folder containing the transaction log for the table.
Note that you have not given the table a name here. So it is only present as its underlying files.
Overwrite the table
To overwrite the table data with new data from a new dataframe, run:
new_df.write.format("delta").mode("overwrite").save(delta_table_path)
(where new_df is the new dataframe that already contains the new data required).
Or append the new data instead of overwriting it, with:
new_rows_df.write.format("delta").mode("append").save(delta_table_path)
Give the table a name
To give the table a name (in this case "deltaTable"), run the DeltaTable.forPath function
from delta.tables import * from pyspark.sql.functions import * # Create a deltaTable object deltaTable = DeltaTable.forPath(spark, delta_table_path)
Update the table
# Update the table (reduce price of accessories by 10%)
deltaTable.update(
condition = "Category == 'Accessories'",
set = { "Price": "Price * 0.9" })
Previous Versions
Use the .option("versionAsOf") and .option("timestampAsOf") methods to query historical data. Which comes from the data stored in the transaction log.
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_table_path)
SaveAsTable
You can also use the .SaveAsTable method on a dataframe, to save it as a table (which can also be done with non delta-lake tables). This is called a "catalog" table
# Save a dataframe as a managed table
df.write.format("delta").saveAsTable("MyManagedTable")
## specify a path option to save as an external table
df.write.format("delta").option("path", "/mydata").saveAsTable("MyExternalTable")
Me: It is not clear what determines whether this is a normal table or an external table. I think that it is internal unless you specify a location.
You can also use SQL code to create the table empty, giving the schema:
%%pyspark
spark.sql("CREATE TABLE MyExternalTable USING DELTA LOCATION '/mydata'")
%%sql
CREATE TABLE MyExternalTable
USING DELTA
LOCATION '/mydata'
%%sql
CREATE TABLE ManagedSalesOrders
(
Orderid INT NOT NULL,
OrderDate TIMESTAMP NOT NULL,
CustomerName STRING,
SalesTotal FLOAT NOT NULL
)
USING DELTA
DeltaTables API
You can also run the following in Spark:
from delta.tables import *
DeltaTable.create(spark) \
.tableName("default.ManagedProducts") \
.addColumn("Productid", "INT") \
.addColumn("ProductName", "STRING") \
.addColumn("Category", "STRING") \
.addColumn("Price", "FLOAT") \
.execute()
Can also use createIfNotExists or createOrReplace
Streaming Data in Delta Lake
Source to Sink.
Spark Structured Streaming
From sources including Azure Event Hub
Spark.readStream. Spark.writeStream
Streaming Example 1
The following is an example streaming dataflow, from a Delta Lake table source (reading the data from the underlying files) to an output dataframe that is appended.
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Load a streaming dataframe from the Delta Table
stream_df = spark.readStream.format("delta") \
.option("ignoreChanges", "true") \
.load("/delta/internetorders")
# Now you can process the streaming data in the dataframe
# for example, show it:
stream_df.writeStream \
.outputMode("append") \
.format("console") \
.start()
Streaming Example 2
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Create a stream that reads JSON data from a folder
inputPath = '/streamingdata/'
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
# Write the stream to a delta table
table_path = '/delta/devicetable'
checkpoint_path = '/delta/checkpoint'
delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)
OpenRowSet
You can load the delta files from their source into a dataframe using OpenRowSet: