Anonymous CompletableFuture threads with burstable pods
Short-lived anonymous Threads
It all started when opening a JFR recording, I noticed a very large and growing number of very short-lived anonymous threads. Was it bad coding, or something else?
However, when looking at the Java Thread Start or the Java Thread End events there is no stack trace. Fortunately these short-lived thread created objects, which triggered the Allocation in new TLAB events.
As a reminder the generational GCs in the JVM dedicate a space of the Eden memory (where objects are created) for each thread, when these new threads are created they require a new TLAB for their new objects.
Now that I know what those threads are executing it will be easier to follow the trail to where these threads get created.
public CompletableFuture<Empty> saveOrUpdateAsync(String a, String b) {
return tryFindAsync(userId, clientId)
.thenApply(t -> ...)
.thenComposeAsync(
t -> manager.crud() (1)
.insert(t) (1)
.withLwtResultListener(error -> ...)) (1)
.executeAsync()); (1)
}
1 | This code shows up the allocation event |
I’m usually avoiding CompletableFuture
for asynchronous tasks as I find their API
bulky and ambiguous and I remember that they ship with some
surprises.
That’s a personal opinion I know. Yet it’s not unusual to find
them used in a framework to avoid external dependencies.
As I don’t use them, I wondered CFs don’t create a new Thread
for fun, right ?
It turns out it’s possible with the right conditions ! Let’s hit back the trail
in the CompletableFuture
code with the method thenComposeAsync
:
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(defaultExecutor(), fn); (1)
}
This method uses the default CompletableFuture
pool.
Note the Javadoc of this method is located on CompletionStage.thenComposeAsync
,
and does not say anything useful regarding threads.
/**
* Returns the default Executor used for async methods that do not
* specify an Executor. This class uses the {@link
* ForkJoinPool#commonPool()} if it supports more than one (1)
* parallel thread, or else an Executor using one thread per async
* task. This method may be overridden in subclasses to return
* an Executor that provides at least one independent thread.
*
* @return the executor
* @since 9
*/
public Executor defaultExecutor() {
return ASYNC_POOL;
}
1 | There it is: if ForkJoinPool#commonPool() parallelism
is 1 , then a single thread will be created per task. |
In particular the code looks like this:
private static final boolean USE_COMMON_POOL =
(ForkJoinPool.getCommonPoolParallelism() > 1);
/**
* Default executor -- ForkJoinPool.commonPool() unless it cannot
* support parallelism.
*/
private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
/** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
static final class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) { new Thread(r).start(); }
}
Also in my experience I didn’t use much the ForkJoinPoll, but I remember
there was some pool criticism when used Collection.parallelStream()
about
pool sizing. This is exactly the case here since this API of CompletableFuture
rely on the default.
This is likely to have been mentioned by other bloggers that are using the JVM
in containers. If ForkJoinPool
does not see more than one processor, then
CompletableFuture
will use a new Thread
for each task. Let’s see what the
defaults.
$ env -u JDK_JAVA_OPTIONS jshell -s - \
<<<'System.out.printf("fjp-parallelism: %d%n", java.util.concurrent.ForkJoinPool.getCommonPoolParallelism())'
fjp-parallelism: 1
In my case the container resources is configured to 2 cpu in the Kubernetes deployment descriptor :
resources:
limits:
memory: "5.5Gi"
requests:
cpu: 2 (1)
memory: "5.5Gi"
1 |
You can check the cgroup too if it’s not a kubernetes deployment :
$ cat /sys/fs/cgroup/cpu/cpu.shares
2048
It’s always possible to check what the runtime sees (since the JVM understand cgroup (v1)):
$ env -u JDK_JAVA_OPTIONS jshell -s - \
<<<'System.out.printf("procs: %d%n", Runtime.getRuntime().availableProcessors())'
procs: 2
Here’s how ForkJoinPool is coded to configure the parallelism:
/**
* Returns the targeted parallelism level of the common pool.
*
* @return the targeted parallelism level of the common pool
* @since 1.8
*/
public static int getCommonPoolParallelism() {
return COMMON_PARALLELISM;
}
static {
// ...
common = AccessController.doPrivileged(new PrivilegedAction<>() {
public ForkJoinPool run() {
return new ForkJoinPool((byte)0); }});
COMMON_PARALLELISM = Math.max(common.mode & SMASK, 1);
}
/**
* Constructor for common pool using parameters possibly
* overridden by system properties
*/
private ForkJoinPool(byte forCommonPoolOnly) {
int parallelism = -1;
// ...
try { // ignore exceptions in accessing/parsing properties
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
if (pp != null)
parallelism = Integer.parseInt(pp);
// ...
} catch (Exception ignore) {
}
// ...
if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) (1)
parallelism = 1; (1)
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
// ...
this.mode = parallelism;
// ...
}
1 | Indeed, FJP subtract one to the reported available processors. |
The constructor also initializes a lot of things, in particular the Thread factory, the pool boundaries, the work queues, etc. Also, we see the few properties that are looked up to override the defaults.
In particular the parallelism value can be overridden by java.util.concurrent.ForkJoinPool.common.parallelism
as stated in the
javadoc
and in the constructor code.
Back to the main issue, there are two ways to fix this, either change the code to pass an executor, or tell default pool what is the system parallelism. Both are options are not mutually exclusive, and it’s always possible that some code that use the common ForkJoinPool.
$ env JDK_JAVA_OPTIONS="-Djava.util.concurrent.ForkJoinPool.common.parallelism=3" \
jshell -s - \
<<<'System.out.printf("fjp-parallelism: %d%n", java.util.concurrent.ForkJoinPool.getCommonPoolParallelism())'
fjp-parallelism: 3
I changed this value with care the short task are really short, and they are not too many, hence the low value of 3. Also, this does work because the containers are not limited in CPU, so they won’t be throttled which would have been very bad for a runtime like the JVM.
End words
Again, containers narrower walls, tricks the JVM to choose inadequate defaults (or runtime ergonomics) for their workload.