<repositories>
...
<repository>
<id>github-releases</id>
<url>http://oss.sonatype.org/content/repositories/github-releases/</url>
</repository>
<repository>
<id>clojars</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
In the dependencies section of the pom.xml, include the storm-kafka-0.8-plus (which contains the KafkaSpout which is a spout in the storm cluster that act as consumer to Kafka) and storm-core:
<dependencies>
...
<dependency>
<groupId>net.wurstmeister.storm</groupId>
<artifactId>storm-kafka-0.8-plus</artifactId>
<version>0.4.0</version>
</dependency>
<dependency>
<groupId>storm</groupId>
<artifactId>storm-core<artifactId>
<version>0.9.0.1</version>
</dependency>
<!--Utility dependencies-->
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>15.0</version>
<dependency>
</dependencies>
In the build/plugins section of the pom.xml, include the maven-assembly-plugin [version:2.2.1] (contains maven plugin for jar packaging of the storm topology project, which can be submitted into storm cluster, instruction can be found at this link: http://czcodezone.blogspot.sg/2014/11/maven-pom-configuration-for-maven-build.html) and exec-maven-plugin [version:1.2.1] (contains maven plugin to execute java program, instruction can be found at this link: http://czcodezone.blogspot.sg/2014/11/maven-add-plugin-to-pom-configuration.html).
Now we are ready to create storm topology which pull data from Kafka messaging system. The topology will be very simple, it will use a KafkaSpout which reads message from Kafka broker and emit a tuple consisting the message to a very simple bolt which prints the content of the tuple out. The simple bolt, PrinterBolt, has the following implementation:
package com.memeanalytics.kafka_consumer_storm; import org.apache.commons.lang.StringUtils; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; public class PrinterBolt extends BaseBasicBolt { private static final long serialVersionUID = 1L; public void execute(Tuple input, BasicOutputCollector collector) { // TODO Auto-generated method stub String word=input.getString(0); if(StringUtils.isBlank(word)) { return; } System.out.println("Word: "+word); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } }
Below is the implementation of the main class com.memeanalytics.kafka_consumer_storm.App, which creates a KafkaSpout which emits tuple to the PrinterBolt above (LocalCluster is used but will be change to StormSubmitter in production environment):
package com.memeanalytics.kafka_consumer_storm; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; public class App { public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException { ZkHosts zkHosts=new ZkHosts("192.168.2.4:2181"); String topic_name="test-topic"; String consumer_group_id="id7"; String zookeeper_root=""; SpoutConfig kafkaConfig=new SpoutConfig(zkHosts, topic_name, zookeeper_root, consumer_group_id); kafkaConfig.scheme=new SchemeAsMultiScheme(new StringScheme()); kafkaConfig.forceFromStart=true; TopologyBuilder builder=new TopologyBuilder(); builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1); builder.setBolt("PrinterBolt", new PrinterBolt()).globalGrouping("KafkaSpout"); Config config=new Config(); LocalCluster cluster=new LocalCluster(); cluster.submitTopology("KafkaConsumerTopology", config, builder.createTopology()); try{ Thread.sleep(60000); }catch(InterruptedException ex) { ex.printStackTrace(); } cluster.killTopology("KafkaConsumerTopology"); cluster.shutdown(); } }
As can be seen, the main component consists of create a SpoutConfig which is configured for the KafkaSpout object. The SpoutConfig in this case specifies the following information:
--zookeeper 192.168.2.4:2181
--topic test-topic
--consumer-group-id: id7
--from-beginning
--string-scheme
The complete source codes can be downloaded from the link below:
https://dl.dropboxusercontent.com/u/113201788/storm/kafka-consumer-storm.zip
Now before executing the project, we must have the kafka cluster running (follow instructions from this link: http://czcodezone.blogspot.sg/2014/11/setup-kafka-in-cluster.html) and a Kafka producer running (following instructions from this link: http://czcodezone.blogspot.sg/2014/11/write-and-test-simple-kafka-producer.html). Now compile and run the kafka-consumer-storm project by running the following command from the project's root directory in the terminal :
> mvn clean compile exec:java -Dmain.class=com.memeanalytics.kafka_consumer_storm.App
You will now see that words produced by the Kafka producer being printed by the storm's PrinterBolt.
No comments:
Post a Comment