Saturday, November 22, 2014

Integrate Kafka with Storm

Create a Maven project (for example, with groupId = "com.memeanalytics" and artifactId = "kafka-consumer-storm"). and then modify the pom.xml file to include the following repository:


In the dependencies section of the pom.xml, include the storm-kafka-0.8-plus (which contains the KafkaSpout which is a spout in the storm cluster that act as consumer to Kafka) and storm-core:


<!--Utility dependencies-->


In the build/plugins section of the pom.xml, include the maven-assembly-plugin [version:2.2.1] (contains maven plugin for jar packaging of the storm topology project, which can be submitted into storm cluster, instruction can be found at this link: and exec-maven-plugin [version:1.2.1] (contains maven plugin to execute java program, instruction can be found at this link:

Now we are ready to create storm topology which pull data from Kafka messaging system. The topology will be very simple, it will use a KafkaSpout which reads message from Kafka broker and emit a tuple consisting the message to a very simple bolt which prints the content of the tuple out. The simple bolt, PrinterBolt, has the following implementation:

package com.memeanalytics.kafka_consumer_storm;

import org.apache.commons.lang.StringUtils;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class PrinterBolt extends BaseBasicBolt {
 private static final long serialVersionUID = 1L;

 public void execute(Tuple input, BasicOutputCollector collector) {
  // TODO Auto-generated method stub
  String word=input.getString(0);
  System.out.println("Word: "+word);

 public void declareOutputFields(OutputFieldsDeclarer declarer) {
  // TODO Auto-generated method stub

Below is the implementation of the main class com.memeanalytics.kafka_consumer_storm.App, which creates a KafkaSpout which emits tuple to the PrinterBolt above (LocalCluster is used but will be change to StormSubmitter in production environment):

package com.memeanalytics.kafka_consumer_storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;

public class App 
    public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException
        ZkHosts zkHosts=new ZkHosts("");
        String topic_name="test-topic";
        String consumer_group_id="id7";
        String zookeeper_root="";
        SpoutConfig kafkaConfig=new SpoutConfig(zkHosts, topic_name, zookeeper_root, consumer_group_id);
        kafkaConfig.scheme=new SchemeAsMultiScheme(new StringScheme());
        TopologyBuilder builder=new TopologyBuilder();
        builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);
        builder.setBolt("PrinterBolt", new PrinterBolt()).globalGrouping("KafkaSpout");
        Config config=new Config();
        LocalCluster cluster=new LocalCluster();
        cluster.submitTopology("KafkaConsumerTopology", config, builder.createTopology());
        }catch(InterruptedException ex)

As can be seen, the main component consists of create a SpoutConfig which is configured for the KafkaSpout object. The SpoutConfig in this case specifies the following information:

--topic test-topic
--consumer-group-id: id7

The complete source codes can be downloaded from the link below:

Now before executing the project, we must have the kafka cluster running (follow instructions from this link: and a Kafka producer running (following instructions from this link: Now compile and run the kafka-consumer-storm project by running the following command from the project's root directory in the terminal :

> mvn clean compile exec:java -Dmain.class=com.memeanalytics.kafka_consumer_storm.App

You will now see that words produced by the Kafka producer being printed by the storm's PrinterBolt.

No comments:

Post a Comment