Monday, January 8, 2018

Read HAR to Spark DataFrame

When I use spark-streaming to pull JSON events from a Kafka topic and persist the data into HDFS, I have to handle a lot of tiny files because the volume of the Kafka topic is pretty small. Too many small files will cause bad performance of Hadoop NameNode. Usually you need to build Hadoop Archive (.har) to make the small files into one big archive file.
The problem is how to read the archive file (.har) into Spark DataFrame. Method text and json of Spark DataFrameReader won’t work for the path of an archive file. You have to use SparkContext#textFile and the file path needs to be ${har_path}/*.
Here is the example showing how to read the files in a HAR. DataFrameReader read nothing for all three path patterns. SparkContext.textFile successfully read the data for the patterns of dir and file.
val har = "har:///tmp/test-data/bwang/starring/tag-v1-1511170200-1511175600.har"

val paths = Map(
    "har" -> har,
    "dir" -> s"$har/tag-*",
    "file" -> s"$har/tag-*/part-*"
)

println("DataFrameReader different HAR paths")

paths.foreach {
    case (kind, path) =>
        val data = spark.read.text(path)
        println(s"--- Reading $kind using path $path.")
        data.show(2, false)
}

println("SparkContext#textFile different HAR paths")

paths.foreach {
    case (kind, path) =>
        try {
            val data = sc.textFile(path).toDF
            println(s"--- Reading $kind using path $path.")
            data.show(2, false)
        } catch {
            case e: java.io.IOException =>
                println(s" --   Failed. ${e.getMessage}")
        }
}

No comments:

Post a Comment