航点匹配查询

Waypoint matching query

我们有collection如下。每个文档代表driver的一次行程,loc属性包含way-points,time属性包含way-points对应的时间。例如,在行程 A 中,Driver 会在 tripA.time[0]

时位于 GeoLocation tripA.loc.coordinates[0]
{
    tripId : "Trip A",
    time : [
        "2015-03-08T04:47:43.589Z",
        "2015-03-08T04:48:43.589Z",
        "2015-03-08T04:49:43.589Z",
        "2015-03-08T04:50:43.589Z",
    ],
    loc: {
        type: "MultiPoint",
        coordinates: [
            [ -73.9580, 40.8003 ],
            [ -73.9498, 40.7968 ],
            [ -73.9737, 40.7648 ],
            [ -73.9814, 40.7681 ]
        ]
    }
}
{
    tripId : "Trip B",
    time : [
        "2015-03-08T04:47:43.589Z",
        "2015-03-08T04:48:43.589Z",
        "2015-03-08T04:49:43.589Z",
        "2015-03-08T04:50:43.589Z",
    ],
    loc: {
        type: "MultiPoint",
        coordinates: [
            [ -72.9580, 41.8003 ],
            [ -72.9498, 41.7968 ],
            [ -72.9737, 41.7648 ],
            [ -72.9814, 41.7681 ]
        ]
    }
}

我们想查询在时间 t(+-10 分钟)附近(1 公里)位置“[long1,lat1]”开始并在 [long2,lat2] 结束的行程。

是否有简单有效的方法来为 MongoDB 或 Elasticsearch 制定上述查询?

如果可以,请提出问题。在 MongoDB 或 Elasticsearch 中。 (MongoDB 更可取)

这确实是作为评论开始的,但显然已经很长了。所以这是对局限性和方法的长篇解释。

您在这里要求实现的底线实际上是一个 "union query",它通常被定义为两个单独的查询,其中最终结果是每个结果的 "set intersection"。更简而言之,从您的 "origin" 查询中选择的 "trips" 与您在 "destination" 查询中找到的结果相匹配。

在一般数据库术语中,我们将 "union" 操作称为 "join" 或至少是一种条件,其中一组条件的选择 "and" 另一组条件的选择必须同时遇到一个共同的基本分组标识符。

我认为 MongoDB 中的基点也适用于弹性搜索索引,这两种数据存储机制都不支持来自直接单一查询的 "join" 的概念。

这里还有另一个 MongoDB 原则,考虑到您提出的或现有的建模,即使使用 "array" 条款中指定的项目,也无法实现 "and" 条件对坐标进行地理空间搜索,并且还考虑到您选择建模为 GeoJSON "MultiPoint" 查询无法 "choose" 该对象的哪个元素与 "nearest" 匹配。因此在考虑 "nearest match".

时会考虑 "all points"

你解释的意思很明确。因此我们可以看到 "origin" 在文档结构中本质上被标记为 "two arrays" 并在其中表示为每个数组中的 "first" 元素。 "trip"中每个进步"waypoint"的代表性数据是"location"和"time"。当然,考虑到数据点是 "paired".

,自然会在每个数组的末尾元素以 "destination" 结尾

我认为这是一种很好的存储方式,但它不遵循您在此处提到的任何一种存储解决方案的允许查询模式。

正如我已经提到的,这确实是一个 "union" 的意图,所以当我看到导致设计的想法时,最好存储这样的东西:

{
    "tripId" : "Trip A",
    "time" : ISODate("2015-03-08T04:47:43.589Z"),
    "loc": {
        "type": "Point",
        "coordinates": [ -73.9580, 40.8003 ]
    },
    "seq": 0
},
{
    "tripId" : "Trip A",
    "time" : ISODate("2015-03-08T04:48:43.589Z"),
    "loc": {
        "type": "Point",
        "coordinates": [ -73.9498, 40.7968 ]
    },
    "seq": 1
},
{
    "tripId" : "Trip A",
    "time" : ISODate("2015-03-08T04:49:43.589Z"),
    "loc": {
        "type": "Point",
        "coordinates": [ -73.9737, 40.7648 ]
    },
    "seq": 2
},
{
    "tripId" : "Trip A",
    "time" : ISODate("2015-03-08T04:50:43.589Z"),
    "loc": {
        "type": "Point",
        "coordinates": [ -73.9814, 40.7681 ]
    },
    "seq": 3,
    "isEnd": true
}

在示例中,我只是将这些文档插入到名为 "geojunk" 的集合中,然后为 "loc" 字段发出 2dsphere 索引:

db.geojunk.ensureIndex({ "loc": "2dsphere" })

然后使用 "two" .aggregate() 查询完成此处理。 .aggregate() 的原因是因为您想在每种情况下匹配 "first" 文档 "per trip"。这表示查询找到的每次旅行的最近航路点。然后基本上你想 "merge" 这些结果到某种 "hash" 结构由 "tripId".

键入

最后的逻辑是,如果 "origin" 和 "destination" 都符合给定 "trip" 的查询条件,那么这对您的整体查询来说是有效的结果。

我在这里给出的代码是一个任意的 nodejs 实现。主要是因为它是表示在 "parallel" 中发出查询以获得最佳性能的良好基础,还因为我选择使用 nedb 作为 "hash" 的示例,再加上一点 "Mongolike"语法:

var async = require('async'),
    MongoClient = require("mongodb").MongoClient;
    DataStore = require('nedb');


// Common stream upsert handler
function streamProcess(stream,store,callback) {

  stream.on("data",function(data) {
    // Clean "_id" to keep nedb happy
    data.trip = data._id;
    delete data._id;


    // Upsert to store
    store.update(
      { "trip": data.trip },
      {
        "$push": {
          "time": data.time,
          "loc": data.loc
        }
      },
      { "upsert": true },
      function(err,num) {
        if (err) callback(err);
      }
    );

  });

  stream.on("err",callback)

  stream.on("end",callback);

}

MongoClient.connect('mongodb://localhost/test',function(err,db) {
  if (err) throw err;

  db.collection('geojunk',function(err,collection) {
    if (err) throw err;

  var store = new DataStore();

    // Parallel execution
    async.parallel(
      [
        // Match origin trips
        function(callback) {
          var stream = collection.aggregate(
            [
              { "$geoNear": {
                "near": {
                  "type": "Point",
                  "coordinates": [ -73.9580, 40.8003 ],
                },
                "query": {
                  "time": {
                    "$gte": new Date("2015-03-08T04:40:00.000Z"),
                    "$lte": new Date("2015-03-08T04:50:00.000Z")
                  },
                  "seq": 0
                },
                "maxDistance": 1000,
                "distanceField": "distance",
                "spherical": true
              }},
              { "$group": {
                "_id": "$tripId",
                "time": { "$first": "$time" },
                "loc": { "$first": "$loc" }
              }}
            ],
            { "cursor": { "batchSize": 1 } }
          );
          streamProcess(stream,store,callback);
        },

        // Match destination trips
        function(callback) {
          var stream = collection.aggregate(
            [
              { "$geoNear": {
                "near": {
                  "type": "Point",
                  "coordinates": [ -73.9814, 40.7681 ]
                },
                "query": { "isEnd": true },
                "maxDistance": 1000,
                "distanceField": "distance",
                "spherical": true
              }},
              { "$group": {
                "_id": "$tripId",
                "time": { "$first": "$time" },
                "loc": { "$first": "$loc" }
              }}
            ],
            { "cursor": { "batchSize": 25 } }
          );
          streamProcess(stream,store,callback);
        }

      ],
      function(err) {
        if (err) throw err;

        // Just documents that matched origin and destination
        store.find({ "loc": { "$size": 2 }},{ "_id": 0 },function(err,result) {
          if (err) throw err;
          console.log( JSON.stringify( result, undefined, 2 ) );
          db.close();
        });
      }
    );

  });

});

在我列出的示例数据中,这将 return:

[
  {
    "trip": "Trip A",
    "time": [
      "2015-03-08T04:47:43.589Z",
      "2015-03-08T04:50:43.589Z"
    ],
    "loc": [
      {
        "type": "Point",
        "coordinates": [
          -73.958,
          40.8003
        ]
      },
      {
        "type": "Point",
        "coordinates": [
          -73.9814,
          40.7681
        ]
      }
    ]
  }
]

所以它找到了距离查询位置最近的起点和终点,也是在要求的时间内"origin"和定义为终点的东西,即"isEnd"。

因此 $geoNear operation does the matching with the returned results being the documents nearest to the point and other conditions. The $group stage is required because other documents in the same trip could "possibly" match the conditions,so it's just a way of making sure. The $first operator makes sure that the already "sorted" results will contain only one result per "trip". If you are really "sure" that will not happen with the conditions, then you could just use a standard $nearSphere 改为在聚合之外查询。所以我在这里谨慎行事。

有一点需要注意,即使在 "nedb" 中包含在内,虽然它确实支持将输出转储到磁盘,但数据仍会累积在内存中。如果您希望获得较大的结果,而不是这种类型的 "hash table" 实现,您需要以类似于显示给另一个 mongodb 集合的方式输出,并从那里检索匹配的结果。

但这并没有改变整体逻辑,这也是使用 "nedb" 进行演示的另一个原因,因为您将 "upsert" 以相同的方式访问结果集合中的文档。