Spark is interesting and one of the most important things you can do with spark is to define your own functions called User defined Functions(UDFs) in spark. Which allows us to write our own transformations in Scala, Python or Java.
In this article, I will be discussing about spark UDF step by step.
Let’s get started !
Step1: Create a DataFrame first.
val tempDF=sparkSession.createDataFrame(Seq(
("rahul sharma",32,"Patna",20000,"Store Manager"),
("Joy don",30,"NY",23455,"Developer"),
("Steve boy",42,"Delhi",294884,"Developer"))
).toDF("name","age","city","salary","designation")
Output : -
Step 2 : Write a Scala function to get first name of a person and test it .
def getFistName= (name: String) => {
val temp:Array[String]=name.split(" ")
temp(0)
}
If you pass a name to this function it will return you the first name. In scala we don’t use return keyword. We need to make sure that input to these functions should not be a null value. we can also create spark UDFs with multiple parameters.
Step 3: Now, above function is ready to be called but before that we need to register it.
val getFistNameUDF = sparkSession.udf.register("fist_name",getFistName)this can be also used as string expression for example -tempDF.selectExpr("fist_name(name)").show(2)another way to register: import org.apache.spark.sql.functions.udf
val getFistNameUDF = udf(getFistName)we can use this only as a DataFrame function.it can’t be used within a string expression.
Why do we need to register UDFs?
We need to register so that it can be used on all of our worker machines. Spark will serialize the function on the driver and transfer it over the network to all executor processes.
let’s create one more function and register it.
def isManager= (name: String) => {
if(name.contains("Manager"))
"yes"
else
"No"
}
val isManagerUDF = sparkSession.udf.register("is_manager",isManager)
Here, we created another function which will return yes if person is a manager.
Step 4: Finally, we will use it with our Dataframe. Now it can be called as normal SQL function.
val finalDF=tempDF.withColumn("first name",getFistNameUDF(col("name")))
.withColumn("is_manager",isManagerUDF(col("designation")))
In the above code, both UDFs are being called.And two extra columns will be added as we used withColumn.
OutPut:- Can see two different columns have been added to the Dataframe.
Thanks for reading!
Please do share the article, if you liked it. Any comments or suggestions are welcome!