Spring 数据 MongoDB 使用管道聚合查找
Spring Data MongoDB Lookup with Pipeline Aggregation
我如何将以下 MongoDB 查询转换为供我的 Java Spring 应用程序使用的查询?我找不到通过提供的 lookup 方法使用 pipeline
的方法。
这是我要转换的查询。我还想指出,我没有使用 $unwind
,因为我希望 deliveryZipCodeTimings
保留为 return 对象中的分组集合。
db.getCollection('fulfillmentChannel').aggregate([
{
$match: {
"dayOfWeek": "SOME_VARIABLE_STRING_1"
}
},
{
$lookup: {
from: "deliveryZipCodeTiming",
let: { location_id: "$fulfillmentLocationId" },
pipeline: [{
$match: {
$expr: {
$and: [
{$eq: ["$fulfillmentLocationId", "$$location_id"]},
{$eq: ["$zipCode", "SOME_VARIABLE_STRING_2"]}
]
}
}
},
{
$project: { _id: 0, zipCode: 1, cutoffTime: 1 }
}],
as: "deliveryZipCodeTimings"
}
},
{
$match: {
"deliveryZipCodeTimings": {$ne: []}
}
}
])
驱动程序几乎总是落后于 MongoDB 提供的当前语言功能 - 因此一些最新和最强大的功能还不能通过 API 很好地访问。恐怕这是其中一种情况,您需要求助于使用字符串。有点像(未经测试):
AggregationOperation match = Aggregation.match(Criteria.where("dayOfWeek").is("SOME_VARIABLE_STRING_1"));
AggregationOperation match2 = Aggregation.match(Criteria.where("deliveryZipCodeTimings").ne([]));
String query = "{ $lookup: { from: 'deliveryZipCodeTiming', let: { location_id: '$fulfillmentLocationId' }, pipeline: [{ $match: { $expr: { $and: [ { $eq: ['$fulfillmentLocationId', '$$location_id']}, { $eq: ['$zipCode', 'SOME_VARIABLE_STRING_2']} ]} } }, { $project: { _id: 0, zipCode: 1, cutoffTime: 1 } }], as: 'deliveryZipCodeTimings' } }";
Aggregation.newAggregation(match, (DBObject) JSON.parse(query), match2);
根据@dnickless 提供的信息,我解决了这个问题。我将 post 完整的解决方案,希望它能在将来帮助其他人。
我正在使用 mongodb-driver:3.6.4
首先,我必须创建一个自定义聚合操作 class,以便我可以传入自定义 JSON mongodb 查询以用于聚合操作。这将允许我在我使用的驱动程序版本不支持的 $lookup
中使用 pipeline
。
public class CustomProjectAggregationOperation implements AggregationOperation {
private String jsonOperation;
public CustomProjectAggregationOperation(String jsonOperation) {
this.jsonOperation = jsonOperation;
}
@Override
public Document toDocument(AggregationOperationContext aggregationOperationContext) {
return aggregationOperationContext.getMappedObject(Document.parse(jsonOperation));
}
}
现在我们可以将自定义 JSON 查询传递到我们的 mongodb spring 实现中,剩下的就是将这些值插入 TypedAggregation查询。
public List<FulfillmentChannel> getFulfillmentChannels(
String SOME_VARIABLE_STRING_1,
String SOME_VARIABLE_STRING_2) {
AggregationOperation match = Aggregation.match(
Criteria.where("dayOfWeek").is(SOME_VARIABLE_STRING_1));
AggregationOperation match2 = Aggregation.match(
Criteria.where("deliveryZipCodeTimings").ne(Collections.EMPTY_LIST));
String query =
"{ $lookup: { " +
"from: 'deliveryZipCodeTiming'," +
"let: { location_id: '$fulfillmentLocationId' }," +
"pipeline: [{" +
"$match: {$expr: {$and: [" +
"{ $eq: ['$fulfillmentLocationId', '$$location_id']}," +
"{ $eq: ['$zipCode', '" + SOME_VARIABLE_STRING_2 + "']}]}}}," +
"{ $project: { _id: 0, zipCode: 1, cutoffTime: 1 } }]," +
"as: 'deliveryZipCodeTimings'}}";
TypedAggregation<FulfillmentChannel> aggregation = Aggregation.newAggregation(
FulfillmentChannel.class,
match,
new CustomProjectAggregationOperation(query),
match2
);
AggregationResults<FulfillmentChannel> results =
mongoTemplate.aggregate(aggregation, FulfillmentChannel.class);
return results.getMappedResults();
}
我想添加这个我的解决方案,它在某些方面重复了之前 posted 的解决方案。
Mongo driver v3.x
对于Mongodriverv3.x我得出以下解决方案:
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import com.mongodb.util.JSON;
import org.bson.Document;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
public class JsonOperation implements AggregationOperation {
private List<Document> documents;
public JsonOperation(String json) {
Object root = JSON.parse(json);
documents = root instanceof BasicDBObject
? Collections.singletonList(new Document(((BasicDBObject) root).toMap()))
: ((BasicDBList) root).stream().map(item -> new Document((Map<String, Object>) ((BasicDBObject) item).toMap())).collect(Collectors.toList());
}
@Override
public Document toDocument(AggregationOperationContext context) {
// Not necessary to return anything as we override toPipelineStages():
return null;
}
@Override
public List<Document> toPipelineStages(AggregationOperationContext context) {
return documents;
}
}
然后假设在某些资源中给出了聚合步骤 aggregations.json
:
[
{
$match: {
"userId": "..."
}
},
{
$lookup: {
let: {
...
},
from: "another_collection",
pipeline: [
...
],
as: "things"
}
},
{
$sort: {
"date": 1
}
}
]
上面class可以使用如下:
import static org.springframework.data.mongodb.core.aggregation.Aggregation.newAggregation;
Collection<ResultDao> results = mongoTemplate.aggregate(newAggregation(new JsonOperation(resourceToString("aggregations.json", StandardCharsets.UTF_8))), "some_collection", ResultDao.class).getMappedResults();
Mongo driver v4.x
由于 JSON
class 从 Mongo v4 中删除,我重写了 class 如下:
import java.util.Collections;
import java.util.List;
import org.bson.Document;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
public class JsonOperation implements AggregationOperation {
private List<Document> documents;
private static final String DUMMY_KEY = "dummy";
public JsonOperation(String json) {
documents = parseJson(json);
}
static final List<Document> parseJson(String json) {
return (json.startsWith("["))
? Document.parse("{\"" + DUMMY_KEY + "\": " + json + "}").getList(DUMMY_KEY, Document.class)
: Collections.singletonList(Document.parse(json));
}
@Override
public Document toDocument(AggregationOperationContext context) {
// Not necessary to return anything as we override toPipelineStages():
return null;
}
@Override
public List<Document> toPipelineStages(AggregationOperationContext context) {
return documents;
}
@Override
public String getOperator() {
return documents.iterator().next().keySet().iterator().next();
}
}
但由于字符串操作,现在的实现有点难看。如果有人对如何以更优雅的方式解析 object 数组有更好的想法,请编辑此 post 或发表评论。理想情况下,Mongo 核心中应该有一些方法允许解析 JSON object 或列表(returns BasicDBObject
/BasicDBList
或 Document
/List<Document>
).
另请注意,我已经跳过了在 toPipelineStages()
方法中转换 Document
实例的步骤,因为在我的情况下没有必要:
@Override
public List<Document> toPipelineStages(AggregationOperationContext context) {
return documents.stream().map(document -> context.getMappedObject(document)).collect(Collectors.toList());
}
当我使用接受的答案中解释的方式时,我遇到了一些 JSON 解析异常,所以我深入挖掘了默认的 MongoDB java 驱动程序(版本 3)文档 class 构建聚合查询,发现任何聚合查询都可以构建如下,
按如下方式替换 mongo 控制台查询中的每个元素
- 大括号({) -> 新文档()
- 参数名称相同
- 冒号(:) -> 冒号(,)
- Coma(,) -> .append()
- 方括号([) -> Arrays.asList()
AggregationOperation customLookupOperation = new AggregationOperation() {
@Override
public Document toDocument(AggregationOperationContext context) {
return new Document(
"$lookup",
new Document("from", "deliveryZipCodeTiming")
.append("let",new Document("location_id", "$fulfillmentLocationId"))
.append("pipeline", Arrays.<Object> asList(
new Document("$match", new Document("$expr", new Document("$and",
Arrays.<Object>asList(
new Document("$eq", Arrays.<Object>asList("$fulfillmentLocationId", "$$location_id")),
new Document("$eq", Arrays.<Object>asList("$zipCode", "SOME_VARIABLE_STRING_2"))
)))),
new Document("$project", new Document("_id",0).append("zipCode", 1)
.append("cutoffTime", 1)
)
))
.append("as", "deliveryZipCodeTimings")
);
}
};
终于可以在聚合管道中使用聚合操作了,
Aggregation aggregation = Aggregation.newAggregation(matchOperation,customLookupOperation,matchOperation2);
我如何将以下 MongoDB 查询转换为供我的 Java Spring 应用程序使用的查询?我找不到通过提供的 lookup 方法使用 pipeline
的方法。
这是我要转换的查询。我还想指出,我没有使用 $unwind
,因为我希望 deliveryZipCodeTimings
保留为 return 对象中的分组集合。
db.getCollection('fulfillmentChannel').aggregate([
{
$match: {
"dayOfWeek": "SOME_VARIABLE_STRING_1"
}
},
{
$lookup: {
from: "deliveryZipCodeTiming",
let: { location_id: "$fulfillmentLocationId" },
pipeline: [{
$match: {
$expr: {
$and: [
{$eq: ["$fulfillmentLocationId", "$$location_id"]},
{$eq: ["$zipCode", "SOME_VARIABLE_STRING_2"]}
]
}
}
},
{
$project: { _id: 0, zipCode: 1, cutoffTime: 1 }
}],
as: "deliveryZipCodeTimings"
}
},
{
$match: {
"deliveryZipCodeTimings": {$ne: []}
}
}
])
驱动程序几乎总是落后于 MongoDB 提供的当前语言功能 - 因此一些最新和最强大的功能还不能通过 API 很好地访问。恐怕这是其中一种情况,您需要求助于使用字符串。有点像(未经测试):
AggregationOperation match = Aggregation.match(Criteria.where("dayOfWeek").is("SOME_VARIABLE_STRING_1"));
AggregationOperation match2 = Aggregation.match(Criteria.where("deliveryZipCodeTimings").ne([]));
String query = "{ $lookup: { from: 'deliveryZipCodeTiming', let: { location_id: '$fulfillmentLocationId' }, pipeline: [{ $match: { $expr: { $and: [ { $eq: ['$fulfillmentLocationId', '$$location_id']}, { $eq: ['$zipCode', 'SOME_VARIABLE_STRING_2']} ]} } }, { $project: { _id: 0, zipCode: 1, cutoffTime: 1 } }], as: 'deliveryZipCodeTimings' } }";
Aggregation.newAggregation(match, (DBObject) JSON.parse(query), match2);
根据@dnickless 提供的信息,我解决了这个问题。我将 post 完整的解决方案,希望它能在将来帮助其他人。
我正在使用 mongodb-driver:3.6.4
首先,我必须创建一个自定义聚合操作 class,以便我可以传入自定义 JSON mongodb 查询以用于聚合操作。这将允许我在我使用的驱动程序版本不支持的 $lookup
中使用 pipeline
。
public class CustomProjectAggregationOperation implements AggregationOperation {
private String jsonOperation;
public CustomProjectAggregationOperation(String jsonOperation) {
this.jsonOperation = jsonOperation;
}
@Override
public Document toDocument(AggregationOperationContext aggregationOperationContext) {
return aggregationOperationContext.getMappedObject(Document.parse(jsonOperation));
}
}
现在我们可以将自定义 JSON 查询传递到我们的 mongodb spring 实现中,剩下的就是将这些值插入 TypedAggregation查询。
public List<FulfillmentChannel> getFulfillmentChannels(
String SOME_VARIABLE_STRING_1,
String SOME_VARIABLE_STRING_2) {
AggregationOperation match = Aggregation.match(
Criteria.where("dayOfWeek").is(SOME_VARIABLE_STRING_1));
AggregationOperation match2 = Aggregation.match(
Criteria.where("deliveryZipCodeTimings").ne(Collections.EMPTY_LIST));
String query =
"{ $lookup: { " +
"from: 'deliveryZipCodeTiming'," +
"let: { location_id: '$fulfillmentLocationId' }," +
"pipeline: [{" +
"$match: {$expr: {$and: [" +
"{ $eq: ['$fulfillmentLocationId', '$$location_id']}," +
"{ $eq: ['$zipCode', '" + SOME_VARIABLE_STRING_2 + "']}]}}}," +
"{ $project: { _id: 0, zipCode: 1, cutoffTime: 1 } }]," +
"as: 'deliveryZipCodeTimings'}}";
TypedAggregation<FulfillmentChannel> aggregation = Aggregation.newAggregation(
FulfillmentChannel.class,
match,
new CustomProjectAggregationOperation(query),
match2
);
AggregationResults<FulfillmentChannel> results =
mongoTemplate.aggregate(aggregation, FulfillmentChannel.class);
return results.getMappedResults();
}
我想添加这个我的解决方案,它在某些方面重复了之前 posted 的解决方案。
Mongo driver v3.x
对于Mongodriverv3.x我得出以下解决方案:
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import com.mongodb.util.JSON;
import org.bson.Document;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
public class JsonOperation implements AggregationOperation {
private List<Document> documents;
public JsonOperation(String json) {
Object root = JSON.parse(json);
documents = root instanceof BasicDBObject
? Collections.singletonList(new Document(((BasicDBObject) root).toMap()))
: ((BasicDBList) root).stream().map(item -> new Document((Map<String, Object>) ((BasicDBObject) item).toMap())).collect(Collectors.toList());
}
@Override
public Document toDocument(AggregationOperationContext context) {
// Not necessary to return anything as we override toPipelineStages():
return null;
}
@Override
public List<Document> toPipelineStages(AggregationOperationContext context) {
return documents;
}
}
然后假设在某些资源中给出了聚合步骤 aggregations.json
:
[
{
$match: {
"userId": "..."
}
},
{
$lookup: {
let: {
...
},
from: "another_collection",
pipeline: [
...
],
as: "things"
}
},
{
$sort: {
"date": 1
}
}
]
上面class可以使用如下:
import static org.springframework.data.mongodb.core.aggregation.Aggregation.newAggregation;
Collection<ResultDao> results = mongoTemplate.aggregate(newAggregation(new JsonOperation(resourceToString("aggregations.json", StandardCharsets.UTF_8))), "some_collection", ResultDao.class).getMappedResults();
Mongo driver v4.x
由于 JSON
class 从 Mongo v4 中删除,我重写了 class 如下:
import java.util.Collections;
import java.util.List;
import org.bson.Document;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
public class JsonOperation implements AggregationOperation {
private List<Document> documents;
private static final String DUMMY_KEY = "dummy";
public JsonOperation(String json) {
documents = parseJson(json);
}
static final List<Document> parseJson(String json) {
return (json.startsWith("["))
? Document.parse("{\"" + DUMMY_KEY + "\": " + json + "}").getList(DUMMY_KEY, Document.class)
: Collections.singletonList(Document.parse(json));
}
@Override
public Document toDocument(AggregationOperationContext context) {
// Not necessary to return anything as we override toPipelineStages():
return null;
}
@Override
public List<Document> toPipelineStages(AggregationOperationContext context) {
return documents;
}
@Override
public String getOperator() {
return documents.iterator().next().keySet().iterator().next();
}
}
但由于字符串操作,现在的实现有点难看。如果有人对如何以更优雅的方式解析 object 数组有更好的想法,请编辑此 post 或发表评论。理想情况下,Mongo 核心中应该有一些方法允许解析 JSON object 或列表(returns BasicDBObject
/BasicDBList
或 Document
/List<Document>
).
另请注意,我已经跳过了在 toPipelineStages()
方法中转换 Document
实例的步骤,因为在我的情况下没有必要:
@Override
public List<Document> toPipelineStages(AggregationOperationContext context) {
return documents.stream().map(document -> context.getMappedObject(document)).collect(Collectors.toList());
}
当我使用接受的答案中解释的方式时,我遇到了一些 JSON 解析异常,所以我深入挖掘了默认的 MongoDB java 驱动程序(版本 3)文档 class 构建聚合查询,发现任何聚合查询都可以构建如下,
按如下方式替换 mongo 控制台查询中的每个元素
- 大括号({) -> 新文档()
- 参数名称相同
- 冒号(:) -> 冒号(,)
- Coma(,) -> .append()
- 方括号([) -> Arrays.asList()
AggregationOperation customLookupOperation = new AggregationOperation() {
@Override
public Document toDocument(AggregationOperationContext context) {
return new Document(
"$lookup",
new Document("from", "deliveryZipCodeTiming")
.append("let",new Document("location_id", "$fulfillmentLocationId"))
.append("pipeline", Arrays.<Object> asList(
new Document("$match", new Document("$expr", new Document("$and",
Arrays.<Object>asList(
new Document("$eq", Arrays.<Object>asList("$fulfillmentLocationId", "$$location_id")),
new Document("$eq", Arrays.<Object>asList("$zipCode", "SOME_VARIABLE_STRING_2"))
)))),
new Document("$project", new Document("_id",0).append("zipCode", 1)
.append("cutoffTime", 1)
)
))
.append("as", "deliveryZipCodeTimings")
);
}
};
终于可以在聚合管道中使用聚合操作了,
Aggregation aggregation = Aggregation.newAggregation(matchOperation,customLookupOperation,matchOperation2);