Spark getting started¶
Basic programming concepts¶
RDD: Resilient Distributed Dataset¶
RDD is a dataset distributed against the cluster nodes. RDDs are fault-tolerant, immutable distributed collections of objects. Each dataset in RDD is divided into logical partitions, which can be computed on different nodes of the cluster. To create a RDD, we use the spark context object and then one of its APIs depending of the data source (JDBC, Hive, HDFS, Cassandra, HBase, ElasticSearch, CSV, json,...). In the code below, movies
variable is a RDD.
from pyspark import SparkConf, SparkContext
# get spark session
sparkConfiguration = SparkConf().setAppName("WorseMovie")
sparkSession = SparkContext(conf = sparkConfiguration)
# load movie ratings from a csv file as a RDD
movies = sparkSession.textFile('../data/movielens/u.data')
results = movies.take(10)
for result in results:
print(result[0], result[1])
This program is not launched by using Python interpreter, but by the spark-submit
tool. This tool is available in the Dockerfile we defined, with a python 3.7 interpreter.
/spark/bin/spark-submit nameoftheprogram.py
Creating a RDD can be done from different data sources, text file, csv, database, Hive, Cassandra...
On Spark RDD, we can perform two kinds of operations: RDD Transformations and RDD actions.
Spark context¶
Created by the driver program, it is responsible for making the RDD resilient and distributed. Here is an example of a special context creation for Spark Streaming, using local server with one executor per core, and using a batch size of 1 second.
val scc = new StreamingContext("local[*]", "TelemetryAlarmer", Seconds(1))
Transforming RDDs¶
Use map, flatmap, filter, distinct, union, intersection, substract
, ... functions, and then applies one of the action.
Nothing actually happens in your drive program until an action is called. Here is the python API documentation.
The wordscale.scala code uses RDD to count word occurence in a text. To be able to get an executor running the code, the scala program needs to be an object and have a main function:
object wordcount {
def main(args: Array[String]) {
}
}
-
Map
transforms one row into another row:// Now extract the text of each tweeter status update into DStreams: val statuses = tweets.map(status => status.getText())
-
while
mapFlat
transforms one row into multiple ones:// Blow out each word into a new DStream val tweetwords = statuses.flatMap(tweetText => tweetText.split(" "))
-
filter
helps to remove row not matching a condition:// Now eliminate anything that's not a hashtag val hashtags = tweetwords.filter(word => word.startsWith("#"))
A classical transformation, is to create key-value pair to count occurence of something like words using a reduce approach. reduce(f,l)
applies the function f to elements of the list l by pair: (i,j) where i is the result of f(i-1,j-1).
valrdd.reduce((x,y) => x + y)
// Map each hashtag to a key/value pair of (hashtag, 1) so we can count them up by adding up the values
val hashtagKeyValues = hashtags.map(hashtag => (hashtag, 1))
val counts = hashtagKeyValues.reduceByKey()
DataFrames¶
Spark 2.0 supports exposing data in RDD as data frames to apply SQL queries. DataFrame is a distributed collection of data organized into named columns. DataFrames contain Row Objects and may be easier to manipulate.
In the example below, the movies rating file includes records like:
0 50 5 881250949
0 172 5 881250949
The python code using RDD and data frame:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions
def parseInput(line):
fields = line.split()
return Row(movieID = int(fields[1]), rating = float(fields[2]))
# Create a SparkSession
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()
movies = spark.sparkContext.textFile("../data/movielens/u.data")
# Convert it to a RDD of Row objects with (movieID, rating)
movieRows = lines.map(parseInput)
# Convert that to a DataFrame
movieDataset = spark.createDataFrame(movieRows)
Then in data frame we can do SQL type of transformation
# Compute average rating for each movieID
averageRatings = movieDataset.groupBy("movieID").avg("rating")
# Compute count of ratings for each movieID
counts = movieDataset.groupBy("movieID").count()
Since DataFrames are structure format that contains names and column, we can get the schema of the DataFrame using the df.printSchema()
.
Scala¶
Create scala project with maven¶
See this article to create a maven project for scala project, and package it.
SBT the scala CLI¶
Scala SBT is a tool to manage library dependencies for Scala development. It also helps to package all dependencies in a single jar.
See sbt by examples note and this SBT essential tutorial.
Example to create a project template: sbt create scala/helloworld.g8
.
Once code and unit tests done, package the scala program and then submit it to spark cluster:
# In spark-studies/src/scala-wordcount
sbt package
# start a docker container with spark image (see previous environment notes)
docker run --rm -it --network spark_network -v $(pwd):/home jbcodeforce/spark bash
# in the shell within the container
cd /home
spark-submit target/scala-2.12/wordcount_2.12-1.0.jar
Proof Delta Lake is cool¶
- Connect to the docker container:
docker exec -ti distracted_satoshi bash
- Start spark-shell:
spark-shell
- Write a code to create 100 records in a file with the classical dataframe API, and a second one that overwrites it but generates an exception:
// first job
spark.range(100).repartition(1).write.mode("overwrite").csv("./tmp/test/")
// second job
scala.util.Try(spark.range(100).repartition(1).map{ i=>
if (i>50) {
Thread.sleep(5000)
throw new RuntimeException("Too bad crash !")
}
i
}.write.mode("overwrite").csv("./tmp/test"))
The spark
is variable is a SparkSession and sc
is the spark context, predefined in the shell.
We should observe the file is deleted after the exception, so we lost data. As a general statement, due to the immutable nature of the underlying storage in the cloud, one of the challenges in data processing is updating or deleting a subset of identified records from a data lake.
With Delta lake API we can keep the file created even if the second job could not complete the update. The code uses:
spark.range(100).select($"id".as("id")).repartition(1).write.mode("overwrite").format("delta").save("./tmp/test/")
scala.util.Try(spark.range(100).repartition(1).map{ i=>
if (i>50) {
Thread.sleep(5000)
throw new RuntimeException("Too bad crash !")
}
i
}.select($"value".as("id")).write.mode("overwrite").format("delta").save("./tmp/test"))
It is important to note that now a transaction log was created under the _delta_log
folder. And in the second job, the exception is created and delta could not create a commit file, so the first file is preserved. A read operation via the delta api will read file with a commit file only.
(See source from Learning journal)