Saturday, November 22, 2014

Write and test a simple Kafka producer

First we would need to start a zookeeper cluster


Now create a Maven project in Eclipse or STS (e.g. groupId=com.memeanalytics artifactId=kafka-producer), and change the pom.xml to include the following dependencies and plugins:

<dependencies>
...
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.8.0</artifactId>
<version>0.8.1.1</version>
<exclusions>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>

<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>

<configuration>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<executable>java</executable>
<classpathScope>compile</classpathScope>
<mainClass>com.memeanalytics.kafka_producer.App</mainClass>
</configuration>
</plugin>

</plugins>
</build>

The dependency setting include the kafka pom and the plugin setting include the maven exec plugin.

The implementation of a kafka producer in java is very simple a straight forward, we first create a KafkaConfig object which the following initialization:

metadata.broker.list: 192.168.2.4:9092
serializer.class: kafka.serializer.StringEncoder
request.required.acks: 1

The broker list we only need to specify one broker and the rest will be automatically discovered. Since we are to produce string data to the kafka broker, the StringEncoder is used as the data serializer, We also specify that we like to have acknowleges for request sent.

Next is to create a Producer object that is configured by the KafkaConfig object above:

Producer<String, String> kafkaProducer=new Producer<String, String>(config);

The actual sending of data is performed by the following line:

KeyedMessage<String, String> data=new KeyedMessage<String, String>("test-topic", "MyDataTextMessage");
kafkaProducer.send(data);

The "test-topic" is the name of the topic and the "MyDataTextMessage" is the actual data sent to kafka brokers. After all the data has been sent, the kafkaProducer needs to be closed:

kafkaProducer.close();

You can download the complete java project codes from the link below:

https://dl.dropboxusercontent.com/u/113201788/storm/kafka-producer.zip

To run, navigate the root folder of the Maven project, and run the following command in a terminal:

> mvn compile exec:java

To test, make sure that the kafka cluster is setup and running (follow the instructions given in this link: http://czcodezone.blogspot.sg/2014/11/setup-kafka-in-cluster.html) open another terminal and run a kafka console consumer using the following command:

> $KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper 192.168.2.2:2181 --topic test-topic --from-beginning

No comments:

Post a Comment