Friday, January 29, 2016

Using Hadoop distcp copy files from a SFTP server

Here are the steps how I use "hadoop distcp" to copy files from a SFTP server to HDFS:
  • Clone hadoop-filesystem-sftp at https://github.com/wnagele/hadoop-filesystem-sftp.git
  • There is a bug in hadoop-filesystem-sftp which may block you running distcp correctly when you have special characters in the file names which should be escaped, e.g. ":". The fix is very simple. You can find line 331 in SFTPFileSystem.java, and encode the sftpFile.filename.
      for (SFTPv3DirectoryEntry sftpFile : sftpFiles) {
       String filename = URLEncoder.encode(sftpFile.filename, "UTF-8");
       if (!"..".equals(filename) && !".".equals(filename))
        fileStats.add(getFileStatus(sftpFile.attributes, new Path(path, filename).makeQualified(this)));
      }
    
  • If using password, it might be easy, but your password to SFTP server will be public because it will be shown in MapReduce job configuration.
  • hadoop-filesystem-sftp using ganymed-ssh-2 which only supports authentication using password or keyfile.
  • Here is how to set up passwordless SSH, you need permission to log on the sftp server. Create a ssh key pair using "ssh-keygen". Make sure you don't overwrite your current key pair in $HOME/.ssh
    $ ssk-keygen -f ${distcp_ssh}/keyfile
    $ ssh-keygen -F sftp-server-name -f ${distcp_ssh}/known_hosts
    
  • Copy the public key to the sftp server.
    • Copy ${distcp_ssh} to all data nodes and the client node. And you need to set the dir read only by yarn.
    • On the client node. You need to set this dir readable by the user you use to run "hadoop distcp"
    • The reason doing like this is that hadoop-filesystem-sftp trying to use ${user.home} for the default path for the key file and known_hosts. And the more important reason is that the task is run as yarn instead of the user running the command on each data node. Unfortunately, some one can write a mapreduce job to grab your id_rsa key file and gain the access to SFTP server.
  • hadoop distcp -D fs.sftp.user=username -D fs.sftp.key.file=${distcp_ssh}/id_rsa -D fs.sftp.knownhosts=${distcp_ssh}/known_hosts -libjars hadoop-filesystem-sftp-0.0.1-SNAPSHOT-jar-with-dependencies.jar sftp://sftp-server/src-path hdfs://namenode/target-path
  • WARNING: Don't use this method unless you have to.

Tuesday, January 19, 2016

Using HiveContext to read Hive Tables

I just tried to use Spark HiveContext to use the tables in HiveMetastore. It is very simple, but there are still a lot of things not documented clearly yet.
Here is what I recorded to make it work. I compiled Spark 1.6.0 to work with CDH 5.3.2 in my cluster.
  • Create HiveContext instead of SQLContext
    val sqlContext = new HiveContext(sc)
    
  • Use the specific version of Hive. Spark 1.6.0 is compiled with Hive 1.2.1 by default. But you can use the specific version of Hive in your cluster without recompiling it. I’m using CDH 5.3.2, so I need to
    • specify the version spark.sql.hive.metastore.version=0.13.1
    • specify the hive jars. Unfortunately, you cannot simply list hive-metastore.jar because it depends on several jars and a lot of transitive dependencies. The simple way is to have all the jars in $HIVE_LIB_DIR/*like spark.sql.hive.metastore.jars=$HIVE_LIB_DIR/*
    • In Spark’s document, it is said that you need hive-site.xml, but how to add it is not clear. Basically, you need to add it into the classpath.
      • For spark-shell, it is ok when you use spark.sql.hive.metastore.jars=$HIVE_CONF_DIR:$HIVE_LIB_DIR/*
      • For spark-submit --master yarn-cluster, you can do like --files $HIVE_CONF_DIR/hive-site.xml. hive-site.xml will be copied to the container working directory for spark driver, and the working directory is in the classpath of spark driver.
    • There is a bug [SPARK-11702], so you need to add spark.driver.extraClassPath=$GUAVA_CLASSPATH. Otherwise, you will see an error like below, although the guava jar is already in $HIVE_LIB_DIR.
      java.lang.ClassNotFoundException: java.lang.NoClassDefFoundError: com/google/common/base/Predicate
      
  • Read a table as DataFrame. NOTE: you can give the database name.
    val df = sqlContext.read.table(“adhoc.hc_part_test”)
    
  • Write into parquet files like dynamic partitioned. You need to run alter table add partition by yourself.
    case class P (name: String, event_date:String)
    val data = Seq(P(“joe”, “2015-01-01”), P(“harry”, “2016-01-19”)).toDF 
    data.registerTempTable(“event”) 
    data.write.partitionBy(“event_date”).format(“parquet”).save(“/tmp/hc_part_test”)
    
  • Write into the table directly using dynamic partition. No need to manage the partitions yourself.
    sqlContext.sql(“set hive.exec.dynamic.partition.mode=nonstrict”)
    sqlContext.sql(“insert into table adhoc.hc_part_test partition (event_date) select * from event”)
    
I got this error when I tried to read a parqet table,
java.lang.RuntimeException: java.lang.ClassNotFoundException: parquet.hive.DeprecatedParquetInputFormat.
It turns out that I created the table in Impala, and the table InputFormat/OutputFormat use parquet.hive.DeprecatedParquetInputFormat. Creating the table in Hive resolve this issue.
— created by Impala 
CREATE TABLE adhoc.hc_part_test( 
  name string COMMENT ‘’)
PARTITIONED BY ( 
  event_date string)
ROW FORMAT SERDE
  ‘parquet.hive.serde.ParquetHiveSerDe’
STORED AS 
  INPUTFORMAT ‘parquet.hive.DeprecatedParquetInputFormat’
  OUTPUTFORMAT ‘parquet.hive.DeprecatedParquetOutputFormat’
LOCATION ‘hdfs://nameservice-prod/user/hive/warehouse/adhoc.db/hc_part_test’
TBLPROPERTIES ( ‘transient_lastDdlTime’=’1453223673’)

 — created by hive 
CREATE TABLE hc_part_test(
  name string)
PARTITIONED BY (
  event_date string)
ROW FORMAT SERDE
  ‘org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe’
STORED AS 
  INPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat’
  OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat’
LOCATION ‘hdfs://nameservice-prod/user/hive/warehouse/adhoc.db/hc_part_test’
TBLPROPERTIES ( ‘transient_lastDdlTime’=’1453226294’)
When you start spark using spark-shell, the script look like:
export HADOOP_CONF_DIR=/etc/hadoop/conf
export HIVE_CONF_DIR=/etc/hive/conf
CDH_JARS=/opt/cloudera/parcels/CDH/jars

HIVE_LIB_DIR=/opt/cloudera/parcels/CDH/lib/hive/lib
hive_metastore_classpath=$HIVE_LIB_DIR/*

# SPARK-11702
GUAVA_CLASSPATH=$CDH_JARS/guava-15.0.jar

shell() {
  $SPARK_SHELL \
    --master yarn \
    --num-executors 8 \
    --executor-cores 3 \
    --executor-memory 6G \
    --conf spark.sql.hive.metastore.version=0.13.1 \
    --conf spark.sql.hive.metastore.jars=$HIVE_CONF_DIR:$HIVE_LIB_DIR/* \
    --conf spark.driver.extraClassPath=$GUAVA_CLASSPATH "$@"
}

submit() {
  $SPARK_SUBMIT \
    --master yarn-cluster \
    --num-executors 8 \
    --executor-cores 3 \
    --executor-memory 6G \
    --conf spark.sql.hive.metastore.version=0.13.1 \
    --conf spark.sql.hive.metastore.jars=$HIVE_LIB_DIR/* \
    --conf spark.driver.extraClassPath=$GUAVA_CLASSPATH \
    --files $HIVE_CONF_DIR/hive-site.xml "$@"
}