如何将大流转换为 gzipped base64 字符串
How to convert a large stream to a gzipped base64 string
我正在构建一个分析平台,我想在将 ETL(提取转换加载)作业存储到我的数据库之前对其进行压缩。在我开始编写代码之前,我想知道是否有经验的人可以告诉我如何正确地完成它。我想 gzip 数据,然后将其转换为 base64 字符串。我是简单地 gzip,然后转换为 base64 还是不起作用?
这是我目前对这些大型数据集使用的过程。
var streamObj = athenaClient.execute('my query').toStream()
var data = [];
redis.set('Some Dashboard Data', '[')
streamObj.on('data', function(record) {
// TODO gzip record then convert to base64
if (data.length === 500) {
let tempData = JSON.stringify(data);
data = []
redis.append('Some Dashboard Data', tempData.slice(1, tempData.length - 1) + ',')
}
data.push(record);
})
}
如果这不可能,有没有办法存储压缩后的字符串?
让 node.js 环境通过使用流提供的背压来控制内存。
我会考虑这个解决方案:
inputStream
.pipe(zlib)
.pipe(transformToBase64Stream)
.pipe(redisCli);
zlib 是原生的,因此不会造成任何问题。
要转换为 base64,您可以编写转换流或在管道模式下使用外部 tools. To pipe results into redis by stream, you could spawn child process redis-cli。正如 mass insertion 和 redis cli 文章中提到的,建议用于大数据,但你必须自己处理 redis 协议。阅读提供的文章,让我知道它是否有助于解决您的问题。
只是为了进一步详细说明 Zilvinas 的回答。我将向您展示我是如何让它工作的。
const athena = require('./athena')
const redis = require('./redis')
const zlib = require('zlib')
const Stream = require('stream')
exports.persistStream = (config, query, name, transform) => {
return new Promise((resolve, reject) => {
let recordCount = 0
var transformStream = new Stream.Transform({ writableObjectMode: true, readableObjectMode: true})
transformStream._transform = function (chunk, encoding, done) {
recordCount++
if (transform) chunk = transform(chunk)
let jsonChunk = JSON.stringify([chunk])
switch (true) {
case recordCount === 1:
jsonChunk = jsonChunk.slice(0, jsonChunk.length - 1); break
default:
jsonChunk = ',' + jsonChunk.slice(1, jsonChunk.length - 1); break
}
this.push(jsonChunk)
done();
};
transformStream._final = function (done) {
this.push(']')
done()
}
const gzip = zlib.createGzip()
let buffers = []
var stream = athena.execute(query)
.toStream()
.pipe(transformStream)
.pipe(gzip)
gzip.on('data', (chunk) => {
buffers.push(chunk)
})
gzip.on('end', function () {
let buffer = Buffer.concat(buffers)
redis.set(name, buffer.toString('base64'), (err, response) => {
zlib.gzip(config, (err, buff) => {
redis.set(name + ' Config', buff.toString('base64'), (err, response) => {
if (err) {
console.log(err)
reject()
} else {
console.log(name + ' succeeded')
resolve()
}
})
})
})
})
stream.on('error', (err) => {
console.log(err)
reject()
})
})
}
我正在构建一个分析平台,我想在将 ETL(提取转换加载)作业存储到我的数据库之前对其进行压缩。在我开始编写代码之前,我想知道是否有经验的人可以告诉我如何正确地完成它。我想 gzip 数据,然后将其转换为 base64 字符串。我是简单地 gzip,然后转换为 base64 还是不起作用?
这是我目前对这些大型数据集使用的过程。
var streamObj = athenaClient.execute('my query').toStream()
var data = [];
redis.set('Some Dashboard Data', '[')
streamObj.on('data', function(record) {
// TODO gzip record then convert to base64
if (data.length === 500) {
let tempData = JSON.stringify(data);
data = []
redis.append('Some Dashboard Data', tempData.slice(1, tempData.length - 1) + ',')
}
data.push(record);
})
}
如果这不可能,有没有办法存储压缩后的字符串?
让 node.js 环境通过使用流提供的背压来控制内存。
我会考虑这个解决方案:
inputStream
.pipe(zlib)
.pipe(transformToBase64Stream)
.pipe(redisCli);
zlib 是原生的,因此不会造成任何问题。 要转换为 base64,您可以编写转换流或在管道模式下使用外部 tools. To pipe results into redis by stream, you could spawn child process redis-cli。正如 mass insertion 和 redis cli 文章中提到的,建议用于大数据,但你必须自己处理 redis 协议。阅读提供的文章,让我知道它是否有助于解决您的问题。
只是为了进一步详细说明 Zilvinas 的回答。我将向您展示我是如何让它工作的。
const athena = require('./athena')
const redis = require('./redis')
const zlib = require('zlib')
const Stream = require('stream')
exports.persistStream = (config, query, name, transform) => {
return new Promise((resolve, reject) => {
let recordCount = 0
var transformStream = new Stream.Transform({ writableObjectMode: true, readableObjectMode: true})
transformStream._transform = function (chunk, encoding, done) {
recordCount++
if (transform) chunk = transform(chunk)
let jsonChunk = JSON.stringify([chunk])
switch (true) {
case recordCount === 1:
jsonChunk = jsonChunk.slice(0, jsonChunk.length - 1); break
default:
jsonChunk = ',' + jsonChunk.slice(1, jsonChunk.length - 1); break
}
this.push(jsonChunk)
done();
};
transformStream._final = function (done) {
this.push(']')
done()
}
const gzip = zlib.createGzip()
let buffers = []
var stream = athena.execute(query)
.toStream()
.pipe(transformStream)
.pipe(gzip)
gzip.on('data', (chunk) => {
buffers.push(chunk)
})
gzip.on('end', function () {
let buffer = Buffer.concat(buffers)
redis.set(name, buffer.toString('base64'), (err, response) => {
zlib.gzip(config, (err, buff) => {
redis.set(name + ' Config', buff.toString('base64'), (err, response) => {
if (err) {
console.log(err)
reject()
} else {
console.log(name + ' succeeded')
resolve()
}
})
})
})
})
stream.on('error', (err) => {
console.log(err)
reject()
})
})
}