apache spark

Blog

Apache Spark Tips and Tricks

We all know that when you have to process a massive amount of data, Apache Spark is a perfect choice. Apache Spark is a distributed processing framework easy to use and designed for performing a lot of computations and stream processing. However, it can present a range of problems if not properly optimized.

A Spark job can be optimized by many techniques, so let’s dig deeper into those techniques one by one.

 

Data Serialization

By default, Spark uses Java Serialization and serialization plays an important role in the performance, especially when the data is moved over the network or when we want to reduce memory usage by storing spark RDDs in serialized form. We recommend using Kryo Serialization which is more compact and 10x faster than Java Serialization. To set Kryo, we need to set the serializer properties:       

# Kryo Serialization is much faster than the default Java Serialization
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

Kryo Serialization does not support all Serializable types and it requires you to register the classes you’ll use. 

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

 

Caching and Persistence of Data

Both contribute to better performance when you want to access a data set repeatedly. They are synonymous, but persist() provides more control over where (memory or disk) and how (serialized or unserialized) your data is stored.

val df = spark.range(1 * 5000000).toDF("id").withColumn("square", $"id" * $"id")
df.cache()
val size = df.count() // materialize the cache

println(size) //first action is slower

println(df.count()) // second call gets data from the cache and it’s faster

It is good to know that not all use cases require cache or persist. In some scenarios, it is faster to execute the DAG from the beginning instead of caching at a certain transformation step. Also, be careful when your data set is too big to fit in memory. As well, in some cases when the DataFrame is not used anymore and more computations follow it would be useful to call async nonblocking unpersist.

df.unpersist(false

 

Feed as much input as possible

We have noticed a common mistake, especially among junior developers, that when they want to read files that are spread across many folders, they read those files sequentially on each folder, instead of reading all the files at once. 

  • - wrong way

val dates = Seq("2020-07-07", "2020-07-09")

val df = dates.map(_.split("-"))
    .map {
        case Array(y, m, d) => spark.read.json(s"/data/exchange-rates/y=$y/m=$m/d=$d")
    }.reduce(_ union _)

  • - correct way

val dates = Seq("2020-07-07", "2020-07-09")

val filesPaths = dates.map(_.split("-"))
    .map {
        case Array(y, m, d) => s"/data/exchange-rates/y=$y/m=$m/d=$d"
    }

val df = spark.read.json(filesPaths: _*)

 

  

Skip schema inference

When you have to process .csv or .json files and you don’t specify the schema, then Spark will go through the input to determine the input schema. To avoid the extra scan, you can explicitly specify the schema using a config file:    

val conf = ConfigFactory.parseResources("schema.conf")
val jsonSchema = conf.root().render(ConfigRenderOptions.concise)
val schema = DataType.fromJson(jsonSchema).asInstanceOf[StructType]
 
val df = spark.read.schema(schema).json(filesPaths: _*)

When you have multiple small files, another approach is to read only one file to deduce the schema and apply the same schema to the rest of the files:  

val jsonSchema = spark.read.json(filesPaths(0)).schema.json
val schema = DataType.fromJson(jsonSchema).asInstanceOf[StructType]

val df = spark.read.schema(schema).json(filesPaths: _*)

 

Coalesce vs. Repartition

Coalesce should be preferred over repartition to reduce the number of partitions because it avoids a shuffle, but the number of tasks depends on the number of partitions of the output of the stage. If there are heavy computations performed upstream, you would like them to be performed in parallel, so it may be better to use repartition(1) that adds a shuffle step but keeps the execution in parallel, rather than coalesce(1) that will execute on only one node.

df.doSth().coalesce(10).write(...) //faster

df.doSth().repartition(10).write(...) //slower

  • If you have loaded a dataset that includes huge data and a lot of transformations that need an equal distribution of load on executors, you need to use repartition.
  • Once all the transformations are applied and you want to save all the data into fewer files  (no. of files = no. of partitions) instead of many files, use coalesce.

 

Conclusion 

We have seen the most important and common tuning configurations. Spark has many configurations for tuning and you can find them in the official documentation. Another important optimization with a significant effect on execution time is the join operations tuning, but I will cover this in a future article.

    

 

About the author 

Lucian Neghina is an experienced big data architect, leading eSolutions’ big data team. With a passion for open source technologies, Lucian has extensive experience in designing and implementing complex big data projects, being also a Certified Google Cloud Platform Architect.

 

Got a question or need advice? We're just one click away.