Java Spliterator 的并行处理
上次修改:2025年5月8日
java.util.Spliterator 是 Java 8 中引入的一个强大的类似迭代器的接口,旨在高效地遍历和分割数据源的元素。它最重要的能力是并行处理,能够将大型数据集分割成更小的片段,这些片段可以由多个线程同时处理。与仅支持顺序迭代的传统 Iterator 不同,Spliterator 可以动态地分割数据,使其成为 Java 并行 Streams API 的一个基础组件。
Spliterator 使用特征提供有关底层数据源的元数据,这些特征有助于优化并行执行。 这些特征会影响任务在线程之间的分配方式,从而确保有效的工作负载分配。 常见的特征包括:
ORDERED:以定义的顺序访问元素。DISTINCT:确保所有元素都是唯一的,避免冗余计算。SORTED:保证特定的顺序,从而实现优化的并行排序。SIZED:提供准确的元素计数,有助于平衡工作负载分配。NONNULL:防止空值中断并行执行。IMMUTABLE:通过防止结构修改来确保线程安全。CONCURRENT:允许安全的并行修改,而不会出现竞争条件。SUBSIZED:表明拆分的部分保留可靠的大小估计。
这些特征对于并行计算至关重要,允许 Java 运行时做出关于任务分区和资源分配的明智决策。 通过利用 trySplit,Spliterator 能够在多个线程之间分配工作负载,从而确保计算效率随着可用处理能力的提高而扩展。
Java 的 Fork/Join 框架在内部依赖于 Spliterator 来管理并行流,动态地分配任务以最大化性能。开发人员可以使用并行流来实现自动并发,或者使用显式多线程来对执行进行细粒度控制。 了解 Spliterator 如何促进并行处理对于构建可扩展、高性能的应用程序至关重要。
基本 Spliterator 遍历(顺序)
此示例演示了使用 Spliterator 的基本顺序遍历。 我们从字符串的 List 中获取 Spliterator。 然后,我们在循环中使用 tryAdvance 方法来处理每个元素。 tryAdvance 接受一个 Consumer,它指定要对元素执行的操作。 当没有更多元素可用时,它返回 false。
此示例展示了 Spliterator 的基本迭代能力。 它在迭代之前打印特征和估计大小。 迭代本身是顺序的,在主线程中一次处理一个元素。
package com.zetcode;
import java.util.List;
import java.util.Spliterator;
public class Main {
public static void main(String[] args) {
List<String> names = List.of("John", "Jane", "Mike", "Sarah", "Tom");
Spliterator<String> spliterator = names.spliterator();
System.out.println("Characteristics: " + spliterator.characteristics());
System.out.println("Estimated size: " + spliterator.estimateSize());
System.out.println("Elements (sequentially):");
boolean hasNextElement;
do {
hasNextElement = spliterator.tryAdvance(name -> {
System.out.println("Processing: " + name + " by " + Thread.currentThread().getName());
// Simulate some work
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
} while (hasNextElement); // Continue until no more elements remain
System.out.println("Finished processing.");
}
}
输出显示特征和初始估计大小。 每个元素都由主线程处理。 这构成了在深入研究并行处理之前了解 Spliterator 如何工作的基础。
理解 trySplit
trySplit 方法是 Spliterator 在并行处理中的作用的基础。 它尝试将源元素分成两个。 如果成功,它将返回一个新的 Spliterator,涵盖元素的前导部分,而原始 Spliterator 涵盖剩余部分。 此示例演示了拆分,但仍然是顺序处理。
必须了解的是,单独调用 trySplit 不会启动并行执行。 它只是通过分割来准备数据。 必须由单独的线程处理生成的 Spliterator 才能实现并行性,如后面的示例所示。
package com.zetcode;
import java.util.List;
import java.util.Spliterator;
public class Main {
public static void main(String[] args) {
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
Spliterator<Integer> spliterator1 = numbers.spliterator();
System.out.println("Original spliterator size: " + spliterator1.estimateSize());
// Attempt to split the spliterator
Spliterator<Integer> spliterator2 = spliterator1.trySplit();
if (spliterator2 != null) {
System.out.println("First split size: " + spliterator1.estimateSize());
System.out.println("Second split size: " + spliterator2.estimateSize());
System.out.println("\nProcessing first split (sequentially):");
spliterator1.forEachRemaining(num -> System.out.println(num + " by " + Thread.currentThread().getName()));
System.out.println("\nProcessing second split (sequentially):");
spliterator2.forEachRemaining(num -> System.out.println(num + " by " + Thread.currentThread().getName()));
} else {
System.out.println("Could not split the spliterator. Processing all elements:");
spliterator1.forEachRemaining(System.out::println);
}
}
}
输出表明原始列表已分割。 但是,这两个部分都由主线程按顺序处理。 此示例阐明了 trySplit 是一种用于分区的机制,而不是直接的并行执行。
使用 ExecutorService 和 Spliterator 进行并行执行
此示例演示了如何使用带有 ExecutorService 的 Spliterator 拆分来实现真正的并行处理。 拆分 Spliterator 后,我们将任务提交给 ExecutorService。 每个任务处理一个拆分。 这允许数据的不同部分由不同的线程同时处理。
我们创建一个固定大小的线程池。 每个 Spliterator(原始的和 trySplit 返回的)都由提交给池的单独任务处理。 Thread.currentThread().getName() 调用有助于识别哪个线程处理每个元素。
package com.zetcode;
import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) throws InterruptedException {
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16);
Spliterator<Integer> spliterator1 = numbers.spliterator();
Spliterator<Integer> spliterator2 = spliterator1.trySplit(); // s1 is now roughly the second half
try (ExecutorService executor = Executors.newFixedThreadPool(2)) {
System.out.println("Submitting tasks for parallel processing...");
// Task for the first split (which is now spliterator2)
if (spliterator2 != null) {
executor.submit(() -> {
System.out.println("Processing second half (split part) by " + Thread.currentThread().getName());
spliterator2.forEachRemaining(num -> {
System.out.println("S2: " + num + " by " + Thread.currentThread().getName());
try { Thread.sleep(100); } catch (InterruptedException e) { /*ignore*/ }
});
});
}
// Task for the remaining part of the original spliterator (spliterator1)
executor.submit(() -> {
System.out.println("Processing first half (original part) by " + Thread.currentThread().getName());
spliterator1.forEachRemaining(num -> {
System.out.println("S1: " + num + " by " + Thread.currentThread().getName());
try { Thread.sleep(100); } catch (InterruptedException e) { /*ignore*/ }
});
});
executor.shutdown();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
System.out.println("All tasks completed.");
}
}
}
输出将显示元素由池中的不同线程处理(例如,“pool-1-thread-1”、“pool-1-thread-2”)。 这证实了两个数据分区的处理是并行发生的。 这种手动方法可以对并行执行进行细粒度控制。
利用带有 Spliterator 的并行流
Java Streams 提供了一个高级 API,用于处理元素序列。 并行流在内部使用 Spliterator 在多个线程之间分配工作。 此示例显示了获取并行流的两种方法:collection.parallelStream 和 StreamSupport.stream(spliterator, true)。
集合上的 parallelStream 方法直接返回一个并行流。 或者,StreamSupport.stream 可以从现有 Spliterator 创建一个流。 将其第二个参数设置为 true 会使生成的流并行。 Java Fork/Join 框架在后台管理并行性。
package com.zetcode;
import java.util.List;
import java.util.Spliterator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class Main {
public static void main(String[] args) {
List<String> words = List.of("apple", "banana", "cherry", "date",
"elderberry", "fig", "grape", "honeydew");
System.out.println("Processing with collection.parallelStream():");
words.parallelStream().forEach(word ->
System.out.println(word + " processed by " + Thread.currentThread().getName())
);
System.out.println("\nProcessing with StreamSupport.stream(spliterator, true):");
Spliterator<String> spliterator = words.spliterator();
Stream<String> parallelStreamFromSpliterator = StreamSupport.stream(spliterator, true);
parallelStreamFromSpliterator.forEach(word ->
System.out.println(word + " processed by " + Thread.currentThread().getName())
);
System.out.println("Finished processing with parallel streams.");
}
}
输出揭示了多个线程如何同时执行任务,从而有效地将工作负载分配到可用的处理器内核上。 通过利用 Java 的 Fork/Join 框架,Streams API 无缝地处理并行执行,从而确保最佳的资源利用率,而无需手动线程管理。 此示例说明了 Spliterator 如何促进数据分区,从而使并行流能够独立处理元素,同时提高大规模计算中的性能。
原始数据类型的并行处理
Java 为原始类型(如 int、long 和 double)提供了专门的 Spliterator(例如,Spliterator.OfInt)。 这些避免了将原始类型装箱/拆箱到其包装类中的开销。 此示例使用从 int 数组中获得的 Spliterator.OfInt。
我们首先演示如何使用 StreamSupport.intStream(spliterator, true) 创建并行 IntStream 来计算总和。 然后,我们通过拆分 Spliterator.OfInt 并使用 ExecutorService 来展示手动并行处理。 每个任务都使用 forEachRemaining(IntConsumer) 来实现高效的原始处理。
package com.zetcode;
import java.util.Arrays;
import java.util.Spliterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
public class Main {
public static void main(String[] args) throws InterruptedException {
int[] numbers = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20};
// Method 1: Using parallel IntStream from Spliterator.OfInt
Spliterator.OfInt spliteratorForStream = Arrays.spliterator(numbers);
IntStream parallelIntStream = StreamSupport.intStream(spliteratorForStream, true);
long sum1 = parallelIntStream.sum();
System.out.println("Sum using parallel IntStream: " + sum1);
// Method 2: Manual parallel processing with ExecutorService
System.out.println("\nManual parallel processing of primitive array:");
Spliterator.OfInt s1 = Arrays.spliterator(numbers);
Spliterator.OfInt s2 = s1.trySplit();
LongAdder partialSum1 = new LongAdder();
LongAdder partialSum2 = new LongAdder();
try (ExecutorService executor = Executors.newFixedThreadPool(2)) {
if (s2 != null) {
executor.submit(() -> {
s2.forEachRemaining((int val) -> {
partialSum2.add(val);
});
});
}
executor.submit(() -> {
s1.forEachRemaining((int val) -> {
partialSum1.add(val);
});
});
executor.shutdown();
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
executor.shutdownNow();
}
}
long totalSum = partialSum1.sum() + partialSum2.sum();
System.out.println("Sum using manual parallel processing: " + totalSum);
}
}
此示例展示了并行处理原始数组的两种方法。 使用并行流通常更简洁。 使用 ExecutorService 的手动控制提供了灵活性,尤其是在与现有线程模型集成或需要细粒度任务管理时。 这两种方法都利用了 Spliterator.OfInt 的拆分功能。
使用 Spliterator 进行数字的并行求和
此示例演示了并行求和数字列表。 我们从 List<Integer> 中获取 Spliterator,将其拆分,然后使用 ExecutorService 来并发计算部分和。 每个任务都是一个 Callable,它返回其部分和。 然后,使用 Future 对象收集这些部分和,并将它们组合以获得总和。
这种模式对于“分而治之”的并行算法很常见。 求和的工作在线程之间分配,结果被聚合。 它突出了 Spliterator 如何促进分解计算以进行并行执行。
package com.zetcode;
import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class Main {
public static void main(String[] args) throws Exception {
List<Integer> numbers = IntStream.rangeClosed(1, 10000)
.boxed()
.collect(Collectors.toList());
Spliterator<Integer> spliterator1 = numbers.spliterator();
Spliterator<Integer> spliterator2 = spliterator1.trySplit(); // s1 is now the second half
try (ExecutorService executor = Executors.newFixedThreadPool(2)) {
// Task for the first part (which is spliterator2)
Callable<Long> sumTask1 = () -> {
long sum = 0;
if (spliterator2 != null) {
SummingConsumer consumer = new SummingConsumer();
spliterator2.forEachRemaining(consumer);
sum = consumer.getTotal();
}
System.out.println("Sum from task 1 (thread " + Thread.currentThread().getName() + "): " + sum);
return sum;
};
// Task for the second part (remaining of spliterator1)
Callable<Long> sumTask2 = () -> {
SummingConsumer consumer = new SummingConsumer();
spliterator1.forEachRemaining(consumer);
long sum = consumer.getTotal();
System.out.println("Sum from task 2 (thread " + Thread.currentThread().getName() + "): " + sum);
return sum;
};
Future<Long> future1 = executor.submit(sumTask1);
Future<Long> future2 = executor.submit(sumTask2);
long totalSum = future1.get() + future2.get();
System.out.println("Total sum calculated in parallel: " + totalSum);
long expectedSum = numbers.stream().mapToLong(Integer::longValue).sum();
System.out.println("Expected sum (sequential stream): " + expectedSum);
executor.shutdown();
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
executor.shutdownNow();
}
}
}
// Helper class for summing, as lambda variable for sum must be effectively final
static class SummingConsumer implements Consumer<Integer> {
private long total = 0;
@Override
public void accept(Integer value) {
total += value;
}
public long getTotal() {
return total;
}
}
}
拆分机制是通过 trySplit 实现的,它将原始 Spliterator 分成两部分。 第一个 Spliterator (spliterator1) 保留数据的一部分,而第二个 Spliterator (spliterator2) 处理另一部分。 这允许独立地并行处理每个子集。
为了执行并行求和,使用大小为 2 的固定线程池来使用 ExecutorService。 每个拆分由一个单独的线程处理,从而确保优化的资源利用率。 这些任务作为 Callable<Long> 函数提交,这些函数异步返回计算的总和。
辅助类 SummingConsumer 用于在遍历带有 forEachRemaining 的元素时累积总和。 这种方法是必要的,因为 lambda 表达式中的变量必须是有效的 final,从而防止直接就地修改。
一旦两个任务都完成执行,它们的结果就会合并以并行获得总和。 然后,将计算结果与通过标准流操作获得的顺序总和进行验证。 这确保了正确性,并提供了对并行执行的性能优势的洞察。
使用 Spliterator 进行并行数据转换
此示例演示了使用 Spliterator 和 ExecutorService 进行并行处理。 该程序从预定义的单词列表开始,并使用 trySplit 将工作负载拆分为两个单独的任务。 每个任务将其分配的单词转换为大写,并附加线程标识以说明实际的并发。
这对于大型数据集上的 CPU 密集型转换任务非常有用。 拆分数据允许多个内核同时处理转换,从而可能显着加快整个过程。
package com.zetcode;
import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) throws Exception {
List<String> words = List.of("alpha", "bravo", "charlie", "delta", "echo",
"foxtrot", "golf", "hotel", "india", "juliett");
Spliterator<String> s1 = words.spliterator();
Spliterator<String> s2 = s1.trySplit(); // s1 is now the second half
try (ExecutorService executor = Executors.newFixedThreadPool(2)) {
Callable<List<String>> transformTask1 = () -> {
List<String> result = new ArrayList<>();
if (s2 != null) { // s2 is the first half
s2.forEachRemaining(word -> {
result.add(word.toUpperCase() + " (processed by " + Thread.currentThread().getName() + ")");
});
}
return result;
};
Callable<List<String>> transformTask2 = () -> {
List<String> result = new ArrayList<>();
s1.forEachRemaining(word -> { // s1 is the second half
result.add(word.toUpperCase() + " (processed by " + Thread.currentThread().getName() + ")");
});
return result;
};
Future<List<String>> future1 = executor.submit(transformTask1);
Future<List<String>> future2 = executor.submit(transformTask2);
List<String> combinedResult = new ArrayList<>();
combinedResult.addAll(future1.get());
combinedResult.addAll(future2.get());
System.out.println("Transformed words in parallel:");
combinedResult.forEach(System.out::println);
executor.shutdown();
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
executor.shutdownNow();
}
}
System.out.println("\nSequentially transformed for order reference:");
words.stream().map(String::toUpperCase).forEach(System.out::println);
}
}
拆分机制将原始 Spliterator 分为两个部分。 前半部分 (s2) 和剩余部分 (s1) 然后由单独的线程独立处理。 这种方法通过在多个线程之间分配计算任务来优化 CPU 利用率。
为了促进真正的并行执行,使用了一个固定线程池 (ExecutorService),从而确保每个拆分都被并发处理。 这些任务定义为 Callable<List<String>>,允许异步执行,并在完成后返回转换后的结果。
每个任务都使用 forEachRemaining 来处理其拆分中的元素,从而确保有效地遍历单词,而无需显式迭代。 一旦两个任务都完成执行,它们的结果将合并到组合列表中以进行最终输出。
重要的是要注意,除非明确管理,否则不能保证订单一致性。 由于处理发生在多个线程中,因此最终合并的结果可能与原始序列不同。 为了进行参考,与并行输出一起显示了使用标准流的顺序转换。
最后,ExecutorService 被正常关闭,从而确保有效的资源清理。 如果任务超出其预期的执行时间,紧急关闭将防止不必要的资源消耗。
使用 Spliterator 进行并行数据聚合(例如,计数)
此示例演示了并行数据聚合,特别是计算符合特定标准的元素。 我们使用字符串列表(模拟文件中的行)。 目标是计算有多少个字符串包含特定的关键字。 Spliterator 被拆分,每个部分由 ExecutorService 中的任务处理。 每个任务计算其段中匹配的字符串,主线程对这些部分计数求和。
这种方法对于大型数据集上的过滤和计数等操作有效。 通过划分数据集并同时处理部分,我们通常可以获得比纯顺序方法更好的性能。
package com.zetcode;
import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Main {
public static void main(String[] args) throws Exception {
List<String> lines = List.of(
"The quick brown fox",
"jumps over the lazy dog",
"A B C D E F G",
"Another line with fox here",
"Spliterators are useful for parallel fox processing",
"The lazy fox is quick"
);
String keyword = "fox";
Spliterator<String> s1 = lines.spliterator();
Spliterator<String> s2 = s1.trySplit(); // s1 is now the second half
try (ExecutorService executor = Executors.newFixedThreadPool(2)) {
Callable<Integer> countTask1 = () -> {
AtomicInteger count = new AtomicInteger(0);
if (s2 != null) { // s2 is the first half
s2.forEachRemaining(line -> {
if (line.contains(keyword)) {
count.incrementAndGet();
}
});
}
System.out.println("Count from task 1 (thread " + Thread.currentThread().getName() + "): " + count.get());
return count.get();
};
Callable<Integer> countTask2 = () -> {
AtomicInteger count = new AtomicInteger(0);
s1.forEachRemaining(line -> { // s1 is the second half
if (line.contains(keyword)) {
count.incrementAndGet();
}
});
System.out.println("Count from task 2 (thread " + Thread.currentThread().getName() + "): " + count.get());
return count.get();
};
Future<Integer> future1 = executor.submit(countTask1);
Future<Integer> future2 = executor.submit(countTask2);
int totalCount = future1.get() + future2.get();
System.out.println("Total lines containing '" + keyword + "': " + totalCount);
long expectedCount = lines.stream().filter(line -> line.contains(keyword)).count();
System.out.println("Expected count (sequential stream): " + expectedCount);
executor.shutdown();
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
executor.shutdownNow();
}
}
}
}
在此示例中,并行处理字符串列表以计算单词“fox”的出现次数。 如果 lambda 表达式更复杂,每个任务都使用一个 AtomicInteger 进行本地计数,以确保线程安全,尽管此处简单的局部变量也可以工作。 部分计数通过 Future 对象检索并求和。
此实现中的一个关键组件是使用 AtomicInteger,它保证了在计算目标关键字的出现次数时进行线程安全增量。 这种方法确保了在 lambda 表达式中修改共享变量时的正确性 - 尽管对于简单情况,局部变量就足够了。
一旦两个任务都完成执行,它们的部分结果将通过 Future 对象检索并求和,以并行计算总关键字出现次数。 然后,通过将其与基于顺序流的计数进行比较来验证输出,从而证明通过并发处理实现的准确性和效率提升。
来源
本教程介绍了 Java Spliterator 接口,重点介绍了它在并行处理中的使用。 我们探讨了基本遍历、拆分、特征、自定义 Spliterator 以及与并行流和 ExecutorService 的集成。 了解 Spliterator 对于编写高效、可扩展的 Java 代码至关重要,这些代码可以利用多核处理器来处理数据密集型任务。
作者
列出所有Java教程。