Go 管道
最后修改于 2025 年 4 月 22 日
本教程演示了如何在 Golang 中利用管道实现无缝的进程间通信。
管道
管道是一种强大的机制,用于在进程之间重定向数据,通过单向通道实现高效的进程间通信。管道通常用于需要在生产者和消费者之间无缝流动数据,或者进程协作以完成共同任务的场景。
Go 中的 `io.Pipe` 函数创建一个同步的内存管道,它在 `io.Reader` 和 `io.Writer` 之间提供直接连接。当您需要在应用程序组件之间流式传输数据时,此函数特别有用,例如从源读取数据同时写入到接收器。
由于 `io.Pipe` 是同步操作的,写入管道的操作会阻塞,直到发生相应的读取操作,从而确保数据在没有缓冲的情况下进行一致传输。这使得 `io.Pipe` 成为需要实时数据处理或组件之间紧密耦合通信的应用程序的绝佳选择。
$ go version go version go1.22.2 linux/amd64
本教程中的所有示例均使用 Go 版本 1.22.2。
Go 管道简单示例
此示例演示了 `io.Pipe` 函数在数据传输中的基本用法。
package main import ( "fmt" "io" "log" "os" ) func main() { r, w := io.Pipe() go func() { fmt.Fprint(w, "Hello there\n") w.Close() }() _, err := io.Copy(os.Stdout, r) if err != nil { log.Fatal(err) } }
此程序使用 `io.Pipe` 创建一个管道,在一个 goroutine 中向管道的 writer 写入数据,并使用 `io.Copy` 将数据从管道的 reader 复制到标准输出。
go func() { fmt.Fprint(w, "Hello there\n") w.Close() }()
在一个 goroutine 中,数据被写入 `PipeWriter`。写入会阻塞,直到数据被一个或多个来自 `PipeReader` 的读取操作完全消耗。
$ go run simple.go Hello there
Go cmd StdoutPipe
`command` 的 `StdoutPipe` 方法提供了一个在命令启动后连接到该命令标准输出的管道。
package main import ( "bufio" "fmt" "log" "os" "os/exec" ) func main() { cmd := exec.Command("ping", "webcode.me") stdout, err := cmd.StdoutPipe() if err != nil { log.Fatal(err) } cmd.Start() buf := bufio.NewReader(stdout) num := 0 for { line, _, _ := buf.ReadLine() if num > 3 { os.Exit(0) } num += 1 fmt.Println(string(line)) } }
此示例执行 `ping` 命令并读取其输出的前四行,将它们显示在控制台上。
cmd := exec.Command("ping", "webcode.me")
创建一个命令来运行 `ping`,以测试 `webcode.me` 网站的可访问性。
stdout, err := cmd.StdoutPipe()
`StdoutPipe` 方法获取命令标准输出流的管道。
buf := bufio.NewReader(stdout)
创建一个缓冲读取器,以便有效地从命令的标准输出读取数据。
for { line, _, _ := buf.ReadLine() if num > 3 { os.Exit(0) } num += 1 fmt.Println(string(line)) }
一个循环从输出中读取四行,将每一行打印到控制台,然后终止程序。
$ go run pingcmd.go PING webcode.me (46.101.248.126) 56(84) bytes of data. 64 bytes from 46.101.248.126 (46.101.248.126): icmp_seq=1 ttl=54 time=29.7 ms 64 bytes from 46.101.248.126 (46.101.248.126): icmp_seq=2 ttl=54 time=35.9 ms 64 bytes from 46.101.248.126 (46.101.248.126): icmp_seq=3 ttl=54 time=37.4 ms
Go 管道 POST JSON 数据
此示例演示了如何使用管道将 JSON 数据 POST 到 `https://httpbin.org/post`。
package main import ( "encoding/json" "fmt" "io" "io/ioutil" "log" "net/http" ) type PayLoad struct { Content string } func main() { r, w := io.Pipe() go func() { defer w.Close() err := json.NewEncoder(w).Encode(&PayLoad{Content: "Hello there!"}) if err != nil { log.Fatal(err) } }() resp, err := http.Post("https://httpbin.org/post", "application/json", r) if err != nil { log.Fatal(err) } body, err := ioutil.ReadAll(resp.Body) if err != nil { log.Fatal(err) } fmt.Println(string(body)) }
此程序通过管道将 JSON 有效负载发送到 Web 服务器,并打印响应正文,展示了通过管道进行的网络通信。
go func() { defer w.Close() err := json.NewEncoder(w).Encode(&PayLoad{Content: "Hello there!"}) if err != nil { log.Fatal(err) } }()
JSON 有效负载被编码并在 goroutine 中写入 `PipeWriter`,确保在编码后正确关闭 writer。
resp, err := http.Post("https://httpbin.org/post", "application/json", r)
`http.Post` 函数使用 `PipeReader` 作为请求正文发送 JSON 数据,并将内容类型指定为 JSON。
body, err := ioutil.ReadAll(resp.Body) if err != nil { log.Fatal(err) } fmt.Println(string(body))
响应正文被读取并转换为字符串,然后打印到控制台以显示服务器的响应。
$ go run post_json.go { "args": {}, "data": "{\"Content\":\"Hello there!\"}\n", "files": {}, "form": {}, "headers": { ...
Go 通过管道读取标准输入
此示例创建了一个 Go 程序,该程序通过管道从标准输入读取数据,对其进行处理,然后显示结果。
package main import ( "bufio" "fmt" "io" "log" "os" ) func main() { nBytes, nChunks := int64(0), int64(0) r := bufio.NewReader(os.Stdin) buf := make([]byte, 0, 4*1024) for { n, err := r.Read(buf[:cap(buf)]) buf = buf[:n] if n == 0 { if err == nil { continue } if err == io.EOF { break } log.Fatal(err) } nChunks++ nBytes += int64(len(buf)) fmt.Println(string(buf)) if err != nil && err != io.EOF { log.Fatal(err) } } fmt.Println("Bytes:", nBytes, "Chunks:", nChunks) }
此程序从标准输入读取数据,打印它,并跟踪处理的字节数和块数,演示了基于管道的输入处理。
r := bufio.NewReader(os.Stdin)
创建一个缓冲读取器,以便从标准输入高效地读取数据,通常是通过另一个命令管道传输过来的。
buf := make([]byte, 0, 4*1024)
分配一个 4KB 的缓冲区来存储从标准输入读取的数据,从而优化输入处理的内存使用。
n, err := r.Read(buf[:cap(buf)]) buf = buf[:n]
数据被读入缓冲区,并根据读取的字节数对缓冲区进行切片,确保准确的数据处理。
nChunks++ nBytes += int64(len(buf))
程序递增块计数器并累积读取的总字节数,跟踪输入统计信息。
fmt.Println(string(buf))
缓冲区的内容被转换为字符串并打印到控制台,显示管道输入数据。
$ date | go run read_stdin.go Sun 15 Nov 2020 01:08:13 PM CET Bytes: 32 Chunks: 1
`date` 命令的输出被管道传输到程序,程序读取、显示并报告字节数和块计数。
Go Stat
`Stat` 函数返回一个描述文件的 FileInfo 结构,可用于检测标准输入上的管道输入。
package main import ( "bufio" "fmt" "log" "os" ) func main() { stat, _ := os.Stdin.Stat() if (stat.Mode() & os.ModeCharDevice) == 0 { var buf []byte scanner := bufio.NewScanner(os.Stdin) for scanner.Scan() { buf = append(buf, scanner.Bytes()...) } if err := scanner.Err(); err != nil { log.Fatal(err) } fmt.Printf("Hello %s!\n", buf) } else { fmt.Print("Enter your name: ") var name string fmt.Scanf("%s", &name) fmt.Printf("Hello %s!\n", name) } }
此程序通过管道或用户提示接受输入,并根据输入源向用户致以问候。
stat, _ := os.Stdin.Stat()
`Stat` 函数检索有关标准输入元数据,指示它是通过管道传输还是来自终端。
if (stat.Mode() & os.ModeCharDevice) == 0 {
此条件检查标准输入是否为管道传输,而不是来自终端或字符设备。
var buf []byte scanner := bufio.NewScanner(os.Stdin) for scanner.Scan() { buf = append(buf, scanner.Bytes()...) }
一个扫描器逐行读取管道输入,并将每一行追加到一个字节切片中以供进一步处理。
} else { fmt.Print("Enter your name: ") var name string fmt.Scanf("%s", &name) fmt.Printf("Hello %s!\n", name) }
如果没有检测到管道输入,程序会提示用户输入一个名字并打印问候语。
$ echo "Peter" | go run hello.go Hello Peter! $ go run hello.go Enter your name: Peter Hello Peter!
程序处理管道输入(通过 `echo`)和交互式提示输入,展示了灵活的输入处理。
Go HTTP 处理程序中的管道
此示例在 HTTP 处理程序中使用管道将命令输出流式传输到 Web 客户端。
package main import ( "fmt" "io" "net/http" "os/exec" ) func handler(w http.ResponseWriter, r *http.Request) { cmd := exec.Command("date") pr, pw := io.Pipe() defer pw.Close() cmd.Stdout = pw cmd.Stderr = pw go io.Copy(w, pr) cmd.Run() } func main() { http.HandleFunc("/", handler) fmt.Println("server started on port 8080") http.ListenAndServe(":8080", nil) }
此程序运行一个 Web 服务器,该服务器执行 `date` 命令并通过管道将其输出流式传输到客户端。
cmd := exec.Command("date")
定义了一个命令来执行 `date` 系统命令,该命令输出当前日期和时间。
pr, pw := io.Pipe() defer pw.Close()
使用 `io.Pipe` 创建一个管道,并推迟 writer 的关闭,以确保在使用后正确关闭。
cmd.Stdout = pw cmd.Stderr = pw
命令的标准输出和错误流被定向到 `PipeWriter` 进行流式传输。
go io.Copy(w, pr)
在 goroutine 中,`PipeReader` 将数据流式传输到 `http.ResponseWriter`,将其发送到客户端。
cmd.Run()
`Run` 方法执行命令,生成通过管道传输到客户端的输出。
$ go run handler.go server started on port 8080
服务器启动,监听 8080 端口上的 HTTP 请求。
$ curl localhost:8080 Sun 15 Nov 2020 02:18:07 PM CET
一个 `curl` 请求获取 date 命令的输出,该输出通过服务器的管道进行流式传输。
Go 管道与并发写入器
此示例演示了向单个管道使用多个并发写入器,展示了来自多个 goroutine 的同步数据聚合。
package main import ( "fmt" "io" "log" "os" "sync" ) func main() { r, w := io.Pipe() var wg sync.WaitGroup for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() fmt.Fprintf(w, "Message from writer %d\n", id) }(i) } go func() { wg.Wait() w.Close() }() _, err := io.Copy(os.Stdout, r) if err != nil { log.Fatal(err) } }
此程序创建一个管道并启动三个 goroutine,每个 goroutine 向 `PipeWriter` 写入一条消息。`WaitGroup` 确保在所有 goroutine 完成后才关闭 writer,并将数据读取到 stdout。
$ go run concurrent_writers.go Message from writer 0 Message from writer 1 Message from writer 2
Go 管道用于文件流式处理
此示例使用管道将数据从文件流式传输到处理函数,演示了在不将大文件完全加载到内存中的情况下进行高效处理。
package main import ( "bufio" "io" "log" "os" "strings" ) func main() { r, w := io.Pipe() go func() { defer w.Close() file, err := os.Open("input.txt") if err != nil { log.Fatal(err) } defer file.Close() io.Copy(w, file) }() scanner := bufio.NewScanner(r) for scanner.Scan() { line := strings.ToUpper(scanner.Text()) os.Stdout.WriteString(line + "\n") } if err := scanner.Err(); err != nil { log.Fatal(err) } }
此程序通过管道读取文件,将其内容流式传输到将每行转换为大写并输出的扫描器。管道使得在不将整个文件加载到内存中的情况下进行处理成为可能。
$ echo -e "hello\nworld" > input.txt $ go run file_stream.go HELLO WORLD
Go 管道与错误处理
此示例展示了如何在基于管道的工作流中处理错误,确保 writer 和 reader 之间通信的健壮性,并具有正确的错误传播。
package main import ( "errors" "fmt" "io" "log" "os" ) func main() { r, w := io.Pipe() go func() { defer w.Close() _, err := fmt.Fprint(w, "Valid data\n") if err != nil { w.CloseWithError(err) return } w.CloseWithError(errors.New("simulated writer error")) }() buf := make([]byte, 1024) n, err := r.Read(buf) if err != nil && err != io.EOF { log.Fatal(err) } fmt.Print(string(buf[:n])) _, err = r.Read(buf) if err != nil { fmt.Println("Reader caught error:", err) } }
此程序使用 `CloseWithError` 模拟 writer 错误。reader 处理了初始有效数据,然后检测到错误,演示了管道通信中的健壮错误处理。
$ go run error_handling.go Valid data Reader caught error: simulated writer error
Go 管道与链式处理
此示例说明了一个管道,其中数据通过管道流经多个处理阶段,展示了数据转换的模块化方法。
package main import ( "bufio" "io" "log" "os" "strings" ) func main() { r1, w1 := io.Pipe() r2, w2 := io.Pipe() go func() { defer w1.Close() w1.Write([]byte("hello there!\n")) }() go func() { defer w2.Close() scanner := bufio.NewScanner(r1) for scanner.Scan() { w2.Write([]byte(strings.ToUpper(scanner.Text()) + "\n")) } if err := scanner.Err(); err != nil { log.Fatal(err) } }() io.Copy(os.Stdout, r2) }
此程序创建了一个两阶段管道:第一阶段将数据写入管道,第二阶段读取数据,将其转换为大写,然后写入另一个管道。最终输出被流式传输到 stdout。
$ go run chained_pipeline.go HELLO THERE!
Go 管道与实时日志聚合
此示例演示了如何使用管道实时聚合来自多个源的日志,模拟具有并发输入的集中式日志记录系统。
package main import ( "fmt" "io" "log" "os" "sync" "time" ) func main() { r, w := io.Pipe() var wg sync.WaitGroup for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 3; j++ { fmt.Fprintf(w, "[Source %d] Log entry %d at %v\n", id, j, time.Now()) time.Sleep(100 * time.Millisecond) } }(i) } go func() { wg.Wait() w.Close() }() scanner := bufio.NewScanner(r) for scanner.Scan() { fmt.Println(scanner.Text()) } if err := scanner.Err(); err != nil { log.Fatal(err) } }
此程序模拟了三个日志源,每个源在并发 goroutine 中将条目写入管道。reader 扫描管道并实时打印日志,演示了集中式日志聚合。
Go 管道与压缩数据流
此示例展示了如何使用管道实时流式传输和压缩数据,说明了高效处理具有压缩功能的大型数据集。
package main import ( "compress/gzip" "fmt" "io" "log" "os" ) func main() { r, w := io.Pipe() go func() { defer w.Close() gw := gzip.NewWriter(w) defer gw.Close() for i := 0; i < 5; i++ { fmt.Fprintf(gw, "Data chunk %d\n", i) } }() file, err := os.Create("output.gz") if err != nil { log.Fatal(err) } defer file.Close() _, err = io.Copy(file, r) if err != nil { log.Fatal(err) } }
此程序通过管道流式传输数据,并在 goroutine 中使用 gzip 进行压缩。压缩后的数据被写入文件,展示了实时压缩以实现高效存储或传输。
$ go run compress_stream.go $ gunzip -c output.gz Data chunk 0 Data chunk 1 Data chunk 2 Data chunk 3 Data chunk 4
来源
本文探讨了在 Go 中使用管道进行进程间通信和数据流式传输。