Cache and Persist in Spark Scala | Dataframe | Dataset

Parmanand
3 min readNov 14, 2020

Caching Dateset or Dataframe is one of the best feature of Apache Spark. This technique improves performance of a data pipeline. It allows you to store Dataframe or Dataset in memory. Here, memory could be RAM, DISK or Both based on the parameter passed while calling the functions.

In this article, will talk about cache and permit function one by one.

Let’s get started !

Cache() : In DataFrame API, there is a function called cache() which can be used to store intermediate computation of a Spark DataFrame

Let’s understand this by an example.

package in.crazyschools.spark.dataframe

import org.apache.spark.sql.{DataFrame, SparkSession}
object CacheAndPersist {

def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
.appName("TestAPP").master("local[2]").getOrCreate()
val rawData:DataFrame=sparkSession.read.option("header","true")
.option("delimiter",",").csv("Data/weblog.csv")
val rawCachedDF:DataFrame=rawData.cache()
val count:Long= rawCachedDF.count()
if(sparkSession.sharedState.cacheManager.lookupCachedData(rawCachedDF.queryExecution.logical).isDefined){
println("DF has been cached !")
}

}

}

In the above example, rawData DF has been cached and can used further in the code. Because of lazy operation DataFrame will not be cached until an action is triggered.

But where it has been stored ?

If cache() is used . it stores data into MEMORY_AND_DISK but if it is an RDD by default it is stored in MEMORY_ONLY.

When to use caching?

If you are only going to process a dataset once caching it will only slow you down. Because it involves serialization, de-serialization, and storage cost. So if you are going to use same Dataframe at multiple places then caching could be used.

Persist() : In DataFrame API, there is a function called Persist() which can be used to store intermediate computation of a Spark DataFrame

For example -

val rawPersistDF:DataFrame=rawData.persist(StorageLevel.MEMORY_ONLY)
val rowCount:Long= rawCachedDF.count()

What if the data does not fit into memory?

If it does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they’re needed. This is the default level.

How to remove it from memory?

val dfPersist = rawPersistDF.unpersist()

Below are different storage level .

  • MEMORY_ONLY — Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they’re needed. This is the default level.
  • MEMORY_AND_DISK- Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don’t fit on disk, and read them from there when they’re needed.
  • MEMORY_ONLY_SER — Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
  • MEMORY_AND_DISK_SER — Similar to MEMORY_ONLY_SER, but spill partitions that don’t fit in memory to disk instead of recomputing them on the fly each time they’re needed.
  • DISK_ONLY — Store the RDD partitions only on disk.
  • MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. — same as the levels above, but replicate each partition on two cluster nodes.

Thanks for reading!

Please do share the article, if you liked it. Any comments or suggestions are welcome.

--

--