字符串到缓冲流并不总是写入数据

string to bufferstream not always writing data

我有一个云函数在 pubsub 主题中接收 json 字符串。 目标是将一些数据提取到新的 json 字符串中。 接下来将其解析为 JSONL。 最后将其流式传输到 Google Cloud Storage。

我注意到有时文件似乎包含数据,有时却没有。 pubsub 工作正常,数据进入这个云功能也很好。

我尝试在我认为可能合适的地方添加一些异步等待,但我担心它与缓冲流有关。这两个主题都是关于我难以理解的地方。

可能是什么问题?

const stream = require('stream');
const { Storage } = require('@google-cloud/storage');

// Initiate the source
const bufferStream = new stream.PassThrough();

// Creates a client
const storage = new Storage();

// save stream to bucket
const toBucket = (message, filename) => {
  // Write your buffer
  bufferStream.end(Buffer.from(message));

  const myBucket = storage.bucket(process.env.BUCKET);
  const file = myBucket.file(filename);
  // Pipe the 'bufferStream' into a 'file.createWriteStream' method.
  bufferStream.pipe(file.createWriteStream({
    validation: 'md5',
  }))
    .on('error', (err) => { console.error(err); })
    .on('finish', () => {
      // The file upload is complete.
      console.log(`${filename} is uploaded`);
    });
};

// extract correct fields
const extract = (entry) => ({
  id: entry.id,
  status: entry.status,
  date_created: entry.date_created,
  discount_total: entry.discount_total,
  discount_tax: entry.discount_tax,
  shipping_total: entry.shipping_total,
  shipping_tax: entry.shipping_tax,
  total: entry.total,
  total_tax: entry.total_tax,
  customer_id: entry.customer_id,
  payment_method: entry.payment_method,
  payment_method_title: entry.payment_method_title,
  transaction_id: entry.transaction_id,
  date_completed: entry.date_completed,
  billing_city: entry.billing.city,
  billing_state: entry.billing.state,
  billing_postcode: entry.billing.postcode,
  coupon_lines_id: entry.coupon_lines.id,
  coupon_lines_code: entry.coupon_lines.code,
  coupon_lines_discount: entry.coupon_lines.discount,
  coupon_lines_discount_tax: entry.coupon_lines.discount_tax,
});

// format json to jsonl
const format = async (message) => {
  let jsonl;
  try {
    // extract only the necessary

    const jsonMessage = await JSON.parse(message);

    const rows = await jsonMessage.map((row) => {
      const extractedRow = extract(row);
      return `${JSON.stringify(extractedRow)}\n`;
    });
    // join all lines as one string with no join symbol
    jsonl = rows.join('');

    console.log(jsonl);
  } catch (e) {
    console.error('jsonl conversion failed');
  }
  return jsonl;
};

exports.jsonToBq = async (event, context) => {
  const message = Buffer.from(event.data, 'base64').toString();
  const { filename } = event.attributes;

  console.log(filename);

  const jsonl = await format(message, filename);
  toBucket(jsonl, filename);
};

通过将 bufferstream const 移动到 tobucket 函数中解决了这个问题。