从存储中读取 JSON 数组并发送到 GCP PubSub

Reading JSON Array from Storage and send to GCP PubSub

我在 Google 云存储中有多个文件,每个文件包含 JSON 数组,如下所示 -

{
  "Data": [
    {
      "Country": "IN",
      "Order": "1033616591",
      "Method": "LCD zone E same day",
      "WorkOrderNo": "1033616591",
      "Zipcode": "6020",
      "OriginalTimeSlot": "2019-05-29 14:00-18:00",
      "CurrentTimeSlot": "2019-05-29 14:00-18:00",
      "Shipment": "98:2",
      "WOCreationDate": "2019-05-27T18:21:15Z",
      "ModifactionDate": "2020-01-17T16:50:58Z",
      "Dispatch": {
        "Status": "00",
        "DispatchUnit": []
      },
      "Parcels": {
        "Parcel": [
          {
            "Active": true,
            "Weight": 29.087833333333332,
            "Volume": 0.27791899999999997,
            "Trackingstatus": "",
            "Number": "704648048"
          },
          {
            "Active": true,
            "Weight": 29.087833333333332,
            "Volume": 0.27791899999999997,
            "Trackingstatus": "",
            "Number": "704648049"
          },
          {
            "Active": true,
            "Weight": 29.087833333333332,
            "Volume": 0.27791899999999997,
            "Trackingstatus": "",
            "Number": "704648050"
          },
          {
            "Active": true,
            "Weight": 29.087833333333332,
            "Volume": 0.27791899999999997,
            "Trackingstatus": "",
            "Number": "704648051"
          },
          {
            "Active": true,
            "Weight": 29.087833333333332,
            "Volume": 0.27791899999999997,
            "Trackingstatus": "",
            "Number": "704648052"
          },
          {
            "Active": true,
            "Weight": 29.087833333333332,
            "Volume": 0.27791899999999997,
            "Trackingstatus": "",
            "Number": "704648053"
          }
        ]
      },
      "TimeSlotId": "d2916acd-1f36-4604-98dc-0d11014a045c"
    },
    {
      "Country": "IN",
      "Order": "1049968941",
      "Method": "LCD zone A",
      "WorkOrderNo": "1049968941",
      "Zipcode": "6020",
      "OriginalTimeSlot": "2019-09-26 06:00-10:00",
      "CurrentTimeSlot": "2019-09-26 06:00-10:00",
      "Shipment": "98:2",
      "WOCreationDate": "2019-09-02T16:17:13Z",
      "ModifactionDate": "2020-01-17T16:40:18Z",
      "Dispatch": {
        "Status": "00",
        "DispatchUnit": []
      },
      "Parcels": {
        "Parcel": [
          {
            "Active": true,
            "Weight": 44.5,
            "Volume": 1.147163,
            "Trackingstatus": "",
            "Number": "704987779"
          }
        ]
      },
      "TimeSlotId": "3c3da1d2-000d-402a-856d-0d89013a6961"
    }
  ]
}

现在我正在尝试从存储中读取每个文件,根据 "Country" 字段将每个 JSON 分开并将其发布到 Google PubSub。低于我的尝试 -

const express = require('express')
const app = express()
const port = 8080
const { PubSub } = require('@google-cloud/pubsub');

const projectId = 'my_project_id';
const keyFilename = 'myjson.json';
const pubsub = new PubSub({ projectId, keyFilename });
const topicName = 'pubsub_topic_name';
const subscriptionName = 'pubsub_subscription_name';
const { Storage } = require('@google-cloud/storage');
const storage = new Storage();
const timeout = 60;

const subscription = pubsub.subscription(subscriptionName);
let messageCount = 0;
const bucketName = 'temp-shipment';

app.get('/', async function (req, res) {
    var messageIds = "";
    console.log('Line 1');
    const [files] = await storage.bucket('bucketname').getFiles();
    console.log('Line 2');
    var bkt = '';
    var i = 0;
    files.forEach(file => {
        console.log('name of file' + file.name);
        var archivo = file.createReadStream();
        bkt = '';
        console.log('---- bkt value:' + bkt);
        archivo.on('data', async function (d) {
            console.log('---- bkt value 2:' + bkt);
            bkt += d;
            i = i + 1;
            console.log('---- bkt value 3:' + bkt);
        }).on('end', async function () {

            console.log('---- bkt value 4:' + bkt);
            console.log(">>>>END CALLED i" + i)
            console.log("bky:" + bkt.replace(/(?:\[rn])+/g, '').trim().toString());

            try {
                var kktrim = bkt.replace(/(?:\[rn])+/g, '').trim();
                var kk = JSON.parse(kktrim);

                for (var v of kk.Data) {

                    var myJsonObject = { message: JSON.stringify(v) }
                    const data = JSON.stringify(myJsonObject);
                    console.log("Data: " + data);
                    const dataBuffer = Buffer.from(data);
                   // console.log("buffer" + dataBuffer);
                    const messageId = await pubsub.topic(topicName).publish(dataBuffer);
                    console.log("MessageId>>>>>>>>>>>>>>>>>>>" + messageId);
                    //messageIds += ":" + messageId;
                    //console.log(messageIds);

                }
                bkt = '';
            } catch (ex) {
                console.log('error' + ex + 'in ' + file.name);
            }


        })


    });

    res.send('Message successfully sent!\nTopic: messages' + messageIds);
});
app.listen(port, () => console.log(`Example app listening on port ${port}!`))

此代码读取文件,但代码实际上转换一个文件并为其他文件提供错误。对于其他文件,它会抛出此错误 -

textPayload: "errorSyntaxError: Unexpected token { in JSON at position 1696in IN.json" 

需要一些帮助来解决这个问题。我无法理解我在这里缺少什么。

问题是因为正在附加 JSON 文件。解析的第一个 JSON 是:

{
  "Data": 
    ...
}

但是,第二个 JSON 试图解析的是:

{
  "Data": 
    ...
}
{
  "Data": 
    ...
}

因此出现 "unexpected token '{' was found in ..." 错误。

这是由变量 bkt 引起的,该变量用于解析 JSON:

var kktrim = bkt.replace(/(?:\[rn])+/g, '').trim();
var kk = JSON.parse(kktrim);

但是变量是在遍历文件之前声明的:

var bkt = '';
var i = 0;
files.forEach(file => {
    // More code
});

您可以移动变量声明以保持 JSON 文件分开并解决问题:

var i = 0;
files.forEach(file => {
    var bkt = '';
    // More code
});