在嵌入式独立环境中测试 Apache Pulsar 功能

Test apache pulsar functions in an embedded standalone environment

为了测试,我已经设法 运行 一个嵌入式独立的 Pulsar 服务器和客户端。我也可以发送和接收消息。但是我实际上想(集成-)测试功能(实现org.apache.pulsar.functions.api.Function)。如何在嵌入式设置中注册函数?

package kic.data.stream.pulsar

import groovy.util.logging.Log
import org.apache.pulsar.PulsarStandalone
import org.apache.pulsar.PulsarStandaloneBuilder
import org.apache.pulsar.broker.PulsarService
import org.apache.pulsar.broker.ServiceConfiguration
import org.apache.pulsar.client.api.Consumer
import org.apache.pulsar.client.api.ConsumerEventListener
import org.apache.pulsar.client.api.Message
import org.apache.pulsar.client.api.Producer
import org.apache.pulsar.client.api.PulsarClient
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.client.api.SubscriptionType
import spock.lang.Specification

import java.util.concurrent.TimeUnit

@Log
class PulsarEmbeddedTest extends Specification {

    static final String TOPIC = "hello";
    static final int NUM_OF_MESSAGES = 100;
    static PulsarStandalone standalone
    static PulsarService pulsarService

    def setupSpec() {
        def configFile = new File(ClassLoader.getSystemResource("broker.conf").toURI()).getAbsolutePath()
        def conf = new ServiceConfiguration(clusterName: "test-cluster", zookeeperServers: "localhost:2184")
        log.info("${PulsarStandalone.properties}")
        standalone = PulsarStandaloneBuilder.instance()
                                            .withConfig(conf)
                                            .withNoStreamStorage(true)
                                            .build()
        standalone.configFile = configFile
        standalone.start()
        pulsarService = new PulsarService(conf)
    }

    def test() {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(pulsarService.brokerServiceUrl)
                .build()

        Producer<String> producer = client.newProducer(Schema.STRING)
                .topic(TOPIC)
                .enableBatching(false)
                .create()

        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic(TOPIC)
                //.subscriptionInitialPosition()
                .subscriptionName("test-subs-1")
                .ackTimeout(10, TimeUnit.SECONDS)
                .subscriptionType(SubscriptionType.Exclusive)
                .messageListener(Mesa)
                .subscribe()



        for (int i = 1; i <= NUM_OF_MESSAGES; ++i) {
            producer.send("Hello_" + i)
        }


        Message<String> message
        for (int i = 1; i <= NUM_OF_MESSAGES; ++i) {
            // This calls blocks until a message is available.
            message = consumer.receive(1, TimeUnit.SECONDS)
            //log.info("Message received : ${message.getValue()}")
            println("Message received : ${message.messageId}:${message.value}")

            consumer.acknowledge(message)
        }

        producer.close()
        consumer.close()
        client.close()

        expect:
        1==1

    }

    def cleanupSpec() {
        standalone.close()
    }

}

您应该能够通过 Pulsar Admin API 创建 Pulsar Functions,就像创建普通 Pulsar 集群一样

PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl("http://localhost:8080").build();

FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setName("exclamation");
functionConfig.setInputs(Collections.singleton("input"));
functionConfig.setClassName(ExclamationFunction.class.getName());
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setOutput("output");
functionConfig.setJar("/tmp/my-jar.jar")

pulsarAdmin.functions().createFunction(functionConfig, functionConfig.getJar());

Apache Pulsar项目中也有不少集成测试用于测试Pulsar的功能。有 docker 基于真正的集成测试,有单进程 "integration" 测试。下面是单个进程"integration"测试的例子,你可以参考:

https://github.com/apache/pulsar/blob/master/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java