ZetCode

Java Spliterator 的并行处理

上次修改:2025年5月8日

java.util.Spliterator 是 Java 8 中引入的一个强大的类似迭代器的接口,旨在高效地遍历和分割数据源的元素。它最重要的能力是并行处理,能够将大型数据集分割成更小的片段,这些片段可以由多个线程同时处理。与仅支持顺序迭代的传统 Iterator 不同,Spliterator 可以动态地分割数据,使其成为 Java 并行 Streams API 的一个基础组件。

Spliterator 使用特征提供有关底层数据源的元数据,这些特征有助于优化并行执行。 这些特征会影响任务在线程之间的分配方式,从而确保有效的工作负载分配。 常见的特征包括:

这些特征对于并行计算至关重要,允许 Java 运行时做出关于任务分区和资源分配的明智决策。 通过利用 trySplitSpliterator 能够在多个线程之间分配工作负载,从而确保计算效率随着可用处理能力的提高而扩展。

Java 的 Fork/Join 框架在内部依赖于 Spliterator 来管理并行流,动态地分配任务以最大化性能。开发人员可以使用并行流来实现自动并发,或者使用显式多线程来对执行进行细粒度控制。 了解 Spliterator 如何促进并行处理对于构建可扩展、高性能的应用程序至关重要。

基本 Spliterator 遍历(顺序)

此示例演示了使用 Spliterator 的基本顺序遍历。 我们从字符串的 List 中获取 Spliterator。 然后,我们在循环中使用 tryAdvance 方法来处理每个元素。 tryAdvance 接受一个 Consumer,它指定要对元素执行的操作。 当没有更多元素可用时,它返回 false

此示例展示了 Spliterator 的基本迭代能力。 它在迭代之前打印特征和估计大小。 迭代本身是顺序的,在主线程中一次处理一个元素。

Main.java
package com.zetcode;

import java.util.List;
import java.util.Spliterator;

public class Main {

    public static void main(String[] args) {

        List<String> names = List.of("John", "Jane", "Mike", "Sarah", "Tom");

        Spliterator<String> spliterator = names.spliterator();

        System.out.println("Characteristics: " + spliterator.characteristics());
        System.out.println("Estimated size: " + spliterator.estimateSize());
        System.out.println("Elements (sequentially):");

        boolean hasNextElement;
        do {
            hasNextElement = spliterator.tryAdvance(name -> {
                System.out.println("Processing: " + name + " by " + Thread.currentThread().getName());
                // Simulate some work
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        } while (hasNextElement); // Continue until no more elements remain

        System.out.println("Finished processing.");
    }
}

输出显示特征和初始估计大小。 每个元素都由主线程处理。 这构成了在深入研究并行处理之前了解 Spliterator 如何工作的基础。

理解 trySplit

trySplit 方法是 Spliterator 在并行处理中的作用的基础。 它尝试将源元素分成两个。 如果成功,它将返回一个新的 Spliterator,涵盖元素的前导部分,而原始 Spliterator 涵盖剩余部分。 此示例演示了拆分,但仍然是顺序处理。

必须了解的是,单独调用 trySplit 不会启动并行执行。 它只是通过分割来准备数据。 必须由单独的线程处理生成的 Spliterator 才能实现并行性,如后面的示例所示。

Main.java
package com.zetcode;

import java.util.List;
import java.util.Spliterator;

public class Main {

    public static void main(String[] args) {

        List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        Spliterator<Integer> spliterator1 = numbers.spliterator();

        System.out.println("Original spliterator size: " + spliterator1.estimateSize());

        // Attempt to split the spliterator
        Spliterator<Integer> spliterator2 = spliterator1.trySplit();

        if (spliterator2 != null) {
            System.out.println("First split size: " + spliterator1.estimateSize());
            System.out.println("Second split size: " + spliterator2.estimateSize());

            System.out.println("\nProcessing first split (sequentially):");
            spliterator1.forEachRemaining(num -> System.out.println(num + " by " + Thread.currentThread().getName()));

            System.out.println("\nProcessing second split (sequentially):");
            spliterator2.forEachRemaining(num -> System.out.println(num + " by " + Thread.currentThread().getName()));
        } else {
            System.out.println("Could not split the spliterator. Processing all elements:");
            spliterator1.forEachRemaining(System.out::println);
        }
    }
}

输出表明原始列表已分割。 但是,这两个部分都由主线程按顺序处理。 此示例阐明了 trySplit 是一种用于分区的机制,而不是直接的并行执行。

使用 ExecutorService 和 Spliterator 进行并行执行

此示例演示了如何使用带有 ExecutorService 的 Spliterator 拆分来实现真正的并行处理。 拆分 Spliterator 后,我们将任务提交给 ExecutorService。 每个任务处理一个拆分。 这允许数据的不同部分由不同的线程同时处理。

我们创建一个固定大小的线程池。 每个 Spliterator(原始的和 trySplit 返回的)都由提交给池的单独任务处理。 Thread.currentThread().getName() 调用有助于识别哪个线程处理每个元素。

Main.java
package com.zetcode;

import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws InterruptedException {

        List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16);
        Spliterator<Integer> spliterator1 = numbers.spliterator();
        Spliterator<Integer> spliterator2 = spliterator1.trySplit(); // s1 is now roughly the second half

        try (ExecutorService executor = Executors.newFixedThreadPool(2)) {

            System.out.println("Submitting tasks for parallel processing...");

            // Task for the first split (which is now spliterator2)
            if (spliterator2 != null) {
                executor.submit(() -> {
                    System.out.println("Processing second half (split part) by " + Thread.currentThread().getName());
                    spliterator2.forEachRemaining(num -> {
                        System.out.println("S2: " + num + " by " + Thread.currentThread().getName());
                        try { Thread.sleep(100); } catch (InterruptedException e) { /*ignore*/ }
                    });
                });
            }
            
            // Task for the remaining part of the original spliterator (spliterator1)
            executor.submit(() -> {
                System.out.println("Processing first half (original part) by " + Thread.currentThread().getName());
                spliterator1.forEachRemaining(num -> {
                    System.out.println("S1: " + num + " by " + Thread.currentThread().getName());
                    try { Thread.sleep(100); } catch (InterruptedException e) { /*ignore*/ }
                });
            });

            executor.shutdown();
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
            System.out.println("All tasks completed.");
        }
    }
}

输出将显示元素由池中的不同线程处理(例如,“pool-1-thread-1”、“pool-1-thread-2”)。 这证实了两个数据分区的处理是并行发生的。 这种手动方法可以对并行执行进行细粒度控制。

利用带有 Spliterator 的并行流

Java Streams 提供了一个高级 API,用于处理元素序列。 并行流在内部使用 Spliterator 在多个线程之间分配工作。 此示例显示了获取并行流的两种方法:collection.parallelStreamStreamSupport.stream(spliterator, true)

集合上的 parallelStream 方法直接返回一个并行流。 或者,StreamSupport.stream 可以从现有 Spliterator 创建一个流。 将其第二个参数设置为 true 会使生成的流并行。 Java Fork/Join 框架在后台管理并行性。

Main.java
package com.zetcode;

import java.util.List;
import java.util.Spliterator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class Main {

    public static void main(String[] args) {

        List<String> words = List.of("apple", "banana", "cherry", "date", 
                                     "elderberry", "fig", "grape", "honeydew");

        System.out.println("Processing with collection.parallelStream():");
        words.parallelStream().forEach(word -> 
            System.out.println(word + " processed by " + Thread.currentThread().getName())
        );

        System.out.println("\nProcessing with StreamSupport.stream(spliterator, true):");
        Spliterator<String> spliterator = words.spliterator();
        Stream<String> parallelStreamFromSpliterator = StreamSupport.stream(spliterator, true);
        
        parallelStreamFromSpliterator.forEach(word -> 
            System.out.println(word + " processed by " + Thread.currentThread().getName())
        );

        System.out.println("Finished processing with parallel streams.");
    }
}

输出揭示了多个线程如何同时执行任务,从而有效地将工作负载分配到可用的处理器内核上。 通过利用 Java 的 Fork/Join 框架,Streams API 无缝地处理并行执行,从而确保最佳的资源利用率,而无需手动线程管理。 此示例说明了 Spliterator 如何促进数据分区,从而使并行流能够独立处理元素,同时提高大规模计算中的性能。

原始数据类型的并行处理

Java 为原始类型(如 intlongdouble)提供了专门的 Spliterator(例如,Spliterator.OfInt)。 这些避免了将原始类型装箱/拆箱到其包装类中的开销。 此示例使用从 int 数组中获得的 Spliterator.OfInt

我们首先演示如何使用 StreamSupport.intStream(spliterator, true) 创建并行 IntStream 来计算总和。 然后,我们通过拆分 Spliterator.OfInt 并使用 ExecutorService 来展示手动并行处理。 每个任务都使用 forEachRemaining(IntConsumer) 来实现高效的原始处理。

Main.java
package com.zetcode;

import java.util.Arrays;
import java.util.Spliterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

public class Main {

    public static void main(String[] args) throws InterruptedException {

        int[] numbers = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20};

        // Method 1: Using parallel IntStream from Spliterator.OfInt
        Spliterator.OfInt spliteratorForStream = Arrays.spliterator(numbers);
        IntStream parallelIntStream = StreamSupport.intStream(spliteratorForStream, true);
        long sum1 = parallelIntStream.sum();
        System.out.println("Sum using parallel IntStream: " + sum1);

        // Method 2: Manual parallel processing with ExecutorService
        System.out.println("\nManual parallel processing of primitive array:");
        Spliterator.OfInt s1 = Arrays.spliterator(numbers);
        Spliterator.OfInt s2 = s1.trySplit();

        LongAdder partialSum1 = new LongAdder();
        LongAdder partialSum2 = new LongAdder();

        try (ExecutorService executor = Executors.newFixedThreadPool(2)) {
            if (s2 != null) {
                executor.submit(() -> {
                    s2.forEachRemaining((int val) -> {
                        partialSum2.add(val);
                    });
                });
            }
            executor.submit(() -> {
                s1.forEachRemaining((int val) -> {
                    partialSum1.add(val);
                });
            });

            executor.shutdown();
            if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
                executor.shutdownNow();
            }
        }

        long totalSum = partialSum1.sum() + partialSum2.sum();
        System.out.println("Sum using manual parallel processing: " + totalSum);
    }
}

此示例展示了并行处理原始数组的两种方法。 使用并行流通常更简洁。 使用 ExecutorService 的手动控制提供了灵活性,尤其是在与现有线程模型集成或需要细粒度任务管理时。 这两种方法都利用了 Spliterator.OfInt 的拆分功能。

使用 Spliterator 进行数字的并行求和

此示例演示了并行求和数字列表。 我们从 List<Integer> 中获取 Spliterator,将其拆分,然后使用 ExecutorService 来并发计算部分和。 每个任务都是一个 Callable,它返回其部分和。 然后,使用 Future 对象收集这些部分和,并将它们组合以获得总和。

这种模式对于“分而治之”的并行算法很常见。 求和的工作在线程之间分配,结果被聚合。 它突出了 Spliterator 如何促进分解计算以进行并行执行。

Main.java
package com.zetcode;

import java.util.List;
import java.util.Spliterator;

import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class Main {

    public static void main(String[] args) throws Exception {
        List<Integer> numbers = IntStream.rangeClosed(1, 10000)
                .boxed()
                .collect(Collectors.toList());

        Spliterator<Integer> spliterator1 = numbers.spliterator();
        Spliterator<Integer> spliterator2 = spliterator1.trySplit(); // s1 is now the second half

        try (ExecutorService executor = Executors.newFixedThreadPool(2)) {

            // Task for the first part (which is spliterator2)
            Callable<Long> sumTask1 = () -> {
                long sum = 0;
                if (spliterator2 != null) {
                    SummingConsumer consumer = new SummingConsumer();
                    spliterator2.forEachRemaining(consumer);
                    sum = consumer.getTotal();
                }
                System.out.println("Sum from task 1 (thread " + Thread.currentThread().getName() + "): " + sum);
                return sum;
            };

            // Task for the second part (remaining of spliterator1)
            Callable<Long> sumTask2 = () -> {
                SummingConsumer consumer = new SummingConsumer();
                spliterator1.forEachRemaining(consumer);
                long sum = consumer.getTotal();
                System.out.println("Sum from task 2 (thread " + Thread.currentThread().getName() + "): " + sum);
                return sum;
            };

            Future<Long> future1 = executor.submit(sumTask1);
            Future<Long> future2 = executor.submit(sumTask2);

            long totalSum = future1.get() + future2.get();
            System.out.println("Total sum calculated in parallel: " + totalSum);

            long expectedSum = numbers.stream().mapToLong(Integer::longValue).sum();
            System.out.println("Expected sum (sequential stream): " + expectedSum);

            executor.shutdown();
            if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
                executor.shutdownNow();
            }
        }
    }

    // Helper class for summing, as lambda variable for sum must be effectively final
    static class SummingConsumer implements Consumer<Integer> {
        private long total = 0;
        @Override
        public void accept(Integer value) {
            total += value;
        }
        public long getTotal() {
            return total;
        }
    }
}

拆分机制是通过 trySplit 实现的,它将原始 Spliterator 分成两部分。 第一个 Spliterator (spliterator1) 保留数据的一部分,而第二个 Spliterator (spliterator2) 处理另一部分。 这允许独立地并行处理每个子集。

为了执行并行求和,使用大小为 2 的固定线程池来使用 ExecutorService。 每个拆分由一个单独的线程处理,从而确保优化的资源利用率。 这些任务作为 Callable<Long> 函数提交,这些函数异步返回计算的总和。

辅助类 SummingConsumer 用于在遍历带有 forEachRemaining 的元素时累积总和。 这种方法是必要的,因为 lambda 表达式中的变量必须是有效的 final,从而防止直接就地修改。

一旦两个任务都完成执行,它们的结果就会合并以并行获得总和。 然后,将计算结果与通过标准流操作获得的顺序总和进行验证。 这确保了正确性,并提供了对并行执行的性能优势的洞察。

使用 Spliterator 进行并行数据转换

此示例演示了使用 SpliteratorExecutorService 进行并行处理。 该程序从预定义的单词列表开始,并使用 trySplit 将工作负载拆分为两个单独的任务。 每个任务将其分配的单词转换为大写,并附加线程标识以说明实际的并发。

这对于大型数据集上的 CPU 密集型转换任务非常有用。 拆分数据允许多个内核同时处理转换,从而可能显着加快整个过程。

Main.java
package com.zetcode;

import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {
        List<String> words = List.of("alpha", "bravo", "charlie", "delta", "echo",
                "foxtrot", "golf", "hotel", "india", "juliett");

        Spliterator<String> s1 = words.spliterator();
        Spliterator<String> s2 = s1.trySplit(); // s1 is now the second half

        try (ExecutorService executor = Executors.newFixedThreadPool(2)) {

            Callable<List<String>> transformTask1 = () -> {
                List<String> result = new ArrayList<>();
                if (s2 != null) { // s2 is the first half
                    s2.forEachRemaining(word -> {
                        result.add(word.toUpperCase() + " (processed by " + Thread.currentThread().getName() + ")");
                    });
                }
                return result;
            };

            Callable<List<String>> transformTask2 = () -> {
                List<String> result = new ArrayList<>();
                s1.forEachRemaining(word -> { // s1 is the second half
                    result.add(word.toUpperCase() + " (processed by " + Thread.currentThread().getName() + ")");
                });
                return result;
            };

            Future<List<String>> future1 = executor.submit(transformTask1);
            Future<List<String>> future2 = executor.submit(transformTask2);

            List<String> combinedResult = new ArrayList<>();
            combinedResult.addAll(future1.get());
            combinedResult.addAll(future2.get());

            System.out.println("Transformed words in parallel:");
            combinedResult.forEach(System.out::println);

            executor.shutdown();
            if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
                executor.shutdownNow();
            }
        }

        System.out.println("\nSequentially transformed for order reference:");
        words.stream().map(String::toUpperCase).forEach(System.out::println);
    }
}

拆分机制将原始 Spliterator 分为两个部分。 前半部分 (s2) 和剩余部分 (s1) 然后由单独的线程独立处理。 这种方法通过在多个线程之间分配计算任务来优化 CPU 利用率。

为了促进真正的并行执行,使用了一个固定线程池 (ExecutorService),从而确保每个拆分都被并发处理。 这些任务定义为 Callable<List<String>>,允许异步执行,并在完成后返回转换后的结果。

每个任务都使用 forEachRemaining 来处理其拆分中的元素,从而确保有效地遍历单词,而无需显式迭代。 一旦两个任务都完成执行,它们的结果将合并到组合列表中以进行最终输出。

重要的是要注意,除非明确管理,否则不能保证订单一致性。 由于处理发生在多个线程中,因此最终合并的结果可能与原始序列不同。 为了进行参考,与并行输出一起显示了使用标准流的顺序转换。

最后,ExecutorService 被正常关闭,从而确保有效的资源清理。 如果任务超出其预期的执行时间,紧急关闭将防止不必要的资源消耗。

使用 Spliterator 进行并行数据聚合(例如,计数)

此示例演示了并行数据聚合,特别是计算符合特定标准的元素。 我们使用字符串列表(模拟文件中的行)。 目标是计算有多少个字符串包含特定的关键字。 Spliterator 被拆分,每个部分由 ExecutorService 中的任务处理。 每个任务计算其段中匹配的字符串,主线程对这些部分计数求和。

这种方法对于大型数据集上的过滤和计数等操作有效。 通过划分数据集并同时处理部分,我们通常可以获得比纯顺序方法更好的性能。

Main.java
package com.zetcode;

import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Main {

    public static void main(String[] args) throws Exception {
        List<String> lines = List.of(
                "The quick brown fox",
                "jumps over the lazy dog",
                "A B C D E F G",
                "Another line with fox here",
                "Spliterators are useful for parallel fox processing",
                "The lazy fox is quick"
        );
        String keyword = "fox";

        Spliterator<String> s1 = lines.spliterator();
        Spliterator<String> s2 = s1.trySplit(); // s1 is now the second half

        try (ExecutorService executor = Executors.newFixedThreadPool(2)) {

            Callable<Integer> countTask1 = () -> {
                AtomicInteger count = new AtomicInteger(0);
                if (s2 != null) { // s2 is the first half
                    s2.forEachRemaining(line -> {
                        if (line.contains(keyword)) {
                            count.incrementAndGet();
                        }
                    });
                }
                System.out.println("Count from task 1 (thread " + Thread.currentThread().getName() + "): " + count.get());
                return count.get();
            };

            Callable<Integer> countTask2 = () -> {
                AtomicInteger count = new AtomicInteger(0);
                s1.forEachRemaining(line -> { // s1 is the second half
                    if (line.contains(keyword)) {
                        count.incrementAndGet();
                    }
                });
                System.out.println("Count from task 2 (thread " + Thread.currentThread().getName() + "): " + count.get());
                return count.get();
            };

            Future<Integer> future1 = executor.submit(countTask1);
            Future<Integer> future2 = executor.submit(countTask2);

            int totalCount = future1.get() + future2.get();
            System.out.println("Total lines containing '" + keyword + "': " + totalCount);

            long expectedCount = lines.stream().filter(line -> line.contains(keyword)).count();
            System.out.println("Expected count (sequential stream): " + expectedCount);

            executor.shutdown();
            if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
                executor.shutdownNow();
            }
        }
    }
}

在此示例中,并行处理字符串列表以计算单词“fox”的出现次数。 如果 lambda 表达式更复杂,每个任务都使用一个 AtomicInteger 进行本地计数,以确保线程安全,尽管此处简单的局部变量也可以工作。 部分计数通过 Future 对象检索并求和。

此实现中的一个关键组件是使用 AtomicInteger,它保证了在计算目标关键字的出现次数时进行线程安全增量。 这种方法确保了在 lambda 表达式中修改共享变量时的正确性 - 尽管对于简单情况,局部变量就足够了。

一旦两个任务都完成执行,它们的部分结果将通过 Future 对象检索并求和,以并行计算总关键字出现次数。 然后,通过将其与基于顺序流的计数进行比较来验证输出,从而证明通过并发处理实现的准确性和效率提升。

来源

Java Spliterator 文档

本教程介绍了 Java Spliterator 接口,重点介绍了它在并行处理中的使用。 我们探讨了基本遍历、拆分、特征、自定义 Spliterator 以及与并行流和 ExecutorService 的集成。 了解 Spliterator 对于编写高效、可扩展的 Java 代码至关重要,这些代码可以利用多核处理器来处理数据密集型任务。

作者

我叫 Jan Bodnar,是一位经验丰富的专注程序员。我于 2007 年开始撰写编程文章,至今已创作了 1,400 多篇文章和八本电子书。 凭借超过八年的教学经验,我致力于分享我的知识并帮助他人掌握编程概念。

列出所有Java教程