Skip to content

Spark getting started

Basic programming concepts

RDD: Resilient Distributed Dataset

RDD is a dataset distributed against the cluster nodes. RDDs are fault-tolerant, immutable distributed collections of objects. Each dataset in RDD is divided into logical partitions, which can be computed on different nodes of the cluster. To create a RDD, we use the spark context object and then one of its APIs depending of the data source (JDBC, Hive, HDFS, Cassandra, HBase, ElasticSearch, CSV, json,...). In the code below, movies variable is a RDD.

from pyspark import SparkConf, SparkContext

# get spark session
sparkConfiguration = SparkConf().setAppName("WorseMovie")
sparkSession = SparkContext(conf = sparkConfiguration)
# load movie ratings from a csv file as a RDD
movies = sparkSession.textFile('../data/movielens/u.data')
results = movies.take(10)
for result in results:
    print(result[0], result[1])

This program is not launched by using Python interpreter, but by the spark-submit tool. This tool is available in the Dockerfile we defined, with a python 3.7 interpreter.

/spark/bin/spark-submit nameoftheprogram.py

Creating a RDD can be done from different data sources, text file, csv, database, Hive, Cassandra...

On Spark RDD, we can perform two kinds of operations: RDD Transformations and RDD actions.

Spark context

Created by the driver program, it is responsible for making the RDD resilient and distributed. Here is an example of a special context creation for Spark Streaming, using local server with one executor per core, and using a batch size of 1 second.

val scc = new StreamingContext("local[*]", "TelemetryAlarmer", Seconds(1))

Transforming RDDs

Use map, flatmap, filter, distinct, union, intersection, substract, ... functions, and then applies one of the action.

Nothing actually happens in your drive program until an action is called. Here is the python API documentation.

The wordscale.scala code uses RDD to count word occurence in a text. To be able to get an executor running the code, the scala program needs to be an object and have a main function:

object wordcount {

  def main(args: Array[String]) {
  }
}
  • Map transforms one row into another row:

    // Now extract the text of each tweeter status update into DStreams:
    val statuses = tweets.map(status => status.getText())
    
  • while mapFlat transforms one row into multiple ones:

    // Blow out each word into a new DStream
        val tweetwords = statuses.flatMap(tweetText => tweetText.split(" "))
    
  • filter helps to remove row not matching a condition:

        // Now eliminate anything that's not a hashtag
        val hashtags = tweetwords.filter(word => word.startsWith("#"))
    

A classical transformation, is to create key-value pair to count occurence of something like words using a reduce approach. reduce(f,l) applies the function f to elements of the list l by pair: (i,j) where i is the result of f(i-1,j-1).

valrdd.reduce((x,y) => x + y)
// Map each hashtag to a key/value pair of (hashtag, 1) so we can count them up by adding up the values
val hashtagKeyValues = hashtags.map(hashtag => (hashtag, 1))

val counts = hashtagKeyValues.reduceByKey()

DataFrames

Spark 2.0 supports exposing data in RDD as data frames to apply SQL queries. DataFrame is a distributed collection of data organized into named columns. DataFrames contain Row Objects and may be easier to manipulate.

In the example below, the movies rating file includes records like:

0   50  5   881250949
0   172 5   881250949

The python code using RDD and data frame:

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions

def parseInput(line):
    fields = line.split()
    return Row(movieID = int(fields[1]), rating = float(fields[2]))

# Create a SparkSession 
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()
movies = spark.sparkContext.textFile("../data/movielens/u.data")
# Convert it to a RDD of Row objects with (movieID, rating)
movieRows = lines.map(parseInput)
# Convert that to a DataFrame
movieDataset = spark.createDataFrame(movieRows)

Then in data frame we can do SQL type of transformation

 # Compute average rating for each movieID
averageRatings = movieDataset.groupBy("movieID").avg("rating")

# Compute count of ratings for each movieID
counts = movieDataset.groupBy("movieID").count()

Since DataFrames are structure format that contains names and column, we can get the schema of the DataFrame using the df.printSchema().

Scala

Create scala project with maven

See this article to create a maven project for scala project, and package it.

SBT the scala CLI

Scala SBT is a tool to manage library dependencies for Scala development. It also helps to package all dependencies in a single jar.

See sbt by examples note and this SBT essential tutorial.

Example to create a project template: sbt create scala/helloworld.g8.

Once code and unit tests done, package the scala program and then submit it to spark cluster:

# In spark-studies/src/scala-wordcount
sbt package
# start a docker container with spark image (see previous environment notes)
docker run --rm -it --network spark_network -v $(pwd):/home jbcodeforce/spark bash
# in the shell within the container
cd /home
spark-submit target/scala-2.12/wordcount_2.12-1.0.jar

Proof Delta Lake is cool

  • Connect to the docker container:
docker exec -ti distracted_satoshi bash
  • Start spark-shell:
spark-shell
  • Write a code to create 100 records in a file with the classical dataframe API, and a second one that overwrites it but generates an exception:
// first job
spark.range(100).repartition(1).write.mode("overwrite").csv("./tmp/test/")
// second job
scala.util.Try(spark.range(100).repartition(1).map{ i=>
    if (i>50) {
        Thread.sleep(5000)
        throw new RuntimeException("Too bad crash !")
    }
    i
}.write.mode("overwrite").csv("./tmp/test"))

The spark is variable is a SparkSession and sc is the spark context, predefined in the shell.

We should observe the file is deleted after the exception, so we lost data. As a general statement, due to the immutable nature of the underlying storage in the cloud, one of the challenges in data processing is updating or deleting a subset of identified records from a data lake.

With Delta lake API we can keep the file created even if the second job could not complete the update. The code uses:

spark.range(100).select($"id".as("id")).repartition(1).write.mode("overwrite").format("delta").save("./tmp/test/")

scala.util.Try(spark.range(100).repartition(1).map{ i=>
    if (i>50) {
        Thread.sleep(5000)
        throw new RuntimeException("Too bad crash !")
    }
    i
}.select($"value".as("id")).write.mode("overwrite").format("delta").save("./tmp/test"))

It is important to note that now a transaction log was created under the _delta_log folder. And in the second job, the exception is created and delta could not create a commit file, so the first file is preserved. A read operation via the delta api will read file with a commit file only.

(See source from Learning journal)

Next step... Deployment with local run >>>