Tuesday, December 23, 2014

Spark and Parquet with large block size

One of issue when I run a Spark application in yarn cluster mode is that my executor container is killed because the memory exceeds memory limits. NodeManager's log shows the folowing messages:

2014-12-22 09:22:48,906 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 26453 for container-id container_1418853356063_0165_01_000005_01: 45.8 GB of 44.5 GB physical memory used; 46.5 GB of 93.4 GB virtual memory used
2014-12-22 09:22:48,907 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Process tree for container: container_1418853356063_0165_01_000005_01 has processes older than 1 iteration running over the configured limit. Limit=47781511168, current usage = 49199300608
2014-12-22 09:22:48,907 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=26453,containerID=container_1418853356063_0165_01_000005_01] is running beyond physical memory limits. Current usage: 45.8 GB of 44.5 GB physical memory used; 46.5 GB of 93.4 GB virtual memory used. Killing container.
Dump of the process-tree for container_1418853356063_0165_01_000005_01 :
        |- 26526 26453 26453 26453 (java) 315870 8116 49777754112 12011233 /usr/java/jdk1.7.0_67-cloudera/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms45056m -Xmx45056m -Djava.io.tmpdir=/grid/2/yarn/nm/usercache/bwang/appcache/application_1418853356063_0165/container_1418853356063_0165_01_000005_01/tmp org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@hadoop-data:48923/user/CoarseGrainedScheduler 3 hadoop-data 23
        |- 26453 26450 26453 26453 (bash) 1 1 108658688 315 /bin/bash -c /usr/java/jdk1.7.0_67-cloudera/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms45056m -Xmx45056m  -Djava.io.tmpdir=/grid/2/yarn/nm/usercache/bwang/appcache/application_1418853356063_0165/container_1418853356063_0165_01_000005_01/tmp org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@hadoop-data:48923/user/CoarseGrainedScheduler 3 hadoop-data 23 1> /var/log/hadoop-yarn/container/application_1418853356063_0165/container_1418853356063_0165_01_000005_01/stdout 2> /var/log/hadoop-yarn/container/application_1418853356063_0165/container_1418853356063_0165_01_000005_01/stderr

If an executor is lost, Spark will spawn another executor to recover from this failure. Under this situation, any persisted or cached RDDs are lost too, Spark must re-process from the beginning. This usually leads to a lengthy processing time. In any case, you should avoid this situation happening. For example, one of my application can finish in 5 mins if no lost executor happens, otherwise it could take more than 40 mins.

Because Spark builds an in-memory hash when doing groupByKey or combineByKey, which needs a lot of memory. One of suggestion is to use a larger parallelism to make the reduce task smaller.

If your application read Parquet files, you must be careful to choose executor memory and cores by taking into account of the block size of parquet files because reading Parquet files may consume a lot of memory. For example, Impala writes Parquet files using 1GB HDFS blocksize by default. My application needs to read 136 parquet files output by Impala. The application runs on 4 nodes with 24 virtual cores each node using "--executor-memory 44G --executor-cores 23 --num-executors 4", and my executors will be killed. But if I use 10 cores per executor "--executor-cores 10", everything passes through without any executor is lost. The reason is when you read 136 parquet files, there are 23 running tasks in one executor at the same time, so 23GB are allocated for reading Parquet, and 23 tasks try to build hash in memory. By using 10 cores, there are more memory for one task, and each task has enough memory to build the hash. Even the job is a little bit slow, it actually save time because no re-computation due to the lost executors.