ZetCode

Java PipedOutputStream 类

最后修改时间:2025 年 4 月 16 日

java.io.PipedOutputStream 类用于在生产者-消费者场景中向管道写入数据。它必须连接到 PipedInputStream 才能在线程之间创建通信通道。写入输出流的数据可以从连接的输入流中读取。

PipedOutputStream 通常用于线程间通信。管道具有有限的缓冲区大小,如果缓冲区已满,写入操作将被阻塞。管道的两端必须位于同一进程中,并且通常位于同一 JVM 中。

PipedOutputStream 类概述

PipedOutputStream 继承了 OutputStream 并提供了基本的管道写入功能。主要方法包括连接到输入流、写入数据和关闭流。该类对于并发写入不是线程安全的。

public class PipedOutputStream extends OutputStream {
    public PipedOutputStream();
    public PipedOutputStream(PipedInputStream snk);
    public void connect(PipedInputStream snk);
    public void write(int b);
    public void write(byte[] b, int off, int len);
    public void flush();
    public void close();
}

上面的代码展示了 PipedOutputStream 提供的关键方法。这些方法允许将数据写入管道并管理连接。在写入数据之前,必须正确连接流。

创建 PipedOutputStream

可以使用两种方式创建 PipedOutputStream:带或不带现有的 PipedInputStream。如果创建时不带,则必须稍后连接。连接建立了线程之间的通信通道。

Main.java
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class Main {

    public static void main(String[] args) {
        try {
            // Create unconnected output stream
            PipedOutputStream pos1 = new PipedOutputStream();
            System.out.println("Created unconnected PipedOutputStream");
            
            // Create and connect to existing input stream
            PipedInputStream pis = new PipedInputStream();
            PipedOutputStream pos2 = new PipedOutputStream(pis);
            System.out.println("Created connected PipedOutputStream");
            
            pos1.close();
            pos2.close();
            pis.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

此示例演示了创建 PipedOutputStream 的不同方法。第一个是未连接的,需要稍后显式连接。第二个是预先连接到现有的 PipedInputStream。完成后务必关闭流。

将 PipedOutputStream 连接到 PipedInputStream

输出流和输入流之间的连接可以在创建期间建立,也可以稍后使用 connect 方法建立。每个流只允许一个连接。尝试重新连接会抛出 IOException。

Main.java
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class Main {

    public static void main(String[] args) {
        try {
            PipedInputStream pis = new PipedInputStream();
            PipedOutputStream pos = new PipedOutputStream();
            
            // Connect the streams
            pos.connect(pis);
            System.out.println("Streams connected successfully");
            
            // Verify connection
            if (pis.available() == 0) {
                System.out.println("No data in pipe yet");
            }
            
            pos.close();
            pis.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

此示例展示了如何在创建后将 PipedOutputStream 连接到 PipedInputStream。必须在写入数据之前建立连接。available 方法检查输入流中是否存在数据。

将数据写入 PipedOutputStream

连接后,可以将数据写入输出流并从输入流中读取。如果管道缓冲区已满,写入操作将被阻塞。管道具有默认的缓冲区大小,但可以在创建输入流时指定。

Main.java
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class Main {

    public static void main(String[] args) {
        try {
            PipedInputStream pis = new PipedInputStream();
            PipedOutputStream pos = new PipedOutputStream(pis);
            
            // Write data to the output stream
            String message = "Hello from PipedOutputStream!";
            pos.write(message.getBytes());
            System.out.println("Data written to pipe");
            
            // Read from input stream
            byte[] buffer = new byte[1024];
            int bytesRead = pis.read(buffer);
            System.out.println("Read from pipe: " + 
                new String(buffer, 0, bytesRead));
            
            pos.close();
            pis.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

此示例演示了将数据写入 PipedOutputStream 并从连接的 PipedInputStream 中读取它。write 方法接受字节数组用于批量数据传输。读取操作会阻塞,直到数据可用。

使用管道进行线程间通信

管道流通常用于线程之间的通信。一个线程写入输出流,而另一个线程从输入流读取。这提供了一种线程安全的数据交换方式。

Main.java
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class Main {

    public static void main(String[] args) {
        try {
            PipedInputStream pis = new PipedInputStream();
            PipedOutputStream pos = new PipedOutputStream(pis);
            
            // Writer thread
            Thread writer = new Thread(() -> {
                try {
                    for (int i = 1; i <= 5; i++) {
                        String msg = "Message " + i + "\n";
                        pos.write(msg.getBytes());
                        Thread.sleep(500);
                    }
                    pos.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            
            // Reader thread
            Thread reader = new Thread(() -> {
                try {
                    int data;
                    while ((data = pis.read()) != -1) {
                        System.out.print((char) data);
                    }
                    pis.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            
            writer.start();
            reader.start();
            
            writer.join();
            reader.join();
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

此示例展示了使用管道流的线程间通信。写入线程发送消息,而读取线程显示它们。管道自动处理线程之间的同步。

使用 PipedOutputStream 处理大型数据

在处理大型数据时,管理管道缓冲区大小并在生产者和消费者之间进行协调非常重要。管道的容量有限,因此如果消费者读取速度不够快,生产者可能会被阻塞。

Main.java
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class Main {

    public static void main(String[] args) {
        try {
            // Create pipe with larger buffer (10KB)
            PipedInputStream pis = new PipedInputStream(10240);
            PipedOutputStream pos = new PipedOutputStream(pis);
            
            Thread producer = new Thread(() -> {
                try {
                    byte[] data = new byte[4096];
                    for (int i = 0; i < 10; i++) {
                        // Fill buffer with pattern
                        for (int j = 0; j < data.length; j++) {
                            data[j] = (byte) (i + j);
                        }
                        pos.write(data);
                        System.out.println("Produced block " + i);
                    }
                    pos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            
            Thread consumer = new Thread(() -> {
                try {
                    byte[] buffer = new byte[1024];
                    int bytesRead;
                    int total = 0;
                    while ((bytesRead = pis.read(buffer)) != -1) {
                        total += bytesRead;
                        System.out.println("Consumed " + bytesRead + 
                            " bytes (total: " + total + ")");
                    }
                    pis.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            
            producer.start();
            consumer.start();
            
            producer.join();
            consumer.join();
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

此示例演示了如何使用自定义缓冲区大小处理更大的数据量。生产者创建 4KB 的块,而消费者以 1KB 的块读取。更大的管道缓冲区有助于防止数据传输期间的阻塞。

使用 PipedOutputStream 进行错误处理

在使用管道流时,正确的错误处理至关重要。常见问题包括管道损坏、连接失败和线程中断。始终在 finally 块中关闭流或使用 try-with-resources。

Main.java
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class Main {

    public static void main(String[] args) {
        PipedInputStream pis = null;
        PipedOutputStream pos = null;
        
        try {
            pis = new PipedInputStream();
            pos = new PipedOutputStream();
            
            // Attempt to write before connecting
            try {
                pos.write("Test".getBytes());
            } catch (IOException e) {
                System.out.println("Expected error: " + e.getMessage());
            }
            
            // Connect properly
            pos.connect(pis);
            
            // Start reader thread
            Thread reader = new Thread(() -> {
                try {
                    byte[] buffer = new byte[100];
                    int bytes = pis.read(buffer);
                    System.out.println("Read: " + new String(buffer, 0, bytes));
                } catch (IOException e) {
                    System.out.println("Reader error: " + e.getMessage());
                }
            });
            reader.start();
            
            // Write data
            pos.write("Successful communication".getBytes());
            pos.flush();
            
            reader.join();
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (pos != null) pos.close();
                if (pis != null) pis.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

此示例演示了使用管道流进行的正确错误处理。它显示了写入未连接的管道时会发生什么,并在 finally 块中包含正确的清理操作。读取线程处理读取期间潜在的 IOExceptions。

来源

Java PipedOutputStream 类文档

在本文中,我们介绍了 Java PipedOutputStream 类的基本方法和功能。理解这些概念对于在 Java 应用程序中使用线程间通信至关重要。

作者

我叫 Jan Bodnar,是一位经验丰富的专业程序员。我于 2007 年开始撰写编程文章,至今已创作超过 1400 篇文章和 8 本电子书。凭借超过 8 年的教学经验,我致力于分享我的知识并帮助他人掌握编程概念。

列出所有Java教程