Dart Pipe
最后修改于 2025 年 4 月 4 日
Dart 中的 Pipe 类提供了一种连接流以进行高效数据处理的方法。它对于链接流转换很有用。
Pipe 管理流之间的连接,处理数据流和错误。它是 Dart dart:async 库中用于异步编程的一部分。
基本定义
Pipe 是流之间的连接器,负责将事件从源转发到目标。它同时实现了 Stream 和 StreamSink 接口。
主要功能包括自动事件转发、错误传播和流生命周期管理。它简化了复杂的流管道。
基本 Pipe 用法
此示例展示了两个控制器之间的基本流管道。
main.dart
import 'dart:async';
void main() async {
var source = StreamController<int>();
var destination = StreamController<int>();
var pipe = Pipe(source.stream, destination.sink);
destination.stream.listen((data) {
print('Received: $data');
});
source.add(1);
source.add(2);
source.add(3);
await source.close();
}
我们创建一个 Pipe,连接一个源流到一个目标 sink。数据通过管道自动从源流向目标流。
$ dart main.dart Received: 1 Received: 2 Received: 3
使用 Pipe 转换数据
此示例演示了向 pipe 添加转换器。
main.dart
import 'dart:async';
void main() async {
var source = StreamController<int>();
var destination = StreamController<String>();
var pipe = Pipe(
source.stream,
destination.sink,
transform: (stream) => stream.map((n) => 'Number $n'),
);
destination.stream.listen(print);
source.add(1);
source.add(2);
source.add(3);
await source.close();
}
我们添加一个转换函数来修改数据在通过 pipe 时的样子。转换器在转发之前将数字转换为格式化的字符串。
$ dart main.dart Number 1 Number 2 Number 3
Pipe 中的错误处理
此示例展示了 Pipe 如何处理和转发错误。
main.dart
import 'dart:async';
void main() async {
var source = StreamController<int>();
var destination = StreamController<int>();
var pipe = Pipe(source.stream, destination.sink);
destination.stream.listen(
print,
onError: (e) => print('Error: $e'),
onDone: () => print('Done'),
);
source.add(1);
source.addError('Test Error');
source.add(2);
await source.close();
}
源流中的错误会自动转发到目标。Pipe 在整个流管道中维护错误传播。
$ dart main.dart 1 Error: Test Error 2 Done
带有广播流的 Pipe
此示例演示了将 Pipe 与广播流一起使用。
main.dart
import 'dart:async';
void main() async {
var source = StreamController<int>.broadcast();
var destination = StreamController<int>.broadcast();
var pipe = Pipe(source.stream, destination.sink);
// First listener
destination.stream.listen((data) {
print('Listener 1: $data');
});
// Second listener
destination.stream.listen((data) {
print('Listener 2: $data');
});
source.add(1);
source.add(2);
await source.close();
}
Pipe 可以与广播流一起使用,将事件转发给所有监听器。每个目标监听器都会收到源的所有事件。
$ dart main.dart Listener 1: 1 Listener 2: 1 Listener 1: 2 Listener 2: 2
关闭 Pipe 连接
此示例展示了 pipe 连接的正确清理。
main.dart
import 'dart:async';
void main() async {
var source = StreamController<int>();
var destination = StreamController<int>();
var pipe = Pipe(source.stream, destination.sink);
destination.stream.listen(print);
source.add(1);
source.add(2);
// Proper cleanup
await pipe.close();
await source.close();
await destination.close();
print('All streams closed');
}
关闭 pipe 可确保所有资源都得到正确释放。完成时关闭所有控制器和 pipe 是个好习惯。
$ dart main.dart 1 2 All streams closed
最佳实践
- 资源清理:始终关闭 pipes 和 controllers
- 错误处理:在目标流中处理错误
- 转换器:用于管道中的数据修改
- 广播:对于多个监听器,请使用广播流
来源
本教程通过实际示例介绍了 Dart 的 Pipe 类,展示了基本用法、错误处理和流转换。
作者
列出 所有 Dart 教程。