Exam 203 back-end services Spark

From MillerSql.com

The second back-end service is: Spark

For Spark Delta Lake, see: Spark Delta Lake

Languages supported in Spark include Python, Scala, Java, SQL, and C#.

To run Spark code it is necessary in Synapse Studio to first create a Spark pool in the Manage - Apache Spark Pools tab. Then in the Develop tab, create a new Notebook, and select the Spark pool created to it. Then paste the following code into it and run:

%%pyspark
df = spark.read.load('abfss://files@datalakexxxxxxx.dfs.core.windows.net/product_data/products.csv'
, format='csv'
, header=True
)
display(df.limit(10))

Note the first time it runs it will take several minutes to complete because it takes the time to start up the Spark pool.

Then run:

df_counts = df.groupby(df.Category).count()
display(df_counts)

Which can also be output as a chart.

You can load notebooks into Synapse Studio Develop tab, from .ipynb files, by clicking Import.

Overview

The SparkContext connects to the cluster manager, which allocates resources across applications using an implementation of Apache Hadoop YARN

The nodes read and write data from and to the file system and cache transformed data in-memory as Resilient Distributed Datasets (RDDs).

Cluster Manager assigns work to many executors.

The SparkContext is responsible for converting an application to a directed acyclic graph (DAG).

When creating the Spark Pool, choose the size of virtual machine (VM) used for the nodes in the pool, including the option to use hardware accelerated GPU-enabled nodes., the number of nodes, Autoscale min max nodes, and the version of Spark.

Synapse Studio notebooks with cells.

The language is PySpark - a Spark version of Python. Also Scala and Spark SQL, and indeed apply frameworks like Java and Microsoft .NET.

Dataframes

A "dataframe" is an in-memory table.

To load data into a dataframe, use the spark.read.load function:

%%pyspark
df = spark.read.load('abfss://container@store.dfs.core.windows.net/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

The equivalent command in Scala is:

%%spark
val df = spark.read.load("abfss://container@store.dfs.core.windows.net/products.csv").format("csv").option("header", "true")
display(df.limit(10))

(note the magic command is %%spark instead of %%pyspark)

the difference between the above commands appearing to be the format and option parameters are inside the .load parameter in PySpark, but are outside it in Scala (and applied by a .parameter after the load function.

Schemas

The dataframe schema can be independently defined in a variable, and applied as a parameter for the load:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('abfss://container@store.dfs.core.windows.net/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

df.printSchema() #prints the schema

Parameter chaining

Once the dataframe object is loaded from the source file, parameters such as:

  1. Selecting specific columns
  2. Where clauses
  3. Group by
  4. Count(*)

Can be chained one after the other, like:

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))

display(bikes_df)

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df) #Displays the counts grouped by Category

sum_df = df.select("Category", "Quantity").groupBy("Category").sum().orderBy("Category")
display(sum_df) #Displays the sum of Quantity grouped by Category, ordered by Category

yearlySales = df.select(year("OrderDate").alias("Year")).groupBy("Year").count().orderBy("Year")
display(yearlySales)  #Creates virtual (alias) column "Year" as the year of OrderDate, and groups by it. Outputs the count ordered by the year.

The pipe | means OR.

Create Spark database temp views and permanent tables

df.createOrReplaceTempView("products")

creates a temp view named "products", which can then be queried using Spark SQL. Note this is deleted automatically at the end of the session.

Me: I'm not sure how this is different from a dataframe.

Alternatively you can create tables using spark.catalog.createTable or from the dataframe using .saveAsTable, that are persisted in the Metastore.

order_details.write.saveAsTable('sales_orders', format='parquet', mode='overwrite', path='/sales_orders_table')

These tables when deleted also delete their underlying files.

Or you can run spark.catalog.createExternalTable to create an external table. These tables when deleted leave their data files in place.

These tables can then be queried by running SQL code in one of the following two ways:

%%pyspark
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

%%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

Chart output

Useful if you don't have groupings in the SQL code, these can be done in the chart a la Power BI.

MatPlotLib can do this in PySpark code, with the creation of a figure in code:

from matplotlib import pyplot as plt

# Get the data as a Pandas dataframe
data = spark.sql("SELECT Category, COUNT(ProductID) AS ProductCount \
                  FROM products \
                  GROUP BY Category \
                  ORDER BY Category").toPandas()

# Clear the plot area
plt.clf()

# Create a Figure
fig = plt.figure(figsize=(12,8))

# Create a bar plot of product counts by category
plt.bar(x=data['Category'], height=data['ProductCount'], color='orange')

# Customize the chart
plt.title('Product Counts by Category')
plt.xlabel('Category')
plt.ylabel('Products')
plt.grid(color='#95a5a6', linestyle='--', linewidth=2, axis='y', alpha=0.7)
plt.xticks(rotation=70)

# Show the plot area
plt.show()

Note it has been necessary to use the .toPandas() function to convert the Spark dataframe into a Pandas dataframe prior to applying the display logic to it.

Note also that while the figure object "fig" has been set to plt.figure, it isn't necessary to define it. Just calling plt.figure is sufficient to create the output, with plt being able to be subsequently updated with methods.

The output of the regular SQL query in Spark can instead be set to output as chart rather than table results set. And here you can set pop-up menu items to aggreate by a column and set count or sum etc.

Subplots

You can create subplots with the function of this name. This creates each subplot as a numbered instance of an array variable (ax[0], ax[1]):

ax = plt.subplots(1, 2, figsize = (10,4))
ax[0].bar(x=df_sales['OrderYear'], height=df_sales['GrossRevenue'], color='orange')
ax[0].set_title('Revenue by Year')

ax[1].pie(yearly_counts)
ax[1].set_title('Orders per Year')
ax[1].legend(yearly_counts.keys().tolist())

Seaborn and other libraries

Other libraries such as Seaborn can be used in order to add advanced features to the output:

 import seaborn as sns

 # Clear the plot area
 plt.clf()

 # Create a bar chart
 ax = sns.barplot(x="OrderYear", y="GrossRevenue", data=df_sales)
 plt.show()

 sns.set_theme(style="whitegrid")

 # Create a bar chart
 ax = sns.barplot(x="OrderYear", y="GrossRevenue", data=df_sales)
 plt.show()

Dataframe transformations

You can define new dataframes as the outputs of functions (e.g. group by) run on the existing dataframes.

In addition to group by, there are functions like WithColumn. This generates two new columns "FirstName" and "LastName", populating them with a text function to split the content of the existing CustomerName column on a space. And then it subsequently deletes the CustomerName column.

from pyspark.sql.functions import split, col

# Create the new FirstName and LastName fields
transformed_df = order_details
.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0))
.withColumn("LastName" , split(col("CustomerName"), " ").getItem(1))

# Remove the CustomerName field
transformed_df = transformed_df.drop("CustomerName")

display(transformed_df.limit(5))

Export dataframe to file

You can export the data in a dataframe into a Parquet file using the .write function:

transformed_df.write.mode("overwrite").parquet('/transformed_data/orders.parquet')
print ("Transformed data saved!")

Note this seems to have the output path as a parameter to the .parquet function.

Partitioning in this export

You can write the data into a different subdirectory for each year, for example, by using the .PartitionBy function in the write, and a directory rather than file name in the output path.

from pyspark.sql.functions import year, col

# Load source data
df = spark.read.csv('/orders/*.csv', header=True, inferSchema=True)

# Add Year column
dated_df = df.withColumn("Year", year(col("OrderDate")))

# Partition by year
dated_df.write.partitionBy("Year").mode("overwrite").parquet("/data")

Partitioning on import

Having saved the data to a partitioned directory structure, one can elect to load into a new dataframe only a single partition value, by specifying a particular directory:

orders_2020 = spark.read.parquet('/partitioned_data/Year=2020')
display(orders_2020.limit(5))