将 CSV 文件转换为 JSON 并将其发送到 ActiveMQ 队列

Converting CSV file to JSON and send it to ActiveMQ queue

我的目标是读取 CSV 文件,将其转换为 JSON 并将生成的 JSON 一个一个地发送到 ActiveMQ 队列。我的代码如下:

final BindyCsvDataFormat bindy=new BindyCsvDataFormat(camelproject.EquityFeeds.class);
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        CamelContext _ctx = new DefaultCamelContext(); 
        _ctx.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        _ctx.addRoutes(new RouteBuilder() {

            public void configure() throws Exception {
                from("file:src/main/resources?fileName=data-sample.csv")
                .unmarshal(bindy)
                .marshal()
                .json(JsonLibrary.Jackson).log("${body}")
                .to("file:src/main/resources/?fileName=emp.json");
            }

        });

EquityFeeds 是我上面代码中的 POJO class。

问题:

  1. 没有产生输出。 "emp.json" 文件未在给定位置生成。
  2. 另外,我如何将生成的 JSON 拆分为单独的 JSON 并将其发送到 ActiveMQ 队列,就像我为 XML 所做的那样,如下所示:

    .split(body().tokenizeXML("equityFeeds", null)).streaming().to("jms:queue:xml.upstream.queue");

EquityFeeds (POJO):

    @CsvRecord(separator = ",",skipFirstLine = true)
    public class EquityFeeds {

    @DataField(pos = 1) 
    private String externalTransactionId;

    @DataField(pos = 2)
    private String clientId;

    @DataField(pos = 3)
    private String securityId;

    @DataField(pos = 4)
    private String transactionType;

    @DataField(pos = 5)
    private Date transactionDate;

    @DataField(pos = 6)
    private float marketValue; 

    @DataField(pos = 7)
    private String priorityFlag;

        // getters and setters... 
    }

请帮忙。请告诉我哪里出错了。迫切需要帮助。卡在这个问题上无法前进。任何帮助将不胜感激。我真的很努力,搜索 Google 并尝试了各种选项,但没有任何效果。

请注意: 我评论了 .marshal() 和 .json() 以检查 .unmarshal() 是否正常工作但是由于未创建 "emp.json",解组也无法正常工作。

如果在启动路由时什么也没有发生,那么很可能是由于您传递给文件组件的相对路径。可能您的 Java 进程的执行目录不在您认为的位置,并且找不到该文件。为了简化事情,我建议您从绝对路径开始。一旦其他一切正常,找出正确的相对路径(你的基础应该是 user.dir 系统 属性 的值)。

关于拆分内容的问题:已在 documentation 中回答。

这对我有用(Camel 3.1):

public class CsvRouteBuilder extends EndpointRouteBuilder {
    @Override
    public void configure() {
        DataFormat bindy = new BindyCsvDataFormat(BindyModel.class);

        from(file("/tmp?fileName=simpsons.csv"))
            .unmarshal(bindy)
            .split(body())
            .log("Unmarshalled model: ${body}")
            .marshal().json()
            .log("Marshalled to JSON: ${body}")
            // Unique file name for the JSON output
            .setHeader(Exchange.FILE_NAME, () -> UUID.randomUUID().toString() + ".json")
            .to(file("/tmp"));
    }
}

// Use lombok to generate all the boilerplate stuff
@ToString
@Getter
@Setter
@NoArgsConstructor
// Bindy record definition
@CsvRecord(separator = ";", skipFirstLine = true, crlf = "UNIX")
public static class BindyModel {

    @DataField(pos = 1)
    private String firstName;
    @DataField(pos = 2)
    private String middleName;
    @DataField(pos = 3)
    private String lastName;

}

/tmp/simpsons.csv

中给出此输入
firstname;middlename;lastname
Homer;Jay;Simpson
Marge;Jacqueline;Simpson

日志输出如下所示

Unmarshalled model: RestRouteBuilder.BindyModel(firstName=Homer, middleName=Jay, lastName=Simpson)
Marshalled to JSON: {"firstName":"Homer","middleName":"Jay","lastName":"Simpson"}
Unmarshalled model: RestRouteBuilder.BindyModel(firstName=Marge, middleName=Jacqueline, lastName=Simpson)
Marshalled to JSON: {"firstName":"Marge","middleName":"Jacqueline","lastName":"Simpson"}

和两个json文件写在/tmp.