NightSkyJeff
NightSkyJeff
RReactiflux
Created by NightSkyJeff on 1/10/2024 in #help-js
nightskyjeff – 01-27 Jan 10
I think that's the problem 🙂
4 replies
RReactiflux
Created by NightSkyJeff on 1/10/2024 in #help-js
nightskyjeff – 01-27 Jan 10
UGH
The process.stderr and process.stdout Writable streams are never closed until the Node.js process exits, regardless of the specified options.
4 replies
RReactiflux
Created by NightSkyJeff on 1/10/2024 in #help-js
nightskyjeff – 01-27 Jan 10
If I use for await (const chunk of feed) { ... } instead of piping to stdout, the feed does indeed end as expected. But I can't get it to end with just the pipe()
4 replies
RReactiflux
Created by NightSkyJeff on 8/1/2023 in #help-js
nightskyjeff – 03-10 Aug 1
await ssh.get(
filenameCsv,
transform
.pipe(csvParser)
.pipe(jsonToCsv)
.pipe(
bucket.file(filenameCsv).createWriteStream({
resumable: false,
validation: false,
contentType: "auto",
})
)
);
await ssh.get(
filenameCsv,
transform
.pipe(csvParser)
.pipe(jsonToCsv)
.pipe(
bucket.file(filenameCsv).createWriteStream({
resumable: false,
validation: false,
contentType: "auto",
})
)
);
3 replies
RReactiflux
Created by NightSkyJeff on 8/1/2023 in #help-js
nightskyjeff – 03-10 Aug 1
I think I figured it out, with Transform and .pipe()
3 replies
RReactiflux
Created by NightSkyJeff on 3/21/2023 in #help-js
NightSkyJeff – 00-14 Mar 21
3 replies
RReactiflux
Created by NightSkyJeff on 3/21/2023 in #help-js
NightSkyJeff – 00-14 Mar 21
I figured it out. I created multiple pipelines:
const stream = getFileInBucket(parseGsSpec(src)).createReadStream();
const pipelines = [...Array(partitions)].map((_, i) => [ createGzip(), createWriteStream(dest.replace('%partition%', i+1)) ]);
let lineNum = 0;

const readline = async function* (stream) {
const reader = streamReadline(stream);

for await (const line of reader) {
const idx = lineNum++ % partitions;
pipelines[idx][0].write(line);
}

pipelines.forEach((pipe) => pipe[0].end());
};

await Promise.all([
pipeline(
stream,
createGunzip(),
readline,
),
...pipelines.map((pipe) => pipeline(...pipe)),
]);
const stream = getFileInBucket(parseGsSpec(src)).createReadStream();
const pipelines = [...Array(partitions)].map((_, i) => [ createGzip(), createWriteStream(dest.replace('%partition%', i+1)) ]);
let lineNum = 0;

const readline = async function* (stream) {
const reader = streamReadline(stream);

for await (const line of reader) {
const idx = lineNum++ % partitions;
pipelines[idx][0].write(line);
}

pipelines.forEach((pipe) => pipe[0].end());
};

await Promise.all([
pipeline(
stream,
createGunzip(),
readline,
),
...pipelines.map((pipe) => pipeline(...pipe)),
]);
3 replies