2 dòng:nối hai (hoặc n) suối
Với thể đọc được streams
stream1
vàstream2
, những gì là một thành ngữ (ngắn gọn) cách để nhận được một dòng chứastream1
vàstream2
nối?Tôi không thể làm
stream1.pipe(outStream); stream2.pipe(outStream)
, bởi vì sau đó nội dung luồng được xáo trộn với nhau.n suối:
Với một EventEmitter mà phát ra một số không xác định của suối, ví dụ
eventEmitter.emit('stream', stream1) eventEmitter.emit('stream', stream2) eventEmitter.emit('stream', stream3) ... eventEmitter.emit('end')
gì là một thành ngữ (ngắn gọn) cách để nhận được một dòng với tất cả các con suối nối với nhau?
Trả lời
Gói combined-stream ghép các luồng. Ví dụ từ README:
var CombinedStream = require('combined-stream');
var fs = require('fs');
var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));
combinedStream.pipe(fs.createWriteStream('combined.txt'));
Tôi tin rằng bạn phải nối thêm tất cả các luồng cùng một lúc. Nếu hàng đợi trống, thì combinedStream
sẽ tự động kết thúc. Xem issue #5.
Thư viện stream-stream là phương án thay thế có rõ ràng .end
, nhưng ít phổ biến hơn và có lẽ không được kiểm tra kỹ lưỡng. Nó sử dụng API stream2 của Nút 0.10 (xem this discussion).
Bạn có thể có thể làm cho nó ngắn gọn hơn, nhưng đây là một trong những hoạt động:
var util = require('util');
var EventEmitter = require('events').EventEmitter;
function ConcatStream(streamStream) {
EventEmitter.call(this);
var isStreaming = false,
streamsEnded = false,
that = this;
var streams = [];
streamStream.on('stream', function(stream){
stream.pause();
streams.push(stream);
ensureState();
});
streamStream.on('end', function() {
streamsEnded = true;
ensureState();
});
var ensureState = function() {
if(isStreaming) return;
if(streams.length == 0) {
if(streamsEnded)
that.emit('end');
return;
}
isStreaming = true;
streams[0].on('data', onData);
streams[0].on('end', onEnd);
streams[0].resume();
};
var onData = function(data) {
that.emit('data', data);
};
var onEnd = function() {
isStreaming = false;
streams[0].removeAllListeners('data');
streams[0].removeAllListeners('end');
streams.shift();
ensureState();
};
}
util.inherits(ConcatStream, EventEmitter);
Chúng tôi theo dõi các trạng thái với streams
(hàng đợi của suối; push
để lưng và shift
từ mặt trước), isStreaming
và streamsEnded
. Khi chúng tôi có một luồng mới, chúng tôi sẽ phát trực tuyến và khi luồng kết thúc, chúng tôi sẽ ngừng nghe và thay đổi. Khi luồng luồng kết thúc, chúng tôi đặt streamsEnded
.
Trên mỗi sự kiện này, chúng tôi kiểm tra trạng thái của chúng tôi. Nếu chúng tôi đã phát trực tuyến (đang truyền luồng), chúng tôi sẽ không làm gì cả. Nếu hàng đợi trống và streamsEnded
được đặt, chúng tôi phát ra sự kiện end
. Nếu có thứ gì đó trong hàng đợi, chúng tôi sẽ tiếp tục và lắng nghe các sự kiện của nó.
* Lưu ý rằng pause
và resume
là cố vấn, vì vậy, một số luồng có thể không hoạt động chính xác và sẽ yêu cầu lưu vào bộ đệm. Bài tập này được để lại cho người đọc.
Sau khi thực hiện tất cả những điều này, tôi sẽ làm các trường hợp n=2
bằng cách xây dựng một EventEmitter
, tạo ra một ConcatStream
với nó, và phát ra hai stream
sự kiện tiếp theo là một sự kiện end
. Tôi chắc rằng nó có thể được thực hiện chính xác hơn, nhưng chúng tôi cũng có thể sử dụng những gì chúng tôi có.
streamee.js là tập hợp các máy biến áp dòng và nhà soạn nhạc dựa trên nút 1.0+ suối và bao gồm một phương pháp tiếp nhau:
var stream1ThenStream2 = streamee.concatenate([stream1, stream2]);
Cảm ơn, tôi sẽ kiểm tra. Đó là Node 0.10 tôi giả sử? –
Có nút 0.10, nhưng bạn có thể bao bọc các luồng kiểu cũ thành 0.10+ luồng như được viết trong README – atamborrino
https://github.com/joepie91/node-combined-stream2 là một sự thay thế thả trong Streams2 tương thích cho các mô-đun kết hợp dòng (được mô tả ở trên.) Nó tự động kết thúc tốt đẹp Streams1 suối.
Ví dụ mã cho hỗn hợp stream2:
var CombinedStream = require('combined-stream2');
var fs = require('fs');
var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));
combinedStream.pipe(fs.createWriteStream('combined.txt'));
này có thể được thực hiện với vani nodejs
import { PassThrough } from 'stream'
const merge = (...streams) => {
let pass = new PassThrough()
let waiting = streams.length
for (let stream of streams) {
pass = stream.pipe(pass, {end: false})
stream.once('end',() => --waiting === 0 && pass.emit('end'))
}
return pass
}
Một reduce thao tác đơn giản nên được tốt trong nodejs!
const {PassThrough} = require('stream')
let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) => {
s.pipe(pt, {end: false})
s.once('end',() => a.every(s => s.ended) && pt.emit('end'))
return pt
}, new PassThrough())
Cheers;)
Bạn có nên trả lại thứ gì đó từ việc giảm bớt không? Điều này có vẻ như 'đã tham gia' sẽ không được xác định. –
đã được sửa. thanks;) @Mark_M – Ivo
CẢNH BÁO: Điều này sẽ làm cho tất cả các luồng được nối vào luồng PassThrough song song, mà không có liên quan đến thứ tự của dữ liệu, nhiều khả năng làm hỏng dữ liệu của bạn. –
Cảm ơn Aaron! Tôi đã hy vọng sẽ có một số thư viện hiện có để tôi có thể giải quyết nó trong ba dòng. Nếu không có, tôi nghĩ tôi có thể giải nén giải pháp của bạn thành một gói. Tôi có thể sử dụng mã của bạn theo giấy phép MIT không? –
Ah, đã tìm thấy thư viện truyền trực tuyến. Xem câu trả lời của tôi. –
@JoLiss Tôi cũng đã tìm kiếm thứ gì đó đầu tiên, nhưng tôi không thể tìm thấy tùy chọn đó. Bạn chắc chắn có thể sử dụng mã của tôi trong thư viện nếu bạn vẫn muốn. –