Java PipedOutputStream 类
最后修改时间:2025 年 4 月 16 日
java.io.PipedOutputStream 类用于在生产者-消费者场景中向管道写入数据。它必须连接到 PipedInputStream 才能在线程之间创建通信通道。写入输出流的数据可以从连接的输入流中读取。
PipedOutputStream 通常用于线程间通信。管道具有有限的缓冲区大小,如果缓冲区已满,写入操作将被阻塞。管道的两端必须位于同一进程中,并且通常位于同一 JVM 中。
PipedOutputStream 类概述
PipedOutputStream 继承了 OutputStream 并提供了基本的管道写入功能。主要方法包括连接到输入流、写入数据和关闭流。该类对于并发写入不是线程安全的。
public class PipedOutputStream extends OutputStream {
public PipedOutputStream();
public PipedOutputStream(PipedInputStream snk);
public void connect(PipedInputStream snk);
public void write(int b);
public void write(byte[] b, int off, int len);
public void flush();
public void close();
}
上面的代码展示了 PipedOutputStream 提供的关键方法。这些方法允许将数据写入管道并管理连接。在写入数据之前,必须正确连接流。
创建 PipedOutputStream
可以使用两种方式创建 PipedOutputStream:带或不带现有的 PipedInputStream。如果创建时不带,则必须稍后连接。连接建立了线程之间的通信通道。
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class Main {
public static void main(String[] args) {
try {
// Create unconnected output stream
PipedOutputStream pos1 = new PipedOutputStream();
System.out.println("Created unconnected PipedOutputStream");
// Create and connect to existing input stream
PipedInputStream pis = new PipedInputStream();
PipedOutputStream pos2 = new PipedOutputStream(pis);
System.out.println("Created connected PipedOutputStream");
pos1.close();
pos2.close();
pis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
此示例演示了创建 PipedOutputStream 的不同方法。第一个是未连接的,需要稍后显式连接。第二个是预先连接到现有的 PipedInputStream。完成后务必关闭流。
将 PipedOutputStream 连接到 PipedInputStream
输出流和输入流之间的连接可以在创建期间建立,也可以稍后使用 connect 方法建立。每个流只允许一个连接。尝试重新连接会抛出 IOException。
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class Main {
public static void main(String[] args) {
try {
PipedInputStream pis = new PipedInputStream();
PipedOutputStream pos = new PipedOutputStream();
// Connect the streams
pos.connect(pis);
System.out.println("Streams connected successfully");
// Verify connection
if (pis.available() == 0) {
System.out.println("No data in pipe yet");
}
pos.close();
pis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
此示例展示了如何在创建后将 PipedOutputStream 连接到 PipedInputStream。必须在写入数据之前建立连接。available 方法检查输入流中是否存在数据。
将数据写入 PipedOutputStream
连接后,可以将数据写入输出流并从输入流中读取。如果管道缓冲区已满,写入操作将被阻塞。管道具有默认的缓冲区大小,但可以在创建输入流时指定。
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class Main {
public static void main(String[] args) {
try {
PipedInputStream pis = new PipedInputStream();
PipedOutputStream pos = new PipedOutputStream(pis);
// Write data to the output stream
String message = "Hello from PipedOutputStream!";
pos.write(message.getBytes());
System.out.println("Data written to pipe");
// Read from input stream
byte[] buffer = new byte[1024];
int bytesRead = pis.read(buffer);
System.out.println("Read from pipe: " +
new String(buffer, 0, bytesRead));
pos.close();
pis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
此示例演示了将数据写入 PipedOutputStream 并从连接的 PipedInputStream 中读取它。write 方法接受字节数组用于批量数据传输。读取操作会阻塞,直到数据可用。
使用管道进行线程间通信
管道流通常用于线程之间的通信。一个线程写入输出流,而另一个线程从输入流读取。这提供了一种线程安全的数据交换方式。
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class Main {
public static void main(String[] args) {
try {
PipedInputStream pis = new PipedInputStream();
PipedOutputStream pos = new PipedOutputStream(pis);
// Writer thread
Thread writer = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
String msg = "Message " + i + "\n";
pos.write(msg.getBytes());
Thread.sleep(500);
}
pos.close();
} catch (Exception e) {
e.printStackTrace();
}
});
// Reader thread
Thread reader = new Thread(() -> {
try {
int data;
while ((data = pis.read()) != -1) {
System.out.print((char) data);
}
pis.close();
} catch (IOException e) {
e.printStackTrace();
}
});
writer.start();
reader.start();
writer.join();
reader.join();
} catch (Exception e) {
e.printStackTrace();
}
}
}
此示例展示了使用管道流的线程间通信。写入线程发送消息,而读取线程显示它们。管道自动处理线程之间的同步。
使用 PipedOutputStream 处理大型数据
在处理大型数据时,管理管道缓冲区大小并在生产者和消费者之间进行协调非常重要。管道的容量有限,因此如果消费者读取速度不够快,生产者可能会被阻塞。
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class Main {
public static void main(String[] args) {
try {
// Create pipe with larger buffer (10KB)
PipedInputStream pis = new PipedInputStream(10240);
PipedOutputStream pos = new PipedOutputStream(pis);
Thread producer = new Thread(() -> {
try {
byte[] data = new byte[4096];
for (int i = 0; i < 10; i++) {
// Fill buffer with pattern
for (int j = 0; j < data.length; j++) {
data[j] = (byte) (i + j);
}
pos.write(data);
System.out.println("Produced block " + i);
}
pos.close();
} catch (IOException e) {
e.printStackTrace();
}
});
Thread consumer = new Thread(() -> {
try {
byte[] buffer = new byte[1024];
int bytesRead;
int total = 0;
while ((bytesRead = pis.read(buffer)) != -1) {
total += bytesRead;
System.out.println("Consumed " + bytesRead +
" bytes (total: " + total + ")");
}
pis.close();
} catch (IOException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
producer.join();
consumer.join();
} catch (Exception e) {
e.printStackTrace();
}
}
}
此示例演示了如何使用自定义缓冲区大小处理更大的数据量。生产者创建 4KB 的块,而消费者以 1KB 的块读取。更大的管道缓冲区有助于防止数据传输期间的阻塞。
使用 PipedOutputStream 进行错误处理
在使用管道流时,正确的错误处理至关重要。常见问题包括管道损坏、连接失败和线程中断。始终在 finally 块中关闭流或使用 try-with-resources。
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class Main {
public static void main(String[] args) {
PipedInputStream pis = null;
PipedOutputStream pos = null;
try {
pis = new PipedInputStream();
pos = new PipedOutputStream();
// Attempt to write before connecting
try {
pos.write("Test".getBytes());
} catch (IOException e) {
System.out.println("Expected error: " + e.getMessage());
}
// Connect properly
pos.connect(pis);
// Start reader thread
Thread reader = new Thread(() -> {
try {
byte[] buffer = new byte[100];
int bytes = pis.read(buffer);
System.out.println("Read: " + new String(buffer, 0, bytes));
} catch (IOException e) {
System.out.println("Reader error: " + e.getMessage());
}
});
reader.start();
// Write data
pos.write("Successful communication".getBytes());
pos.flush();
reader.join();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (pos != null) pos.close();
if (pis != null) pis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
此示例演示了使用管道流进行的正确错误处理。它显示了写入未连接的管道时会发生什么,并在 finally 块中包含正确的清理操作。读取线程处理读取期间潜在的 IOExceptions。
来源
在本文中,我们介绍了 Java PipedOutputStream 类的基本方法和功能。理解这些概念对于在 Java 应用程序中使用线程间通信至关重要。
作者
列出所有Java教程。