Apache Spark s3a committer - thread stack - out of memory issues

0

I really need some help here:

We are using Spark3.1.2 using standalone cluster. Since we started using the s3a directory committer, our spark jobs stability and performance grew significantly!

Lately however we are completely baffled troubleshooting this s3a directory committer issue for days, and wonder if you have any idea what's going on?

Our spark jobs fail because of Java OOM (or rather process limit) error:

 An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.

: java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
    at java.base/java.lang.Thread.start0(Native Method)
    at java.base/java.lang.Thread.start(Thread.java:803)
    at java.base/java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:937)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1343)
    at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
    at java.base/java.util.concurrent.Executors$DelegatedExecutorService.submit(Executors.java:714)
    at org.apache.spark.rpc.netty.DedicatedMessageLoop.$anonfun$new$1(MessageLoop.scala:174)
    at org.apache.spark.rpc.netty.DedicatedMessageLoop.$anonfun$new$1$adapted(MessageLoop.scala:173)
    at scala.collection.immutable.Range.foreach(Range.scala:158)
    at org.apache.spark.rpc.netty.DedicatedMessageLoop.<init>(MessageLoop.scala:173)
    at org.apache.spark.rpc.netty.Dispatcher.liftedTree1$1(Dispatcher.scala:75)
    at org.apache.spark.rpc.netty.Dispatcher.registerRpcEndpoint(Dispatcher.scala:72)
    at org.apache.spark.rpc.netty.NettyRpcEnv.setupEndpoint(NettyRpcEnv.scala:136)
    at org.apache.spark.storage.BlockManager.<init>(BlockManager.scala:231)
    at org.apache.spark.SparkEnv$.create(SparkEnv.scala:394)
    at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:189)
    at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:277)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:458)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:238)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:834)

Spark Thread Dump shows over 5000 committer threads on the spark driver! Here is an example:

Thread ID   Thread Name Thread State    Thread Locks
1047    s3-committer-pool-0 WAITING 
1449    s3-committer-pool-0 WAITING 
1468    s3-committer-pool-0 WAITING 
1485    s3-committer-pool-0 WAITING 
1505    s3-committer-pool-0 WAITING 
1524    s3-committer-pool-0 WAITING 
1529    s3-committer-pool-0 WAITING 
1544    s3-committer-pool-0 WAITING 
1549    s3-committer-pool-0 WAITING 
1809    s3-committer-pool-0 WAITING 
1972    s3-committer-pool-0 WAITING 
1998    s3-committer-pool-0 WAITING 
2022    s3-committer-pool-0 WAITING 
2043    s3-committer-pool-0 WAITING 
2416    s3-committer-pool-0 WAITING 
2453    s3-committer-pool-0 WAITING 
2470    s3-committer-pool-0 WAITING 
2517    s3-committer-pool-0 WAITING 
2534    s3-committer-pool-0 WAITING 
2551    s3-committer-pool-0 WAITING 
2580    s3-committer-pool-0 WAITING 
2597    s3-committer-pool-0 WAITING 
2614    s3-committer-pool-0 WAITING 
2631    s3-committer-pool-0 WAITING 
2726    s3-committer-pool-0 WAITING 
2743    s3-committer-pool-0 WAITING 
2763    s3-committer-pool-0 WAITING 
2780    s3-committer-pool-0 WAITING 
2819    s3-committer-pool-0 WAITING 
2841    s3-committer-pool-0 WAITING 
2858    s3-committer-pool-0 WAITING 
2875    s3-committer-pool-0 WAITING 
2925    s3-committer-pool-0 WAITING 
2942    s3-committer-pool-0 WAITING 
2963    s3-committer-pool-0 WAITING 
2980    s3-committer-pool-0 WAITING 
3020    s3-committer-pool-0 WAITING 
3037    s3-committer-pool-0 WAITING 
3055    s3-committer-pool-0 WAITING 
3072    s3-committer-pool-0 WAITING 
3127    s3-committer-pool-0 WAITING 
3144    s3-committer-pool-0 WAITING 
3163    s3-committer-pool-0 WAITING 
3180    s3-committer-pool-0 WAITING 
3222    s3-committer-pool-0 WAITING 
3242    s3-committer-pool-0 WAITING 
3259    s3-committer-pool-0 WAITING 
3278    s3-committer-pool-0 WAITING 
3418    s3-committer-pool-0 WAITING 
3435    s3-committer-pool-0 WAITING 
3452    s3-committer-pool-0 WAITING 
3469    s3-committer-pool-0 WAITING 
3486    s3-committer-pool-0 WAITING 
3491    s3-committer-pool-0 WAITING 
3501    s3-committer-pool-0 WAITING 
3508    s3-committer-pool-0 WAITING 
4029    s3-committer-pool-0 WAITING 
4093    s3-committer-pool-0 WAITING 
4658    s3-committer-pool-0 WAITING 
4666    s3-committer-pool-0 WAITING 
4907    s3-committer-pool-0 WAITING 
5102    s3-committer-pool-0 WAITING 
5119    s3-committer-pool-0 WAITING 
5158    s3-committer-pool-0 WAITING 
5175    s3-committer-pool-0 WAITING 
5192    s3-committer-pool-0 WAITING 
5209    s3-committer-pool-0 WAITING 
5226    s3-committer-pool-0 WAITING 
5395    s3-committer-pool-0 WAITING 
5634    s3-committer-pool-0 WAITING 
5651    s3-committer-pool-0 WAITING 
5668    s3-committer-pool-0 WAITING 
5685    s3-committer-pool-0 WAITING 
5702    s3-committer-pool-0 WAITING 
5722    s3-committer-pool-0 WAITING 
5739    s3-committer-pool-0 WAITING 
6144    s3-committer-pool-0 WAITING 
6167    s3-committer-pool-0 WAITING 
6289    s3-committer-pool-0 WAITING 
6588    s3-committer-pool-0 WAITING 
6628    s3-committer-pool-0 WAITING 
6645    s3-committer-pool-0 WAITING 
6662    s3-committer-pool-0 WAITING 
6675    s3-committer-pool-0 WAITING 
6692    s3-committer-pool-0 WAITING 
6709    s3-committer-pool-0 WAITING 
7049    s3-committer-pool-0 WAITING 

This is considering that our settings do not allow more than 100 threads… Or we don't understand something…

Here is our configurations and settings:

fs.s3a.threads.max  100 
fs.s3a.connection.maximum  1000 
fs.s3a.committer.threads 16   
fs.s3a.max.total.tasks  5
fs.s3a.committer.name   directory
fs.s3a.fast.upload.buffer                 disk
io.file.buffer.size                                1048576
mapreduce.outputcommitter.factory.scheme.s3a    - org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory

We had tried different versions of the spark Hadoop cloud library, but the issue is consistently the same.

https://repository.cloudera.com/content/repositories/releases/org/apache/spark/spark-hadoop-cloud_2.11/2.4.0-cdh6.3.2/spark-hadoop-cloud_2.11-2.4.0-cdh6.3.2.jar

https://repository.cloudera.com/artifactory/libs-release-local/org/apache/spark/spark-hadoop-cloud_2.11/2.4.0.7.0.3.0-79/spark-hadoop-cloud_2.11-2.4.0.7.0.3.0-79.jar

https://repo1.maven.org/maven2/org/apache/spark/spark-hadoop-cloud_2.12/3.2.0/spark-hadoop-cloud_2.12-3.2.0.jar

https://repository.cloudera.com/artifactory/libs-release-local/org/apache/spark/spark-hadoop-cloud_2.12/3.1.2.7.2.12.0-291/spark-hadoop-cloud_2.12-3.1.2.7.2.12.0-291.jar

We'd really appreciate if you can point us in the right direction

amazon-s3 apache-spark hadoop java
2021-11-23 16:49:54
2

3

It'll be HADOOP-16570 S3A committers leak threads/raises OOM on job/task commit at scale

move up to hadoop-3.3.0 binaries for the fix. Ideally to 3.3.1 to fix some other issues, especially duplicate jobIDs coming from spark. Not sure how far back into CDH releases that fix went; I could work out if you really need to now. Not CDH6.x, certainly

2021-11-28 12:13:50

Thank you so much! Indeed everything we did didn't help.
Ofer Eliassaf

Our problem now is for looking the correct drivers - where can we fetch spark-cloud jar with the relevant version? Will this driver work for Spark 3.1.2: mvnrepository.com/artifact/org.apache.spark/… ??
Ofer Eliassaf

the spark cloud jar must be the one for your spark distro; it's just a thin binding class
stevel

Hi, We are trying to enable this on Spark 3.1.2 which is coming bundled with Hadoop 3.2. Is it safe to provide spark-hadoop-cloud for 3.3.1 together with Hadoop 3.2 libraries? (We are trying to go with the unbundled version and got some diffuculties with Zookeeper curator libraries mismatch to Guava. a faster track will be to go with the bundled and just add the comitter of 3.3.1. Will it work?). Thanks!
Ofer Eliassaf

use the spark-hadoop-cloud-3.1.2; you will need a full set of hadoop-3.3.x in perfect sync. or backport the committer patch only to hadoop 3.2
stevel

This spark-hadoop-cloud-3.1.2 has this bug in it (and it was a nightmare to find it since it is not in official maven releases). What we ended up doing is: Option 1 (our current setup): Use spark 3.1.2 unbundled version. We added Hadoop 3.3.1 binaries We added spark-hadoop-cloud-3.2.0 to resolve the committer issue. Option 2 (which we consider): Bring Spark3.1.2 bundled (which comes with Hadoop 3.2) And add spark-hadoop-cloud-3.2.0 for fixing the comitter issu. Which is better in your opinion?
Ofer Eliassaf

The thread leakage is in hadoop-aws; it should only be hadoop-* which needs updating.
stevel
1

Checkout this article on tuning for S3A.

Specifically I'd look at this even though its categorized under array:

You may need to perform careful tuning to reduce the risk of running out memory, especially if the data is buffered in memory. There are a number parameters which can be tuned:

The total number of threads available in the filesystem for data uploads or any other queued filesystem operation. This is set in fs.s3a.threads.max.

The number of operations which can be queued for execution, awaiting a thread. This is set in fs.s3a.max.total.tasks.

The number of blocks which a single output stream can have active (that is, being uploaded by a thread or queued in the filesystem thread queue). This is set in fs.s3a.fast.upload.active.blocks.

The length of time that an idle thread can stay in the thread pool before it is retired. This is set in fs.s3a.threads.keepalivetime.

I think you might find reducing the number of threads will remove memory pressure.

I also suggest you tune fs.s3a.fast.upload.active.blocks it will also alleviate memory pressure. I think reducing the thread count should be your first move as 100 is a little aggressive. You are likely capped on bandwidth and extra threads aren't likely doing anything but consuming memory.

2021-11-28 12:08:19

Do you know what the driver threads are for? If I understand correctly the configuration threads should be for workers. What are the drivers threads for?
Ofer Eliassaf

Take a look at s3 committer documentation.(Not 100% the same as this but a good place to learn about s3 & spark) LInk below. It raises the idea that files are communicated to the driver, and that they're committed. We can't know this is the same for this file writing but it seems logical, as the driver is the only one that's aware the job succeeded/failed. I didn't find a smoking gun claiming this as true but it seems reasonable/plausible. github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/…
Matt Andruff

This would also then make sense why using 100 threads per worker could end up with 5000 threads in the driver and why again you should consider using a number that will not add as much pressure on the driver. (reduce your thread count)
Matt Andruff

look at the answer by @Stevel - it is a known hadoop bug with earlier versions of the committer!
Ofer Eliassaf

Thank you very much for pinging me.
Matt Andruff

In other languages

This page is in other languages

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................