NightSkyJeff
NightSkyJeff17mo ago

NightSkyJeff – 00-14 Mar 21

I am attempting to split one gzipped file (stream) into two gzipped files (streams). I have the readable stream, I am piping it through zlib.createGunzip(), and piping that to a readline function that reads a line from the stream and writes it to one of two streams, both of which are piping via zlib.createGzip(). The problem I am having is that the two resulting files are NOT gzipped.
const stream = GetSomeReadableGzippedStream();
const dest = 'out.%partition%.txt.gz';
const partitions = 2;
const outs = [...Array(partitions)].map((_, i) =>
zlib.createGzip().pipe(createWriteStream(dest.replace('%partition%', i+1)))
);
let lineNum = 0;

const readline = async function* (stream) {
const reader = ReadStreamLineByLine(stream);
for await (const line of reader) {
outs[lineNum++ % partitions].write(line);
}
};

await pipeline(
stream,
zlib.createGunzip(),
readline,
);
const stream = GetSomeReadableGzippedStream();
const dest = 'out.%partition%.txt.gz';
const partitions = 2;
const outs = [...Array(partitions)].map((_, i) =>
zlib.createGzip().pipe(createWriteStream(dest.replace('%partition%', i+1)))
);
let lineNum = 0;

const readline = async function* (stream) {
const reader = ReadStreamLineByLine(stream);
for await (const line of reader) {
outs[lineNum++ % partitions].write(line);
}
};

await pipeline(
stream,
zlib.createGunzip(),
readline,
);
2 Replies
NightSkyJeff
NightSkyJeff17mo ago
I figured it out. I created multiple pipelines:
const stream = getFileInBucket(parseGsSpec(src)).createReadStream();
const pipelines = [...Array(partitions)].map((_, i) => [ createGzip(), createWriteStream(dest.replace('%partition%', i+1)) ]);
let lineNum = 0;

const readline = async function* (stream) {
const reader = streamReadline(stream);

for await (const line of reader) {
const idx = lineNum++ % partitions;
pipelines[idx][0].write(line);
}

pipelines.forEach((pipe) => pipe[0].end());
};

await Promise.all([
pipeline(
stream,
createGunzip(),
readline,
),
...pipelines.map((pipe) => pipeline(...pipe)),
]);
const stream = getFileInBucket(parseGsSpec(src)).createReadStream();
const pipelines = [...Array(partitions)].map((_, i) => [ createGzip(), createWriteStream(dest.replace('%partition%', i+1)) ]);
let lineNum = 0;

const readline = async function* (stream) {
const reader = streamReadline(stream);

for await (const line of reader) {
const idx = lineNum++ % partitions;
pipelines[idx][0].write(line);
}

pipelines.forEach((pipe) => pipe[0].end());
};

await Promise.all([
pipeline(
stream,
createGunzip(),
readline,
),
...pipelines.map((pipe) => pipeline(...pipe)),
]);
reactibot
reactibot17mo ago
This thread hasn’t had any activity in 12 hours, so it’s now locked. Threads are closed automatically after 12 hours. If you have a followup question, you may want to reply to this thread so other members know they're related. https://discord.com/channels/102860784329052160/565213527673929729/1087529608640999555