Showing posts with label Spark. Show all posts
Showing posts with label Spark. Show all posts

Monday, January 8, 2018

Read HAR to Spark DataFrame

When I use spark-streaming to pull JSON events from a Kafka topic and persist the data into HDFS, I have to handle a lot of tiny files because the volume of the Kafka topic is pretty small. Too many small files will cause bad performance of Hadoop NameNode. Usually you need to build Hadoop Archive (.har) to make the small files into one big archive file.
The problem is how to read the archive file (.har) into Spark DataFrame. Method text and json of Spark DataFrameReader won’t work for the path of an archive file. You have to use SparkContext#textFile and the file path needs to be ${har_path}/*.
Here is the example showing how to read the files in a HAR. DataFrameReader read nothing for all three path patterns. SparkContext.textFile successfully read the data for the patterns of dir and file.
val har = "har:///tmp/test-data/bwang/starring/tag-v1-1511170200-1511175600.har"

val paths = Map(
    "har" -> har,
    "dir" -> s"$har/tag-*",
    "file" -> s"$har/tag-*/part-*"
)

println("DataFrameReader different HAR paths")

paths.foreach {
    case (kind, path) =>
        val data = spark.read.text(path)
        println(s"--- Reading $kind using path $path.")
        data.show(2, false)
}

println("SparkContext#textFile different HAR paths")

paths.foreach {
    case (kind, path) =>
        try {
            val data = sc.textFile(path).toDF
            println(s"--- Reading $kind using path $path.")
            data.show(2, false)
        } catch {
            case e: java.io.IOException =>
                println(s" --   Failed. ${e.getMessage}")
        }
}

Wednesday, October 4, 2017

Spark SQL AnalysisException due to data type mismatch

If you run the following code, you will encounter AnalysisException with data type mismatch.
spark.sql("create table test_table(id int, data struct<name: string, age:int>)")
spark.sql("insert overwrite table test_table select 1, struct('joe', 15)")
val d = spark.read.table("test_table")

case class UserData(name: String, age: Int)
case class User(id: Int, data: UserData)
val u = Seq(User(1, UserData("joe", 15)), User(2, UserData("mary", 10))).toDF

val missing = {
  u.join(
    d,
    u("id") === d("id")
    && u("data")("name") === d("data")("name")
    && u("data")("age") === d("data")("age"),
    "outer"
  )
  .where( u("id").isNull || d("id").isNull)
}

// show this result
// +---+---------+----+----+                                                      
// |id |data     |id  |data|
// +---+---------+----+----+
// |2  |[mary,10]|null|null|
// +---+---------+----+----+
missing.show(false)

// Throws this error: org.apache.spark.sql.AnalysisException: cannot resolve '(`data` = test_table.`data`)'
// due to data type mismatch: differing types in '(`data` = test_table.`data`)'
// (struct<name:string,age:int> and struct<name:string,age:int>).;
val missing_2 = {
  u.join(d,
         u("id") === d("id") && u("data") === d("data"),
         "outer")
    .where(u("id").isNull || d("id").isNull)
}
Don’t be fooled by (struct<name:string,age:int> and struct<name:string,age:int>). The problem is caused by nullable, which is not shown in the error message. The DataFrame created from case classes has nullable=false for id and age because Scala Int cannot be null, while the SQL creates nullable fields. And if you compare a field with complex type (struct, array), Spark just thinks they are different as shown in missing_2. But if you compare field by field, there is no problem as shown in missing.
scala> u.schema.treeString
res4: String =
"root
 |-- id: integer (nullable = false)
 |-- data: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- age: integer (nullable = false)
"

scala> d.schema.treeString
res5: String =
"root
 |-- id: integer (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- age: integer (nullable = true)
"

Monday, May 9, 2016

Spark Cassandra Connector and DataFrame

When you write a DataFrame to a cassandra table, be careful to use SaveMode.Overwrite. In spark-cassandra-connector-1.6.0-M2, TRUNCATE $keyspace.$table will be called. See the code in CassandraSourceRelation.scala.
I did observe something weird when I use the following code to write a data frame to a cluster of Cassandra 2.1.8:
df.write
  .format(""org.apache.spark.sql.cassandra")
  .mode(SaveMode.Overwrite)
  .options(Map("table" -> table, "keyspace" -> keyspace))
  .save
After the scheduled spark job finishes, in CQLSH, the table is empty when running select * from keyspace.table limit 10. The same results if I change consistency level to QUORUM, and even ALL. It might take some time, then the query returns the results.
If I start the job manually from the command line, however, most of time the query can return the results.
If you check the CQL document for TRUNCATE, setting consistency level to ALL is required.
Note: The consistency level must be set to ALL prior to performing a TRUNCATE operation. All replicas must remove the data.
I don’t think the consistency level is changed before calling TRUNCATE $keyspace.$table in spark-cassandra-connector. The default consistency level is LOCAL_QUORUM. That might be the root cause.

Tuesday, January 19, 2016

Using HiveContext to read Hive Tables

I just tried to use Spark HiveContext to use the tables in HiveMetastore. It is very simple, but there are still a lot of things not documented clearly yet.
Here is what I recorded to make it work. I compiled Spark 1.6.0 to work with CDH 5.3.2 in my cluster.
  • Create HiveContext instead of SQLContext
    val sqlContext = new HiveContext(sc)
    
  • Use the specific version of Hive. Spark 1.6.0 is compiled with Hive 1.2.1 by default. But you can use the specific version of Hive in your cluster without recompiling it. I’m using CDH 5.3.2, so I need to
    • specify the version spark.sql.hive.metastore.version=0.13.1
    • specify the hive jars. Unfortunately, you cannot simply list hive-metastore.jar because it depends on several jars and a lot of transitive dependencies. The simple way is to have all the jars in $HIVE_LIB_DIR/*like spark.sql.hive.metastore.jars=$HIVE_LIB_DIR/*
    • In Spark’s document, it is said that you need hive-site.xml, but how to add it is not clear. Basically, you need to add it into the classpath.
      • For spark-shell, it is ok when you use spark.sql.hive.metastore.jars=$HIVE_CONF_DIR:$HIVE_LIB_DIR/*
      • For spark-submit --master yarn-cluster, you can do like --files $HIVE_CONF_DIR/hive-site.xml. hive-site.xml will be copied to the container working directory for spark driver, and the working directory is in the classpath of spark driver.
    • There is a bug [SPARK-11702], so you need to add spark.driver.extraClassPath=$GUAVA_CLASSPATH. Otherwise, you will see an error like below, although the guava jar is already in $HIVE_LIB_DIR.
      java.lang.ClassNotFoundException: java.lang.NoClassDefFoundError: com/google/common/base/Predicate
      
  • Read a table as DataFrame. NOTE: you can give the database name.
    val df = sqlContext.read.table(“adhoc.hc_part_test”)
    
  • Write into parquet files like dynamic partitioned. You need to run alter table add partition by yourself.
    case class P (name: String, event_date:String)
    val data = Seq(P(“joe”, “2015-01-01”), P(“harry”, “2016-01-19”)).toDF 
    data.registerTempTable(“event”) 
    data.write.partitionBy(“event_date”).format(“parquet”).save(“/tmp/hc_part_test”)
    
  • Write into the table directly using dynamic partition. No need to manage the partitions yourself.
    sqlContext.sql(“set hive.exec.dynamic.partition.mode=nonstrict”)
    sqlContext.sql(“insert into table adhoc.hc_part_test partition (event_date) select * from event”)
    
I got this error when I tried to read a parqet table,
java.lang.RuntimeException: java.lang.ClassNotFoundException: parquet.hive.DeprecatedParquetInputFormat.
It turns out that I created the table in Impala, and the table InputFormat/OutputFormat use parquet.hive.DeprecatedParquetInputFormat. Creating the table in Hive resolve this issue.
— created by Impala 
CREATE TABLE adhoc.hc_part_test( 
  name string COMMENT ‘’)
PARTITIONED BY ( 
  event_date string)
ROW FORMAT SERDE
  ‘parquet.hive.serde.ParquetHiveSerDe’
STORED AS 
  INPUTFORMAT ‘parquet.hive.DeprecatedParquetInputFormat’
  OUTPUTFORMAT ‘parquet.hive.DeprecatedParquetOutputFormat’
LOCATION ‘hdfs://nameservice-prod/user/hive/warehouse/adhoc.db/hc_part_test’
TBLPROPERTIES ( ‘transient_lastDdlTime’=’1453223673’)

 — created by hive 
CREATE TABLE hc_part_test(
  name string)
PARTITIONED BY (
  event_date string)
ROW FORMAT SERDE
  ‘org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe’
STORED AS 
  INPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat’
  OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat’
LOCATION ‘hdfs://nameservice-prod/user/hive/warehouse/adhoc.db/hc_part_test’
TBLPROPERTIES ( ‘transient_lastDdlTime’=’1453226294’)
When you start spark using spark-shell, the script look like:
export HADOOP_CONF_DIR=/etc/hadoop/conf
export HIVE_CONF_DIR=/etc/hive/conf
CDH_JARS=/opt/cloudera/parcels/CDH/jars

HIVE_LIB_DIR=/opt/cloudera/parcels/CDH/lib/hive/lib
hive_metastore_classpath=$HIVE_LIB_DIR/*

# SPARK-11702
GUAVA_CLASSPATH=$CDH_JARS/guava-15.0.jar

shell() {
  $SPARK_SHELL \
    --master yarn \
    --num-executors 8 \
    --executor-cores 3 \
    --executor-memory 6G \
    --conf spark.sql.hive.metastore.version=0.13.1 \
    --conf spark.sql.hive.metastore.jars=$HIVE_CONF_DIR:$HIVE_LIB_DIR/* \
    --conf spark.driver.extraClassPath=$GUAVA_CLASSPATH "$@"
}

submit() {
  $SPARK_SUBMIT \
    --master yarn-cluster \
    --num-executors 8 \
    --executor-cores 3 \
    --executor-memory 6G \
    --conf spark.sql.hive.metastore.version=0.13.1 \
    --conf spark.sql.hive.metastore.jars=$HIVE_LIB_DIR/* \
    --conf spark.driver.extraClassPath=$GUAVA_CLASSPATH \
    --files $HIVE_CONF_DIR/hive-site.xml "$@"
}

Tuesday, December 23, 2014

Spark and Parquet with large block size

One of issue when I run a Spark application in yarn cluster mode is that my executor container is killed because the memory exceeds memory limits. NodeManager's log shows the folowing messages:

2014-12-22 09:22:48,906 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 26453 for container-id container_1418853356063_0165_01_000005_01: 45.8 GB of 44.5 GB physical memory used; 46.5 GB of 93.4 GB virtual memory used
2014-12-22 09:22:48,907 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Process tree for container: container_1418853356063_0165_01_000005_01 has processes older than 1 iteration running over the configured limit. Limit=47781511168, current usage = 49199300608
2014-12-22 09:22:48,907 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=26453,containerID=container_1418853356063_0165_01_000005_01] is running beyond physical memory limits. Current usage: 45.8 GB of 44.5 GB physical memory used; 46.5 GB of 93.4 GB virtual memory used. Killing container.
Dump of the process-tree for container_1418853356063_0165_01_000005_01 :
        |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
        |- 26526 26453 26453 26453 (java) 315870 8116 49777754112 12011233 /usr/java/jdk1.7.0_67-cloudera/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms45056m -Xmx45056m -Djava.io.tmpdir=/grid/2/yarn/nm/usercache/bwang/appcache/application_1418853356063_0165/container_1418853356063_0165_01_000005_01/tmp org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@hadoop-data:48923/user/CoarseGrainedScheduler 3 hadoop-data 23
        |- 26453 26450 26453 26453 (bash) 1 1 108658688 315 /bin/bash -c /usr/java/jdk1.7.0_67-cloudera/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms45056m -Xmx45056m  -Djava.io.tmpdir=/grid/2/yarn/nm/usercache/bwang/appcache/application_1418853356063_0165/container_1418853356063_0165_01_000005_01/tmp org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@hadoop-data:48923/user/CoarseGrainedScheduler 3 hadoop-data 23 1> /var/log/hadoop-yarn/container/application_1418853356063_0165/container_1418853356063_0165_01_000005_01/stdout 2> /var/log/hadoop-yarn/container/application_1418853356063_0165/container_1418853356063_0165_01_000005_01/stderr

If an executor is lost, Spark will spawn another executor to recover from this failure. Under this situation, any persisted or cached RDDs are lost too, Spark must re-process from the beginning. This usually leads to a lengthy processing time. In any case, you should avoid this situation happening. For example, one of my application can finish in 5 mins if no lost executor happens, otherwise it could take more than 40 mins.

Because Spark builds an in-memory hash when doing groupByKey or combineByKey, which needs a lot of memory. One of suggestion is to use a larger parallelism to make the reduce task smaller.

If your application read Parquet files, you must be careful to choose executor memory and cores by taking into account of the block size of parquet files because reading Parquet files may consume a lot of memory. For example, Impala writes Parquet files using 1GB HDFS blocksize by default. My application needs to read 136 parquet files output by Impala. The application runs on 4 nodes with 24 virtual cores each node using "--executor-memory 44G --executor-cores 23 --num-executors 4", and my executors will be killed. But if I use 10 cores per executor "--executor-cores 10", everything passes through without any executor is lost. The reason is when you read 136 parquet files, there are 23 running tasks in one executor at the same time, so 23GB are allocated for reading Parquet, and 23 tasks try to build hash in memory. By using 10 cores, there are more memory for one task, and each task has enough memory to build the hash. Even the job is a little bit slow, it actually save time because no re-computation due to the lost executors.