FIWARE Cygnus -> cartodb sinks.NGSISink:持久性错误,400 错误请求

FIWARE Cygnus -> cartodb sinks.NGSISink: Persistence error, 400 Bad request

我正在尝试将 cygnus (1.4.0_SNAPSHOT) 连接到 cartodb。我 运行 在本地,我使用脚本向 cygnus 发送通知。脚本 运行s 好的,但是 cygnus 说:

ERROR sinks.NGSISink: Persistence error (The query 'INSERT INTO jcarneroatos.x002fpeoplelocation (recvtime,fiwareservicepath,entityid,entitytype,the_geom) VALUES ('2016-10-31T19:04:00.994Z','/peoplelocation','Person:1','Person',ST_SetSRID(ST_MakePoint({"coordinates":[-4.423032856,36.721290055]), 4326))' could not be executed. CartoDB response: 400 Bad Request)

有人知道会发生什么吗?下面我放了我的配置文件以供参考,谢谢!

我在 CARTO 的用户名是 "jcarneroatos",域名是 https://jcarneroatos.carto.com。这是我用来模拟来自 Orion Context Broker 的通知的脚本:

#/bin/bash
HOST=localhost
PORT=5050
SERVICE=jcarneroatos
SUBSERVICE=/peoplelocation

#send notification
NOTIFICATION=$(\
curl http://$HOST:$PORT/notify \
    -v -s -S \
    --header "Content-Type: application/json; charset=utf-8" \
    --header 'Accept: application/json' \
    --header "Fiware-Service: $SERVICE" \
    --header "Fiware-ServicePath: $SUBSERVICE" \
    -d '
    {
        "contextResponses": [
            {
                "contextElement": {
                    "attributes": [
                        {
                            "metadatas": [
                                {
                                    "name": "location",
                                    "type": "string",
                                    "value": "WGS84"
                                }
                            ],
                            "name": "location",
                            "type": "geo:json",
                            "value": {
                                "coordinates": [
                                    -4.423032856,
                                    36.721290055
                                ],
                                "type": "Point"
                            }
                        }
                    ],
                    "id": "Person:1",
                    "isPattern": "false",
                    "type": "Person"
                },
                "statusCode": {
                    "code": "200",
                    "reasonPhrase": "OK"
                }
            }
        ],
        "originator": "localhost",
        "subscriptionId": "58178396634ded66caac35b2"
    }')
if [ -z "$NOTIFICATION" ]; then
    echo "Ok"
else
    echo $NOTIFICATION
fi

这是 cartodb 数据集的结构:

x002fpeoplelocation
cartodb_id | the_geom | entityid | entitytype | fiwareservicepath | recvtime
  number   | geometry |  string  |   string   |      string       |   date

这是 cygnus 配置文件:

cygnusagent.sources = http-source
cygnusagent.sinks = cartodb-sink
cygnusagent.channels =cartodb-channel

cygnusagent.sources.http-source.channels = cartodb-channel
cygnusagent.sources.http-source.type = org.apache.flume.source.http.HTTPSource
cygnusagent.sources.http-source.port = 5050
cygnusagent.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.NGSIRestHandler
cygnusagent.sources.http-source.handler.notification_target = /notify
cygnusagent.sources.http-source.handler.default_service = jcarneroatos
cygnusagent.sources.http-source.handler.default_service_path = /peoplelocation
cygnusagent.sources.http-source.interceptors = ts gi
cygnusagent.sources.http-source.interceptors.ts.type = timestamp
cygnusagent.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.NGSIGroupingInterceptor$Builder
cygnusagent.sources.http-source.interceptors.gi.grouping_rules_conf_file = /home/cygnus/APACHE_FLUME_HOME/conf/grouping_rules.conf

cygnusagent.sinks.cartodb-sink.type = com.telefonica.iot.cygnus.sinks.NGSICartoDBSink
cygnusagent.sinks.cartodb-sink.channel = cartodb-channel
cygnusagent.sinks.cartodb-sink.enable_grouping = false
cygnusagent.sinks.cartodb-sink.enable_name_mappings = false
cygnusagent.sinks.cartodb-sink.enable_lowercase = false
cygnusagent.sinks.cartodb-sink.data_model = dm-by-service-path
cygnusagent.sinks.cartodb-sink.keys_conf_file = /home/cygnus/APACHE_FLUME_HOME/conf/cartodb_keys.conf
cygnusagent.sinks.cartodb-sink.flip_coordinates = false
cygnusagent.sinks.cartodb-sink.enable_raw = true
cygnusagent.sinks.cartodb-sink.enable_distance = false
cygnusagent.sinks.cartodb-sink.batch_size = 100
cygnusagent.sinks.cartodb-sink.batch_timeout = 30
cygnusagent.sinks.cartodb-sink.batch_ttl = 10
cygnusagent.sinks.cartodb-sink.backend.max_conns = 500
cygnusagent.sinks.cartodb-sink.backend.max_conns_per_route = 100

cygnusagent.channels.cartodb-channel.type = memory
cygnusagent.channels.cartodb-channel.capacity = 1000
cygnusagent.channels.cartodb-channel.transactionCapacity = 100

最后 cartodb_keys.conf 文件(没有密钥):

{
   "cartodb_keys": [
      {
         "username": "jcarneroatos",
         "endpoint": "https://jcarneroatos.carto.com",
         "key": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"   
      }
   ]
}

更新: 在 DEBUG 模式下执行 Cygnus 并检查日志后,CARTO 似乎正在返回:

{"error":["syntax error at or near \"{\""]}

这是完整的日志:http://pastebin.com/p9VyUU8n

问题是 geo:json 类型目前不受 NGSICartoDBSink 支持。根据 Orion Context Broker 规范,此接收器了解通知地理定位属性的某些方式;这些:

  • 使用 geo:point 类型,并在值字段中发送格式为 "latitude, longitude" 的坐标。
  • 使用类型 string 和值 WGS84location 元数据,并发送格式为 "latitude, longitude".
  • 的值字段中的坐标

请注意:

  • 以上选项是排他性的,即不能同时使用。
  • location 元数据在 Orion 中已弃用,但仍可使用。

虽然支持 geo:json(我会开始处理它,它可能会在 sprint/month 期间准备好),但我建议您使用 geo:point 类型。

编辑 1

我在此处添加一个 Cygnus 执行示例,当收到涉及地理定位属性(geo:point 类型)的通知时。

Cygnus 版本:

1.6.0

天鹅座配置:

cygnus-ngsi.sources = http-source
cygnus-ngsi.sinks = raw-sink
cygnus-ngsi.channels = raw-channel

cygnus-ngsi.sources.http-source.type = org.apache.flume.source.http.HTTPSource
cygnus-ngsi.sources.http-source.channels = raw-channel
cygnus-ngsi.sources.http-source.port = 5050
cygnus-ngsi.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.NGSIRestHandler
cygnus-ngsi.sources.http-source.handler.notification_target = /notify
cygnus-ngsi.sources.http-source.handler.default_service = default
cygnus-ngsi.sources.http-source.handler.default_service_path = /
cygnus-ngsi.sources.http-source.interceptors = ts
cygnus-ngsi.sources.http-source.interceptors.ts.type = timestamp

cygnus-ngsi.sinks.raw-sink.type = com.telefonica.iot.cygnus.sinks.NGSICartoDBSink
cygnus-ngsi.sinks.raw-sink.channel = raw-channel
cygnus-ngsi.sinks.raw-sink.enable_grouping = false
cygnus-ngsi.sinks.raw-sink.keys_conf_file = /usr/cygnus/conf/cartodb_keys.conf
cygnus-ngsi.sinks.raw-sink.swap_coordinates = false
cygnus-ngsi.sinks.raw-sink.enable_raw = true
cygnus-ngsi.sinks.raw-sink.enable_distance = false
cygnus-ngsi.sinks.raw-sink.enable_raw_snapshot = false
cygnus-ngsi.sinks.raw-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.raw-sink.batch_size = 50
cygnus-ngsi.sinks.raw-sink.batch_timeout = 10
cygnus-ngsi.sinks.raw-sink.batch_ttl = 0
cygnus-ngsi.sinks.raw-sink.batch_retries = 5000

cygnus-ngsi.channels.raw-channel.type = com.telefonica.iot.cygnus.channels.CygnusMemoryChannel
cygnus-ngsi.channels.raw-channel.capacity = 1000
cygnus-ngsi.channels.raw-channel.transactionCapacity = 100

创建 table:

$ curl -X GET -G "https://<my_user>.cartodb.com/api/v2/sql?api_key=<my_key>" --data-urlencode "q=CREATE TABLE x002ftestxffffx0043ar1xffffx0043ar (recvTime text, fiwareServicePath text, entityId text, entityType text, speed float, speed_md text, the_geom geometry(POINT,4326))"
{"rows":[],"time":0.005,"fields":{},"total_rows":0}

模拟通知的脚本:

$ cat notification.sh
#!/bin/sh

URL=

if [ "" != "" ]
then
   SERVICE=
else
   SERVICE=default
fi

if [ "" != "" ]
then
   SERVICE_PATH=
else
   SERVICE_PATH=/
fi

curl $URL -v -s -S --header 'Content-Type: application/json; charset=utf-8' --header 'Accept: application/json' --header 'User-Agent: orion/0.10.0' --header "Fiware-Service: $SERVICE" --header "Fiware-ServicePath: $SERVICE_PATH" -d @- 
<<EOF
{
  "subscriptionId" : "51c0ac9ed714fb3b37d7d5a8",
  "originator" : "localhost",
  "contextResponses" : [
    {
      "contextElement" : {
        "attributes" : [
          {
            "name" : "speed",
            "type" : "float",
            "value" : ""
          },
          {
            "name" : "the_geom",
            "type" : "geo:point",
            "value" : ", "
          }
        ],
        "type" : "Car",
        "isPattern" : "false",
        "id" : "Car1"
      },
      "statusCode" : {
        "code" : "200",
        "reasonPhrase" : "OK"
      }
    }
  ]
}
EOF

脚本执行:

$ ./notification.sh http://localhost:5050/notify <my_user> /test 40.40 -3.4 120
*   Trying ::1...
* Connected to localhost (::1) port 5050 (#0)
> POST /notify HTTP/1.1
> Host: localhost:5050
> Content-Type: application/json; charset=utf-8
> Accept: application/json
> User-Agent: orion/0.10.0
> Fiware-Service: <my_user>
> Fiware-ServicePath: /test
> Content-Length: 569
> 
* upload completely sent off: 569 out of 569 bytes
< HTTP/1.1 200 OK
< Transfer-Encoding: chunked
< Server: Jetty(6.1.26)
< 
* Connection #0 to host localhost left intact

Cygnus 在收到通知时登录:

time=2016-12-02T13:48:27.310UTC | lvl=INFO | corr=eb73b8d5-af9b-48ea-8ce7-ff21edc957f3 | trans=eb73b8d5-af9b-48ea-8ce7-ff21edc957f3 | srv=<my_user> | subsrv=/test | comp=cygnus-ngsi | op=getEvents | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[282] : [NGSIRestHandler] Starting internal transaction (eb73b8d5-af9b-48ea-8ce7-ff21edc957f3)
time=2016-12-02T13:48:27.312UTC | lvl=INFO | corr=eb73b8d5-af9b-48ea-8ce7-ff21edc957f3 | trans=eb73b8d5-af9b-48ea-8ce7-ff21edc957f3 | srv=<my_user> | subsrv=/test | comp=cygnus-ngsi | op=getEvents | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[299] : [NGSIRestHandler] Received data ({  "subscriptionId" : "51c0ac9ed714fb3b37d7d5a8",  "originator" : "localhost",  "contextResponses" : [    {      "contextElement" : {        "attributes" : [          {            "name" : "speed",            "type" : "float",            "value" : "120"          },          {            "name" : "the_geom",            "type" : "geo:point",            "value" : "40.40, -3.4"          }        ],        "type" : "Car",        "isPattern" : "false",        "id" : "Car1"      },      "statusCode" : {        "code" : "200",        "reasonPhrase" : "OK"      }    }  ]})
time=2016-12-02T13:48:36.404UTC | lvl=INFO | corr=eb73b8d5-af9b-48ea-8ce7-ff21edc957f3 | trans=eb73b8d5-af9b-48ea-8ce7-ff21edc957f3 | srv=<my_user> | subsrv=/test | comp=cygnus-ngsi | op=persistRawAggregation | msg=com.telefonica.iot.cygnus.sinks.NGSICartoDBSink[553] : [raw-sink] Persisting data at NGSICartoDBSink. Schema (<my_user>), Table (x002ftestxffffx0043ar1xffffx0043ar), Data (('2016-12-02T13:48:27.381Z','/test','Car1','Car',ST_SetSRID(ST_MakePoint(40.40,-3.4), 4326),'120','[]'))
time=2016-12-02T13:48:38.237UTC | lvl=INFO | corr=eb73b8d5-af9b-48ea-8ce7-ff21edc957f3 | trans=eb73b8d5-af9b-48ea-8ce7-ff21edc957f3 | srv=<my_user> | subsrv=/test | comp=cygnus-ngsi | op=processNewBatches | msg=com.telefonica.iot.cygnus.sinks.NGSISink[514] : Finishing internal transaction (eb73b8d5-af9b-48ea-8ce7-ff21edc957f3)

获取数据:

$ curl -X GET -G "https://<my_user>.cartodb.com/api/v2/sql?api_key=<my_key>" --data-urlencode "q=select * from x002ftestxffffx0043ar1xffffx0043ar"
{"rows":[{"recvtime":"2016-12-02T13:48:27.381Z","fiwareservicepath":"/test","entityid":"Car1","entitytype":"Car","speed":120,"speed_md":"[]","the_geom":"0101000020E610000033333333333344403333333333330BC0"}],"time":0.001,"fields":{"recvtime":{"type":"string"},"fiwareservicepath":{"type":"string"},"entityid":{"type":"string"},"entitytype":{"type":"string"},"speed":{"type":"number"},"speed_md":{"type":"string"},"the_geom":{"type":"geometry"}},"total_rows":1}

编辑 2

此答案仅在您拥有 "enterprise" Carto 帐户时有效。请参阅我对这个问题的其他回答。

最后,在与@Javi Carnero 进行线下讨论后,我们发现 Cygnus 代码中存在与 Carto 区分 "enterprise" 和 "personal" 帐户的方式有关的错误。第一个允许企业组织下的每个用户使用 PostgreSQL 模式,而第二个有 "hardcoded" 个名为 public 的模式。由于通知的 FIWARE 服务被用作架构名称,Cygnus 无法为 "personal" 个帐户正常工作。

此类错误已修复:

在撰写本文时,错误已在 master 分支中修复。到 sprint/month 年底,Cygnus 1.7.0 将发布,包括此修复程序。

请注意,如果您有 "enterprise" 帐户,我之前的回答是完全有效的。无论如何,我将对其进行编辑以解释这一点。