org.springframework.amqp.AmqpIllegalStateException:未指定 'queue'。检查 RabbitTemplate 的配置

org.springframework.amqp.AmqpIllegalStateException: No 'queue' specified. Check configuration of RabbitTemplate

我正在制作一个从 rabbitmq 读取消息并写入 oracle 数据库的应用程序。我使用 spring 启动批处理来读取消息,但它以错误 "No 'queue' specified. Check configuration of RabbitTemplate."

结尾

RabbitConfig.java

@Configuration
public class RabbitMQConfig {

//  @Value("${conveh.rabbitmq.queue}")
    public String queueName ="hello2";

//  @Value("${conveh.rabbitmq.exchange}")
    public String exchange="hello_exchage2";

    @Bean
    public Queue queue() {
        return new Queue(queueName, false);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(exchange);
    }

    @Bean
    public Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(queueName);
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setQueue(queueName);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public ConnectionFactory connectionFactory() {

        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("host");
        connectionFactory.setPort(123);
        connectionFactory.setVirtualHost("/xxx");
        connectionFactory.setUsername("xxx");
        connectionFactory.setPassword("xxx");
        return connectionFactory;
    }
}

BatchConfig.java

@Configuration
@EnableBatchProcessing
public class BatchMqListener {
//  private final Logger logger = LoggerFactory.getLogger(ImportJobConfig.class);

      @Autowired
      JobBuilderFactory jobBuilderFactory;
      @Autowired
      StepBuilderFactory stepBuilderFactory;
      @Autowired
      RabbitTemplate rabbitTemplate;

      @Bean
      public Job importJob() {
        return jobBuilderFactory.get("importJob")
            .listener(new JobExecutionListener() {
              @Override
              public void beforeJob(JobExecution jobExecution) {
//              logger.info("Ready to start the job");
                  System.out.println("Ready to start the job");
              }

              @Override
              public void afterJob(JobExecution jobExecution) {
//              logger.info("Job successfully executed.");
                  System.out.println("Job successfully executed.");
              }
            })
            .incrementer(new RunIdIncrementer())
            .flow(stepBuilderFactory.get("importStep")
                .<VehicleEvent, VehicleEvent>chunk(2)
                .reader(new AmqpItemReader<>(rabbitTemplate))
                .listener(new QueueListener<VehicleEvent>())
                .processor(customProcessor())
                .writer(writer())
                .build())
            .end()
            .build();
      }

      public ItemProcessor<VehicleEvent, VehicleEvent> customProcessor(){
          return null;
      }

      @Bean
      public FlatFileItemWriter<VehicleEvent> writer() {
//      log.info("writer called");
        FlatFileItemWriter<VehicleEvent> writer = new FlatFileItemWriter<>();
        writer.setResource(new FileSystemResource("output/item.all.csv"));
        writer.setAppendAllowed(true);
        writer.setLineAggregator(new DelimitedLineAggregator<VehicleEvent>() {{
          setDelimiter(",");
          setFieldExtractor(new BeanWrapperFieldExtractor<VehicleEvent>() {{
            setNames(new String[]{"id", "itemName"});
          }});
        }});
        return writer;
      }
}

我缺少正确给出队列名称的内容。

我为 rabbitTemplate() 方法使用了错误的 return 类型。应该是 RabbitTemplate.

 @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setQueue(queueName);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }