This error occurs when both sides of a Node.js duplex stream (readable and writable) fail or close prematurely before completing data transfer. It commonly happens due to improper error handling, missing event listeners, or abrupt connection terminations in network streams, transform streams, or bidirectional pipes.
A duplex stream error indicates that a bidirectional stream closed unexpectedly on both the readable and writable sides before data transfer completed. Duplex streams maintain two independent internal buffers for reading and writing, allowing simultaneous bidirectional communication like TCP sockets or HTTP/2 connections. When both sides of a duplex stream fail, it typically means an unhandled error propagated through the stream chain, a connection was abruptly terminated, or proper cleanup wasn't performed. Unlike readable-only or writable-only streams, duplex streams require careful coordination between both directions to prevent premature termination. This error is particularly common when piping streams together without proper error handling, when network connections drop unexpectedly, or when transform streams encounter errors during data processing. Understanding duplex stream lifecycle management and event-driven error handling is essential for building robust streaming applications.
Always attach error event listeners to duplex and transform streams:
const { Transform } = require('stream');
const transformStream = new Transform({
transform(chunk, encoding, callback) {
try {
// Transform logic here
const result = processData(chunk);
callback(null, result);
} catch (err) {
callback(err);
}
}
});
// Critical: listen for errors
transformStream.on('error', (err) => {
console.error('Transform stream error:', err);
// Handle error gracefully
});
transformStream.on('end', () => {
console.log('Stream ended successfully');
});
transformStream.on('close', () => {
console.log('Stream closed');
});Without error listeners, unhandled errors will cause both sides of the duplex stream to fail.
Replace manual pipe() calls with pipeline() for automatic error propagation:
const { pipeline } = require('stream/promises');
const fs = require('fs');
const zlib = require('zlib');
// Bad: No error handling
// readStream.pipe(gzip).pipe(writeStream);
// Good: Automatic error handling
async function processFiles() {
try {
await pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('output.txt.gz')
);
console.log('Pipeline completed successfully');
} catch (err) {
console.error('Pipeline failed:', err);
// All streams are properly cleaned up
}
}
processFiles();pipeline() automatically destroys all streams when any stream fails, preventing resource leaks.
Respect the return value of write() to handle backpressure correctly:
const { Duplex } = require('stream');
const duplexStream = new Duplex({
write(chunk, encoding, callback) {
// Process write
this.processWrite(chunk);
callback();
},
read(size) {
// Provide data for reading
this.push(this.getData());
}
});
// Handle backpressure when writing
function writeData(stream, data) {
const canContinue = stream.write(data);
if (!canContinue) {
console.log('Backpressure detected, waiting for drain...');
stream.once('drain', () => {
console.log('Stream drained, can continue writing');
});
}
}
// Check if stream is writable before writing
if (duplexStream.writable) {
writeData(duplexStream, 'some data');
}Ignoring backpressure can cause memory buildup and stream failures.
Verify stream readiness before reading or writing:
function safeWrite(stream, data) {
// Check if stream is still writable
if (!stream.writable || stream.destroyed) {
console.error('Stream is not writable');
return false;
}
// Check if stream has ended
if (stream.writableEnded) {
console.error('Stream has already ended');
return false;
}
return stream.write(data);
}
function safeRead(stream) {
// Check if stream is still readable
if (!stream.readable || stream.destroyed) {
console.error('Stream is not readable');
return null;
}
// Check if stream has ended
if (stream.readableEnded) {
console.error('Stream has already ended');
return null;
}
return stream.read();
}This prevents operations on closed or destroyed streams.
Properly clean up streams when errors occur or operations complete:
const { Duplex } = require('stream');
class CustomDuplex extends Duplex {
constructor(options) {
super(options);
this.resources = [];
}
_construct(callback) {
// Initialize resources
this.setupResources();
callback();
}
_destroy(error, callback) {
// Clean up resources
console.log('Cleaning up duplex stream resources');
this.resources.forEach(resource => resource.close());
this.resources = [];
// Call callback with error if present
callback(error);
}
write(chunk, encoding, callback) {
// Check state before writing
if (this.destroyed) {
const err = new Error('Cannot write to destroyed stream');
if (callback) callback(err);
return false;
}
return super.write(chunk, encoding, callback);
}
}
// Usage with proper error handling
const stream = new CustomDuplex();
stream.on('error', (err) => {
console.error('Stream error:', err);
stream.destroy(err); // Ensure cleanup happens
});
// Graceful shutdown
process.on('SIGTERM', () => {
stream.destroy(); // Clean shutdown
});Proper cleanup prevents resource leaks and cascading errors.
Creating Duplex Streams from Separate Readable and Writable
You can create a duplex stream from independent readable and writable streams:
const { Duplex } = require('stream');
const { Readable, Writable } = require('stream');
const readable = Readable.from(['chunk1', 'chunk2', 'chunk3']);
const writable = new Writable({
write(chunk, encoding, callback) {
console.log('Received:', chunk.toString());
callback();
}
});
// Combine into duplex stream
const duplex = Duplex.from({
readable,
writable
});
// Handle errors from both sides
duplex.on('error', (err) => {
console.error('Duplex error:', err);
});Duplex Stream Backpressure Management
When implementing custom duplex streams, handle backpressure on both sides:
const { Duplex } = require('stream');
const duplexStream = new Duplex({
highWaterMark: 16384, // Buffer size for backpressure
read(size) {
// Called when consumer is ready for more data
if (this.hasMoreData()) {
this.push(this.getNextChunk());
} else {
// Signal end of readable side
this.push(null);
}
},
write(chunk, encoding, callback) {
// Process the incoming data
this.processChunk(chunk)
.then(() => callback())
.catch((err) => callback(err));
},
final(callback) {
// Called when writable side is ending
this.flushBuffers()
.then(() => callback())
.catch((err) => callback(err));
}
});Network Socket Error Handling
For TCP duplex streams (sockets), implement robust error recovery:
const net = require('net');
const client = net.createConnection({ port: 8080 });
client.on('connect', () => {
console.log('Connected to server');
client.write('Hello server');
});
client.on('data', (data) => {
console.log('Received:', data.toString());
});
client.on('error', (err) => {
console.error('Socket error:', err);
// Don't crash, handle gracefully
});
client.on('end', () => {
console.log('Server closed connection');
});
client.on('close', (hadError) => {
console.log(`Socket closed, error: ${hadError}`);
// Implement reconnection logic if needed
});
// Timeout handling
client.setTimeout(30000);
client.on('timeout', () => {
console.log('Socket timeout');
client.destroy(new Error('Connection timeout'));
});Transform Stream Error Propagation
Ensure errors in transform operations are properly handled:
const { Transform } = require('stream');
class SafeTransform extends Transform {
constructor(options) {
super(options);
this.errorCount = 0;
this.maxErrors = 5;
}
_transform(chunk, encoding, callback) {
try {
const result = this.processChunk(chunk);
callback(null, result);
} catch (err) {
this.errorCount++;
if (this.errorCount >= this.maxErrors) {
// Too many errors, destroy the stream
callback(new Error(`Too many errors: ${err.message}`));
} else {
// Log but continue processing
console.error('Transform error:', err);
callback(); // Skip this chunk
}
}
}
_flush(callback) {
// Called before stream ends
if (this.errorCount > 0) {
console.warn(`Stream completed with ${this.errorCount} errors`);
}
callback();
}
}Important Considerations
- Both sides of a duplex stream operate independently; errors on one side don't automatically propagate to the other
- Always use destroy(err) rather than end() when handling errors to ensure proper cleanup
- The 'finish' event fires when writable side completes; 'end' event fires when readable side completes
- Use 'close' event for final cleanup as it fires regardless of how the stream ended
- Be cautious with objectMode: true on one side but not the other - it can cause type errors
Error: EMFILE: too many open files, watch
EMFILE: fs.watch() limit exceeded
Error: Middleware next() called multiple times (next() invoked twice)
Express middleware next() called multiple times
Error: Worker failed to initialize (worker startup error)
Worker failed to initialize in Node.js
Error: EMFILE: too many open files, open 'file.txt'
EMFILE: too many open files
Error: cluster.fork() failed (cannot create child process)
cluster.fork() failed - Cannot create child process