Storm 在 java 中从 Kafka 读取数据
Storm read data from Kafka in java
我写了一个生产者 Kafka 从 MySQL 读取数据,一个消费者 Kafka Kafka 从生产者那里检索数据。
效果很好。这是我的代码:
[卡夫卡生产者]
import java.util.Properties;
import java.sql.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class ProducerTest {
public static void main(String[] args) throws ClassNotFoundException, SQLException
{
Properties props = new Properties();
props.put("zk.connect","localhost:2181");
props.put("serializer.class","kafka.serializer.StringEncoder");
props.put("metadata.broker.list","localhost:9092");
ProducerConfig config = new ProducerConfig(props);
Producer producer = new Producer(config);
try
{
Class.forName("com.mysql.jdbc.Driver");
Connection con = DriverManager.getConnection(
"jdbc:mysql://172.18.67.8:3306/big_data","root","root");
Statement stmt = con.createStatement();
ResultSet rs = stmt.executeQuery("select * from content_log");
while(rs.next())
{
producer.send(new KeyedMessage("lamtest",rs.getString(1) + " " + rs.getString(2)+" "+rs.getString(3)+" "+rs.getString(4)+" "+rs.getString(5)+ " "+ rs.getString(6)
+" "+ rs.getString(7)
+" "+ rs.getString(8)
+" "+ rs.getString(9)
+" "+ rs.getString(10)
+" "+ rs.getString(11)
+" "+ rs.getString(12)
+" "+ rs.getString(13)
+" "+ rs.getString(14)
+" "+ rs.getString(15)
+" "+ rs.getString(17)
+" "+ rs.getString(18)
+" "+ rs.getString(19)
+" "+ rs.getString(21)
+" "+ rs.getString(22)
));
}
con.close();
}
catch(Exception e)
{
System.out.println(e);
}
}
}
[卡夫卡消费者]
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class SimpleHLConsumer {
private final ConsumerConnector consumer;
private final String topic;
public SimpleHLConsumer(String zookeeper, String groupId, String topic) {
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
this.topic = topic;
}
public void testConsumer() {
Map<String, Integer> topicCount = new HashMap<>();
topicCount.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
for (final KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println("Message from Single Topic: " + new String(it.next().message()));
}
}
if (consumer != null) {
consumer.shutdown();
}
}
public static void main(String[] args) {
String topic = "lamtest";
SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("localhost:2181", "testgroup", topic);
simpleHLConsumer.testConsumer();
}
}
现在,我想整合 Kafka 和 Storm。我想从 Kafka 消费者读取数据到 Storm spout。
请帮我。谢谢
有一个 storm-kafka 库可以为 kafka 提供风暴喷口。
http://storm.apache.org/releases/1.0.1/storm-kafka.html
https://mvnrepository.com/artifact/org.apache.storm/storm-kafka/1.0.1
Storm 1.1.X 提供一个外部 storm kafka 客户端,我们可以使用它来构建 storm 拓扑。请注意,这是对 Kafka 0.10 及更高版本的支持。使用这个:
1 - 将以下依赖项添加到您的 pom.xml
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>1.1.1-SNAPSHOT</version>
</dependency>
2 - 拓扑的 kafka spout 实现是使用 KafkaSpoutConfig 配置的。下面是示例配置对象创建。
KafkaSpoutConfig spoutConf = KafkaSpoutConfig.builder(bootStrapServers, topic)
.setGroupId(consumerGroupId)
.setOffsetCommitPeriodMs(10_000)
.setFirstPollOffsetStrategy(UNCOMMITTED_LATEST)
.setMaxUncommittedOffsets(1000000)
.setRetry(kafkaSpoutRetryService)
.setRecordTranslator
(new TupleBuilder(), outputFields, topic )
.build();
3 - 完成上述步骤后,拓扑可以包含上面创建的 spoutConf,如下所示。
TopologyBuilder builder = new TopologyBuilder();
Config conf = new Config();
conf.setNumWorkers(1);
builder.setSpout(KAFKA_SPOUT, new KafkaSpout(spoutConf), 1);
更多详情here。
我写了一个生产者 Kafka 从 MySQL 读取数据,一个消费者 Kafka Kafka 从生产者那里检索数据。
效果很好。这是我的代码: [卡夫卡生产者]
import java.util.Properties;
import java.sql.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class ProducerTest {
public static void main(String[] args) throws ClassNotFoundException, SQLException
{
Properties props = new Properties();
props.put("zk.connect","localhost:2181");
props.put("serializer.class","kafka.serializer.StringEncoder");
props.put("metadata.broker.list","localhost:9092");
ProducerConfig config = new ProducerConfig(props);
Producer producer = new Producer(config);
try
{
Class.forName("com.mysql.jdbc.Driver");
Connection con = DriverManager.getConnection(
"jdbc:mysql://172.18.67.8:3306/big_data","root","root");
Statement stmt = con.createStatement();
ResultSet rs = stmt.executeQuery("select * from content_log");
while(rs.next())
{
producer.send(new KeyedMessage("lamtest",rs.getString(1) + " " + rs.getString(2)+" "+rs.getString(3)+" "+rs.getString(4)+" "+rs.getString(5)+ " "+ rs.getString(6)
+" "+ rs.getString(7)
+" "+ rs.getString(8)
+" "+ rs.getString(9)
+" "+ rs.getString(10)
+" "+ rs.getString(11)
+" "+ rs.getString(12)
+" "+ rs.getString(13)
+" "+ rs.getString(14)
+" "+ rs.getString(15)
+" "+ rs.getString(17)
+" "+ rs.getString(18)
+" "+ rs.getString(19)
+" "+ rs.getString(21)
+" "+ rs.getString(22)
));
}
con.close();
}
catch(Exception e)
{
System.out.println(e);
}
}
}
[卡夫卡消费者]
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class SimpleHLConsumer {
private final ConsumerConnector consumer;
private final String topic;
public SimpleHLConsumer(String zookeeper, String groupId, String topic) {
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
this.topic = topic;
}
public void testConsumer() {
Map<String, Integer> topicCount = new HashMap<>();
topicCount.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
for (final KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println("Message from Single Topic: " + new String(it.next().message()));
}
}
if (consumer != null) {
consumer.shutdown();
}
}
public static void main(String[] args) {
String topic = "lamtest";
SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("localhost:2181", "testgroup", topic);
simpleHLConsumer.testConsumer();
}
}
现在,我想整合 Kafka 和 Storm。我想从 Kafka 消费者读取数据到 Storm spout。 请帮我。谢谢
有一个 storm-kafka 库可以为 kafka 提供风暴喷口。
http://storm.apache.org/releases/1.0.1/storm-kafka.html
https://mvnrepository.com/artifact/org.apache.storm/storm-kafka/1.0.1
Storm 1.1.X 提供一个外部 storm kafka 客户端,我们可以使用它来构建 storm 拓扑。请注意,这是对 Kafka 0.10 及更高版本的支持。使用这个:
1 - 将以下依赖项添加到您的 pom.xml
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>1.1.1-SNAPSHOT</version>
</dependency>
2 - 拓扑的 kafka spout 实现是使用 KafkaSpoutConfig 配置的。下面是示例配置对象创建。
KafkaSpoutConfig spoutConf = KafkaSpoutConfig.builder(bootStrapServers, topic)
.setGroupId(consumerGroupId)
.setOffsetCommitPeriodMs(10_000)
.setFirstPollOffsetStrategy(UNCOMMITTED_LATEST)
.setMaxUncommittedOffsets(1000000)
.setRetry(kafkaSpoutRetryService)
.setRecordTranslator
(new TupleBuilder(), outputFields, topic )
.build();
3 - 完成上述步骤后,拓扑可以包含上面创建的 spoutConf,如下所示。
TopologyBuilder builder = new TopologyBuilder();
Config conf = new Config();
conf.setNumWorkers(1);
builder.setSpout(KAFKA_SPOUT, new KafkaSpout(spoutConf), 1);
更多详情here。