单元测试具有外部依赖性的 apache beam 有状态管道
Unit tests apache beam stateful pipeline with external dependencies
我有一个 apache beam 管道,它从 pubsub 读取数据,使用 Redis 丰富数据,最后写入 pubsub。我正在尝试编写测试来测试作为有状态 DoFn 的丰富 Dofn。这里内部状态充当近缓存以减少对 Redis 的调用。为了实例化我的 Redis 客户端,我使用了在 PipelineOptions 中声明的工厂,例如
@Default.InstanceFactory(RedisClientFactory.class)
RedisClient getRedisClient();
void setRedisClient(RedisClient client);
理论上,上面的client应该是每个worker一个单例。在我的单元测试中,我试图模拟 redis 客户端中的一些东西。我的测试看起来像这样 -
//setup pipeline
TestStream<MetricsInstance> inputStream =
TestStream.create(...).advanceWatermarkToInfinity();
PCollection<MetricsInstance> enrichedDataStream = pipeline.apply(inputStream)
.apply(ParDo.of(new ConvertToKeyValuePairDoFn<>()))
.apply(ParDo.of(new EnrichMetricsInstanceDoFn()));
CommonPipelineOptions options = PipelineOptionsFactory.as(CommonPipelineOptions.class);
RedisClient redisClient = options.getRedisClient();
JedisPool jedisPool = Mockito.mock(JedisPool.class);
jedis = Mockito.mock(Jedis.class);
Mockito.when(jedisPool.getResource()).thenReturn(jedis);
redisClient.setPool(jedisPool);
... some stubbing code and finally the pipeline run
PAssert.that(enrichedDataStream).containsInAnyOrder(expectedDataStream);
pipeline.run(options);
当我尝试运行这个测试时,我收到了这样的错误
java.lang.IllegalArgumentException: Failed to serialize and deserialize property 'redisClient' with value 'xxx.xxx.RedisClientImpl@529cfee5'
为了使框架不尝试序列化客户端,我可以在选项 class 中的 getRedisClient()
上添加 @JsonIgnore
。但这会导致 Redis 实例在某个时候被重新创建,并且我所有的模拟和存根都丢失了。我想知道测试此类场景的最佳方法是什么。
在 Apache Beam 邮件列表上进行了一些讨论之后,我能够让这个东西工作。诀窍是以一种方式设置 RedisClientFactory,它使用管道选项中的另一个字段,该字段公开 RedisClient class.
的名称
因此选项将如下所示 -
@Default.Class(RedisClientImpl.class)
Class<? extends RedisClient> getRedisClientClass();
void setRedisClientClass(Class<? extends RedisClient> redisClientClass);
@Default.InstanceFactory(RedisClientFactory.class)
RedisClient getRedisClient();
void setRedisClient(RedisClient client);
Factory是这样实现的-
public class RedisClientFactory implements DefaultValueFactory<RedisClient> {
@Override
public RedisClient create(PipelineOptions options) {
CommonPipelineOptions pipelineOptions = options.as(CommonPipelineOptions.class);
return InstanceBuilder.ofType(RedisClient.class)
.fromClass(pipelineOptions.getRedisClientClass())
.fromFactoryMethod("fromOptions")
.withArg(PipelineOptions.class, options)
.build();
}
}
此工厂正在使用 class RedisClientImpl
中称为 fromOptions
的方法来构建客户端。
public static RedisClientImpl fromOptions(PipelineOptions options) {
return new RedisClientImpl(options.as(CommonPipelineOptions.class));
}
使用此设置,我现在可以在我的单元测试中创建 RedisClient 的模拟实例。
options = PipelineOptionsFactory.as(CommonPipelineOptions.class);
options.setRedisClientClass(FakeRedisClient.class);
...
// setup fake data in the FakeRedisClient by calling static methods
FakeRedisClient.keyToValueMap.put(redisKey, redisReturnVal);
...
pipeline.run(options);
我们还需要确保 FakeRedisClient class 也公开了一个名为 fromOptions
的方法
public static FakeRedisClient fromOptions(PipelineOptions options) {
return new FakeRedisClient();
}
我有一个 apache beam 管道,它从 pubsub 读取数据,使用 Redis 丰富数据,最后写入 pubsub。我正在尝试编写测试来测试作为有状态 DoFn 的丰富 Dofn。这里内部状态充当近缓存以减少对 Redis 的调用。为了实例化我的 Redis 客户端,我使用了在 PipelineOptions 中声明的工厂,例如
@Default.InstanceFactory(RedisClientFactory.class)
RedisClient getRedisClient();
void setRedisClient(RedisClient client);
理论上,上面的client应该是每个worker一个单例。在我的单元测试中,我试图模拟 redis 客户端中的一些东西。我的测试看起来像这样 -
//setup pipeline
TestStream<MetricsInstance> inputStream =
TestStream.create(...).advanceWatermarkToInfinity();
PCollection<MetricsInstance> enrichedDataStream = pipeline.apply(inputStream)
.apply(ParDo.of(new ConvertToKeyValuePairDoFn<>()))
.apply(ParDo.of(new EnrichMetricsInstanceDoFn()));
CommonPipelineOptions options = PipelineOptionsFactory.as(CommonPipelineOptions.class);
RedisClient redisClient = options.getRedisClient();
JedisPool jedisPool = Mockito.mock(JedisPool.class);
jedis = Mockito.mock(Jedis.class);
Mockito.when(jedisPool.getResource()).thenReturn(jedis);
redisClient.setPool(jedisPool);
... some stubbing code and finally the pipeline run
PAssert.that(enrichedDataStream).containsInAnyOrder(expectedDataStream);
pipeline.run(options);
当我尝试运行这个测试时,我收到了这样的错误
java.lang.IllegalArgumentException: Failed to serialize and deserialize property 'redisClient' with value 'xxx.xxx.RedisClientImpl@529cfee5'
为了使框架不尝试序列化客户端,我可以在选项 class 中的 getRedisClient()
上添加 @JsonIgnore
。但这会导致 Redis 实例在某个时候被重新创建,并且我所有的模拟和存根都丢失了。我想知道测试此类场景的最佳方法是什么。
在 Apache Beam 邮件列表上进行了一些讨论之后,我能够让这个东西工作。诀窍是以一种方式设置 RedisClientFactory,它使用管道选项中的另一个字段,该字段公开 RedisClient class.
的名称因此选项将如下所示 -
@Default.Class(RedisClientImpl.class)
Class<? extends RedisClient> getRedisClientClass();
void setRedisClientClass(Class<? extends RedisClient> redisClientClass);
@Default.InstanceFactory(RedisClientFactory.class)
RedisClient getRedisClient();
void setRedisClient(RedisClient client);
Factory是这样实现的-
public class RedisClientFactory implements DefaultValueFactory<RedisClient> {
@Override
public RedisClient create(PipelineOptions options) {
CommonPipelineOptions pipelineOptions = options.as(CommonPipelineOptions.class);
return InstanceBuilder.ofType(RedisClient.class)
.fromClass(pipelineOptions.getRedisClientClass())
.fromFactoryMethod("fromOptions")
.withArg(PipelineOptions.class, options)
.build();
}
}
此工厂正在使用 class RedisClientImpl
中称为 fromOptions
的方法来构建客户端。
public static RedisClientImpl fromOptions(PipelineOptions options) {
return new RedisClientImpl(options.as(CommonPipelineOptions.class));
}
使用此设置,我现在可以在我的单元测试中创建 RedisClient 的模拟实例。
options = PipelineOptionsFactory.as(CommonPipelineOptions.class);
options.setRedisClientClass(FakeRedisClient.class);
...
// setup fake data in the FakeRedisClient by calling static methods
FakeRedisClient.keyToValueMap.put(redisKey, redisReturnVal);
...
pipeline.run(options);
我们还需要确保 FakeRedisClient class 也公开了一个名为 fromOptions
的方法 public static FakeRedisClient fromOptions(PipelineOptions options) {
return new FakeRedisClient();
}