Monday, November 24, 2014

Integration of Kafka-Trident-MySQL

This post shows a most basic example in which user can integrate Kafka, Trident (on top of Storm) and MySQL. The example uses a Kafka producer which randomly produce messages to Kafka brokers (a random list of country names), a TransactionalTridentKafkaSpout is used pull data from Kafka messaging system and emits the tuples (containing the field "str" which is the country names from the Kafka producer) to a Trident operation that serialize the received messages into the mysql database.

Some ZooKeeper and Kafka settings need to be explained before we proceed. the source codes developed here assumes that the ZooKeepers runs on the following nodes:

192.168.2.2:2181
192.168.2.4:2181

and also assumes that the Kafka brokers runs at the following hostname:port:

192.168.2.2:9092
192.168.2.4:9092

The Kafka producer can be downloaded from the following link:

https://dl.dropboxusercontent.com/u/113201788/storm/kakfa-producer-for-trident.tar.gz

Basically the Kafka producer emits a random list of country names as messages sent to the Kafka brokers. The tutorial on how to implement Kafka producer can be found at:

Next we need to create Maven project (e.g. with groupId="com.memeanalytics" and artifactId="kafka-trident-consumer") which will consumes the Kafka message in a Trident topology and serialize it to mysql database. The source codes of the project can be downloaded from:

https://dl.dropboxusercontent.com/u/113201788/storm/kafka-trident-consumer.zip

Below we will explain how to prepare the pom.xml file, implement the storm operation for mysql serialization, as well as configuration of Trident topology which can consume messages from Kafka brokers.

Prepare pom.xml

In the pom.xml, first add in the clojars repository in the repositories section:

  <repositories>
  <repository>
  <id>clojars</id>
  <url>http://clojars.org/repo</url>
  </repository>
  </repositories>

Next add in the storm-kafka-0.8-plus dependency in the dependencies section (for TransactionalTridentKafkaSpout):

<dependency>
  <groupId>net.wurstmeister.storm</groupId>
  <artifactId>storm-kafka-0.8-plus</artifactId>
  <version>0.4.0</version>
</dependency>

Next add in the storm-core dependency in the dependencies section (for storm):

<dependency>
  <groupId>storm</groupId>
  <artifactId>storm-core</artifactId>
  <version>0.9.0.1</version>
</dependency>

Next add in the mysql-connector-java dependency in the dependencies section (for mysql):

<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.6</version>
</dependency>

Next add in the maven-assembly-plugin in the build/plugins section (for packaging the Maven project as jar for submitting to Storm cluster):

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

Next add in the exec-maven-plugin in the build/plugins section (for executing the Maven project):

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>

<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>

<configuration>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<executable>java</executable>
<classpathScope>compile</classpathScope>
<mainClass>com.memeanalytics.kafka_trident_consumer.App</mainClass>
</configuration>

</plugin>

This completes the pom.xml configuration. Next we will implements the Trident operation which serializes the TridentTuple data into mysql database.

Trident operation for mysql serialization

The Trident operation which serializes the TridentTuple data is a BaseFilter object from Trident that has the following implementation:

package com.memeanalytics.kafka_trident_consumer;

import java.util.Map;

import storm.trident.operation.BaseFilter;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;

public class TridentUtils {
 public static class MySqlPersist extends BaseFilter{

  private static final long serialVersionUID = 1L;

  private MySqlDump mysqlSerializer=null;
  
  public boolean isKeep(TridentTuple tuple) {
   String country=tuple.getString(0);
   mysqlSerializer.store(country);
   System.out.println("Country: "+country);
   return true;
  }
  
  @Override
     public void prepare(Map conf, TridentOperationContext context) {
   mysqlSerializer=new MySqlDump("localhost","mylog", "root", "[username]");
     }

     @Override
     public void cleanup() {
      mysqlSerializer.close();
     }
  
 }
}

In the above implementation, the mysqlSerializer member variable is responsible for actually storing the data in mysql database. it opens the mysql connection (in its constructor) in prepare() method and closes the mysql connection in cleanup() method. the data serialization happens in the execute() method. Below is the implementation of the class of the variable:

package com.memeanalytics.kafka_trident_consumer;

import java.sql.Connection;
import java.sql.SQLException;

import java.sql.PreparedStatement;

public class MySqlDump {
 private String ip;
 private String database;
 private String username;
 private String password;
 private Connection conn;
 
 public MySqlDump(String ip, String database, String username, String password)
 {
  this.ip = ip;
  this.database=database;
  this.username=username;
  this.password=password;
  conn=MySqlConnectionGenerator.open(ip, database, username, password);
 }
 
 public void store(String dataitem)
 {
  if(conn==null) return;
  
  PreparedStatement statement = null;
  try{
   statement = conn.prepareStatement("insert into mylogtable (id, dataitem) values (default, ?)");
   statement.setString(1, dataitem);
   statement.executeUpdate();
  }catch(Exception ex)
  {
   ex.printStackTrace();
  }finally
  {
   if(statement != null)
   {
    try {
     statement.close();
    } catch (SQLException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
    }
   }
  }
 }
 
 public void close()
 {
  if(conn==null) return;
  try {
   conn.close();
  } catch (SQLException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }
}
package com.memeanalytics.kafka_trident_consumer;

import java.sql.Connection;
import java.sql.DriverManager;

public class MySqlConnectionGenerator {
 public static Connection open(String ip, String database, String username, String password)
 {
  Connection conn = null;
  try{
   Class.forName("com.mysql.jdbc.Driver");
   conn=DriverManager.getConnection("jdbc:mysql://"+ip+"/"+database+"?user="+username+"&password="+password);
  }catch(Exception ex)
  {
   ex.printStackTrace();
  }
  
  return conn;
 }
}

Trident topology implementation

Once this is completed. we are ready to implement the Trident topology which consumes data from Kafka and saves it to mysql. this is implemented in the main class:

package com.memeanalytics.kafka_trident_consumer;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.tuple.Fields;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.trident.TransactionalTridentKafkaSpout;
import storm.kafka.trident.TridentKafkaConfig;
import storm.trident.TridentTopology;

public class App 
{
    public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException
    {
        ZkHosts zkHosts=new ZkHosts("192.168.2.4:2181");
        
        String topic="country-topic";
        String consumer_group_id="storm";
        
        TridentKafkaConfig kafkaConfig=new TridentKafkaConfig(zkHosts, topic, consumer_group_id);
        
        kafkaConfig.scheme=new SchemeAsMultiScheme(new StringScheme());
        kafkaConfig.forceFromStart=true;
        
        TransactionalTridentKafkaSpout spout=new TransactionalTridentKafkaSpout(kafkaConfig);
        
        TridentTopology topology=new TridentTopology();
        
        topology.newStream("spout", spout).shuffle().each(new Fields("str"), new TridentUtils.MySqlPersist());
        
        LocalCluster cluster=new LocalCluster();
        
        Config config=new Config();
        
        cluster.submitTopology("KafkaTridentMysqlDemo", config, topology.build());
        
        try{
         Thread.sleep(10000);
        }catch(InterruptedException ex)
        {
         ex.printStackTrace();
        }
        
        cluster.killTopology("KafkaTridentMysqlDemo");
        cluster.shutdown();
    }
}

For demo purpose, its uses LocalCluster to submit and run the Trident topology and does not implement a DRPCStream which can be used to queried result. The main change is the TractionalTridentKafkaSpout which takes in a TridentKafkaConfig object as parameter in its constructor. The TransactionalTridentKafkaSpout emits tuples which is serialized by the TridentUtil.MySqlPersist that is the mysql serialization Trident operation.

Once done, we can compile and exec the Maven project by navigating to its root folder and run the following command:

> mvn compile exec:java

No comments:

Post a Comment