The post assumes user already have a zookeeper cluster set up on two hostname:port:
192.168.2.2:2181
192.168.2.4:2181
The post also assumes that the Kafka brokers runs at the following hostname:port:
192.168.2.2:9092
192.168.2.4:9092
Details of how to set up ZooKeeper and Kafka cluster can be found at the following links:
http://czcodezone.blogspot.sg/2014/11/setup-zookeeper-in-cluster.html
http://czcodezone.blogspot.sg/2014/11/setup-kafka-in-cluster.html
The Kafka producer source codes can be downloaded from the link: https://dl.dropboxusercontent.com/u/113201788/kafka-producer.zip. The details about how to implement a simple Kafka producer in java can be found at this link: http://czcodezone.blogspot.sg/2014/11/write-and-test-simple-kafka-producer.html).
To create the Storm topology which consumes messages from Kafka and persists them to MySQL database, create a Maven project (e.g. with groupId="com.memeanalytics" and artifactId="storm-kafka-mysql"). The complete source code of the project can be downloaded from the following link:
https://dl.dropboxusercontent.com/u/113201788/storm/storm-kafka-mysql.tar.gz
Maven setup: pom.xml
We will start by explaining setup in the pom.xml file. Firstly, we need to put in the clojars in the repositories:<repositories> <repository> <id>clojars</id> <url>http://clojars.org/repo</url> </repository> </repositories>
Next we add the mysql dependency into the dependencies section:
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.6</version> </dependency>
Next we add the storm-kafka-0.8-plus dependency into the dependencies section (for KafkaSpout which consumers Kafka messages in Storm):
<dependency> <groupId>net.wurstmeister.storm</groupId> <artifactId>storm-kafka-0.8-plus</artifactId> <version>0.4.0</version> </dependency>
Next we add the storm-core dependency into the dependencies section (for Storm):
<dependency> <groupId>storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.0.1</version> <scope>provided</scope> </dependency>
Next we add the exec-maven-plugin into the build/plugins section (for compile and execute java project):
<plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.2.1</version> <executions> <execution> <goals> <goal>exec</goal> </goals> </execution> </executions> <configuration> <includeProjectDependencies>true</includeProjectDependencies> <includePluginDependencies>false</includePluginDependencies> <executable>java</executable> <classpathScope>compile</classpathScope> <mainClass>${main.class}</mainClass> </configuration> </plugin>
Next we add the maven-assembly-plugin into the build/plugins section (for packaging the java project as jar to submit to the Storm cluster):
<plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.2.1</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descrptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin>
MySQL Connection and Management Classes
Next we will create a class that manages MySQL connection, the source codes of which are show below:package com.memeanalytics.storm_kafka_mysql; import java.sql.Connection; import java.sql.DriverManager; public class MySqlConnection { private String ip; private String database; private String username; private String password; private Connection conn; public MySqlConnection(String ip, String database, String username, String password) { this.ip=ip; this.database=database; this.username=username; this.password=password; } public Connection getConnection() { return conn; } public boolean open() { boolean successful=true; try{ Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://"+ip+"/"+database+"?"+"user="+username+"&password="+password); }catch(Exception ex) { successful=false; ex.printStackTrace(); } return successful; } public boolean close() { if(conn==null) { return false; } boolean successful=true; try{ conn.close(); }catch(Exception ex) { successful=false; ex.printStackTrace(); } return successful; } }
As illustrated above, the MySqlConnection manages the opening and closing of the connection.
Next we will create a class which saves the tuples emitted from the KafkaSpout into the MySQL server.
package com.memeanalytics.storm_kafka_mysql; import java.sql.PreparedStatement; import backtype.storm.tuple.Tuple; public class MySqlDump { private MySqlConnection conn; public MySqlDump(String ip, String database, String username, String password) { conn = new MySqlConnection(ip, database, username, password); conn.open(); } public void persist(Tuple tuple) { PreparedStatement statement=null; try{ statement = conn.getConnection().prepareStatement("insert into mylogtable (id, dataitem) values (default, ?)"); statement.setString(1, tuple.getString(0)); statement.executeUpdate(); }catch(Exception ex) { ex.printStackTrace(); }finally { if(statement != null) { try{ statement.close(); }catch(Exception ex) { ex.printStackTrace(); } } } } public void close() { conn.close(); } }
The above classes opens the mysql connection in its constructor, and will saves the tuple via its persist() method, it also has a close() method which can be invoked to close the mysql connection.
Before we proceed further, we need to create the necessary database and datatable in mysql so that data can be inserted into. For demo purpose, we create a very basic datatable named "mylogtable" in a database named "mylog". To do this, access the mysql server by running the command:
> mysql -u [username] -p
Once logged into the mysql, run the following mysql queries to create the mylogtable:
> create database mylog;
> user mylog;
> create table mylogtable( id INT NOT NULL AUTO_INCREMENT, dataitem VARCHAR(255) NOT NULL, PRIMARY KEY(id));
Storm Bolt to persists data
Once this is done. We are ready to create a Storm bolt which route the received tuple from KafkaSpout to the MySqlDump which saves it to the database:package com.memeanalytics.storm_kafka_mysql; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; public class MySqlDumpBolt extends BaseBasicBolt{ private static final long serialVersionUID = 1L; private MySqlDump mySqlDump; @Override public void prepare(Map stormConf, TopologyContext context) { mySqlDump=new MySqlDump("localhost", "mylog","root","[username]"); } public void execute(Tuple input, BasicOutputCollector collector) { // TODO Auto-generated method stub mySqlDump.persist(input); //System.out.println(input); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } @Override public void cleanup() { mySqlDump.close(); } }
As can be seen above, the bolt opens the mysql database connection in its prepare() method and close the mysql database connection in its cleanup() method. it also persists the tuple received into mysql datatable in its execute() method.
Submit Topology in main()
Now we can implement the main() method to submit a topology. For simplicity, we only uses local cluster:package com.memeanalytics.storm_kafka_mysql; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; public class App { public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException { ZkHosts zkHosts=new ZkHosts("192.168.2.4:2181"); String topic="test-topic"; String consumer_group_id="id7"; SpoutConfig kafkaConfig=new SpoutConfig(zkHosts, topic, "", consumer_group_id); kafkaConfig.forceFromStart=true; kafkaConfig.scheme=new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout=new KafkaSpout(kafkaConfig); TopologyBuilder builder=new TopologyBuilder(); builder.setSpout("KafkaSpout", kafkaSpout); builder.setBolt("MySqlBolt", new MySqlDumpBolt()).globalGrouping("KafkaSpout"); LocalCluster cluster=new LocalCluster(); Config config=new Config(); cluster.submitTopology("MySqlDemoTopology", config, builder.createTopology()); try{ Thread.sleep(10000); }catch(InterruptedException ex) { ex.printStackTrace(); } cluster.killTopology("MySqlDemoTopology"); cluster.shutdown(); } }
Execute Maven project
Now navigate to the project root folder and run the following command:> mvn exec:java -Dmain.class=com.memeanalytics.storm_kafka_mysql.App
Note that if you change the ${main.class} to com.memeanalytics.storm_kafka_mysql.App, that you don't need to add the -Dmain.class arguments in the above command.
No comments:
Post a Comment