Feature engineering for machine learning models
using scala spark


In this post we will explore how to do feature engineering using scala spark on movielens dataset.


Code tested using IntelliJ IDEA on Ubuntu/Mac/Windows.


  • Java 8 installed and available as java in command line.
  • IntelliJ IDEA for editing and running the program.
  • Download movies data set from GroupLens.
  • Git installed and available as git in command line. Required if you want to clone the repo from github.


Following is the scala source code for HandsOn2.scala. We will be writing all the new steps into the def main(args: Array[String])

import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions._

object HandsOn2 {
    def main(args: Array[String]) {
        // Get Spark
        val spark = getSpark()
        import spark.implicits._

        val moviesFile = "../../../Downloads/ml-latest-small/movies.csv"
        val tagsFile = "../../../Downloads/ml-latest-small/tags.csv"
        val ratingsFile = "../../../Downloads/ml-latest-small/ratings.csv"
        val moviesDF = loadDF(spark, moviesFile)
        val tagsDF = loadDF(spark, tagsFile)
        val ratingsDF = loadDF(spark, ratingsFile)

        // More code goes here.

    // Method to load data from csv to dataframes 
    def loadDF(spark: SparkSession, path: String): DataFrame = {
            .option("header", "true")
            .option("inferSchema", "true")

    // Method to create Spark Session.
    def getSpark() = {
        val spark = SparkSession.builder
            .appName("Simple Application")

If you right click this file in IDE and run it, you can see something like this if you are able to run.

/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA CE.app/Contents/lib/idea_rt.jar=59643:/Applications/IntelliJ IDEA CE.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/lib/tools.jar:/Users/sakthipriyan/Workspace/projects/learning-spark/target/scala-2.11/classes:/Users/sakthipriyan/.ivy2/cache/aopalliance/aopalliance/jars/aopalliance-1.0.jar:/Users/sakthipriyan/.ivy2/cache/xmlenc/xmlenc/jars/xmlenc-0.52.jar:/Users/sakthipriyan/.ivy2/cache/xml-apis/xml-apis/jars/xml-apis-1.3.04.jar:/Users/sakthipriyan/.ivy2/cache/xerces/xercesImpl/jars/xercesImpl-2.9.1.jar:/Users/sakthipriyan/.ivy2/cache/oro/oro/jars/oro-2.0.8.jar:/Users/sakthipriyan/.ivy2/cache/org.xerial.snappy/snappy-java/jars/snappy-java- HandsOn2
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/12/06 22:45:10 INFO SparkContext: Running Spark version 2.4.0
18/12/06 22:45:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/12/06 22:45:11 INFO SparkContext: Submitted application: Simple Application
18/12/06 22:45:11 INFO SecurityManager: Changing view acls to: sakthipriyan
18/12/06 22:45:11 INFO SecurityManager: Changing modify acls to: sakthipriyan
18/12/06 22:45:11 INFO SecurityManager: Changing view acls groups to: 
18/12/06 22:45:11 INFO SecurityManager: Changing modify acls groups to: 
18/12/06 22:45:11 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(sakthipriyan); groups with view permissions: Set(); users  with modify permissions: Set(sakthipriyan); groups with modify permissions: Set()
18/12/06 22:45:11 INFO Utils: Successfully started service 'sparkDriver' on port 59646.
18/12/06 22:45:11 INFO SparkEnv: Registering MapOutputTracker
18/12/06 22:45:11 INFO SparkEnv: Registering BlockManagerMaster
18/12/06 22:45:11 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
18/12/06 22:45:11 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
18/12/06 22:45:11 INFO DiskBlockManager: Created local directory at /private/var/folders/cv/vnk7qg3n4lj1gwby6rcfk16r0000gn/T/blockmgr-b7be2feb-6fe2-4a1d-ad0d-0fb1af483b28
18/12/06 22:45:11 INFO MemoryStore: MemoryStore started with capacity 912.3 MB
18/12/06 22:45:11 INFO SparkEnv: Registering OutputCommitCoordinator
18/12/06 22:45:12 INFO Utils: Successfully started service 'SparkUI' on port 4040.
18/12/06 22:45:12 INFO SparkUI: Bound SparkUI to, and started at http://sakthis-mbp:4040
18/12/06 22:45:12 INFO Executor: Starting executor ID driver on host localhost
18/12/06 22:45:12 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 59647.
18/12/06 22:45:12 INFO NettyBlockTransferService: Server created on sakthis-mbp:59647
18/12/06 22:45:12 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
18/12/06 22:45:12 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, sakthis-mbp, 59647, None)
18/12/06 22:45:12 INFO BlockManagerMasterEndpoint: Registering block manager sakthis-mbp:59647 with 912.3 MB RAM, BlockManagerId(driver, sakthis-mbp, 59647, None)
18/12/06 22:45:12 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, sakthis-mbp, 59647, None)
18/12/06 22:45:12 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, sakthis-mbp, 59647, None)

Process finished with exit code 0

You can see that Process finished with exit code 0

Processing Movies Dataframe

As is after loading into dataframe

|-- movieId: integer (nullable = true)
|-- title: string (nullable = true)
|-- genres: string (nullable = true)

|movieId|               title|              genres|
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
only showing top 5 rows

Let's fix the genres data type and derive count of genres in a movie

  1. Genres is represented as string with pipe separator in source dataframe above. We had used split function to create Array[String] from String.
  2. We had used size function to count no of genres we have for each movie.

withColumn will add additional columns into the dataframe.

val moviesProcessedDF = moviesDF
    .withColumn("genres", split($"genres", "\\|"))
    .withColumn("genresCount", size($"genres"))

|-- movieId: integer (nullable = true)
|-- title: string (nullable = true)
|-- genres: array (nullable = true)
|    |-- element: string (containsNull = true)
|-- genresCount: integer (nullable = false)

|movieId|               title|              genres|genresCount|
|      1|    Toy Story (1995)|[Adventure, Anima...|          5|
|      2|      Jumanji (1995)|[Adventure, Child...|          3|
|      3|Grumpier Old Men ...|   [Comedy, Romance]|          2|
|      4|Waiting to Exhale...|[Comedy, Drama, R...|          3|
|      5|Father of the Bri...|            [Comedy]|          1|
only showing top 5 rows

Processing Tags Dataframe

As is after loading into dataframe

|-- userId: integer (nullable = true)
|-- movieId: integer (nullable = true)
|-- tag: string (nullable = true)
|-- timestamp: integer (nullable = true)

|userId|movieId|            tag| timestamp|
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
only showing top 5 rows

Let's create tags array per movie and count of tags.

  1. We will group records by movieId, aggregate and collect_set of tag.
  2. We will just do size on array as we did earlier.

We can use aggregate functions such as avg, min, max, sum, collect_list, collect_set and etc., after calling groupBy directly or within agg.

val tagsProcessedDF = tagsDF
    .withColumn("tagCount", size($"tags"))

|-- movieId: integer (nullable = true)
|-- tags: array (nullable = true)
|    |-- element: string (containsNull = true)
|-- tagCount: integer (nullable = false)

|movieId|              tags|tagCount|
|    471|       [hula hoop]|       1|
|   1088|    [music, dance]|       2|
|   1580|          [aliens]|       1|
|   1645|         [lawyers]|       1|
|   1959|[adultery, Africa]|       2|
only showing top 5 rows

Processing Ratings Dataframe

As is after loading into dataframe

|-- userId: integer (nullable = true)
|-- movieId: integer (nullable = true)
|-- rating: double (nullable = true)
|-- timestamp: integer (nullable = true)

|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
only showing top 5 rows

Let's find average rating and number of rating for a movie

  1. All we have to do is to groupBy movieId, use aggregate agg
  2. Average avg on rating field and count the rating field.

See the code below!

val ratingsProcessedDF = ratingsDF

|-- movieId: integer (nullable = true)
|-- ratingAvg: double (nullable = true)
|-- ratingCount: long (nullable = false)

|movieId|        ratingAvg|ratingCount|
|   1580|3.487878787878788|        165|
|   2366|             3.64|         25|
|   3175|             3.58|         75|
|   1088|3.369047619047619|         42|
|  32460|             4.25|          4|
only showing top 5 rows

Let's find average ratingAvg and ratingCount

val row = ratingsProcessedDF.agg(avg($"ratingAvg"), avg($"ratingCount")).first
val avgRating = row.getDouble(0)
val avgCount = row.getDouble(1)



Let's classify the movies into 4 classes

Based on values of values of avgRating and avgCount, let's label/classify movies into 4 classes.

  1. ratingAvg >= avgRating && ratingCount >= avgCount
  2. ratingAvg >= avgRating && ratingCount < avgCount
  3. ratingAvg < avgRating && ratingCount >= avgCount
  4. ratingAvg <> avgRating && ratingCount >= avgCount

We will be using this label to train supervised machine learning model later on.

Used when cases here to create label as shown below.

val moviesLabelDF = ratingsProcessedDF.select($"movieId",
    when($"ratingCount" >= avgCount && $"ratingAvg" >= avgRating, 0.0)
        .when($"ratingCount" >= avgCount && $"ratingAvg" < avgRating, 1.0)
        .when($"ratingCount" < avgCount && $"ratingAvg" >= avgRating, 2.0)

|-- movieId: integer (nullable = true)
|-- label: double (nullable = false)

|   1580|  0.0|
|   2366|  0.0|
|   3175|  0.0|
|   1088|  0.0|
|  32460|  2.0|
only showing top 5 rows

Time to join moviesProcessedDF, tagsProcessedDF and moviesLabelDF datafarmes.

We have preprocessed 3 datasets. Let's join them. Here we are joining all dataframes using movieId field. Join is as simple as that!

val moviesConsolidatedDF = moviesProcessedDF
    .join(tagsProcessedDF, "movieId")
    .join(moviesLabelDF, "movieId")

|-- movieId: integer (nullable = true)
|-- title: string (nullable = true)
|-- genres: array (nullable = true)
|    |-- element: string (containsNull = true)
|-- genresCount: integer (nullable = false)
|-- tags: array (nullable = true)
|    |-- element: string (containsNull = true)
|-- tagCount: integer (nullable = false)
|-- label: double (nullable = false)

|movieId|               title|              genres|genresCount|                tags|tagCount|label|
|      1|    Toy Story (1995)|[Adventure, Anima...|          5|        [pixar, fun]|       2|  0.0|
|      2|      Jumanji (1995)|[Adventure, Child...|          3|[fantasy, game, m...|       4|  0.0|
|      3|Grumpier Old Men ...|   [Comedy, Romance]|          2|        [old, moldy]|       2|  1.0|
|      5|Father of the Bri...|            [Comedy]|          1| [pregnancy, remake]|       2|  1.0|
|      7|      Sabrina (1995)|   [Comedy, Romance]|          2|            [remake]|       1|  1.0|
only showing top 5 rows

Let's remove the array and make the table flat.

Covert Array[String] into Map[String, Double]

We will be using this user defined function (udf) to covert it. Once we convert this into map we will be able to easily select it!

// Create UDF converting Array[String] to Map[String, Double]
val arrayToMap = udf[Map[String, Double], Seq[String]] {
    element => element.map { case key: String => (key, 1.0) }.toMap

// Actual usage of the udf
val moviesMergedDF = moviesConsolidatedDF
    .withColumn("genres", arrayToMap($"genres"))
    .withColumn("tags", arrayToMap($"tags"))

|-- movieId: integer (nullable = true)
|-- title: string (nullable = true)
|-- genres: map (nullable = true)
|    |-- key: string
|    |-- value: double (valueContainsNull = false)
|-- genresCount: integer (nullable = false)
|-- tags: map (nullable = true)
|    |-- key: string
|    |-- value: double (valueContainsNull = false)
|-- tagCount: integer (nullable = false)
|-- label: double (nullable = false)

|movieId|               title|              genres|genresCount|                tags|tagCount|label|
|      1|    Toy Story (1995)|[Animation -> 1.0...|          5|[pixar -> 1.0, fu...|       2|  0.0|
|      2|      Jumanji (1995)|[Adventure -> 1.0...|          3|[fantasy -> 1.0, ...|       4|  0.0|
|      3|Grumpier Old Men ...|[Comedy -> 1.0, R...|          2|[old -> 1.0, mold...|       2|  1.0|
|      5|Father of the Bri...|     [Comedy -> 1.0]|          1|[pregnancy -> 1.0...|       2|  1.0|
|      7|      Sabrina (1995)|[Comedy -> 1.0, R...|          2|     [remake -> 1.0]|       1|  1.0|
only showing top 5 rows

As you can see now, we have map created from array using udf functions.

Create dataframe without any nesting.

val moviesFeaturedDF = moviesMergedDF.select(
    $"tags.In Netflix queue",
    $"tags.visually appealing",
    $"tags.dark comedy",
    $"tags.twist ending",
    $"tags.mental illness",

As you can see, I had used dataframe function na.fill(0.0) to fill zero in place of missing values.

|-- movieId: integer (nullable = true)
|-- title: string (nullable = true)
|-- label: double (nullable = false)
|-- gCount: integer (nullable = false)
|-- tCount: integer (nullable = false)
|-- Drama: double (nullable = false)
|-- Comedy: double (nullable = false)
|-- Romance: double (nullable = false)
|-- Thriller: double (nullable = false)
|-- Action: double (nullable = false)
|-- Adventure: double (nullable = false)
|-- Crime: double (nullable = false)
|-- Sci-Fi: double (nullable = false)
|-- Mystery: double (nullable = false)
|-- Fantasy: double (nullable = false)
|-- Children: double (nullable = false)
|-- Horror: double (nullable = false)
|-- Animation: double (nullable = false)
|-- Musical: double (nullable = false)
|-- War: double (nullable = false)
|-- Documentary: double (nullable = false)
|-- Film-Noir: double (nullable = false)
|-- IMAX: double (nullable = false)
|-- Western: double (nullable = false)
|-- In Netflix queue: double (nullable = false)
|-- atmospheric: double (nullable = false)
|-- superhero: double (nullable = false)
|-- Disney: double (nullable = false)
|-- religion: double (nullable = false)
|-- funny: double (nullable = false)
|-- quirky: double (nullable = false)
|-- surreal: double (nullable = false)
|-- psychology: double (nullable = false)
|-- thought-provoking: double (nullable = false)
|-- tCrime: double (nullable = false)
|-- suspense: double (nullable = false)
|-- politics: double (nullable = false)
|-- visually appealing: double (nullable = false)
|-- tSci-fi: double (nullable = false)
|-- dark comedy: double (nullable = false)
|-- twist ending: double (nullable = false)
|-- dark: double (nullable = false)
|-- mental illness: double (nullable = false)
|-- tComedy: double (nullable = false)

|movieId|               title|label|gCount|tCount|Drama|Comedy|Romance|Thriller|Action|Adventure|Crime|Sci-Fi|Mystery|Fantasy|Children|Horror|Animation|Musical|War|Documentary|Film-Noir|IMAX|Western|In Netflix queue|atmospheric|superhero|Disney|religion|funny|quirky|surreal|psychology|thought-provoking|tCrime|suspense|politics|visually appealing|tSci-fi|dark comedy|twist ending|dark|mental illness|tComedy|
|      1|    Toy Story (1995)|  0.0|     5|     2|  0.0|   1.0|    0.0|     0.0|   0.0|      1.0|  0.0|   0.0|    0.0|    1.0|     1.0|   0.0|      1.0|    0.0|0.0|        0.0|      0.0| 0.0|    0.0|             0.0|        0.0|      0.0|   0.0|     0.0|  0.0|   0.0|    0.0|       0.0|              0.0|   0.0|     0.0|     0.0|               0.0|    0.0|        0.0|         0.0| 0.0|           0.0|    0.0|
|      2|      Jumanji (1995)|  0.0|     3|     4|  0.0|   0.0|    0.0|     0.0|   0.0|      1.0|  0.0|   0.0|    0.0|    1.0|     1.0|   0.0|      0.0|    0.0|0.0|        0.0|      0.0| 0.0|    0.0|             0.0|        0.0|      0.0|   0.0|     0.0|  0.0|   0.0|    0.0|       0.0|              0.0|   0.0|     0.0|     0.0|               0.0|    0.0|        0.0|         0.0| 0.0|           0.0|    0.0|
|      3|Grumpier Old Men ...|  1.0|     2|     2|  0.0|   1.0|    1.0|     0.0|   0.0|      0.0|  0.0|   0.0|    0.0|    0.0|     0.0|   0.0|      0.0|    0.0|0.0|        0.0|      0.0| 0.0|    0.0|             0.0|        0.0|      0.0|   0.0|     0.0|  0.0|   0.0|    0.0|       0.0|              0.0|   0.0|     0.0|     0.0|               0.0|    0.0|        0.0|         0.0| 0.0|           0.0|    0.0|
|      5|Father of the Bri...|  1.0|     1|     2|  0.0|   1.0|    0.0|     0.0|   0.0|      0.0|  0.0|   0.0|    0.0|    0.0|     0.0|   0.0|      0.0|    0.0|0.0|        0.0|      0.0| 0.0|    0.0|             0.0|        0.0|      0.0|   0.0|     0.0|  0.0|   0.0|    0.0|       0.0|              0.0|   0.0|     0.0|     0.0|               0.0|    0.0|        0.0|         0.0| 0.0|           0.0|    0.0|
|      7|      Sabrina (1995)|  1.0|     2|     1|  0.0|   1.0|    1.0|     0.0|   0.0|      0.0|  0.0|   0.0|    0.0|    0.0|     0.0|   0.0|      0.0|    0.0|0.0|        0.0|      0.0| 0.0|    0.0|             0.0|        0.0|      0.0|   0.0|     0.0|  0.0|   0.0|    0.0|       0.0|              0.0|   0.0|     0.0|     0.0|               0.0|    0.0|        0.0|         0.0| 0.0|           0.0|    0.0|
only showing top 5 rows

Feature Engineered dataframe ready

  • We started with data in different format.
  • We had standardized the input fields.
  • We had derived values.
  • We had merged muliple sources.
  • We had created flat table.
  • We had handled null values.

Finally we have a dataframe with no nesting and all data filled up.
Ready to be consumed by ML models.

Next: Machine Learning

We will be using MultilayerPerceptronClassifier to train on this dataset. More details to follow. Where we will be selecting features and tuning hyper parameters.

Updated on Dec 25, 2018

Refer Building machine learning models using scala spark for Multilayer Perceptron Classifier implementation.

Previous: Scala Spark Basics

If you want to start on Spark, read this one, https://sakthipriyan.com/2018/11/24/learning-scala-spark-basics.html

Code Download


  1. https://spark.apache.org/docs/2.2.0/sql-programming-guide.html
  2. https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.Dataset


Great!! You read till this point, just go ahead and share this post to your followers, collegues and friends. Thanks!

About Author

Sakthi Priyan H
Passionate Programmer

  • I am passionate about building excellent teams, processes and systems.
  • Primarily I use Java, Scala and Python for building various systems and tools.
  • Building API services, Big data processing and Machine Learning systems in Crayon Data.
  • Also, interested in Golang and building web apps using Javascript ecosystem.
  • I wrote my first program in BASIC in 1998, Passionate about computers since then.