Showing posts with label DataFrame. Show all posts
Showing posts with label DataFrame. Show all posts

Monday, January 8, 2018

Read HAR to Spark DataFrame

When I use spark-streaming to pull JSON events from a Kafka topic and persist the data into HDFS, I have to handle a lot of tiny files because the volume of the Kafka topic is pretty small. Too many small files will cause bad performance of Hadoop NameNode. Usually you need to build Hadoop Archive (.har) to make the small files into one big archive file.
The problem is how to read the archive file (.har) into Spark DataFrame. Method text and json of Spark DataFrameReader won’t work for the path of an archive file. You have to use SparkContext#textFile and the file path needs to be ${har_path}/*.
Here is the example showing how to read the files in a HAR. DataFrameReader read nothing for all three path patterns. SparkContext.textFile successfully read the data for the patterns of dir and file.
val har = "har:///tmp/test-data/bwang/starring/tag-v1-1511170200-1511175600.har"

val paths = Map(
    "har" -> har,
    "dir" -> s"$har/tag-*",
    "file" -> s"$har/tag-*/part-*"
)

println("DataFrameReader different HAR paths")

paths.foreach {
    case (kind, path) =>
        val data = spark.read.text(path)
        println(s"--- Reading $kind using path $path.")
        data.show(2, false)
}

println("SparkContext#textFile different HAR paths")

paths.foreach {
    case (kind, path) =>
        try {
            val data = sc.textFile(path).toDF
            println(s"--- Reading $kind using path $path.")
            data.show(2, false)
        } catch {
            case e: java.io.IOException =>
                println(s" --   Failed. ${e.getMessage}")
        }
}

Wednesday, October 4, 2017

Spark SQL AnalysisException due to data type mismatch

If you run the following code, you will encounter AnalysisException with data type mismatch.
spark.sql("create table test_table(id int, data struct<name: string, age:int>)")
spark.sql("insert overwrite table test_table select 1, struct('joe', 15)")
val d = spark.read.table("test_table")

case class UserData(name: String, age: Int)
case class User(id: Int, data: UserData)
val u = Seq(User(1, UserData("joe", 15)), User(2, UserData("mary", 10))).toDF

val missing = {
  u.join(
    d,
    u("id") === d("id")
    && u("data")("name") === d("data")("name")
    && u("data")("age") === d("data")("age"),
    "outer"
  )
  .where( u("id").isNull || d("id").isNull)
}

// show this result
// +---+---------+----+----+                                                      
// |id |data     |id  |data|
// +---+---------+----+----+
// |2  |[mary,10]|null|null|
// +---+---------+----+----+
missing.show(false)

// Throws this error: org.apache.spark.sql.AnalysisException: cannot resolve '(`data` = test_table.`data`)'
// due to data type mismatch: differing types in '(`data` = test_table.`data`)'
// (struct<name:string,age:int> and struct<name:string,age:int>).;
val missing_2 = {
  u.join(d,
         u("id") === d("id") && u("data") === d("data"),
         "outer")
    .where(u("id").isNull || d("id").isNull)
}
Don’t be fooled by (struct<name:string,age:int> and struct<name:string,age:int>). The problem is caused by nullable, which is not shown in the error message. The DataFrame created from case classes has nullable=false for id and age because Scala Int cannot be null, while the SQL creates nullable fields. And if you compare a field with complex type (struct, array), Spark just thinks they are different as shown in missing_2. But if you compare field by field, there is no problem as shown in missing.
scala> u.schema.treeString
res4: String =
"root
 |-- id: integer (nullable = false)
 |-- data: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- age: integer (nullable = false)
"

scala> d.schema.treeString
res5: String =
"root
 |-- id: integer (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- age: integer (nullable = true)
"

Monday, May 9, 2016

Spark Cassandra Connector and DataFrame

When you write a DataFrame to a cassandra table, be careful to use SaveMode.Overwrite. In spark-cassandra-connector-1.6.0-M2, TRUNCATE $keyspace.$table will be called. See the code in CassandraSourceRelation.scala.
I did observe something weird when I use the following code to write a data frame to a cluster of Cassandra 2.1.8:
df.write
  .format(""org.apache.spark.sql.cassandra")
  .mode(SaveMode.Overwrite)
  .options(Map("table" -> table, "keyspace" -> keyspace))
  .save
After the scheduled spark job finishes, in CQLSH, the table is empty when running select * from keyspace.table limit 10. The same results if I change consistency level to QUORUM, and even ALL. It might take some time, then the query returns the results.
If I start the job manually from the command line, however, most of time the query can return the results.
If you check the CQL document for TRUNCATE, setting consistency level to ALL is required.
Note: The consistency level must be set to ALL prior to performing a TRUNCATE operation. All replicas must remove the data.
I don’t think the consistency level is changed before calling TRUNCATE $keyspace.$table in spark-cassandra-connector. The default consistency level is LOCAL_QUORUM. That might be the root cause.