Storm 在 java 中从 Kafka 读取数据

Storm read data from Kafka in java

我写了一个生产者 Kafka 从 MySQL 读取数据,一个消费者 Kafka Kafka 从生产者那里检索数据。

效果很好。这是我的代码: [卡夫卡生产者]

import java.util.Properties;
import java.sql.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class ProducerTest {
public static void main(String[] args) throws ClassNotFoundException, SQLException
{
    Properties props = new Properties();
    props.put("zk.connect","localhost:2181");
    props.put("serializer.class","kafka.serializer.StringEncoder");
    props.put("metadata.broker.list","localhost:9092");
    ProducerConfig config = new ProducerConfig(props);
    Producer producer = new Producer(config);       
    try
    {
        Class.forName("com.mysql.jdbc.Driver");
        Connection con = DriverManager.getConnection(
                "jdbc:mysql://172.18.67.8:3306/big_data","root","root");
        Statement stmt = con.createStatement();
        ResultSet rs = stmt.executeQuery("select * from content_log");
        while(rs.next())
        {
            producer.send(new KeyedMessage("lamtest",rs.getString(1) + " " + rs.getString(2)+" "+rs.getString(3)+" "+rs.getString(4)+" "+rs.getString(5)+ " "+ rs.getString(6)
                    +" "+ rs.getString(7)
                    +" "+ rs.getString(8)
                    +" "+ rs.getString(9)
                    +" "+ rs.getString(10)
                    +" "+ rs.getString(11)
                    +" "+ rs.getString(12)
                    +" "+ rs.getString(13)
                    +" "+ rs.getString(14)
                    +" "+ rs.getString(15)
                    +" "+ rs.getString(17)
                    +" "+ rs.getString(18)
                    +" "+ rs.getString(19)
                    +" "+ rs.getString(21)
                    +" "+ rs.getString(22)
                    ));

        }
        con.close();
    }
    catch(Exception e)
    {
        System.out.println(e);
    }

   }
}

[卡夫卡消费者]

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class SimpleHLConsumer {

private final ConsumerConnector consumer;
private final String topic;

public SimpleHLConsumer(String zookeeper, String groupId, String topic) {
    Properties props = new Properties();
    props.put("zookeeper.connect", zookeeper);
    props.put("group.id", groupId);
    props.put("zookeeper.session.timeout.ms", "500");
    props.put("zookeeper.sync.time.ms", "250");
    props.put("auto.commit.interval.ms", "1000");

    consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
    this.topic = topic;
}

public void testConsumer() {
    Map<String, Integer> topicCount = new HashMap<>();
    topicCount.put(topic, 1);

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
    List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
    for (final KafkaStream stream : streams) {
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            System.out.println("Message from Single Topic: " + new String(it.next().message()));
        }
    }
    if (consumer != null) {
        consumer.shutdown();
    }
}

public static void main(String[] args) {
    String topic = "lamtest";
    SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("localhost:2181", "testgroup", topic);
    simpleHLConsumer.testConsumer();
}
}

现在,我想整合 Kafka 和 Storm。我想从 Kafka 消费者读取数据到 Storm spout。 请帮我。谢谢

有一个 storm-kafka 库可以为 kafka 提供风暴喷口。

http://storm.apache.org/releases/1.0.1/storm-kafka.html

https://mvnrepository.com/artifact/org.apache.storm/storm-kafka/1.0.1

Storm 1.1.X 提供一个外部 storm kafka 客户端,我们可以使用它来构建 storm 拓扑。请注意,这是对 Kafka 0.10 及更高版本的支持。使用这个:

1 - 将以下依赖项添加到您的 pom.xml

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka-client</artifactId>
    <version>1.1.1-SNAPSHOT</version>
</dependency>

2 - 拓扑的 kafka spout 实现是使用 KafkaSpoutConfig 配置的。下面是示例配置对象创建。

KafkaSpoutConfig spoutConf =  KafkaSpoutConfig.builder(bootStrapServers, topic)
        .setGroupId(consumerGroupId)
        .setOffsetCommitPeriodMs(10_000)
        .setFirstPollOffsetStrategy(UNCOMMITTED_LATEST)
        .setMaxUncommittedOffsets(1000000)
        .setRetry(kafkaSpoutRetryService)
        .setRecordTranslator
                (new TupleBuilder(), outputFields, topic )
        .build();

3 - 完成上述步骤后,拓扑可以包含上面创建的 spoutConf,如下所示。

TopologyBuilder builder = new TopologyBuilder();
Config conf = new Config();
conf.setNumWorkers(1);
builder.setSpout(KAFKA_SPOUT,  new KafkaSpout(spoutConf), 1);

更多详情here