在嵌入式独立环境中测试 Apache Pulsar 功能
Test apache pulsar functions in an embedded standalone environment
为了测试,我已经设法 运行 一个嵌入式独立的 Pulsar 服务器和客户端。我也可以发送和接收消息。但是我实际上想(集成-)测试功能(实现org.apache.pulsar.functions.api.Function
)。如何在嵌入式设置中注册函数?
package kic.data.stream.pulsar
import groovy.util.logging.Log
import org.apache.pulsar.PulsarStandalone
import org.apache.pulsar.PulsarStandaloneBuilder
import org.apache.pulsar.broker.PulsarService
import org.apache.pulsar.broker.ServiceConfiguration
import org.apache.pulsar.client.api.Consumer
import org.apache.pulsar.client.api.ConsumerEventListener
import org.apache.pulsar.client.api.Message
import org.apache.pulsar.client.api.Producer
import org.apache.pulsar.client.api.PulsarClient
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.client.api.SubscriptionType
import spock.lang.Specification
import java.util.concurrent.TimeUnit
@Log
class PulsarEmbeddedTest extends Specification {
static final String TOPIC = "hello";
static final int NUM_OF_MESSAGES = 100;
static PulsarStandalone standalone
static PulsarService pulsarService
def setupSpec() {
def configFile = new File(ClassLoader.getSystemResource("broker.conf").toURI()).getAbsolutePath()
def conf = new ServiceConfiguration(clusterName: "test-cluster", zookeeperServers: "localhost:2184")
log.info("${PulsarStandalone.properties}")
standalone = PulsarStandaloneBuilder.instance()
.withConfig(conf)
.withNoStreamStorage(true)
.build()
standalone.configFile = configFile
standalone.start()
pulsarService = new PulsarService(conf)
}
def test() {
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarService.brokerServiceUrl)
.build()
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(TOPIC)
.enableBatching(false)
.create()
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(TOPIC)
//.subscriptionInitialPosition()
.subscriptionName("test-subs-1")
.ackTimeout(10, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.messageListener(Mesa)
.subscribe()
for (int i = 1; i <= NUM_OF_MESSAGES; ++i) {
producer.send("Hello_" + i)
}
Message<String> message
for (int i = 1; i <= NUM_OF_MESSAGES; ++i) {
// This calls blocks until a message is available.
message = consumer.receive(1, TimeUnit.SECONDS)
//log.info("Message received : ${message.getValue()}")
println("Message received : ${message.messageId}:${message.value}")
consumer.acknowledge(message)
}
producer.close()
consumer.close()
client.close()
expect:
1==1
}
def cleanupSpec() {
standalone.close()
}
}
您应该能够通过 Pulsar Admin API 创建 Pulsar Functions,就像创建普通 Pulsar 集群一样
PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl("http://localhost:8080").build();
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setName("exclamation");
functionConfig.setInputs(Collections.singleton("input"));
functionConfig.setClassName(ExclamationFunction.class.getName());
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setOutput("output");
functionConfig.setJar("/tmp/my-jar.jar")
pulsarAdmin.functions().createFunction(functionConfig, functionConfig.getJar());
Apache Pulsar项目中也有不少集成测试用于测试Pulsar的功能。有 docker 基于真正的集成测试,有单进程 "integration" 测试。下面是单个进程"integration"测试的例子,你可以参考:
为了测试,我已经设法 运行 一个嵌入式独立的 Pulsar 服务器和客户端。我也可以发送和接收消息。但是我实际上想(集成-)测试功能(实现org.apache.pulsar.functions.api.Function
)。如何在嵌入式设置中注册函数?
package kic.data.stream.pulsar
import groovy.util.logging.Log
import org.apache.pulsar.PulsarStandalone
import org.apache.pulsar.PulsarStandaloneBuilder
import org.apache.pulsar.broker.PulsarService
import org.apache.pulsar.broker.ServiceConfiguration
import org.apache.pulsar.client.api.Consumer
import org.apache.pulsar.client.api.ConsumerEventListener
import org.apache.pulsar.client.api.Message
import org.apache.pulsar.client.api.Producer
import org.apache.pulsar.client.api.PulsarClient
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.client.api.SubscriptionType
import spock.lang.Specification
import java.util.concurrent.TimeUnit
@Log
class PulsarEmbeddedTest extends Specification {
static final String TOPIC = "hello";
static final int NUM_OF_MESSAGES = 100;
static PulsarStandalone standalone
static PulsarService pulsarService
def setupSpec() {
def configFile = new File(ClassLoader.getSystemResource("broker.conf").toURI()).getAbsolutePath()
def conf = new ServiceConfiguration(clusterName: "test-cluster", zookeeperServers: "localhost:2184")
log.info("${PulsarStandalone.properties}")
standalone = PulsarStandaloneBuilder.instance()
.withConfig(conf)
.withNoStreamStorage(true)
.build()
standalone.configFile = configFile
standalone.start()
pulsarService = new PulsarService(conf)
}
def test() {
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarService.brokerServiceUrl)
.build()
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(TOPIC)
.enableBatching(false)
.create()
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(TOPIC)
//.subscriptionInitialPosition()
.subscriptionName("test-subs-1")
.ackTimeout(10, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.messageListener(Mesa)
.subscribe()
for (int i = 1; i <= NUM_OF_MESSAGES; ++i) {
producer.send("Hello_" + i)
}
Message<String> message
for (int i = 1; i <= NUM_OF_MESSAGES; ++i) {
// This calls blocks until a message is available.
message = consumer.receive(1, TimeUnit.SECONDS)
//log.info("Message received : ${message.getValue()}")
println("Message received : ${message.messageId}:${message.value}")
consumer.acknowledge(message)
}
producer.close()
consumer.close()
client.close()
expect:
1==1
}
def cleanupSpec() {
standalone.close()
}
}
您应该能够通过 Pulsar Admin API 创建 Pulsar Functions,就像创建普通 Pulsar 集群一样
PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl("http://localhost:8080").build();
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setName("exclamation");
functionConfig.setInputs(Collections.singleton("input"));
functionConfig.setClassName(ExclamationFunction.class.getName());
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setOutput("output");
functionConfig.setJar("/tmp/my-jar.jar")
pulsarAdmin.functions().createFunction(functionConfig, functionConfig.getJar());
Apache Pulsar项目中也有不少集成测试用于测试Pulsar的功能。有 docker 基于真正的集成测试,有单进程 "integration" 测试。下面是单个进程"integration"测试的例子,你可以参考: