When processing a large file (e.g. 20000 rows), with frequent pause/resume calls, lines are dropped from the end of the file (e.g. about 4000).
I'm using pause/resume to control the # of lines being processed. Each line goes into a queue ... when the queue reaches a certain size the input is paused ... when it drops to 0 the input is resumed. This works well when the # of pause/resume calls is very few. As they get more frequent (i.e. I control the size of the queue) more data loss occurs.
A code sample (ugly) without the input file ...
var Asynchronous = require('async');
var ByLine = require('byline');
var FileSystem = require('fs');
var Readline = require('readline');
var Utilities = require('util');
var countOfTasks = 0;
var countOfProcessed = 0;
var output = Readline.createInterface({
input: process.stdin,
output: process.stdout
});
output.setPrompt('');
output.write(Utilities.format('tasks:%d processed:%d ', countOfTasks, countOfProcessed));
var interval = setInterval(function() {
output.write(null, {
ctrl: true,
name: 'u'
});
output.write(Utilities.format('tasks:%d processed:%d ', countOfTasks, countOfProcessed));
}, 250)
Asynchronous.waterfall([
function(callback) {
var queue = Asynchronous.queue(function(task, callback) {
FileSystem.appendFileSync('debug-processing-preTimeout', Utilities.format('%s\n', task.line));
setTimeout(function() {
FileSystem.appendFileSync('debug-processing-inTimeout', Utilities.format('%s\n', task.line));
callback();
}, 0);
}, 1);
// queue.drain = function() {
// callback();
// };
var lines = ByLine.createStream(FileSystem.createReadStream('lines'));
lines
.on('data', function(line) {
countOfTasks ++;
var task = {
line: line
};
FileSystem.appendFileSync('debug-pushing', Utilities.format('%s\n', line));
queue.push(task, function(error) {
FileSystem.appendFileSync('debug-processed', Utilities.format('%s\n', line));
countOfProcessed ++;
countOfTasks --;
if (countOfTasks == 0)
lines.resume();
});
if (countOfTasks >= 5)
lines.pause();
})
.on('error', callback);
}
], function(error) {
clearInterval(interval);
if (error) {
output.write(null, {
ctrl: true,
name: 'u'
});
output.write(Utilities.format('Error ... %s\n\n', error.message));
}
output.write(null, {
ctrl: true,
name: 'u'
});
output.write(Utilities.format('Done ... processed:%d\n\n', countOfProcessed));
output.close();
});
Notice that the size of the input file (lines) is different from the size of an output file (e.g. debug-pushed). It seems as if lines are missing from the end.