1. Introduction
Groovy adds parallel collection methods directly onto Collection,
enabling data parallelism with familiar Groovy idioms. These methods
use Java parallel streams under the hood, with pool isolation via
ForkJoinPool to prevent interference with the common pool.
All methods use java.util.function types (Consumer, Function,
Predicate, etc.) and Groovy closures are automatically
converted via SAM coercion.
2. Getting Started
Parallel methods are available on any Collection without imports:
def result = (1..1000).toList().collectParallel { it * 2 }
assert result.sort() == (2..2000).step(2).toList()
By default, operations run on ForkJoinPool.commonPool(). For
pool isolation, use ParallelScope.withPool:
import groovy.concurrent.ParallelScope
import groovy.concurrent.Pool
ParallelScope.withPool(Pool.cpu()) { scope ->
def bigList = (1..100_000).toList()
def sum = bigList.injectParallel(0) { a, b -> a + b }
println "Sum: $sum"
}
3. Pool and ParallelScope
3.1. Pool
Pool is a managed thread pool extending Executor and AutoCloseable:
import groovy.concurrent.Pool
def pool = Pool.cpu() // Sized to available processors (ForkJoinPool)
def pool2 = Pool.fixed(8) // Fixed-size ForkJoinPool
def pool3 = Pool.io() // Virtual threads on JDK 21+, sized pool otherwise
def pool4 = Pool.virtual() // Virtual-thread-per-task (JDK 21+)
Pool.cpu() and Pool.fixed() create ForkJoinPool-backed pools,
ideal for CPU-bound parallel work. Pool.io() and Pool.virtual()
create virtual thread pools for I/O-bound tasks.
3.2. ParallelScope
ParallelScope.withPool binds a pool for the duration of a block.
Parallel collection methods inside the block automatically use it:
import groovy.concurrent.ParallelScope
import groovy.concurrent.Pool
ParallelScope.withPool(4) { scope ->
// All *Parallel methods use a 4-thread ForkJoinPool
def results = bigList.collectParallel { transform(it) }
def filtered = results.findAllParallel { it > threshold }
filtered.eachParallel { process(it) }
}
// Pool is shut down automatically
3.3. ConcurrentConfig
Global defaults are managed by ConcurrentConfig:
import groovy.concurrent.ConcurrentConfig
// Default parallelism: processors + 1 (GPars convention)
println ConcurrentConfig.defaultParallelism
// Override via system property: -Dgroovy.concurrent.poolsize=16
// Or programmatically:
ConcurrentConfig.defaultParallelism = 16
4. Parallel Collection Methods
4.1. Transformation
| Method | Description |
|---|---|
|
Transforms each element (parallel map) |
|
Transforms and flattens (parallel flatMap) |
def names = people.collectParallel { it.name }
def allTags = articles.collectManyParallel { it.tags }
4.2. Filtering
| Method | Description |
|---|---|
|
Elements matching predicate |
|
First element matching predicate |
|
Any element matching (may be faster, no order guarantee) |
|
Pattern match via |
|
Partition into [matching, non-matching] |
def adults = people.findAllParallel { it.age >= 18 }
def firstMatch = data.findParallel { it.isValid() }
def strings = mixed.grepParallel(String)
def (evens, odds) = numbers.splitParallel { it % 2 == 0 }
4.3. Iteration
| Method | Description |
|---|---|
|
Side-effect iteration |
|
Iteration with index |
files.eachParallel { process(it) }
items.eachWithIndexParallel { item, idx -> println "$idx: $item" }
4.4. Predicates
| Method | Description |
|---|---|
|
True if any element matches |
|
True if all elements match |
|
Count of matching elements |
if (orders.anyParallel { it.isPriority() }) { ... }
assert positiveNumbers.everyParallel { it > 0 }
def errorCount = logs.countParallel { it.level == 'ERROR' }
4.5. Aggregation
| Method | Description |
|---|---|
|
Reduce with binary operator |
|
Reduce with seed (accumulator must be associative) |
|
Minimum by comparator |
|
Maximum by comparator |
|
Group into map by classifier |
def total = amounts.sumParallel { a, b -> a + b }
def product = numbers.injectParallel(1) { a, b -> a * b }
def youngest = people.minParallel { a, b -> a.age <=> b.age }
def byDept = employees.groupByParallel { it.department }
injectParallel requires an associative accumulator for correct
parallel results. Non-associative accumulators produce undefined results.
|
5. @Parallel for loops
The @Parallel annotation converts a for-in loop into a parallel
operation with structured completion — all iterations finish before
the next statement executes:
import groovy.transform.Parallel
import java.util.concurrent.CopyOnWriteArrayList
def results = new CopyOnWriteArrayList()
@Parallel
for (item in [1, 2, 3, 4, 5]) {
results << item * 10
}
// All iterations complete before this line
assert results.sort() == [10, 20, 30, 40, 50]
Inside a ParallelScope.withPool block, @Parallel uses the
bound pool:
ParallelScope.withPool(Pool.cpu()) { scope ->
@Parallel
for (n in data) {
process(n)
}
}
Under the hood, @Parallel rewrites the for loop body into a
closure passed to eachParallel. This means the loop body is no
longer a traditional loop — it is a closure invocation. The loop
variable is also internally renamed (e.g., item becomes
$parallel_item) to avoid scope conflicts in the generated code.
Because the body becomes a closure:
-
breakandcontinueare not supported — they are not valid inside closures and will cause a compile error. -
returninside the body returns from the closure (skipping the current iteration), not from the enclosing method.
Shared mutable state inside @Parallel loops must be
thread-safe (e.g., AtomicInteger, CopyOnWriteArrayList,
ConcurrentHashMap).
|
If you need the best debugging experience (e.g., stepping
through loop bodies or inspecting variable names), consider using
the *Parallel collection methods directly (such as
eachParallel, collectParallel) instead of @Parallel, or be aware of the previously mentioned internal variable renaming.
|
6. Best Practices
6.1. Choosing between parallel collections and async/await
Parallel collections and @Parallel run on a ForkJoinPool
with a small number of worker threads (typically matching your
CPU core count). This is ideal for CPU-bound work — computation,
transformation, aggregation — where each task keeps the CPU busy.
For I/O-bound or latency-bound work (network calls, database
queries, sleeps, delays), use async/await with virtual threads
instead. Virtual threads can scale to millions of concurrent tasks
because blocking one doesn’t consume an OS thread.
| Workload | Right tool | Why |
|---|---|---|
CPU-bound (computation, parsing, math) |
|
ForkJoinPool work-stealing maximises CPU utilisation |
I/O-bound (HTTP calls, DB queries, file I/O) |
|
Millions of concurrent waits without blocking OS threads |
Mixed |
|
Virtual threads handle both compute and I/O |
To illustrate, consider processing 100 items where each involves a short delay (simulating I/O or latency):
// CPU-bound work — @Parallel is the right tool
@Parallel
for (item in bigList) {
results << expensiveComputation(item) // keeps CPU busy, no blocking
}
// I/O-bound work — async/await is the right tool
AsyncScope.withScope { scope ->
urls.each { url ->
scope.async { fetchUrl(url) } // virtual thread blocks on I/O (free)
}
}
// All fetches complete here — structured concurrency
// Anti-pattern — blocking inside parallel collections
@Parallel
for (item in 1..100) {
Thread.sleep(50) // blocks a ForkJoinPool worker!
process(item)
}
// Only ~8 items run concurrently (pool size), rest queue behind.
// Total time: ~625ms instead of ~50ms with virtual threads.
The anti-pattern looks parallel but the ForkJoinPool has only
~8 workers. Each Thread.sleep ties up a worker, so at most
~8 items sleep concurrently. The remaining 92 wait in the queue.
await Awaitable.delay(ms) inside a parallel collection body
has the same problem — await blocks the calling ForkJoinPool
worker thread while waiting for the delay to complete. Use
async/await outside parallel collections for delay-based work.
|
6.2. Use thread-safe collections for shared state
The parallel body runs on multiple threads simultaneously. Use concurrent collections or atomics for any shared mutable state:
// Bad — ArrayList is not thread-safe
def results = []
items.eachParallel { results << transform(it) } // race condition!
// Good — use CopyOnWriteArrayList or collectParallel
def results = items.collectParallel { transform(it) }