Exam 203 back-end services Spark Delta Lake
Spark Delta Lake.
This is a layer on top of Spark that provides for relational databases.
By using Delta Lake, you can implement a data lakehouse architecture in Spark.
Delta Lake supports:
- CRUD (create, read, update, and delete) operations
- ACID atomicity (transactions complete as a single unit of work), consistency (transactions leave the database in a consistent state), isolation (in-process transactions can't interfere with one another), and durability (when a transaction completes, the changes it made are persisted
- Data versioning and time travel
- Streaming as well as Batch data. Spark Structured Streaming API
- Underlying data is in Parquet format only, not CSV.
- Can use the Serverless pool in Synapse Studio to query it.
Create a Delta Lake table
Use the .Write method on a dataframe to create a Delta Lake table, specifying .format("delta"):
# Load a file into a dataframe df = spark.read.load('/data/mydata.csv', format='csv', header=True) # Save the dataframe as a delta table delta_table_path = "/delta/mydata" df.write.format("delta").save(delta_table_path)
Q: does this save into Gen2 storage?
A: No, the data files are stored within the storage used by the metastore.
This also creates a _delta_log folder containing the transaction log for the table (edit: I think these are json files)
Note that you have not given the table a name here. So it is only present as its underlying files.
Overwrite the table
To overwrite the table data with new data from a new dataframe, run:
new_df.write.format("delta").mode("overwrite").save(delta_table_path)
(where new_df is the new dataframe that already contains the new data required).
Or append the new data instead of overwriting it, with:
new_rows_df.write.format("delta").mode("append").save(delta_table_path)
Give the table a name
To give the table a name (in this case "deltaTable"), run the DeltaTable.forPath function
from delta.tables import * from pyspark.sql.functions import * # Create a deltaTable object deltaTable = DeltaTable.forPath(spark, delta_table_path)
Update the table
# Update the table (reduce price of accessories by 10%) deltaTable.update( condition = "Category == 'Accessories'", set = { "Price": "Price * 0.9" })
Check constraints
ALTER TABLE people ADD CONSTRAINT gendercheck CHECK (gender in ('M', 'F'))
Merge Schemas
You can use .option("MergeSchema") to add new columns into a table, when data from a dataframe is inserted into it.
Previous Versions
Use the .option("versionAsOf") and .option("timestampAsOf") methods to query historical data. Which comes from the data stored in the transaction log.
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path) df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_table_path) deltaTable.history(10).show(20, False, True)
SaveAsTable
You can also use the .SaveAsTable method on a dataframe, to save it as a table (which can also be done with non delta-lake tables). This is called a "catalog" table
# Save a dataframe as a managed table df.write.format("delta").saveAsTable("MyManagedTable") ## specify a path option to save as an external table df.write.format("delta").option("path", "/mydata").saveAsTable("MyExternalTable")
Me: It is not clear what determines whether this is a normal table or an external table. I think that it is internal unless you specify a location.
You can also use SQL code to create the table empty, giving the schema:
%%pyspark spark.sql("CREATE TABLE MyExternalTable USING DELTA LOCATION '/mydata'") %%sql CREATE TABLE MyExternalTable USING DELTA LOCATION '/mydata' %%sql CREATE TABLE ManagedSalesOrders ( Orderid INT NOT NULL, OrderDate TIMESTAMP NOT NULL, CustomerName STRING, SalesTotal FLOAT NOT NULL ) USING DELTA
DeltaTables API
You can also run the following in Spark:
from delta.tables import * DeltaTable.create(spark) \ .tableName("default.ManagedProducts") \ .addColumn("Productid", "INT") \ .addColumn("ProductName", "STRING") \ .addColumn("Category", "STRING") \ .addColumn("Price", "FLOAT") \ .execute()
Can also use createIfNotExists
or createOrReplace
Streaming Data in Delta Lake
Source to Sink.
Spark Structured Streaming
From sources including Azure Event Hub
Spark.readStream. Spark.writeStream
Streaming Example 1
The following is an example streaming dataflow, from a Delta Lake table source (reading the data from the underlying files) to an output dataframe that is appended.
from pyspark.sql.types import * from pyspark.sql.functions import * # Load a streaming dataframe from the Delta Table stream_df = spark.readStream.format("delta") \ .option("ignoreChanges", "true") \ .load("/delta/internetorders") # Now you can process the streaming data in the dataframe # for example, show it: stream_df.writeStream \ .outputMode("append") \ .format("console") \ .start()
Streaming Example 2
from pyspark.sql.types import * from pyspark.sql.functions import * # Create a stream that reads JSON data from a folder inputPath = '/streamingdata/' jsonSchema = StructType([ StructField("device", StringType(), False), StructField("status", StringType(), False) ]) stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath) # Write the stream to a delta table table_path = '/delta/devicetable' checkpoint_path = '/delta/checkpoint' delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)
OpenRowSet
You can load the delta files from their source (parquet files) into a dataframe using OpenRowSet:
%%sql SELECT * FROM OPENROWSET( BULK 'https://mystore.dfs.core.windows.net/files/delta/mytable/', FORMAT = 'DELTA' ) AS deltadata