Anonymous CompletableFuture threads with burstable pods

Short-lived anonymous Threads

It all started when opening a JFR recording, I noticed a very large and growing number of very short-lived anonymous threads. Was it bad coding, or something else?

short threads

However, when looking at the Java Thread Start or the Java Thread End events there is no stack trace. Fortunately these short-lived thread created objects, which triggered the Allocation in new TLAB events.

As a reminder the generational GCs in the JVM dedicate a space of the Eden memory (where objects are created) for each thread, when these new threads are created they require a new TLAB for their new objects.

short threads flamegraph

Now that I know what those threads are executing it will be easier to follow the trail to where these threads get created.

Short tasks code
public CompletableFuture<Empty> saveOrUpdateAsync(String a, String b) {
    return tryFindAsync(userId, clientId)
            .thenApply(t -> ...)
            .thenComposeAsync(
                    t -> manager.crud() (1)
                                .insert(t) (1)
                                .withLwtResultListener(error -> ...)) (1)
                                .executeAsync()); (1)
}
1 This code shows up the allocation event

I’m usually avoiding CompletableFuture for asynchronous tasks as I find their API bulky and ambiguous and I remember that they ship with some surprises. That’s a personal opinion I know. Yet it’s not unusual to find them used in a framework to avoid external dependencies.

As I don’t use them, I wondered CFs don’t create a new Thread for fun, right ? It turns out it’s possible with the right conditions ! Let’s hit back the trail in the CompletableFuture code with the method thenComposeAsync:

CompletableFuture.thenComposeAsync(…​)
public <U> CompletableFuture<U> thenComposeAsync(
    Function<? super T, ? extends CompletionStage<U>> fn) {
    return uniComposeStage(defaultExecutor(), fn); (1)
}

This method uses the default CompletableFuture pool.

Note the Javadoc of this method is located on CompletionStage.thenComposeAsync, and does not say anything useful regarding threads.

CompletableFuture.defaultExecutor()
/**
 * Returns the default Executor used for async methods that do not
 * specify an Executor. This class uses the {@link
 * ForkJoinPool#commonPool()} if it supports more than one (1)
 * parallel thread, or else an Executor using one thread per async
 * task.  This method may be overridden in subclasses to return
 * an Executor that provides at least one independent thread.
 *
 * @return the executor
 * @since 9
 */
public Executor defaultExecutor() {
    return ASYNC_POOL;
}
1 There it is: if ForkJoinPool#commonPool() parallelism is 1, then a single thread will be created per task.

In particular the code looks like this:

CompletableFuture
private static final boolean USE_COMMON_POOL =
    (ForkJoinPool.getCommonPoolParallelism() > 1);

/**
 * Default executor -- ForkJoinPool.commonPool() unless it cannot
 * support parallelism.
 */
private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

/** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
static final class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) { new Thread(r).start(); }
}

Also in my experience I didn’t use much the ForkJoinPoll, but I remember there was some pool criticism when used Collection.parallelStream() about pool sizing. This is exactly the case here since this API of CompletableFuture rely on the default.

This is likely to have been mentioned by other bloggers that are using the JVM in containers. If ForkJoinPool does not see more than one processor, then CompletableFuture will use a new Thread for each task. Let’s see what the defaults.

FJP parallelism
$ env -u JDK_JAVA_OPTIONS jshell -s - \
    <<<'System.out.printf("fjp-parallelism: %d%n", java.util.concurrent.ForkJoinPool.getCommonPoolParallelism())'
fjp-parallelism: 1

In my case the container resources is configured to 2 cpu in the Kubernetes deployment descriptor :

container resources
resources:
  limits:
    memory: "5.5Gi"
  requests:
    cpu: 2 (1)
    memory: "5.5Gi"
1 More on this later

You can check the cgroup too if it’s not a kubernetes deployment :

cgroup cpu shares
$ cat /sys/fs/cgroup/cpu/cpu.shares
2048

It’s always possible to check what the runtime sees (since the JVM understand cgroup (v1)):

availableProcessors()
$ env -u JDK_JAVA_OPTIONS jshell -s - \
    <<<'System.out.printf("procs: %d%n", Runtime.getRuntime().availableProcessors())'
procs: 2

Here’s how ForkJoinPool is coded to configure the parallelism:

ForkJoinPool
/**
 * Returns the targeted parallelism level of the common pool.
 *
 * @return the targeted parallelism level of the common pool
 * @since 1.8
 */
public static int getCommonPoolParallelism() {
    return COMMON_PARALLELISM;
}

static {
    // ...
    common = AccessController.doPrivileged(new PrivilegedAction<>() {
        public ForkJoinPool run() {
            return new ForkJoinPool((byte)0); }});

    COMMON_PARALLELISM = Math.max(common.mode & SMASK, 1);
}

/**
 * Constructor for common pool using parameters possibly
 * overridden by system properties
 */
private ForkJoinPool(byte forCommonPoolOnly) {
    int parallelism = -1;
    // ...
    try {  // ignore exceptions in accessing/parsing properties
        String pp = System.getProperty
            ("java.util.concurrent.ForkJoinPool.common.parallelism");
        if (pp != null)
            parallelism = Integer.parseInt(pp);
        // ...
    } catch (Exception ignore) {
    }

    // ...
    if (parallelism < 0 && // default 1 less than #cores
        (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) (1)
        parallelism = 1; (1)
    if (parallelism > MAX_CAP)
        parallelism = MAX_CAP;

    // ...

    this.mode = parallelism;
    // ...
}
1 Indeed, FJP subtract one to the reported available processors.

The constructor also initializes a lot of things, in particular the Thread factory, the pool boundaries, the work queues, etc. Also, we see the few properties that are looked up to override the defaults.

In particular the parallelism value can be overridden by java.util.concurrent.ForkJoinPool.common.parallelism as stated in the javadoc and in the constructor code.

Back to the main issue, there are two ways to fix this, either change the code to pass an executor, or tell default pool what is the system parallelism. Both are options are not mutually exclusive, and it’s always possible that some code that use the common ForkJoinPool.

with the system property
$ env JDK_JAVA_OPTIONS="-Djava.util.concurrent.ForkJoinPool.common.parallelism=3" \
    jshell -s - \
    <<<'System.out.printf("fjp-parallelism: %d%n", java.util.concurrent.ForkJoinPool.getCommonPoolParallelism())'
fjp-parallelism: 3

I changed this value with care the short task are really short, and they are not too many, hence the low value of 3. Also, this does work because the containers are not limited in CPU, so they won’t be throttled which would have been very bad for a runtime like the JVM.

CPU shares and CPU quotas

In Kubernetes, the CPU resources have different meaning in the cgroup:

  • requests are configured as CPU shares,

  • and the limits are configured as CPU Quotas (hence the term milli-cpu or milli-core).

CPU shares

When making a pod burstable in Kubernetes, the CPU limit is not set.

container resources
resources:
  limits:
    memory: "5.5Gi"
  requests:
    cpu: 2 (1)
    memory: "5.5Gi"
1 This number means there will be 2048 (2 x 1024) CPU shares.
cgroup cpu shares
$ cat /sys/fs/cgroup/cpu/cpu.shares
2048

The JVM will use CPU shares when no quota are defined.

Beware when the cpu request is 1, the cpu shares of the cgroup will be 1024, and as it happens the CPU share of a cgroup is also 1024, in this case the JVM do not know if a cpu share has been set or not for this cgroup ( source), which means it will not use this value and instead use the number of processors of the host machine.

CPU quotas

Kubernetes set the CPU limit as a CPU quota. CPU quota are used by the Linux Completely Fair Scheduler to limit the process usage of a CPU when in a cgroup.

Setting quotas may throttle your process and may result in disastrous responsiveness. I believe the current best practice is to set the CPU request to help Kubernetes schedule the pod on the available hardware, but let the process burst if more CPU is required for a brief time, like during a GC.

The JVM will use the quota in combination of the period (it is not currently configurable in a Kubernetes deployment descriptor).

container resources
resources:
  limits:
    cpu: 3 (1)
    memory: "5.5Gi"
  requests:
    cpu: 2
    memory: "5.5Gi"
1 3 means a quota of 300000 or 3 x 100000, which 3 times the default period (100ms), the docker arguments would then be either --cpus=3 or --cpu-quota=300000

If a CFS quota is in place then it’s possible to know their parameters by looking at the cgroup values. The below values are equivalent to --cpus=3 or --cpu-quota=300000 assuming the period is the default one (100000).

cgroup cfs parameters
$ cat /sys/fs/cgroup/cpu/cpu.cfs_period_us
100000
$ cat /sys/fs/cgroup/cpu/cpu.cfs_quota_us
300000

If you are caught with throttled process, then it’s possible to see it via the cpu.stat special file.

$ cat /sys/fs/cgroup/cpu/cpu.stat
nr_periods 0
nr_throttled 0
throttled_time 0

Tell the JVM to prefer CPU shares

If you are adventurous it’s possible to tell the JVM to lookup the CPU shares instead of the CPU quota with -XX:-PreferContainerQuotaForCPUCount (it is true by default).

Table 1. JVM container option (source)

-XX:+PreferContainerQuotaForCPUCount (true)

Calculate the container CPU availability based on the value of quotas (if set), when true. Otherwise, use the CPU shares value, provided it is less than quota. When cpu share is 1024, the quota will be used as well.

The above is unlikely to be useful in the real world, but who knows.

End words

Again, containers narrower walls, tricks the JVM to choose inadequate defaults (or runtime ergonomics) for their workload.