Showing posts with label yarn. Show all posts
Showing posts with label yarn. Show all posts

Tuesday, May 3, 2016

How to resolve spark-cassandra-connector's Guava version conflict in spark-shell

In my blog How to resolve spark-cassandra-connector Guava version conflicts in Yarn cluster mode, I explained how to resolve Guava version issue in Yarn cluster mode. This blog covers how to do it in spark-shell.
The first thing is, when you start spark-shell with --master yarn, you actually run in yarn-client mode. Unfortunately my method for Yarn cluster mode won’t work. You may still get an exception as below:
Caused by: java.lang.NoSuchMethodError: com.google.common.util.concurrent.Futures.withFallback(Lcom/google/common/util/concurrent/ListenableFuture;Lcom/google/common/util/concurrent/FutureFallback;Ljava/util/concurrent/Executor;)Lcom/google/common/util/concurrent/ListenableFuture;
What’s wrong? If you log on the data node, and check the launch_container.sh for you Yarn application, you will find guava-16.0.1.jar is in the first one in the classpath
export CLASSPATH="$PWD/guava-16.0.1.jar:$PWD:$PWD/__spark__.jar:$HADOOP_CLIENT_CONF_DIR:$HADOOP_CONF_DIR:$HADOOP_COMMON_HOME/*:$HADOOP_COMMON_HOME/lib/*:$HADOOP_HDFS_HOME/*:$HADOOP_HDFS_HOME/lib/*:$HADOOP_YARN_HOME/*:$HADOOP_YARN_HOME/lib/*:$HADOOP_MAPRED_HOME/*:$HADOOP_MAPRED_HOME/lib/*:$MR2_CLASSPATH"
Here is the trick: you need to add Guava jar to --files in your command
spark-shell \
  --master yarn-cluster \
  --driver-class-path <local path of guava-16.0.1.jar> \
  --conf spark.executor.extraClassPath=./guava-16.0.1.jar \
  --jars <local path of guava-16.0.1.jar> \
  --files <local path of guava-16.0.1.jar> \
  ...
Sounds weird? You can do this test, then you will understand why. Run spark-shell command, when you see the prompt, don’t do anything, log on the data nodes where your application’s spark executors are running. Find the application’s cache, what will you find?
# ls /grid/0/yarn/nm/usercache/bwang/appcache/application_1459869234031_5503/container_e45_1459869234031_5503_01_000004/
container_tokens                       launch_container.sh
default_container_executor_session.sh  __spark__.jar
default_container_executor.sh          tmp
Where are those jars listed in —jars? The answer is those jars are copied until you start some action of RDD or DataFrame in the spark-shell. Unfortunately, the JVM of the executor is already started, and the JVM might use the older version of Guava already.
If you add Guava jar in --files, the jar will be copied to the executor’s container. And guava-16.0.1.jar will be chosen over the older version of Guava.
Updates
Adding --files is not necessary for Spark 2.0.1. For Spark 2.1, you are not able to start Spark shell if you keep it. All of jars are already distributed in Spark 2 when each executor’s JVM starts.

Friday, April 15, 2016

How to resolve spark-cassandra-connector Guava version conflicts in Yarn cluster mode

When you use spark-cassandra-connector, you will encounter this problem “Guava version conflicts” when you submit your job using Yarn cluster mode. spark-cassandra-connector usually use the latest Guava version 16.0.1, which has some new methods could not be found in the old version of Guava, e.g., 11.0.2. It is A BIG Headache when you try to resolve this problem.
Here is how you can resolve this without building something specially.
I think everyone might already have the idea: Put guava-16.0.1.jar before guava-11.0.2.jar in the classpath. But how can we achieve this when you run as YARN cluster mode?
You Hadoop cluster might already have the Guava jar. If you use CDH, try this
find -L /opt/cloudera/parcels/CDH -name "guava*.jar". If you like use that jar, you can resolve this problem by adding
spark-submit
  --master yarn-cluster
  --conf spark.driver.extraClassPath=<path of guava-16.0.1.jar>
  --conf spark.executor.extraClassPath=<path of guava-16.0.1.jar>
  ...
extraClassPath allow you prepend the jars in the class path.
If you could not find the version of Guava in you cluster, you can just include the jar by yourself
spark-submit
  --master yarn-cluster
  --conf spark.driver.extraClassPath=./guava-16.0.1.jar
  --conf spark.executor.extraClassPath=./guava-16.0.1.jar
  --jars <path of guava-16.0.1.jar>
  ...
In --jars, you actually tell spark how to find the jar, so you need to provide the full path of the jar. When spark starts in Yarn cluster mode, the jar will be shipped to the container in NodeManager, in that everything will the current directory where the executor starts, you only need to tell it is the current working directory in extraClassPath.
If you use CDH, all hadoop jars are automatically added when you run a Yarn application. Take a look of launch_container.sh when you job is running, you will see something like below.
export CLASSPATH="$PWD:$PWD/__spark__.jar:$HADOOP_CLIENT_CONF_DIR:$HADOOP_CONF_DIR:$HADOOP_COMMON_HOME/*:$HADOOP_COMMON_HOME/lib/*:$HADOOP_HDFS_HOME/*:$HADOOP_HDFS_HOME/lib/*:$HADOOP_YARN_HOME/*:$HADOOP_YARN_HOME/lib/*:$HADOOP_MAPRED_HOME/*:$HADOOP_MAPRED_HOME/lib/*:$MR2_CLASSPATH"
Here is how you can find the launch_container.sh
  • Find the hose where one of the executors is running
  • Run this command find -L /yarn -path "*<app_id>*" -name "launch*"
There is a Yarn configuration yarn.application.classpath. If you like, you can prepend an entry for Guava.
  <property>
    <name>yarn.application.classpath</name>
    <value>$HADOOP_CLIENT_CONF_DIR,$HADOOP_CONF_DIR,$HADOOP_COMMON_HOME/*,$HADOOP_COMMON_HOME/lib/*,$HADOOP_HDFS_HOME/*,$HADOOP_HDFS_HOME/lib/*,$HADOOP_YARN_HOME/*,$HADOOP_YARN_HOME/lib/*</value>
  </property>
You know spark-submit have some messy convention:
  • --jars is separated by comma “,
  • extraClassPath is separated by column “:
  • --driver-class-path is separated by comma “,
I was in a hurry to write this blog. I might miss something or I assume you know a lot. If something is not clear, let me know and I will fix it.

Tuesday, December 23, 2014

Spark and Parquet with large block size

One of issue when I run a Spark application in yarn cluster mode is that my executor container is killed because the memory exceeds memory limits. NodeManager's log shows the folowing messages:

2014-12-22 09:22:48,906 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 26453 for container-id container_1418853356063_0165_01_000005_01: 45.8 GB of 44.5 GB physical memory used; 46.5 GB of 93.4 GB virtual memory used
2014-12-22 09:22:48,907 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Process tree for container: container_1418853356063_0165_01_000005_01 has processes older than 1 iteration running over the configured limit. Limit=47781511168, current usage = 49199300608
2014-12-22 09:22:48,907 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=26453,containerID=container_1418853356063_0165_01_000005_01] is running beyond physical memory limits. Current usage: 45.8 GB of 44.5 GB physical memory used; 46.5 GB of 93.4 GB virtual memory used. Killing container.
Dump of the process-tree for container_1418853356063_0165_01_000005_01 :
        |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
        |- 26526 26453 26453 26453 (java) 315870 8116 49777754112 12011233 /usr/java/jdk1.7.0_67-cloudera/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms45056m -Xmx45056m -Djava.io.tmpdir=/grid/2/yarn/nm/usercache/bwang/appcache/application_1418853356063_0165/container_1418853356063_0165_01_000005_01/tmp org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@hadoop-data:48923/user/CoarseGrainedScheduler 3 hadoop-data 23
        |- 26453 26450 26453 26453 (bash) 1 1 108658688 315 /bin/bash -c /usr/java/jdk1.7.0_67-cloudera/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms45056m -Xmx45056m  -Djava.io.tmpdir=/grid/2/yarn/nm/usercache/bwang/appcache/application_1418853356063_0165/container_1418853356063_0165_01_000005_01/tmp org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@hadoop-data:48923/user/CoarseGrainedScheduler 3 hadoop-data 23 1> /var/log/hadoop-yarn/container/application_1418853356063_0165/container_1418853356063_0165_01_000005_01/stdout 2> /var/log/hadoop-yarn/container/application_1418853356063_0165/container_1418853356063_0165_01_000005_01/stderr

If an executor is lost, Spark will spawn another executor to recover from this failure. Under this situation, any persisted or cached RDDs are lost too, Spark must re-process from the beginning. This usually leads to a lengthy processing time. In any case, you should avoid this situation happening. For example, one of my application can finish in 5 mins if no lost executor happens, otherwise it could take more than 40 mins.

Because Spark builds an in-memory hash when doing groupByKey or combineByKey, which needs a lot of memory. One of suggestion is to use a larger parallelism to make the reduce task smaller.

If your application read Parquet files, you must be careful to choose executor memory and cores by taking into account of the block size of parquet files because reading Parquet files may consume a lot of memory. For example, Impala writes Parquet files using 1GB HDFS blocksize by default. My application needs to read 136 parquet files output by Impala. The application runs on 4 nodes with 24 virtual cores each node using "--executor-memory 44G --executor-cores 23 --num-executors 4", and my executors will be killed. But if I use 10 cores per executor "--executor-cores 10", everything passes through without any executor is lost. The reason is when you read 136 parquet files, there are 23 running tasks in one executor at the same time, so 23GB are allocated for reading Parquet, and 23 tasks try to build hash in memory. By using 10 cores, there are more memory for one task, and each task has enough memory to build the hash. Even the job is a little bit slow, it actually save time because no re-computation due to the lost executors.

Thursday, February 27, 2014

Yarn MapReduce Log Level

When you write a Pig or Hive UDF, a debug log may be very useful. You don't have to ask your administrator for help. Setting property 'mapreduce.map.log.level' or 'mapreduce.reduce.log.level' to 'DEBUG', that is it.
set mapreduce.map.log.level 'DEBUG'

register '/home/bwang/workspace/pig-scratch/target/pig-scratch-0.0.1-SNAPSHOT.jar';

define asByteArray com.mycompany.pig.SaveObjectAsByteArray();

d_val = load '/tmp/double.txt' as (id: chararray, val: double);

dba_val = foreach d_val generate flatten(asByteArray(*)), 0 as pos;

g = group dba_val all;

dump g;

But this is not perfect in that you will see a lot of Hadoop DEBUG log information too when you check the syslog of a map task.

Can I just output the DEBUG log for my own class? log4j definitely supports that, but it is not straight forward in Yarn MapReduce job as you think.

If you read MRApps.addLog4jSystemProperties, you will find that the log4j.configuration is actually hard coded to 'container-log4j.properties', which is packed in hadoop-yarn-server-nodemanager-2.0.0-cdh4.5.0.jar.

I found a way to fool NodeManager to achieve setting the log level for my UDF class:

  • Find container-log4j.properties in maven dependencies.
  • Copy the content to a property file, e.g., as_byte_array.log4j.properties.
  • Add 'log4j.logger.com.mycompany.pig=${com.mycompany.pig.logger}' into as_byte_array.log4j.properties.
  • Build a package and make sure as_byte_array.log4j.properties in your udf jar.
  • Change the pig script like this:
    set mapreduce.map.log.level 'INFO,CLA -Dlog4j.configuration=as_byte_array.log4j.properties -Dcom.mycompany.pig.logger=DEBUG'
    
    This is totally a HACK. If you check 'ps -ef | grep mycompany' when the map task is running, you will see something like this:
    /usr/java/jdk1.7.0_25/bin/java -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN -Xmx825955249 -Djava.io.tmpdir=/yarn/nm/usercache/bwang/appcache/application_1393480846083_0011/container_1393480846083_0011_01_000002/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.mapreduce.container.log.dir=/var/log/hadoop-yarn/container/application_1393480846083_0011/container_1393480846083_0011_01_000002 -Dyarn.app.mapreduce.container.log.filesize=0 -Dhadoop.root.logger=INFO,CLA -Dlog4j.configuration=as_byte_array.log4j.properties -Dcom.mycompany.pig.logger=DEBUG,CLA org.apache.hadoop.mapred.YarnChild 127.0.0.1 60261 attempt_1393480846083_0011_m_000000_0 2
    
    Basically I inject a new log4j.configuration to point to my own log4j.properties, which overwrites container-log4j.properties because it appears behind it. And "-Dcom.mycompany.pig.logger=DEBUG" let me control the log level for my UDF.