Dart Pipe
最后修改于 2025 年 4 月 4 日
Dart 中的 Pipe
类提供了一种连接流以进行高效数据处理的方法。它对于链接流转换很有用。
Pipe 管理流之间的连接,处理数据流和错误。它是 Dart dart:async
库中用于异步编程的一部分。
基本定义
Pipe
是流之间的连接器,负责将事件从源转发到目标。它同时实现了 Stream 和 StreamSink 接口。
主要功能包括自动事件转发、错误传播和流生命周期管理。它简化了复杂的流管道。
基本 Pipe 用法
此示例展示了两个控制器之间的基本流管道。
main.dart
import 'dart:async'; void main() async { var source = StreamController<int>(); var destination = StreamController<int>(); var pipe = Pipe(source.stream, destination.sink); destination.stream.listen((data) { print('Received: $data'); }); source.add(1); source.add(2); source.add(3); await source.close(); }
我们创建一个 Pipe,连接一个源流到一个目标 sink。数据通过管道自动从源流向目标流。
$ dart main.dart Received: 1 Received: 2 Received: 3
使用 Pipe 转换数据
此示例演示了向 pipe 添加转换器。
main.dart
import 'dart:async'; void main() async { var source = StreamController<int>(); var destination = StreamController<String>(); var pipe = Pipe( source.stream, destination.sink, transform: (stream) => stream.map((n) => 'Number $n'), ); destination.stream.listen(print); source.add(1); source.add(2); source.add(3); await source.close(); }
我们添加一个转换函数来修改数据在通过 pipe 时的样子。转换器在转发之前将数字转换为格式化的字符串。
$ dart main.dart Number 1 Number 2 Number 3
Pipe 中的错误处理
此示例展示了 Pipe
如何处理和转发错误。
main.dart
import 'dart:async'; void main() async { var source = StreamController<int>(); var destination = StreamController<int>(); var pipe = Pipe(source.stream, destination.sink); destination.stream.listen( print, onError: (e) => print('Error: $e'), onDone: () => print('Done'), ); source.add(1); source.addError('Test Error'); source.add(2); await source.close(); }
源流中的错误会自动转发到目标。Pipe 在整个流管道中维护错误传播。
$ dart main.dart 1 Error: Test Error 2 Done
带有广播流的 Pipe
此示例演示了将 Pipe 与广播流一起使用。
main.dart
import 'dart:async'; void main() async { var source = StreamController<int>.broadcast(); var destination = StreamController<int>.broadcast(); var pipe = Pipe(source.stream, destination.sink); // First listener destination.stream.listen((data) { print('Listener 1: $data'); }); // Second listener destination.stream.listen((data) { print('Listener 2: $data'); }); source.add(1); source.add(2); await source.close(); }
Pipe 可以与广播流一起使用,将事件转发给所有监听器。每个目标监听器都会收到源的所有事件。
$ dart main.dart Listener 1: 1 Listener 2: 1 Listener 1: 2 Listener 2: 2
关闭 Pipe 连接
此示例展示了 pipe 连接的正确清理。
main.dart
import 'dart:async'; void main() async { var source = StreamController<int>(); var destination = StreamController<int>(); var pipe = Pipe(source.stream, destination.sink); destination.stream.listen(print); source.add(1); source.add(2); // Proper cleanup await pipe.close(); await source.close(); await destination.close(); print('All streams closed'); }
关闭 pipe 可确保所有资源都得到正确释放。完成时关闭所有控制器和 pipe 是个好习惯。
$ dart main.dart 1 2 All streams closed
最佳实践
- 资源清理:始终关闭 pipes 和 controllers
- 错误处理:在目标流中处理错误
- 转换器:用于管道中的数据修改
- 广播:对于多个监听器,请使用广播流
来源
本教程通过实际示例介绍了 Dart 的 Pipe 类,展示了基本用法、错误处理和流转换。
作者
列出 所有 Dart 教程。