<repositories>
...
<repository>
<id>github-releases</id>
<url>http://oss.sonatype.org/content/repositories/github-releases/</url>
</repository>
<repository>
<id>clojars</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
In the dependencies section of the pom.xml, include the storm-kafka-0.8-plus (which contains the KafkaSpout which is a spout in the storm cluster that act as consumer to Kafka) and storm-core:
<dependencies>
...
<dependency>
<groupId>net.wurstmeister.storm</groupId>
<artifactId>storm-kafka-0.8-plus</artifactId>
<version>0.4.0</version>
</dependency>
<dependency>
<groupId>storm</groupId>
<artifactId>storm-core<artifactId>
<version>0.9.0.1</version>
</dependency>
<!--Utility dependencies-->
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>15.0</version>
<dependency>
</dependencies>
In the build/plugins section of the pom.xml, include the maven-assembly-plugin [version:2.2.1] (contains maven plugin for jar packaging of the storm topology project, which can be submitted into storm cluster, instruction can be found at this link: http://czcodezone.blogspot.sg/2014/11/maven-pom-configuration-for-maven-build.html) and exec-maven-plugin [version:1.2.1] (contains maven plugin to execute java program, instruction can be found at this link: http://czcodezone.blogspot.sg/2014/11/maven-add-plugin-to-pom-configuration.html).
Now we are ready to create storm topology which pull data from Kafka messaging system. The topology will be very simple, it will use a KafkaSpout which reads message from Kafka broker and emit a tuple consisting the message to a very simple bolt which prints the content of the tuple out. The simple bolt, PrinterBolt, has the following implementation:
package com.memeanalytics.kafka_consumer_storm;
import org.apache.commons.lang.StringUtils;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class PrinterBolt extends BaseBasicBolt {
private static final long serialVersionUID = 1L;
public void execute(Tuple input, BasicOutputCollector collector) {
// TODO Auto-generated method stub
String word=input.getString(0);
if(StringUtils.isBlank(word))
{
return;
}
System.out.println("Word: "+word);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
}
Below is the implementation of the main class com.memeanalytics.kafka_consumer_storm.App, which creates a KafkaSpout which emits tuple to the PrinterBolt above (LocalCluster is used but will be change to StormSubmitter in production environment):
package com.memeanalytics.kafka_consumer_storm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
public class App
{
public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException
{
ZkHosts zkHosts=new ZkHosts("192.168.2.4:2181");
String topic_name="test-topic";
String consumer_group_id="id7";
String zookeeper_root="";
SpoutConfig kafkaConfig=new SpoutConfig(zkHosts, topic_name, zookeeper_root, consumer_group_id);
kafkaConfig.scheme=new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.forceFromStart=true;
TopologyBuilder builder=new TopologyBuilder();
builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);
builder.setBolt("PrinterBolt", new PrinterBolt()).globalGrouping("KafkaSpout");
Config config=new Config();
LocalCluster cluster=new LocalCluster();
cluster.submitTopology("KafkaConsumerTopology", config, builder.createTopology());
try{
Thread.sleep(60000);
}catch(InterruptedException ex)
{
ex.printStackTrace();
}
cluster.killTopology("KafkaConsumerTopology");
cluster.shutdown();
}
}
As can be seen, the main component consists of create a SpoutConfig which is configured for the KafkaSpout object. The SpoutConfig in this case specifies the following information:
--zookeeper 192.168.2.4:2181
--topic test-topic
--consumer-group-id: id7
--from-beginning
--string-scheme
The complete source codes can be downloaded from the link below:
https://dl.dropboxusercontent.com/u/113201788/storm/kafka-consumer-storm.zip
Now before executing the project, we must have the kafka cluster running (follow instructions from this link: http://czcodezone.blogspot.sg/2014/11/setup-kafka-in-cluster.html) and a Kafka producer running (following instructions from this link: http://czcodezone.blogspot.sg/2014/11/write-and-test-simple-kafka-producer.html). Now compile and run the kafka-consumer-storm project by running the following command from the project's root directory in the terminal :
> mvn clean compile exec:java -Dmain.class=com.memeanalytics.kafka_consumer_storm.App
You will now see that words produced by the Kafka producer being printed by the storm's PrinterBolt.
No comments:
Post a Comment