Tuesday, January 19, 2016

Using HiveContext to read Hive Tables

I just tried to use Spark HiveContext to use the tables in HiveMetastore. It is very simple, but there are still a lot of things not documented clearly yet.
Here is what I recorded to make it work. I compiled Spark 1.6.0 to work with CDH 5.3.2 in my cluster.
  • Create HiveContext instead of SQLContext
    val sqlContext = new HiveContext(sc)
    
  • Use the specific version of Hive. Spark 1.6.0 is compiled with Hive 1.2.1 by default. But you can use the specific version of Hive in your cluster without recompiling it. I’m using CDH 5.3.2, so I need to
    • specify the version spark.sql.hive.metastore.version=0.13.1
    • specify the hive jars. Unfortunately, you cannot simply list hive-metastore.jar because it depends on several jars and a lot of transitive dependencies. The simple way is to have all the jars in $HIVE_LIB_DIR/*like spark.sql.hive.metastore.jars=$HIVE_LIB_DIR/*
    • In Spark’s document, it is said that you need hive-site.xml, but how to add it is not clear. Basically, you need to add it into the classpath.
      • For spark-shell, it is ok when you use spark.sql.hive.metastore.jars=$HIVE_CONF_DIR:$HIVE_LIB_DIR/*
      • For spark-submit --master yarn-cluster, you can do like --files $HIVE_CONF_DIR/hive-site.xml. hive-site.xml will be copied to the container working directory for spark driver, and the working directory is in the classpath of spark driver.
    • There is a bug [SPARK-11702], so you need to add spark.driver.extraClassPath=$GUAVA_CLASSPATH. Otherwise, you will see an error like below, although the guava jar is already in $HIVE_LIB_DIR.
      java.lang.ClassNotFoundException: java.lang.NoClassDefFoundError: com/google/common/base/Predicate
      
  • Read a table as DataFrame. NOTE: you can give the database name.
    val df = sqlContext.read.table(“adhoc.hc_part_test”)
    
  • Write into parquet files like dynamic partitioned. You need to run alter table add partition by yourself.
    case class P (name: String, event_date:String)
    val data = Seq(P(“joe”, “2015-01-01”), P(“harry”, “2016-01-19”)).toDF 
    data.registerTempTable(“event”) 
    data.write.partitionBy(“event_date”).format(“parquet”).save(“/tmp/hc_part_test”)
    
  • Write into the table directly using dynamic partition. No need to manage the partitions yourself.
    sqlContext.sql(“set hive.exec.dynamic.partition.mode=nonstrict”)
    sqlContext.sql(“insert into table adhoc.hc_part_test partition (event_date) select * from event”)
    
I got this error when I tried to read a parqet table,
java.lang.RuntimeException: java.lang.ClassNotFoundException: parquet.hive.DeprecatedParquetInputFormat.
It turns out that I created the table in Impala, and the table InputFormat/OutputFormat use parquet.hive.DeprecatedParquetInputFormat. Creating the table in Hive resolve this issue.
— created by Impala 
CREATE TABLE adhoc.hc_part_test( 
  name string COMMENT ‘’)
PARTITIONED BY ( 
  event_date string)
ROW FORMAT SERDE
  ‘parquet.hive.serde.ParquetHiveSerDe’
STORED AS 
  INPUTFORMAT ‘parquet.hive.DeprecatedParquetInputFormat’
  OUTPUTFORMAT ‘parquet.hive.DeprecatedParquetOutputFormat’
LOCATION ‘hdfs://nameservice-prod/user/hive/warehouse/adhoc.db/hc_part_test’
TBLPROPERTIES ( ‘transient_lastDdlTime’=’1453223673’)

 — created by hive 
CREATE TABLE hc_part_test(
  name string)
PARTITIONED BY (
  event_date string)
ROW FORMAT SERDE
  ‘org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe’
STORED AS 
  INPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat’
  OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat’
LOCATION ‘hdfs://nameservice-prod/user/hive/warehouse/adhoc.db/hc_part_test’
TBLPROPERTIES ( ‘transient_lastDdlTime’=’1453226294’)
When you start spark using spark-shell, the script look like:
export HADOOP_CONF_DIR=/etc/hadoop/conf
export HIVE_CONF_DIR=/etc/hive/conf
CDH_JARS=/opt/cloudera/parcels/CDH/jars

HIVE_LIB_DIR=/opt/cloudera/parcels/CDH/lib/hive/lib
hive_metastore_classpath=$HIVE_LIB_DIR/*

# SPARK-11702
GUAVA_CLASSPATH=$CDH_JARS/guava-15.0.jar

shell() {
  $SPARK_SHELL \
    --master yarn \
    --num-executors 8 \
    --executor-cores 3 \
    --executor-memory 6G \
    --conf spark.sql.hive.metastore.version=0.13.1 \
    --conf spark.sql.hive.metastore.jars=$HIVE_CONF_DIR:$HIVE_LIB_DIR/* \
    --conf spark.driver.extraClassPath=$GUAVA_CLASSPATH "$@"
}

submit() {
  $SPARK_SUBMIT \
    --master yarn-cluster \
    --num-executors 8 \
    --executor-cores 3 \
    --executor-memory 6G \
    --conf spark.sql.hive.metastore.version=0.13.1 \
    --conf spark.sql.hive.metastore.jars=$HIVE_LIB_DIR/* \
    --conf spark.driver.extraClassPath=$GUAVA_CLASSPATH \
    --files $HIVE_CONF_DIR/hive-site.xml "$@"
}

2 comments:

  1. Thanks Ben it helped and saved my time.

    ReplyDelete
  2. Thanks a lot, Ben. I almost lost one day trying to fix this one. I discovered you blog and then all set.

    I am getting another error trying to access hive through hive jdbc driver. Could you please help me solve this one as well.

    df = sqlContext.read.format('jdbc').options(url='jdbc:hive2://localhost:10000/default', dbtable='movies').load()

    Py4JJavaError: An error occurred while calling o31.load.
    : java.sql.SQLException: Method not supported
    at org.apache.hive.jdbc.HiveResultSetMetaData.isSigned(HiveResultSetMetaData.java:143)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:135)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:91)
    at org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:60)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:744)

    ReplyDelete