Saturday, November 22, 2014

Integrate Kafka with Storm

Create a Maven project (for example, with groupId = "com.memeanalytics" and artifactId = "kafka-consumer-storm"). and then modify the pom.xml file to include the following repository:

<repositories>
...
<repository>
<id>github-releases</id>
<url>http://oss.sonatype.org/content/repositories/github-releases/</url>
</repository>
<repository>
<id>clojars</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>

In the dependencies section of the pom.xml, include the storm-kafka-0.8-plus (which contains the KafkaSpout which is a spout in the storm cluster that act as consumer to Kafka) and storm-core:

<dependencies>
...
<dependency>
<groupId>net.wurstmeister.storm</groupId>
<artifactId>storm-kafka-0.8-plus</artifactId>
<version>0.4.0</version>
</dependency>
<dependency>
<groupId>storm</groupId>
<artifactId>storm-core<artifactId>
<version>0.9.0.1</version>
</dependency>

<!--Utility dependencies-->
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>15.0</version>
<dependency>

</dependencies>

In the build/plugins section of the pom.xml, include the maven-assembly-plugin [version:2.2.1] (contains maven plugin for jar packaging of the storm topology project, which can be submitted into storm cluster, instruction can be found at this link: http://czcodezone.blogspot.sg/2014/11/maven-pom-configuration-for-maven-build.html) and exec-maven-plugin [version:1.2.1] (contains maven plugin to execute java program, instruction can be found at this link: http://czcodezone.blogspot.sg/2014/11/maven-add-plugin-to-pom-configuration.html).

Now we are ready to create storm topology which pull data from Kafka messaging system. The topology will be very simple, it will use a KafkaSpout which reads message from Kafka broker and emit a tuple consisting the message to a very simple bolt which prints the content of the tuple out. The simple bolt, PrinterBolt, has the following implementation:

package com.memeanalytics.kafka_consumer_storm;

import org.apache.commons.lang.StringUtils;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class PrinterBolt extends BaseBasicBolt {
 private static final long serialVersionUID = 1L;

 public void execute(Tuple input, BasicOutputCollector collector) {
  // TODO Auto-generated method stub
  String word=input.getString(0);
  if(StringUtils.isBlank(word))
  {
   return;
  }
  
  System.out.println("Word: "+word);
 }

 public void declareOutputFields(OutputFieldsDeclarer declarer) {
  // TODO Auto-generated method stub
  
 }
}

Below is the implementation of the main class com.memeanalytics.kafka_consumer_storm.App, which creates a KafkaSpout which emits tuple to the PrinterBolt above (LocalCluster is used but will be change to StormSubmitter in production environment):

package com.memeanalytics.kafka_consumer_storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;

public class App 
{
    public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException
    {
        ZkHosts zkHosts=new ZkHosts("192.168.2.4:2181");
        
        String topic_name="test-topic";
        String consumer_group_id="id7";
        String zookeeper_root="";
        SpoutConfig kafkaConfig=new SpoutConfig(zkHosts, topic_name, zookeeper_root, consumer_group_id);
        
        kafkaConfig.scheme=new SchemeAsMultiScheme(new StringScheme());
        kafkaConfig.forceFromStart=true;
        
        TopologyBuilder builder=new TopologyBuilder();
        builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);
        builder.setBolt("PrinterBolt", new PrinterBolt()).globalGrouping("KafkaSpout");
        
        Config config=new Config();
        
        LocalCluster cluster=new LocalCluster();
        
        cluster.submitTopology("KafkaConsumerTopology", config, builder.createTopology());
        
        try{
         Thread.sleep(60000);
        }catch(InterruptedException ex)
        {
         ex.printStackTrace();
        }
        
        cluster.killTopology("KafkaConsumerTopology");
        cluster.shutdown();
    }
}

As can be seen, the main component consists of create a SpoutConfig which is configured for the KafkaSpout object. The SpoutConfig in this case specifies the following information:

--zookeeper 192.168.2.4:2181
--topic test-topic
--consumer-group-id: id7
--from-beginning
--string-scheme

The complete source codes can be downloaded from the link below:

https://dl.dropboxusercontent.com/u/113201788/storm/kafka-consumer-storm.zip

Now before executing the project, we must have the kafka cluster running (follow instructions from this link: http://czcodezone.blogspot.sg/2014/11/setup-kafka-in-cluster.html) and a Kafka producer running (following instructions from this link: http://czcodezone.blogspot.sg/2014/11/write-and-test-simple-kafka-producer.html). Now compile and run the kafka-consumer-storm project by running the following command from the project's root directory in the terminal :

> mvn clean compile exec:java -Dmain.class=com.memeanalytics.kafka_consumer_storm.App

You will now see that words produced by the Kafka producer being printed by the storm's PrinterBolt.

No comments:

Post a Comment