Faster Java UDF in Pyspark
Using UDFs (User Defined Functions) in spark is probably the last resort for building column-based data processing logic. The Spark UDF is an expensive operation and is used only to extend or fill in missing functionality of Spark methods or libraries or frameworks that do not have a Python wrapper. The UDF typically is a black box for Spark Catalyst optimizers, so when forming the execution plan the logic inside UDF is not considered.
The UDF developed in python acts as an outsider to spark JVM and spin out as a separate python process. This process creates unnecessary overheard and SerDe operations, limiting the performance of pyspark. For this reason, we need Java UDFs for faster processing.
High level Steps
- Create UDF in Java
- Package Java UDF Class into a uber jar .jar file
- Move jar to yspark application env
- Import Java UDF and register as UDF
Step 1 : Create UDF in Java
Using a Java IDE or notepad, create Java UDF. Use the library org.apache.spark.sql.api.java.UDF to extend the java class to run operations on Spark dataframe for more info. The UDF library helps java execute queries on spark dataframe.
//Java Class for simple adding constant number to the inputpackage com.example.jche88;import org.apache.spark.sql.api.java.UDF1;
public class Add implements UDF1<Int, Int> {
public Long call(Int num) throws Exception
{
return (num +100);
}
}
Step 2: Package UDF in jar file
Packaging and creation of jar files is done using Maven, a java build tool. Maven is a build automation tool used primarily for Java projects. For Maven to work, it requires pom.xml, where all the dependencies and libraries are mentioned in the XML format . Setup a maven pom.xml file in the format below or use a template from the Maven site. All the class dependencies and libraries mentioned in the pom.xml file will be packaged into the jar file. There are options in pom.xml to mention the source & target version of compiler, dependent libraries, name of the jar file and name of the project.
-- pom.xml files used in the project. Feel free to create your own<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion><groupId>com.example.jche88</groupId>
<artifactId>JavaUDFProject</artifactId>
<version>1.0-SNAPSHOT</version><properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties><build>
<finalName>JavaUDFjarfile</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build><dependencies><dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies></project>
Install Maven on your machine and set the environmental PATH accordingly. Once all the prerequisites are complete, use the command prompt and browse to the folder where pom.xml is present and run the below command. Below command will use the pom.xml file to download the required libraries and dependencies and package into one big jar file.
Suggestion- Use maven commands instead of IDE jar creation route.
mvn clean install
The uber jar a.k.a fat jar file will be created in the location mentioned in the jar output location. This uber jar file will have all the dependent libraries to be executable in a different environment.
Step 3: Move jar file local to pyspark application
Move the jar to the same server where the Pyspark program is running. The target environment or serve can be docker container, AWS Glue , Databricks or Standalone pyspark environment. Jar file is independent and only required environment java to execute the classes inside it. Ensure all the permission are set once the file is copied.
Step 4: Import jar into pyspark application
In order to use the jar file and classes inside it, it should be imported into the pyspark enviroment. This can be done in three ways,
- Cluster level- updating the spark config file.
- Application level- using config option while creating spark session using spark.jars
spark = SparkSession.builder.master("local[1]") \
.config('spark.jars', "/home/johnche88/JavaUDFProject.jar")\
.appName('JavaUDFApp') \
.getOrCreate()
3. Spark-Submit level -The jars can be imported into the pyspark application by setting - -jar option while submitting the spark job
spark-submit --jar /home/johnche/JavaUDFProject.jar Pyspark_app.py
Some of the cloud managed services applications, like AWS glue, you can use the path to set a jar in the project details or set it at application level. For others, you can use it at the Spark Submit level or Sparkconfig file (cluster level).
# Spark code in pythonfrom pyspark.sql.types import IntegerType,LongType
import pyspark.sql.functions as F
from pyspark.sql import SparkSessionspark = SparkSession.builder.master("local[1]") \
.config('spark.jars', "/home/johnche88/JavaUDFProject.jar")\
.appName('JavaUDFApp') \
.getOrCreate()
Register the java function as UDF using the Sparksession method called registerJavaFunction info.
spark.udf.registerJavaFunction("add", "com.example.jche88")
Once the registration is complete, try the function on a dummy dataframe
import pyspark.sql.functions as Fdf = spark.createDataFrame([
(35, "john"),
(40, "joe"),
(26, "Sam"),],
["age", "Name"])df.withColumn("age", F.expr("add(age)")).show()
You will notice and significant improvement in the UDF performance after the conversion to Java UDF.