Create DataFrame from Nested JSON | Apache Spark DataFrame Practical Tutorial | Scala API | Part 4



Prerequisite

  • Apache Spark
  • IntelliJ IDEA Community Edition

Walk-through

In this article, I am going to walk-through you all, how to create Spark DataFrame from Nested(Complex) JSON file in the Apache Spark application using IntelliJ IDEA Community Edition.

sample_nested_json_file.json

{
    "IFAM":"EQR",
    "KTM": 1548176931466,
    "COL": 21,
    "DATA": [{
      "MLrate": "31",
      "Crate": [{
            "key": "k1",
            "value": "v1"
         }, {
             "key": "k2",
             "value": "v2"
         }]
   },{
       "MLrate": "33",
      "Crate": [{
            "key": "k3",
            "value": "v3"
         }, {
             "key": "k4",
             "value": "v4"
         }]
   }],
   "CHECK": {
  "Check1": 1,
  "Check2": "TWO"
   }
}

build.sbt

name := "apache_spark_dataframe_practical_tutorial"

version := "1.0"

scalaVersion := "2.12.8"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4"

part_4_create_dataframe_from_nested_json_file.scala

package com.datamaking.apache.spark.dataframe

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{ArrayType, StructType}
import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.functions.col

object part_4_create_dataframe_from_nested_json_file {

  def expand_nested_column(json_data_df_temp: DataFrame): DataFrame = {
    var json_data_df: DataFrame = json_data_df_temp
    var select_clause_list = List.empty[String]

    // Iterating each columns again to check if any next json data is exists
    for (column_name <- json_data_df.schema.names){
      println("Outside isinstance loop: " + column_name)

      // Checking column type is ArrayType
      if (json_data_df.schema(column_name).dataType.isInstanceOf[ArrayType]){
        println("Inside isinstance loop: " + column_name)

        //Extracting nested json columns/data using explode function
        json_data_df = json_data_df.withColumn(column_name, explode(json_data_df(column_name)).alias(column_name))
        select_clause_list :+= column_name
      }
      else if (json_data_df.schema(column_name).dataType.isInstanceOf[StructType]){
        println("Inside isinstance loop of StructType: " + column_name)
        for (field <- json_data_df.schema(column_name).dataType.asInstanceOf[StructType].fields){
          select_clause_list :+= column_name + "." + field.name
        }
      }
      else{
        select_clause_list :+= column_name
      }
    }

    val columnNames = select_clause_list.map(name => col(name).alias(name.replace('.', '_')))

    // Selecting columns using select_clause_list from dataframe: json_data_df
    json_data_df.select(columnNames:_*)
  }

  def main(args: Array[String]): Unit = {
    println("Apache Spark Application Started ...")

    val spark = SparkSession.builder()
      .appName("Create DataFrame from JSON File")
      .master("local[*]")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    //Code Block 1 Starts Here
    val json_file_path = "D:\\apache_spark_dataframe\\data\\json\\sample_nested_json_file.json"
    var json_data_df = spark.read.option("multiline", true).json(json_file_path)

    json_data_df.show(10, false)
    json_data_df.printSchema()

    // Process the Nested Structure
    var nested_column_count = 1
    // Run the while loop until the nested_column_count is zero(0)
    while (nested_column_count != 0) {
      println("Printing nested_column_count: " + nested_column_count)

      var nested_column_count_temp = 0
      // Iterating each columns again to check if any next json data is exists

      for (column_name <- json_data_df.schema.names){
        print("Iterating DataFrame Columns: " + column_name)
        // Checking column type is ArrayType
        if (json_data_df.schema(column_name).dataType.isInstanceOf[ArrayType]
          || json_data_df.schema(column_name).dataType.isInstanceOf[StructType]){
          nested_column_count_temp += 1
        }
      }
      if (nested_column_count_temp != 0){
        json_data_df = expand_nested_column(json_data_df)
        json_data_df.show(10, false)
      }
      print("Printing nested_column_count_temp: " + nested_column_count_temp)
      nested_column_count = nested_column_count_temp
    }
    //Code Block 1 Ends Here

    json_data_df.show(10, false)
    json_data_df.printSchema()

    spark.stop()
    println("Apache Spark Application Completed.")
  }
}


Summary

In this article, we have successfully learned how to create Spark DataFrame from Nested(Complex) JSON file in the Apache Spark application. Please go through all these steps and provide your feedback and post your queries/doubts if you have. Thank you. Appreciated.

Happy Learning !!!

Post a Comment

6 Comments

  1. Great stuff, any chance you have this in python?

    ReplyDelete
  2. I will do it in future and will let you know.

    ReplyDelete
  3. Can u share same thing in pyspark...

    ReplyDelete
  4. getting error in nested ArrayType
    can you help me ?

    ReplyDelete
  5. This comment has been removed by the author.

    ReplyDelete
  6. Loved the detailed explaination!!!

    ReplyDelete
Emoji
(y)
:)
:(
hihi
:-)
:D
=D
:-d
;(
;-(
@-)
:P
:o
:>)
(o)
:p
(p)
:-s
(m)
8-)
:-t
:-b
b-(
:-#
=p~
x-)
(k)