Spring Cloud Stream 中的错误处理 - Kafka Binder

Error Handling in Spring Cloud Stream - Kafka Binder

我正在尝试使用 Kafka Binder 为 SCS 实现错误处理功能,但我目前在将错误放入错误主题时遇到问题。

1) 是否需要为 .yml 文件中的错误指定任何特定内容,例如组或内容类型

2) msg​​流入Kafka topic时如何重试?

谢谢。

详情如下:-

1) 每隔几秒生成 JSON 的生产者:-

@EnableBinding(Source.class)
public class LoggingProducer {

      @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "7000", maxMessagesPerPoll = "1"))
        public LoggingObject pumpSource() {

          LoggingObject loggingObject = new LoggingObject();

          String loggingNumber = UUID.randomUUID().toString().toUpperCase().replaceAll("-", "");

          System.out.println(loggingNumber);

          loggingObject.setLoggingId(loggingNumber);
          Random rand = new Random();
          int randint = rand.nextInt(100000);

          if      (randint % 3 == 0) {
              loggingObject.setLoggingMessageStatus("SENT");
          }
          else if (randint % 4 == 0) {
              loggingObject.setLoggingMessageStatus("REVIEW");
          }
          else {
              loggingObject.setLoggingMessageStatus("ERROR");
          }

            System.out.println(loggingObject.toString());

            return loggingObject;
        }   
}

2) application.yml

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: Processortopic
          group: myGroup
          producer:
            header-mode: embeddedHeaders
          content-type: application/json

3) 消费者应用程序:-

@EnableBinding(Sink.class)
@Configuration
public class LoggingObjectProcessor {


    @StreamListener(Sink.INPUT) // destination name 'input.myGroup'
    public void handle(LoggingObject loggingObject) {
        System.out.println("In the Consumer---->>>>><<<<<<");
        throw new RuntimeException("BOOM!");
    }

    /*@ServiceActivator(inputChannel = "Sourcetopic.myGroup.errors")
    public void error(Message<?> message) {
        System.out.println("Handling ERROR: " + message);
    }*/

    @StreamListener("errorChannel")
    public void errorGlobal(Message<?> message) {
        System.out.println("Handling ERROR: " + message);
    }


}

4) 消费者Application.yml

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: Processortopic
          group: myGroup
          consumer:
            header-mode: embeddedHeaders
          content-type: application/json
        error:
          destination: myErrors
          content-type: application/json

5) LoggingObject POJO

public class LoggingObject {

    private String loggingId;
    private String loggingMessageStatus;



                public String getLoggingId() {
                    return loggingId;
                }
                public void setLoggingId(String loggingId) {
                    this.loggingId = loggingId;
                }
                public LoggingObject() {

                }
                public String getLoggingMessageStatus() {
                    return loggingMessageStatus;
                }
                public void setLoggingMessageStatus(String loggingMessageStatus) {
                    this.loggingMessageStatus = loggingMessageStatus;
                }
}

6) 这是 POM

 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <spring-cloud.version>Finchley.BUILD-SNAPSHOT</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

更新了消费者应用程序和日志

@ServiceActivator(inputChannel = "Processortopic.myGroup.errors")
public void error(Message<?> message) {
    System.out.println("Handling ERROR: " + message);
}

日志:-

2018-05-23 10:21:36.178  INFO 76939 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : c0518aa65f25317e
Handling ERROR: ErrorMessage [payload=org.springframework.messaging.MessagingException: Exception thrown while invoking com.example.LoggingObjectProcessor#handle[1 args]; nested exception is java.lang.RuntimeException: BOOM!, failedMessage=GenericMessage [payload=byte[191], headers={kafka_offset=4721, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@29a9cb9d, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, id=feda8595-5ef6-35dd-b43f-4940a90017ba, kafka_receivedPartitionId=0, contentType=application/json;charset=UTF-8, kafka_receivedTopic=Processortopic, kafka_receivedTimestamp=1526991171269, timestamp=1527088896173}], headers={kafka_data=ConsumerRecord(topic = Processortopic, partition = 0, offset = 4721, CreateTime = 1526991171269, serialized key size = -1, serialized value size = 277, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@538f1f04), id=37e72560-5e63-db90-27f3-2ff2e04e1778, timestamp=1527088896174}] for original GenericMessage [payload=byte[277], headers={kafka_offset=4721, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@29a9cb9d, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=Processortopic, kafka_receivedTimestamp=1526991171269}]
In the Consumer---->>>>><<<<<<
In the Consumer---->>>>><<<<<<
In the Consumer---->>>>><<<<<<

我马上看到的一个问题是您正在为全局 'errorChannel' 使用 @StreamListener。这是行不通的。如文档所述:

The use of @StreamListener annotation is intended specifically to define bindings that bridge internal channels and external destinations. Given that the destination specific error channel does NOT have an associated external destination, such channel is a prerogative of Spring Integration (SI). This means that the handler for such destination must be defined using one of the SI handler annotations (i.e., @ServiceActivator, @Transformer etc.).

请修复并告知我们。