2012-06-18 10 views
6

Giả sử bạn có một rất lớn (> 1GB) CSV của id kỷ lục:Làm thế nào để chạy một chức năng async cho mỗi dòng một (> 1GB) tập tin rất lớn trong Node.js

655453 
4930285 
493029 
4930301 
493031 
... 

Và đối với mỗi id bạn muốn thực hiện cuộc gọi REST API để tìm nạp dữ liệu bản ghi, chuyển đổi nó cục bộ và chèn nó vào một cơ sở dữ liệu cục bộ.

Bạn làm như thế nào với Node.js 'Readable Stream?

Câu hỏi của tôi về cơ bản là: Làm thế nào để bạn đọc một tệp rất lớn, từng dòng một, chạy một hàm async cho mỗi dòng và [tùy chọn] có thể bắt đầu đọc tệp từ một dòng cụ thể?

Từ câu hỏi Quora sau tôi bắt đầu học cách sử dụng fs.createReadStream:

http://www.quora.com/What-is-the-best-way-to-read-a-file-line-by-line-in-node-js

var fs = require('fs'); 
var lazy = require('lazy'); 

var stream = fs.createReadStream(path, { 
    flags: 'r', 
    encoding: 'utf-8' 
}); 

new lazy(stream).lines.forEach(function(line) { 
    var id = line.toString(); 
    // pause stream 
    stream.pause(); 
    // make async API call... 
    makeAPICall(id, function() { 
    // then resume to process next id 
    stream.resume(); 
    }); 
}); 

Nhưng, giả đó không làm việc, bởi vì lazy module buộc bạn phải đọc toàn bộ tập tin (dưới dạng luồng, nhưng không có tạm dừng). Cách tiếp cận đó dường như không hoạt động.

Một điều nữa là, tôi muốn có thể bắt đầu xử lý tệp này từ một dòng cụ thể. Lý do cho việc này là, xử lý mỗi id (thực hiện cuộc gọi api, làm sạch dữ liệu, v.v.) có thể mất đến nửa giây cho mỗi bản ghi vì vậy tôi không muốn phải bắt đầu từ đầu tệp mỗi lần . Cách tiếp cận ngây thơ tôi đang nghĩ đến việc sử dụng là chỉ cần nắm bắt số dòng của id cuối cùng được xử lý và lưu lại. Sau đó, khi bạn phân tích cú pháp tệp một lần nữa, bạn truyền qua tất cả các id, từng dòng một, cho đến khi bạn tìm thấy số dòng bạn đã dừng lại, và sau đó bạn thực hiện công việc kinh doanh makeAPICall. Một cách tiếp cận ngây thơ khác là viết các tệp nhỏ (nói 100 id) và xử lý từng tệp một lúc (bộ dữ liệu đủ nhỏ để thực hiện mọi thứ trong bộ nhớ mà không có luồng IO). Có cách nào tốt hơn để làm điều này?

tôi có thể thấy cách này được khôn lanh (và nơi node-lazy do thỏa thuận hợp) vì chunk trong stream.on('data', function(chunk) {}); có thể chứa chỉ phần của một dòng (nếu BUFFERSIZE là nhỏ, mỗi đoạn có thể 10 dòng nhưng vì id là độ dài thay đổi, nó chỉ có thể là 9,5 dòng hoặc bất cứ điều gì). Đây là lý do tại sao tôi tự hỏi cách tiếp cận tốt nhất là gì đối với câu hỏi trên.

+0

đoán đây là những gì redis và việc làm nền dành cho ... –

+0

bắt đầu nhìn đầy hứa hẹn: https://gist.github.com/2947293 –

+0

tôi đăng một giải pháp cho một câu hỏi tương tự cho phân tích rất lớn tệp, sử dụng luồng, đồng bộ. xem: http://stackoverflow.com/questions/16010915/parsing-huge-logfiles-in-node-js-read-in-line-by-line/23695940#23695940 – Gerard

Trả lời

1

Tôi đoán bạn không cần sử dụng node-lazy. Đây là những gì tôi tìm thấy trong Node docs:

Event: data

function (data) { } 

Sự kiện data phát ra hoặc là một Buffer (theo mặc định) hoặc một string nếu setEncoding() được sử dụng.

Vì vậy, điều đó có nghĩa là bạn gọi số setEncoding() trên luồng của mình thì cuộc gọi lại sự kiện data của bạn sẽ chấp nhận tham số chuỗi. Sau đó, trong cuộc gọi lại này, bạn có thể gọi các phương thức .pause().resume().

Mã giả sẽ trông như thế này:

stream.setEncoding('utf8'); 
stream.addListener('data', function (line) { 
    // pause stream 
    stream.pause(); 
    // make async API call... 
    makeAPICall(line, function() { 
     // then resume to process next line 
     stream.resume(); 
    }); 
}) 

Mặc dù các tài liệu không chỉ định rõ ràng dòng được đọc từng dòng tôi cho rằng đó là trường hợp cho con suối tập tin. Ít nhất là trong các ngôn ngữ và nền tảng khác, luồng văn bản hoạt động theo cách đó và tôi không thấy lý do nào cho luồng Node khác nhau.

+1

luồng không được xếp theo bộ đệm, nó cung cấp cho bạn một đoạn dữ liệu có thể hoặc không thể kết thúc trên một dòng mới. – BCoates

0

liên quan đến Andrew Андрей Листочкин của câu trả lời:

Bạn có thể sử dụng một mô-đun như byline để có được một data sự kiện riêng biệt cho mỗi dòng. Đó là luồng biến đổi xung quanh dòng phim gốc, tạo ra sự kiện data cho mỗi đoạn. Điều này cho phép bạn tạm dừng sau mỗi dòng.

byline sẽ không đọc toàn bộ tệp vào bộ nhớ như lazy.

var fs = require('fs'); 
var byline = require('byline'); 

var stream = fs.createReadStream('bigFile.txt'); 
stream.setEncoding('utf8'); 

// Comment out this line to see what the transform stream changes. 
stream = byline.createStream(stream); 

// Write each line to the console with a delay. 
stream.on('data', function(line) { 
    // Pause until we're done processing this line. 
    stream.pause(); 

    setTimeout(() => { 
     console.log(line); 

     // Resume processing. 
     stream.resume(); 
    }, 200); 
});