Hazelcast 无法在可选字段上与 SqlPredicate 和 Index 一起正常工作

Hazelcast not working correctly with SqlPredicate and Index on optional field

我们在 Hazelcast 地图中存储复杂对象,并且需要能够不仅根据键而且根据这些复杂对象的内容来搜索对象。为了不对性能造成太大影响,我们在这些搜索词上使用了索引。

我们还使用 spring-data-hazelcast,它提供了允许我们使用 findByAbcXyz() 类型语义查询的存储库。对于一些更复杂的查询,我们使用 @Query 注释(spring-data-hazelcast 在内部转换为 SqlPredicates)。

我们现在遇到了一个问题,在某些情况下,这些基于 @Query 的搜索方法没有 return 任何值,即使我们可以验证搜索到的对象确实存在于地图中。

我已经设法用核心 hazelcast 重现了这个问题(即不使用 spring-data-hazelcast)。

这是我们的对象结构:

BetriebspunktKey.java

public class BetriebspunktKey implements Serializable {
  private Integer uicLand;
  private Integer nummer;

  public BetriebspunktKey(final Integer uicLand, final Integer nummer) {
    this.uicLand = uicLand;
    this.nummer = nummer;
  }

  public Integer getUicLand() {
    return uicLand;
  }

  public Integer getNummer() {
    return nummer;
  }
}

Betriebspunkt.java

public class Betriebspunkt implements Serializable {
  private BetriebspunktKey key;
  private List<BetriebspunktVersion> versionen;

  public Betriebspunkt(final BetriebspunktKey key, final List<BetriebspunktVersion> versionen) {
    this.key = key;
    this.versionen = versionen;
  }

  public BetriebspunktKey getKey() {
    return key;
  }
}

BetriebspunktVersion.java

public class BetriebspunktVersion implements Serializable {
  private List<BetriebspunktKey> zusatzbetriebspunkte;

  public BetriebspunktVersion(final List<BetriebspunktKey> zusatzbetriebspunkte) {
    this.zusatzbetriebspunkte = zusatzbetriebspunkte;
  }
}

在我的主文件中,我现在正在设置 hazelcast:

Config config = new Config();
final MapConfig mapConfig = config.getMapConfig("points");
mapConfig.addMapIndexConfig(new MapIndexConfig("versionen[any].zusatzbetriebspunkte[any].nummer", false));

HazelcastInstance instance = Hazelcast.newHazelcastInstance(config);

IMap<BetriebspunktKey, Betriebspunkt> map = instance.getMap("points");

我也在准备以后的搜索条件:

Predicate equalPredicate = Predicates.equal("versionen[any].zusatzbetriebspunkte[any].nummer", 53090);
Predicate sqlPredicate = new SqlPredicate("versionen[any].zusatzbetriebspunkte[any].nummer=53090");

接下来,我正在创建两个对象,一个包含 "full depth" 信息,另一个不包含任何 "zusatzbetriebspunkte":

final Betriebspunkt abc = new Betriebspunkt(
        new BetriebspunktKey(80, 166),
        Collections.singletonList(new BetriebspunktVersion(
            Collections.singletonList(new BetriebspunktKey(80, 53090))
        ))
    );

    final Betriebspunkt def = new Betriebspunkt(
        new BetriebspunktKey(83, 141),
        Collections.singletonList(new BetriebspunktVersion(
            Collections.emptyList()
        ))
    );

这就是事情变得有趣的地方。如果我首先将 "full" 对象插入地图,则同时使用 EqualPredicate 和 SqlPredicate 的搜索有效:

map.put(abc.getKey(), abc);
map.put(def.getKey(), def);

Collection<Betriebspunkt> equalResults = map.values(equalPredicate);
Collection<Betriebspunkt> sqlResults = map.values(sqlPredicate);

assertEquals(1, equalResults.size()); // contains "abc"
assertEquals(1, sqlResults.size());   // contains "abc"

但是,如果我以相反的顺序将对象插入到我的地图中(即首先是 "partial" 对象,然后是 "full" 对象),只有 EqualPredicate 工作正常,SqlPredicate return 是一个空列表,无论地图的内容或搜索条件如何。

map.put(abc.getKey(), abc);
map.put(def.getKey(), def);

Collection<Betriebspunkt> equalResults = map.values(equalPredicate);
Collection<Betriebspunkt> sqlResults = map.values(sqlPredicate);

assertEquals(1, equalResults.size()); // contains "abc"
assertEquals(1, sqlResults.size());   // --> this fails, it returns en empty list

这种行为的原因是什么?它看起来像是 hazelcast 代码中的错误。

失败原因

经过大量调试,我找到了这个问题的原因。原因确实可以在hazelcast代码中找到。

将值放入 hazelcast 映射时调用 DefaultRecordStore.putInternal。在这个方法的末尾调用 DefaultRecordStore.saveIndex 找到相应的索引然后调用 Indexes.saveEntryIndex。此方法遍历每个索引并调用 InternalIndex.saveEntryIndex(或者更确切地说是其实现 IndexImpl.saveEntryIndex。该方法的有趣部分是以下几行:

if (this.converter == null || this.converter == TypeConverters.NULL_CONVERTER) {
      this.converter = entry.getConverter(this.attributeName);
}

显然,当第一个元素放入映射时,每个索引都会存储一个转换器 class。查看 QueryableEntry.getConverter 解释发生了什么:

  TypeConverter getConverter(String attributeName) {
    Object attribute = this.getAttributeValue(attributeName);
    if (attribute == null) {
      return TypeConverters.NULL_CONVERTER;
    } else {
      AttributeType attributeType = this.extractAttributeType(attributeName, attribute);
      return attributeType == null ? TypeConverters.IDENTITY_CONVERTER : attributeType.getConverter();
    }
  }

第一次插入"full"对象时,extractAttributeType()会跟在我们索引定义"versionen[any].zusatzbetriebspunkte[any].nummer"的"path"后面,发现nummer是一个整数类型,因此 TypeConverters.IntegerConverter 将被 return 编辑并存储。

第一次插入"partial"对象时,"zusatzbetriebspunkte[any]"是空的,extractAttributeType没办法知道nummer是什么类型,因此returns null 表示使用 TypeConverters.IdentityConverter。

此外,每当插入 "full" 元素时,都会使用 nummer 作为键将条目写入索引映射,即索引映射的类型为 Map。

写到地图上就到此为止。现在让我们看看如何从地图中读取数据。当调用 map.values(predicate) 时,我们最终会到达 QueryRunner.runUsingGlobalIndexSafely,其中包含一行:

Collection<QueryableEntry> entries = indexes.query(predicate);

这将在一些样板代码调用之后依次进行

Set<QueryableEntry> result = indexAwarePredicate.filter(queryContext);

对于我们的两个谓词,我们最终会得到 IndexImpl.getRecords(),如下所示:

  public Set<QueryableEntry> getRecords(Comparable attributeValue) {
    long timestamp = this.stats.makeTimestamp();
    if (this.converter == null) {
      this.stats.onIndexHit(timestamp, 0L);
      return new SingleResultSet((Map)null);
    } else {
      Set<QueryableEntry> result = this.indexStore.getRecords(this.convert(attributeValue));
      this.stats.onIndexHit(timestamp, (long)result.size());
      return result;
    }
  }

关键调用是 this.convert(attributeValue),其中 attributeValue 是谓词的 value

如果我们比较两个谓词,我们可以看到 EqualPredicate 有两个成员:

attributeName = "versionen[any].zusatzbetriebspunkte[any].nummer"
value = {Integer} 53090

SqlPredicate 包含初始字符串(我们将其传递给它的构造函数),但在构造时它也被解析并映射到内部 EqualPredicate(在评估谓词时最终使用并传递给上面的 getRecords()):

sql = "versionen[any].zusatzbetriebspunkte[any].nummer=53090"
predicate = {EqualPredicate}
  attributeName = "versionen[any].zusatzbetriebspunkte[any].nummer"
  value = {String} "53090"

这解释了为什么手动创建的 EqualPredicate 在这两种情况下都有效:它的值是一个整数。当传递给转换器时,它是 IntegerConverter 还是 IdentityConverter 都没有关系,因为两者都会 return 整数,然后可以将其用作索引映射中的键(使用整数作为键)。

然而,对于 SqlPredicate,该值是一个字符串。如果将其传递给 IntegerConverter,它会转换为其相应的整数值并访问索引映射。如果将其传递给 IdentityConverter,则字符串会被转换 return 并尝试使用字符串访问索引映射将永远找不到任何结果。

可能的解决方案

我们如何解决这个问题?我看到了几种可能性:

  • 在启动期间将一个 "fully built" 虚拟值插入到我们的映射中,以确保正确初始化转换器。虽然这行得通,但它很丑而且不便于维护
  • 避免使用 SqlPredicate 并使用基于整数的 EqualPredicate。在使用 spring-data-hazelcast 时,这不是一个选项,因为它总是将基于 @Query 的搜索转换为 SqlPredicates。我们当然可以直接使用 hazelcast 并绕过 spring-data 包装器,但是虽然这样做可行,但这意味着有两种访问 hazelcast 的方法,这也不是很容易维护
  • 使用 hazelcast 的 ValueExtractor class。这是一个优雅的解决方案,既可以在本地工作,也可以使用 spring-data-hazelcast。我将概述它的外观:

首先我们需要实现一个值提取器,它 return 以适合我们的形式包含我们 Betriebspunkt 的所有 zusatzbetriebspunkte

public class BetriebspunktExtractor extends ValueExtractor<Betriebspunkt, String> implements Serializable {
  @Override
  public void extract(final Betriebspunkt betriebspunkt, final String argument, final ValueCollector valueCollector) {
    betriebspunkt.getVersionen().stream()
                 .map(BetriebspunktVersion::getZusatzbetriebspunkte)
                 .flatMap(List::stream)
                 .map(zbp -> zbp.getUicLand() + "_" + zbp.getNummer())
                 .forEach(valueCollector::addObject);
  }
}

你会注意到我不仅 returning nummer 字段而且还包括 uicLand 字段这是我们真正想要但无法使用的东西“...[任何]...”符号。如果我们想要与上面概述的完全相同的行为,我们当然只能 return 数字。

现在我们需要稍微修改我们的 hazelcast 配置:

Config config = new Config();
final MapConfig mapConfig = config.getMapConfig("points");
//mapConfig.addMapIndexConfig(new MapIndexConfig("versionen[any].zusatzbetriebspunkte[any].nummer", false));
mapConfig.addMapIndexConfig(new MapIndexConfig("zusatzbetriebspunkt", false));
mapConfig.addMapAttributeConfig(new MapAttributeConfig("zusatzbetriebspunkt", BetriebspunktExtractor.class.getName()));

您会注意到不再需要使用“...[any]...”符号的 "long" 索引定义。

现在我们可以使用这个 "pseudo attribute" 来查询我们的值,对象添加到地图的顺序无关紧要:

Predicate keyPredicate = Predicates.equal("zusatzbetriebspunkt", "80_53090");
Collection<Betriebspunkt> keyResults = map.values(keyPredicate);
assertEquals(1, keyResults.size()); // always contains "abc"

在我们的 spring-data-hazelcast 存储库中,我们现在可以这样做:

@Query("zusatzbetriebspunkt=%d_%d")
List<StammdatenBetriebspunkt> findByZusatzbetriebspunkt(Integer uicLand, Integer nummer);

如果您不需要使用 spring-data-hazelcast,而不是 return 将字符串发送到 ValueCollector,您可以直接 return BetriebspunktKey 然后使用它在谓词中也是如此。那将是最干净的解决方案:

public class BetriebspunktExtractor extends ValueExtractor<Betriebspunkt, String> implements Serializable {
  @Override
  public void extract(final Betriebspunkt betriebspunkt, final String argument, final ValueCollector valueCollector) {
    betriebspunkt.getVersionen().stream()
                 .map(BetriebspunktVersion::getZusatzbetriebspunkte)
                 .flatMap(List::stream)
                 //.map(zbp -> zbp.getUicLand() + "_" + zbp.getNummer())
                 .forEach(valueCollector::addObject);
  }
}

然后

Predicate keyPredicate = Predicates.equal("zusatzbetriebspunkt", new BetriebspunktKey(80, 53090));

但是,要实现这一点,BetriebspunktKey 需要实现 Comparable 并且还必须提供自己的 equalshashCode 方法。