ZetCode

Dart Pipe

最后修改于 2025 年 4 月 4 日

Dart 中的 Pipe 类提供了一种连接流以进行高效数据处理的方法。它对于链接流转换很有用。

Pipe 管理流之间的连接,处理数据流和错误。它是 Dart dart:async 库中用于异步编程的一部分。

基本定义

Pipe 是流之间的连接器,负责将事件从源转发到目标。它同时实现了 Stream 和 StreamSink 接口。

主要功能包括自动事件转发、错误传播和流生命周期管理。它简化了复杂的流管道。

基本 Pipe 用法

此示例展示了两个控制器之间的基本流管道。

main.dart
import 'dart:async';

void main() async {
  var source = StreamController<int>();
  var destination = StreamController<int>();
  var pipe = Pipe(source.stream, destination.sink);

  destination.stream.listen((data) {
    print('Received: $data');
  });

  source.add(1);
  source.add(2);
  source.add(3);
  
  await source.close();
}

我们创建一个 Pipe,连接一个源流到一个目标 sink。数据通过管道自动从源流向目标流。

$ dart main.dart
Received: 1
Received: 2
Received: 3

使用 Pipe 转换数据

此示例演示了向 pipe 添加转换器。

main.dart
import 'dart:async';

void main() async {
  var source = StreamController<int>();
  var destination = StreamController<String>();
  
  var pipe = Pipe(
    source.stream,
    destination.sink,
    transform: (stream) => stream.map((n) => 'Number $n'),
  );

  destination.stream.listen(print);

  source.add(1);
  source.add(2);
  source.add(3);
  
  await source.close();
}

我们添加一个转换函数来修改数据在通过 pipe 时的样子。转换器在转发之前将数字转换为格式化的字符串。

$ dart main.dart
Number 1
Number 2
Number 3

Pipe 中的错误处理

此示例展示了 Pipe 如何处理和转发错误。

main.dart
import 'dart:async';

void main() async {
  var source = StreamController<int>();
  var destination = StreamController<int>();
  var pipe = Pipe(source.stream, destination.sink);

  destination.stream.listen(
    print,
    onError: (e) => print('Error: $e'),
    onDone: () => print('Done'),
  );

  source.add(1);
  source.addError('Test Error');
  source.add(2);
  
  await source.close();
}

源流中的错误会自动转发到目标。Pipe 在整个流管道中维护错误传播。

$ dart main.dart
1
Error: Test Error
2
Done

带有广播流的 Pipe

此示例演示了将 Pipe 与广播流一起使用。

main.dart
import 'dart:async';

void main() async {
  var source = StreamController<int>.broadcast();
  var destination = StreamController<int>.broadcast();
  var pipe = Pipe(source.stream, destination.sink);

  // First listener
  destination.stream.listen((data) {
    print('Listener 1: $data');
  });

  // Second listener
  destination.stream.listen((data) {
    print('Listener 2: $data');
  });

  source.add(1);
  source.add(2);
  
  await source.close();
}

Pipe 可以与广播流一起使用,将事件转发给所有监听器。每个目标监听器都会收到源的所有事件。

$ dart main.dart
Listener 1: 1
Listener 2: 1
Listener 1: 2
Listener 2: 2

关闭 Pipe 连接

此示例展示了 pipe 连接的正确清理。

main.dart
import 'dart:async';

void main() async {
  var source = StreamController<int>();
  var destination = StreamController<int>();
  var pipe = Pipe(source.stream, destination.sink);

  destination.stream.listen(print);

  source.add(1);
  source.add(2);
  
  // Proper cleanup
  await pipe.close();
  await source.close();
  await destination.close();
  
  print('All streams closed');
}

关闭 pipe 可确保所有资源都得到正确释放。完成时关闭所有控制器和 pipe 是个好习惯。

$ dart main.dart
1
2
All streams closed

最佳实践

来源

Dart Pipe 文档

本教程通过实际示例介绍了 Dart 的 Pipe 类,展示了基本用法、错误处理和流转换。

作者

我叫 Jan Bodnar,是一名充满热情的程序员,拥有丰富的编程经验。我自 2007 年以来一直在撰写编程文章。至今,我已撰写了 1,400 多篇文章和 8 本电子书。我在编程教学方面拥有超过十年的经验。

列出 所有 Dart 教程