Ways to lookup table in Spark scala

sasirekha
2 min readMay 30, 2020

--

Generally relational database model widely divided into two types of tables. One is Fact Table in which data is growing proportional to time and constantly expanding. Other is Dimension Table which is generally a fixed, small and have less variables. we widely use dimension table to lookup some details for larger tables.

When we join two tables using key, we have to face inevitable shuffling of data. There are number of ways to lookup table in spark and also avoid shuffling. But this comes with conditions that each method varies according to the data volume of dimension table.

Lookup using RDD :

This approach should be used when dimension data is extremely less. In this case it would make more sense to collect data to driver and convert it to a normal map.

val empDF = spark.sql(“select empID,empName,deptID from employee”)
val deptDF = spark.sql(“select deptID,deptName from department”)

Let’s say department is our lookup table and we have to fetch department name of each employee based on their department id.

val deptRdd = deptDf.rdd
val lookup_DeptMap = deptRdd.map(x => x(0).toString).
zip(deptRdd.map(x=>x(1).toString)).collectAsMap.toMap
def lookup (lookupMap: Map[String,String]) = udf((deptID:String)=>lookupMap.get(deptID))
val empDeptDF =empDF.withColumn(“DeptName”, lookup(lookup_DeptMap)($”deptID”))

Lookup using Broadcast Join:

Broadcast Join is used to avoid shuffling, the smaller table is distributed to each node for the larger table to use. Running below command will give the default volume of data normally used in broadcast Join.

spark.conf.get(“spark.sql.autoBroadcastJoinThreshold”)

This command will give output String=10485760 which is 10MB, its better to use smaller table with this limit otherwise redunant transmission to all nodes itself more costlier than shuffle.

import org.apache.spark.sql.functions.broadcast
val empDeptDF = empDF.join(broadcast(deptDF),”deptID”)

Lookup using Mapside Join:

Mapside Join is widely used in hive to perform join on larger data with smaller lookup data. We can take advantage of spark sql to run mapjoin query. Anyway this will internally perform like broadcast Join .

val empDeptDF = spark.sql(“select /*+ MAPJOIN(department) */ e.empID,e.empName,e.deptID,d.deptName from employee e join department d on e.deptID = d.deptID”)

Lookup using Standard SortMerge Join:

When our lookup table is larger than expected data size for above joins then we have to go for normal SortMerge Join. This use two phases called sorting and merging of data.

val empDeptDF = empDF.join(deptDF,(“deptID”))

If we didn’t hint broadcast join or other join explicitly, spark will internally calculate the data size of two table and perform the join accordingly. In some case its better to hint join explicitly for accurate join selection. Spark will perform Join Selection internally based on the logical plan. you can see spark Join selection here.

We can ignore BroadcastJoin by setting this below variable but it didn’t make sense to ignore the advantages of broadcast join on purpose.

spark.sql(“SET spark.sql.autoBroadcastJoinThreshold = -1”)

That’s it. You can select join based on your choice and absolutely based on your data size.

--

--

sasirekha
sasirekha

Written by sasirekha

I’m a Data Engineer. Love sharing ideas and thoughts :)

No responses yet