Monday, November 24, 2014

Integration of Kafka-Storm-MySQL

This post will discuss how to create a minimum basic storm topology which integrate Kafka, Storm and MySQL. The scenerios is: a Kafka producer will push some dummy data to the Kafka brokers, and a KafkaSpout (with consumer group id = "id7" and zookeeper = "192.168.2.4:2181") from storm cluster consumes data from the Kafka brokers. The KafkaSpout will then emits Kafka messages as tuples to a BaseBasicBolt which will then persists the data to the MySQL server.

The post assumes user already have a zookeeper cluster set up on two hostname:port:

192.168.2.2:2181
192.168.2.4:2181

The post also assumes that the Kafka brokers runs at the following hostname:port:

192.168.2.2:9092
192.168.2.4:9092

Details of how to set up ZooKeeper and Kafka cluster can be found at the following links:

http://czcodezone.blogspot.sg/2014/11/setup-zookeeper-in-cluster.html
http://czcodezone.blogspot.sg/2014/11/setup-kafka-in-cluster.html

The Kafka producer source codes can be downloaded from the link: https://dl.dropboxusercontent.com/u/113201788/kafka-producer.zip. The details about how to implement a simple Kafka producer in java can be found at this link: http://czcodezone.blogspot.sg/2014/11/write-and-test-simple-kafka-producer.html).

To create the Storm topology which consumes messages from Kafka and persists them to MySQL database, create a Maven project (e.g. with groupId="com.memeanalytics" and artifactId="storm-kafka-mysql"). The complete source code of the project can be downloaded from the following link:

https://dl.dropboxusercontent.com/u/113201788/storm/storm-kafka-mysql.tar.gz

Maven setup: pom.xml

We will start by explaining setup in the pom.xml file. Firstly, we need to put in the clojars in the repositories:

  <repositories>
  <repository>
  <id>clojars</id>
  <url>http://clojars.org/repo</url>
  </repository>
  </repositories>

Next we add the mysql dependency into the dependencies section:

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.6</version>
</dependency>

Next we add the storm-kafka-0.8-plus dependency into the dependencies section (for KafkaSpout which consumers Kafka messages in Storm):

<dependency>
<groupId>net.wurstmeister.storm</groupId>
<artifactId>storm-kafka-0.8-plus</artifactId>
<version>0.4.0</version>
</dependency>

Next we add the storm-core dependency into the dependencies section (for Storm):

<dependency>
<groupId>storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.0.1</version>
<scope>provided</scope>
</dependency>

Next we add the exec-maven-plugin into the build/plugins section (for compile and execute java project):

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<executable>java</executable>
<classpathScope>compile</classpathScope>
<mainClass>${main.class}</mainClass>
</configuration>
</plugin>

Next we add the maven-assembly-plugin into the build/plugins section (for packaging the java project as jar to submit to the Storm cluster):

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descrptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

MySQL Connection and Management Classes

Next we will create a class that manages MySQL connection, the source codes of which are show below:

package com.memeanalytics.storm_kafka_mysql;

import java.sql.Connection;
import java.sql.DriverManager;


public class MySqlConnection {
 private String ip;
 private String database;
 private String username;
 private String password;
 private Connection conn;
 
 public MySqlConnection(String ip, String database, String username, String password)
 {
  this.ip=ip;
  this.database=database;
  this.username=username;
  this.password=password;
 }
 
 public Connection getConnection()
 {
  return conn;
 }
 
 public boolean open()
 {
  boolean successful=true;
  try{
   Class.forName("com.mysql.jdbc.Driver");
   conn = DriverManager.getConnection("jdbc:mysql://"+ip+"/"+database+"?"+"user="+username+"&password="+password);
  }catch(Exception ex)
  {
   successful=false;
   ex.printStackTrace();
  }
  return successful;
 }
 
 public boolean close()
 {
  if(conn==null)
  {
   return false;
  }
  
  boolean successful=true;
  try{
   conn.close();
  }catch(Exception ex)
  {
   successful=false;
   ex.printStackTrace();
  }
  
  return successful;
 }
}

As illustrated above, the MySqlConnection manages the opening and closing of the connection.

Next we will create a class which saves the tuples emitted from the KafkaSpout into the MySQL server.

package com.memeanalytics.storm_kafka_mysql;

import java.sql.PreparedStatement;

import backtype.storm.tuple.Tuple;

public class MySqlDump {
 private MySqlConnection conn;
 
 public MySqlDump(String ip, String database, String username, String password)
 {
  conn = new MySqlConnection(ip, database, username, password);
  conn.open();
 }
 
 public void persist(Tuple tuple)
 {
  PreparedStatement statement=null;
  try{
   statement = conn.getConnection().prepareStatement("insert into mylogtable (id, dataitem) values (default, ?)");
   statement.setString(1, tuple.getString(0));
   
   statement.executeUpdate();
  }catch(Exception ex)
  {
   ex.printStackTrace();
  }finally
  {
   if(statement != null)
   {
    try{
     statement.close();
    }catch(Exception ex)
    {
     ex.printStackTrace();
    }
   }
  }
 }
 
 public void close()
 {
  conn.close();
 }
}

The above classes opens the mysql connection in its constructor, and will saves the tuple via its persist() method, it also has a close() method which can be invoked to close the mysql connection.

Before we proceed further, we need to create the necessary database and datatable in mysql so that data can be inserted into. For demo purpose, we create a very basic datatable named "mylogtable" in a database named "mylog". To do this, access the mysql server by running the command:

> mysql -u [username] -p

Once logged into the mysql, run the following mysql queries to create the mylogtable:

> create database mylog;
> user mylog;
> create table mylogtable( id INT NOT NULL AUTO_INCREMENT, dataitem VARCHAR(255) NOT NULL, PRIMARY KEY(id));

Storm Bolt to persists data

Once this is done. We are ready to create a Storm bolt which route the received tuple from KafkaSpout to the MySqlDump which saves it to the database:

package com.memeanalytics.storm_kafka_mysql;

import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class MySqlDumpBolt extends BaseBasicBolt{

 private static final long serialVersionUID = 1L;
 private MySqlDump mySqlDump;

 @Override
 public void prepare(Map stormConf, TopologyContext context)
 {
  mySqlDump=new MySqlDump("localhost", "mylog","root","[username]");
 }
 
 public void execute(Tuple input, BasicOutputCollector collector) {
  // TODO Auto-generated method stub
  mySqlDump.persist(input);
  //System.out.println(input);
 }

 public void declareOutputFields(OutputFieldsDeclarer declarer) {
  // TODO Auto-generated method stub
  
 }
 
 @Override
    public void cleanup() {
  mySqlDump.close();
    }  

}

As can be seen above, the bolt opens the mysql database connection in its prepare() method and close the mysql database connection in its cleanup() method. it also persists the tuple received into mysql datatable in its execute() method.

Submit Topology in main()

Now we can implement the main() method to submit a topology. For simplicity, we only uses local cluster:

package com.memeanalytics.storm_kafka_mysql;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;

public class App 
{
    public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException
    {
        ZkHosts zkHosts=new ZkHosts("192.168.2.4:2181");
        
        String topic="test-topic";
        String consumer_group_id="id7";
        
        SpoutConfig kafkaConfig=new SpoutConfig(zkHosts, topic, "", consumer_group_id);
        
        kafkaConfig.forceFromStart=true;
        kafkaConfig.scheme=new SchemeAsMultiScheme(new StringScheme());
        
        KafkaSpout kafkaSpout=new KafkaSpout(kafkaConfig);
        
        TopologyBuilder builder=new TopologyBuilder();
        builder.setSpout("KafkaSpout", kafkaSpout);
        builder.setBolt("MySqlBolt", new MySqlDumpBolt()).globalGrouping("KafkaSpout");
        
        LocalCluster cluster=new LocalCluster();
        
        Config config=new Config();
        
        cluster.submitTopology("MySqlDemoTopology", config, builder.createTopology());
        
        try{
         Thread.sleep(10000);
        }catch(InterruptedException ex)
        {
         ex.printStackTrace();
        }
        
        cluster.killTopology("MySqlDemoTopology");
        cluster.shutdown();
    }
}

Execute Maven project

Now navigate to the project root folder and run the following command:

> mvn exec:java -Dmain.class=com.memeanalytics.storm_kafka_mysql.App

Note that if you change the ${main.class} to com.memeanalytics.storm_kafka_mysql.App, that you don't need to add the -Dmain.class arguments in the above command.

No comments:

Post a Comment