Exam 203 back-end services Spark: Difference between revisions

From MillerSql.com
NeilM (talk | contribs)
NeilM (talk | contribs)
Line 107: Line 107:
creates a temp view named "products", which can then be queried using Spark SQL.
creates a temp view named "products", which can then be queried using Spark SQL.
Note this is deleted automatically at the end of the session.
Note this is deleted automatically at the end of the session.
Me: I'm not sure how this is different from a dataframe.
Me: I'm not sure how this is different from a dataframe.

Revision as of 15:12, 17 November 2024

The second back-end service is: Spark

Languages supported in Spark include Python, Scala, Java, SQL, and C#.

To run Spark code it is necessary in Synapse Studio to first create a Spark pool in the Manage - Apache Spark Pools tab. Then in the Develop tab, create a new Notebook, and select the Spark pool created to it. Then paste the following code into it and run:

%%pyspark
df = spark.read.load('abfss://files@datalakexxxxxxx.dfs.core.windows.net/product_data/products.csv'
, format='csv'
, header=True
)
display(df.limit(10))

Note the first time it runs it will take several minutes to complete because it takes the time to start up the Spark pool.

Then run:

df_counts = df.groupby(df.Category).count()
display(df_counts)

Which can also be output as a chart.

Overview

The SparkContext connects to the cluster manager, which allocates resources across applications using an implementation of Apache Hadoop YARN

The nodes read and write data from and to the file system and cache transformed data in-memory as Resilient Distributed Datasets (RDDs).

Cluster Manager assigns work to many executors.

The SparkContext is responsible for converting an application to a directed acyclic graph (DAG).

When creating the Spark Pool, choose the size of virtual machine (VM) used for the nodes in the pool, including the option to use hardware accelerated GPU-enabled nodes., the number of nodes, and the version of Spark.

Synapse Studio notebooks with cells.

The language is PySpark - a Spark version of Python. Also Scala and Spark SQL, and indeed apply frameworks like Java and Microsoft .NET.

Dataframes

A "dataframe" is an in-memory table.

To load data into a dataframe, use the spark.read.load function:

%%pyspark
df = spark.read.load('abfss://container@store.dfs.core.windows.net/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

The equivalent command in Scala is:

%%spark
val df = spark.read.load("abfss://container@store.dfs.core.windows.net/products.csv").format("csv").option("header", "true")
display(df.limit(10))

(note the magic command is %%spark instead of %%pyspark)

the difference between the above commands appearing to be the format and option parameters are inside the .load parameter in PySpark, but are outside it in Scala (and applied by a .parameter after the load function.

Schemas

The dataframe schema can be independently defined in a variable, and applied as a parameter for the load:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('abfss://container@store.dfs.core.windows.net/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Parameter chaining

Once the dataframe object is loaded from the source file, parameters such as:

  1. Selecting specific columns
  2. Where clauses
  3. Group by
  4. Count(*)

Can be chained one after the other, like:

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))

display(bikes_df)

counts_df = df.select("ProductID", "Category").groupBy("Category").count()

display(counts_df)

The pipe | means OR.

Using Spark to create database temp views

df.createOrReplaceTempView("products")

creates a temp view named "products", which can then be queried using Spark SQL. Note this is deleted automatically at the end of the session.

Me: I'm not sure how this is different from a dataframe.