I just tried to use Spark HiveContext to use the tables in HiveMetastore. It is very simple, but there are still a lot of things not documented clearly yet.
Here is what I recorded to make it work. I compiled Spark 1.6.0 to work with CDH 5.3.2 in my cluster.
- Create HiveContext instead of SQLContext
val sqlContext = new HiveContext(sc)
- Use the specific version of Hive. Spark 1.6.0 is compiled with Hive 1.2.1 by default. But you can use the specific version of Hive in your cluster without recompiling it. I’m using CDH 5.3.2, so I need to
- specify the version
spark.sql.hive.metastore.version=0.13.1
- specify the hive jars. Unfortunately, you cannot simply list hive-metastore.jar because it depends on several jars and a lot of transitive dependencies. The simple way is to have all the jars in
$HIVE_LIB_DIR/*
likespark.sql.hive.metastore.jars=$HIVE_LIB_DIR/*
- In Spark’s document, it is said that you need
hive-site.xml
, but how to add it is not clear. Basically, you need to add it into the classpath.- For
spark-shell
, it is ok when you usespark.sql.hive.metastore.jars=$HIVE_CONF_DIR:$HIVE_LIB_DIR/*
- For
spark-submit --master yarn-cluster
, you can do like--files $HIVE_CONF_DIR/hive-site.xml
.hive-site.xml
will be copied to the container working directory for spark driver, and the working directory is in the classpath of spark driver.
- For
- There is a bug [SPARK-11702], so you need to add
spark.driver.extraClassPath=$GUAVA_CLASSPATH
. Otherwise, you will see an error like below, although the guava jar is already in$HIVE_LIB_DIR
.java.lang.ClassNotFoundException: java.lang.NoClassDefFoundError: com/google/common/base/Predicate
- specify the version
- Read a table as DataFrame. NOTE: you can give the database name.
val df = sqlContext.read.table(“adhoc.hc_part_test”)
- Write into parquet files like dynamic partitioned. You need to run
alter table
by yourself.add partition case class P (name: String, event_date:String) val data = Seq(P(“joe”, “2015-01-01”), P(“harry”, “2016-01-19”)).toDF data.registerTempTable(“event”) data.write.partitionBy(“event_date”).format(“parquet”).save(“/tmp/hc_part_test”)
- Write into the table directly using dynamic partition. No need to manage the partitions yourself.
sqlContext.sql(“set hive.exec.dynamic.partition.mode=nonstrict”) sqlContext.sql(“insert into table adhoc.hc_part_test partition (event_date) select * from event”)
I got this error when I tried to read a parqet table,
java.lang.RuntimeException: java.lang.ClassNotFoundException: parquet.hive.DeprecatedParquetInputFormat.
It turns out that I created the table in Impala, and the table InputFormat/OutputFormat use
parquet.hive.DeprecatedParquetInputFormat
. Creating the table in Hive resolve this issue.— created by Impala
CREATE TABLE adhoc.hc_part_test(
name string COMMENT ‘’)
PARTITIONED BY (
event_date string)
ROW FORMAT SERDE
‘parquet.hive.serde.ParquetHiveSerDe’
STORED AS
INPUTFORMAT ‘parquet.hive.DeprecatedParquetInputFormat’
OUTPUTFORMAT ‘parquet.hive.DeprecatedParquetOutputFormat’
LOCATION ‘hdfs://nameservice-prod/user/hive/warehouse/adhoc.db/hc_part_test’
TBLPROPERTIES ( ‘transient_lastDdlTime’=’1453223673’)
— created by hive
CREATE TABLE hc_part_test(
name string)
PARTITIONED BY (
event_date string)
ROW FORMAT SERDE
‘org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe’
STORED AS
INPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat’
OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat’
LOCATION ‘hdfs://nameservice-prod/user/hive/warehouse/adhoc.db/hc_part_test’
TBLPROPERTIES ( ‘transient_lastDdlTime’=’1453226294’)
When you start spark using spark-shell, the script look like:
export HADOOP_CONF_DIR=/etc/hadoop/conf
export HIVE_CONF_DIR=/etc/hive/conf
CDH_JARS=/opt/cloudera/parcels/CDH/jars
HIVE_LIB_DIR=/opt/cloudera/parcels/CDH/lib/hive/lib
hive_metastore_classpath=$HIVE_LIB_DIR/*
# SPARK-11702
GUAVA_CLASSPATH=$CDH_JARS/guava-15.0.jar
shell() {
$SPARK_SHELL \
--master yarn \
--num-executors 8 \
--executor-cores 3 \
--executor-memory 6G \
--conf spark.sql.hive.metastore.version=0.13.1 \
--conf spark.sql.hive.metastore.jars=$HIVE_CONF_DIR:$HIVE_LIB_DIR/* \
--conf spark.driver.extraClassPath=$GUAVA_CLASSPATH "$@"
}
submit() {
$SPARK_SUBMIT \
--master yarn-cluster \
--num-executors 8 \
--executor-cores 3 \
--executor-memory 6G \
--conf spark.sql.hive.metastore.version=0.13.1 \
--conf spark.sql.hive.metastore.jars=$HIVE_LIB_DIR/* \
--conf spark.driver.extraClassPath=$GUAVA_CLASSPATH \
--files $HIVE_CONF_DIR/hive-site.xml "$@"
}
Thanks Ben it helped and saved my time.
ReplyDeleteThanks a lot, Ben. I almost lost one day trying to fix this one. I discovered you blog and then all set.
ReplyDeleteI am getting another error trying to access hive through hive jdbc driver. Could you please help me solve this one as well.
df = sqlContext.read.format('jdbc').options(url='jdbc:hive2://localhost:10000/default', dbtable='movies').load()
Py4JJavaError: An error occurred while calling o31.load.
: java.sql.SQLException: Method not supported
at org.apache.hive.jdbc.HiveResultSetMetaData.isSigned(HiveResultSetMetaData.java:143)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:135)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:91)
at org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:60)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:744)