Sunday, November 30, 2014

Encode and Decode JSON in Java using json-simple

Create a Maven project and add the following dependency to the project:

<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>

Create plain old java obj, something as simple as follows:

package com.memeanalytics.json_demo;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class DemoEntity {
 private String name = "";
 private Date createDate=new Date();
 private boolean enabled=false;
 private double value = 0;
 private int count = 0;
 
 public String getName()
 {
  return this.name;
 }
 
 public void setName(String name)
 {
  this.name=name;
 }
 
 public String getCreateDateString()
 {
  SimpleDateFormat formatter=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  return formatter.format(this.createDate);
 }
 
 public void setCreateDateString(String dateString)
 {
  SimpleDateFormat formatter=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  try {
   this.createDate = formatter.parse(dateString);
  } catch (ParseException e) {
   e.printStackTrace();
  }
 }
 
 public boolean isEnabled()
 {
  return this.enabled;
 }
 
 public void setEnabled(boolean enabled)
 {
  this.enabled=enabled;
 }
 
 public int getCount()
 {
  return this.count;
 }
 
 public int setCount(int count)
 {
  return this.count = count;
 }
 
 public double getValue() 
 {
  return this.value;
 }
 
 public void setValue(double value)
 {
  this.value=value;
 }
}

Create a utility class with two methods to encode and decode between JSON and the DemoEntity obj:

package com.memeanalytics.json_demo;

import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;

public class JsonEncoder {
 public static String encode(DemoEntity obj)
 {
  JSONObject json=new JSONObject();
  
  json.put("name", obj.getName());
  json.put("createDate", obj.getCreateDateString());
  json.put("value", obj.getValue());
  json.put("count", obj.getCount());
  
  return json.toJSONString();
 }
 
 public static DemoEntity decode(String jsonString) 
 {
  JSONParser parser=new JSONParser();
  
  DemoEntity obj=null;
  
  try{
   JSONObject json = (JSONObject)parser.parse(jsonString);
   
   obj=new DemoEntity();
   obj.setName((String)json.get("name"));
   obj.setCreateDateString((String)json.get("createDate"));
   obj.setValue((Double)json.get("value"));
   obj.setCount(Integer.parseInt(json.get("count").toString()));
  }catch(ParseException ex)
  {
   ex.printStackTrace();
  }
  return obj;
 }
}

Now in the main class simply invokes the encode and decode methods to switch between JSON and DemoEntity obj:

package com.memeanalytics.json_demo;

public class App 
{
    public static void main( String[] args )
    {
     DemoEntity obj=new DemoEntity();
     
     obj.setName("obj1");
     obj.setValue(12.5);
     obj.setCount(10);
     
     String jsonString = JsonEncoder.encode(obj);
     System.out.println(jsonString);
     
     DemoEntity obj2  = JsonEncoder.decode(jsonString);
     
     if(obj2 != null)
     {
      System.out.println("name : " + obj2.getName());
      System.out.println("created date : " + obj2.getCreateDateString());
      System.out.println("value : "+obj2.getValue());
      System.out.println("count : "+obj2.getCount());
     }
    }
}

Two points to note:

1. for Date() object, i will be safer to convert it to String before put it into a JSONObject during serialization, and vice versa in deserialization.
2. for Integer object, it will be safer to covert the the item returned by JSONObject.get() to a String then parse it to integer during deserialization (as JSONObject.get() usually will return a Long object instead of an Integer object).

Below is the link to the complete source codes:

https://dl.dropboxusercontent.com/u/113201788/storm/json-demo.zip

Saturday, November 29, 2014

Maven: Include a non-Maven standard library in pom

Suppose that our Maven project depends on a particular library such as ftp4j-1.2.7.jar, which is not listed by online maven repostiories such as mavenrepostory.com or findjar.com. We can include the library in the following way to the pom file.

Next, designate a fake dependency for the jar, e.g.,

groupId: UNKNOWN
actifactId: ftp4j
version: 1.2.7.

Firstly, create a folder "lib" under the current project root directory and place the ftp4j-1.2.7.jar in the "lib/UNKNOWN/ftp4j/1.2.7/" folder.



Now in the pom, create <remote repository> whose url is local (the url points to the "lib" folder):

<repositories>
...
<repository>
<id>pseudoRemoteRepo</id>
<releases>
<enabled>true</enabled>
<checksumPolicy>ignore</checksumPolicy>
</releases>
<url>file://${project.basedir}/lib</url>
</repository>
</repositories>

Finally we can add the ftp4j.jar as a dependency:

<dependencies>
...
<dependency>
<groupId>UNKNOWN</groupId>
<artifactId>ftp4j</artifactId>
<version>1.2.7</version>
</dependency>
</dependencies>


Maven: Analyze Dependency

When a Maven project has many dependencies, it is some times difficult to analyze their usage or conflict without appropriate tools. Maven comes with the analyze goal, which allows the user to analyze the dependencies. To run the goal, enter the following command in the terminal after navigating to the project root folder:

> mvn dependency:analyze

As the results printout may be long, therefore a good idea is to save the analysis results into a readable format. To do this, open the pom file of the Maven project and add the following maven-dependency-plugin to the build/plugins section:

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.6</version>
<executions>
<execution>
<id>copy</id>
<phase>package</phase>
</execution>
</executions>
</plugin>

Now in the project root folder, run the following command:

> mvn dependency:analyze-report

The command will generate the report in the "target/analyze-report" folder in the project root folder.

Another useful goal is the analyze-duplicate, which checks and reports the duplicated dependencies:

> mvn dependency:analyze-duplicate

To list all dependencies, run the following command:

> mvn dependency:list

To list all the repositories use by the current project, run the following command:

> mvn dependency:list-repositories

To clean the local repository, run the following command:

> mvn dependency:purge-local-repository


Maven: Scope of dependency

By default, all dependencies defined in the pom file of a Maven project has a scope of compile. In other words, maven will download source and compile and propagate them when building the maven project. There are several other scopes which i like to discuss here:

Scope: provided

By this scope, the compile classes in the dependency related to the Maven project (e.g. by the "import" statements in the project's java codes) will be included during the build and runtime execution

Scope: runtime

By this scope, the dependency's jar file will be included in the runtime classpath during the execution of the maven project.

To find what is included in the runtime path of a Maven project, navigate to the project's root folder and run the following command:

> mvn dependency:build-classpath

Scope: test

By this scope, the dependency will not be included during build or runtime, but will be included for unit testing.

Maven: Transitive Dependency Version Conflict Resolution

To display the dependency tree of a project projects (which includes the dependencies of the project as well as the dependencies of the dependencies...), navigate to the project root folder (which contains the pom file) and run the following command:

> mvn dependency:tree

To save the dependency true to a file said dependency-tree.txt, run the following command:

> mvn dependency:tree > dependency-tree.txt

This command allows user to detect which transitive dependencies are referenced by the project. As a project many have quite a number of direct dependencies (at level 0 dependency, i.e.), which may in turn has dependency on some other library or components. It may happens that same transitive dependency but with different versions but be referenced by the project. In this case, a version conflict on the transitive dependency will arise. The next section explains which version of the transitive dependency maven will take when this happens.

Determine which version of the same dependency will be taken: Nearest First and First Found

If the project has two different dependencies A and B, and A and B both have another dependency C but with different versions of C. For example, if A depends on C:1.1.0 and B depends on C:1.1.2, then maven uses the follow rule to decide whether C:1.1.0 or C:1.1.2 is to be taken.

1. Nearest First: suppose A is at level 0 dependency to the project (i.e. the project directly depends on A) and B is at level 1 dependency to the same project (i.e. the project depends on another jar, said, D, which then depends on B). Then  C:1.1.0 is be loaded as A is "nearer" to the project.
2. First Found: if A and B is at the same dependency level, but A is first found by the project (e.g. suppose A and B are at level 0, and A is included in the pom file before B), then C:1.1.0 will be taken.

Control which version of the same dependency to be taken: exclusion and optional

Suppose following the above procedure, and the maven takes in C:1.1.0, in this case, the project may crash because its dependency B requires C:1.1.2 (e.g. common exception such as "ClassDefNotFoundException"). In such scenerio, we can ask A to exclude C:1.1.0, then maven will take in C:1.1.2 instead.

<dependency>
<groupId>groupId.A</groupId>
<artifactId>A</artifactId>
<version>${A.version}</version>
<exclusion>
<groupId>groupId.C</groupId>
<artifactId>C</artifactId>
</exclusion>

Another way to to specify C as optional as A's pom file (i.e. include a tag <optional>true</optional>.in dependency section of C).





Thursday, November 27, 2014

Maven Project for Drools

This post summarizes my initial tryout of Maven project for Drools

Eclipse Plugin

Installing Eclipse plugins is optional, but it will make your job easier when using Drools in Eclipse.

Firstly, you might want to install the GEF (Graphical Editing Framework) on your eclipse or STS first. To do so, select "Help-->Install New Software", and enter the following link to add:

http://download.eclipse.org/tools/gef/updates/releases/

Set the GEF as the name. As show in the figure below:

Click Next and Finish to complete the the installation of GEF.

Next install the drools plugin in Eclipse. To do so, select "Help-->Install New Software". and enter the following link to add:

http://download.jboss.org/drools/release/5.5.0.Final/org.drools.updatesite/

Set the DROOLS as the name. As shown in the figure below:


Click Next and Finish to complete the installation of DROOLS.

Configure pom.xml in Maven project

The following is the dependencies setup for Maven project to use Drools:

<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-core</artifactId>
<version>5.5.0.Final</version>
</dependency>
<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-compiler</artifactId>
<version>5.5.0.Final</version>
</dependency>

To add maven plugins for compiling and executing the project:

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals><goal>exec</goal></goals>
</execution>
</executions>
<configuration>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<executable>java</executable>
<classpathScope>compile</classpathScope>
<mainClass>com.memeanalytics.drools_hello_world.App</mainClass>
</configuration>
</plugin>

Where to add the Drool rule files in the project root folder

To create Drools rules *.drl files, first create a "src/main/resources" folder under the project root folder, and the create and save *.drl files into it. The package names in the *.drl files is up to the developer to define. Once they are in that folder, they can be retrieved directly by their name in method such as ResourceFactory.newResourceClassPathResource() method. For example, for "src/main/resources/simplerule.drl", we can write ResourceFactory.newResourceClassPathResource("simplerule.drl").

Tryout Maven Project Source Codes


Below is the source codes for the tryout

https://dl.dropboxusercontent.com/u/113201788/storm/drools-hello-world.zip

Further information can refer to the following link:

http://download.jboss.org/drools/release/5.5.0.Final/org.drools.updatesite/

Simple ways to understand Drools conditions
Drools conditions are implemented in a grammar named, MVEL (MVFLEX Expression Language). The following rule in a rule file:

rule "purchase greater than 15"
when
$p : Purchase ( total > 15)
then
System.out.println("there exists some purchase having total > 15");
end

This rule can be interpreted as "when there exists some purchase having a total > 15, then print the message".  The condition specifies "there exists some purchase having a total > 15" while the consequence is valid java statement.


Wednesday, November 26, 2014

Trident-ML: Indexing Sentiment Classification results with ElasticSearch

In Storm, we may have scenarios in which we like to index results obtained from real-time processing or machine learning into a search and analytics engine. For example, we may have some text streaming in from Kafka messaging system which will go through a TwitterSentimentClassifier (which is available in Trident-ML). After that, we may wish to save the text together with the classified sentiment label as an indexed document in ElasticSearch. This post shows one way to realize such an implementation.

First create a Maven project (e.g. with groupId="com.memeanalytics" and artifactId="es-create-index"), the complete source code of the project can be downloaded from the link below:

https://dl.dropboxusercontent.com/u/113201788/storm/es-create-index.tar.gz

Configure pom.xml and libraries to be used

Before we proceed, I would like to discuss how to write a elasticsearch client which is compatible with Trident-ML as we will be using both in this project. Traditionally an elasticsearch java client can be implemented using native code such as this:

import static org.elasticsearch.node.NodeBuiler.*;

Node node=nodeBuilder().clusterName("elasticsearch").node();
Client client=node.getClient();

//TODO HERE: put document, delete document, etc using the client

node.close();

The above code requires the following dependency in pom.xml:

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>1.4.0</version>
</dependency>

However, as the pom and coding of this library has dependency on lucene-core [version=3.6.0] that is an older version that is not compatible with lucene-analyzers [version=3.6.2] which is currently one of Trident-ML's dependency (The TwitterTokenizer in TwitterSentimentClassifier uses this library). As a result, the elasticsearch library above cannot be used if the TwitterSentimentClassifier in Trident-ML is to be used in this project.

Since the above java code and elastic library cannot be used in this project, the project uses httpclient [version=4.3] from org.apache.httpcomponents in its place to communicate with elasticsearch via RESTful api. The httpclient provides CloseableHttpClient and operators such as HttpGet, HttpPut, HttpDelete,

The dependencies section of the pom for this project looks like the following:

<dependency>
  <groupId>storm</groupId>
  <artifactId>storm</artifactId>
  <version>0.9.0.1</version>
</dependency>
<dependency>
  <groupId>com.github.pmerienne</groupId>
  <artifactId>trident-ml</artifactId>
  <version>0.0.4</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.httpcomponents</groupId>
  <artifactId>httpclient</artifactId>
  <version>4.3</version>
</dependency>

Spout

Once the pom.xml is properly updated, we can move to implement the code for the Storm spout used in this project. The spout, named TweetCommentSpout, reads tweets from "src/test/resources/twitter-sentiment.csv" and emits them in batch to the Trident topology. the implementation of the spout is shown below:

package com.memeanalytics.es_create_index;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import com.github.pmerienne.trident.ml.core.TextInstance;
import com.github.pmerienne.trident.ml.preprocessing.EnglishTokenizer;
import com.github.pmerienne.trident.ml.preprocessing.TextTokenizer;

import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;

public class TweetCommentSpout implements IBatchSpout {
 
 private static final long serialVersionUID = 1L;
 private static List<List<Object>> data=new ArrayList<List<Object>>();

 private int batchIndex;
 private int batchSize=10;
 
 static{
  BufferedReader br=null;
  FileInputStream is=null;
  String filePath="src/test/resources/twitter-sentiment.csv";
  try {
   is=new FileInputStream(filePath);
   br=new BufferedReader(new InputStreamReader(is));
   String line=null;
   while((line=br.readLine())!=null)
   {
    String[] values = line.split(",");
    Integer label=Integer.parseInt(values[0]);
    String text=values[1];
//    TextTokenizer tokenizer=new EnglishTokenizer();
//    List<String> tokens = tokenizer.tokenize(text);
//    TextInstance<Integer> instance=new TextInstance<Integer>(label, tokens);
    data.add(new Values(text, label));
   }
  } catch (FileNotFoundException e) {
   e.printStackTrace();
  }catch(IOException ex)
  {
   ex.printStackTrace();
  }
  
 }
 public void open(Map conf, TopologyContext context) {
  // TODO Auto-generated method stub
  
 }

 public void emitBatch(long batchId, TridentCollector collector) {
  // TODO Auto-generated method stub
  int maxBatchCount = data.size() / batchSize;
  if(maxBatchCount > 0 && batchIndex < maxBatchCount)
  {
   for(int i=(batchSize * batchIndex); i < data.size() && i < (batchIndex+1) * batchSize; ++i)
   {
    collector.emit(data.get(i));
   }
   batchIndex++;
  }
 }

 public void ack(long batchId) {
  // TODO Auto-generated method stub
  
 }

 public void close() {
  // TODO Auto-generated method stub
  
 }

 public Map getComponentConfiguration() {
  // TODO Auto-generated method stub
  return null;
 }

 public Fields getOutputFields() {
  // TODO Auto-generated method stub
  return new Fields("text", "label");
 }
 
}

The tuples emitted by the spout contains two fields: "text" and "label", the label is ignored, we are going to have the Trident-ML's TweetSentimentClassifier predict the sentiment label for us instead.

Trident operation for ElasticSearch

Next we are going to implement a BaseFilter, named CreateESIndex, which is a Trident operation that create an indexed document in ElasticSearch from each tweet text and its predicted sentiment label. The implementation of the Trident operation is shown below:

package com.memeanalytics.es_create_index;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.Map;

import org.apache.http.HttpEntity;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;

import storm.trident.operation.BaseFilter;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;

public class CreateESIndex extends BaseFilter{

 private static final long serialVersionUID = 1L;
 private int esIndex=1;
 private String wsUrl="http://127.0.0.1:9200";
 private String indexName="twittersentiment"; //must be lowercase
 private String typeName="trident";
 private CloseableHttpClient client;
 private String lastIndexedDocumentIdQueryJson="{\"query\": { \"match_all\": {}}, \"size\": 1,"+
   "\"sort\": ["+
     "{"+
       "\"_timestamp\": {"+
         "\"order\": \"desc\""+
       "}"+
     "}"+
   "]"+
 "}";

 public boolean isKeep(TridentTuple tuple) {
  // TODO Auto-generated method stub
  Boolean prediction =tuple.getBooleanByField("prediction");
  String comment=tuple.getStringByField("text");
  System.out.println(comment + " >> " + prediction);
  
  if(client != null)
  {
   HttpPut method=new HttpPut(wsUrl+"/"+indexName+"/"+typeName+"/"+esIndex);
   
   Date currentTime= new Date();
   SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM-dd");
   SimpleDateFormat format2 = new SimpleDateFormat("HH:mm:ss");
   String dateString = format1.format(currentTime)+"T"+format2.format(currentTime);
   
   CloseableHttpResponse response=null;
   try{
    String json = "{\"text\":\""+comment+"\", \"prediction\":\""+prediction+"\", \"postTime\":\""+dateString+"\"}";
    System.out.println(json);
    StringEntity params=new StringEntity(json);
    params.setContentType(new BasicHeader(HTTP.CONTENT_TYPE, "application/json"));
    
    method.setEntity(params);
    
    method.addHeader("Accept", "application/json");
    method.addHeader("Content-type", "application/json");
    
    response = client.execute(method);
    
    HttpEntity entity=response.getEntity();
    String responseText=EntityUtils.toString(entity);
    System.out.println(responseText);
   }catch(IOException ex) {
    ex.printStackTrace();
   }finally {
    method.releaseConnection();
   }
   esIndex++;
  }
  
  return true;
 }
 
 @Override
    public void prepare(Map conf, TridentOperationContext context) {
  
  client=HttpClients.custom().setRetryHandler(new MyRetryHandler()).build();
  
  CloseableHttpResponse response=null;
  HttpDelete method=new HttpDelete(wsUrl+"/"+indexName);
  try{
   response = client.execute(method);
   HttpEntity entity=response.getEntity();
   String responseBody=EntityUtils.toString(entity);
   System.out.println(responseBody);
  }catch(IOException ex)
  {
   ex.printStackTrace();
  }
    }
 
 private class MyRetryHandler implements HttpRequestRetryHandler {

  public boolean retryRequest(IOException arg0, int arg1, HttpContext arg2) {
   // TODO Auto-generated method stub
   return false;
  }
 }

    @Override
    public void cleanup() {
     try {
   client.close();
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
    }

}

In the prepare() method of the CreateESIndex, a RESTful DELETE call is performed to delete all indexed documents under twittersentiment/trident in ElasticSearch. This is to ensure that no data will be under twittersentiment/trident when the bolt is run. Now in its isKeep() method, the tweet text and its associated predicted sentiment label is serialized to a json and sent to elasticsearch via a http PUT call. The CloseableHttpClient object is closed in its cleanup() method.

Trident topology

Now we have the neccessary spout and trident operation, we can define a simple Trident topology which stream tweets-> classified by TwitterSentimentClassifier -> indexed by ElasticSearch. Below is the implementation in the main class:

package com.memeanalytics.es_create_index;

import com.github.pmerienne.trident.ml.nlp.TwitterSentimentClassifier;

import storm.trident.TridentTopology;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;

public class App 
{
    public static void main( String[] args )
    {
        LocalCluster cluster=new LocalCluster();
        Config config=new Config();
        
        cluster.submitTopology("TridentWriteToESDemo", config, buildTopology());
        
        try{
         Thread.sleep(10000);
        }catch(InterruptedException ex)
        {
         
        }
        
        cluster.killTopology("TridentWriteToESDemo");
        cluster.shutdown();
    }
    
    private static StormTopology buildTopology()
    {
     TridentTopology topology=new TridentTopology();
     
     TweetCommentSpout spout=new TweetCommentSpout();
     
     topology.newStream("classifyAndIndex", spout).each(new Fields("text"), new TwitterSentimentClassifier(), new Fields("prediction")).each(new Fields("text", "prediction"), new CreateESIndex());
     
     return topology.build();
    }
}

Once it is completed, run the following command in the project root folder:

> mvn compile exec:java

Setup and run ElasticSearch on Ubuntu 14.04 LTS

Run the following command to download the ElasticSearch 1.4.0:

> wget https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.4.0.deb

Setup ElasticSearch

After download, run the following command to install the debian package:

> sudo dpkg -i elasticsearch-1.4.0.deb

After installation, the configuration files for ElasticSearch are stored in the following directory:

/etc/elasticsearch/

The bin folder for ElasticSearch can be found in the following directory:

/usr/share/elasticsearch/

Now navigate to the user root folder, and open the .bashrc for editing:

> sudo gedit $HOME/.bashrc

In the .bashrc file, add in the following lines to the end

export ES_HOME=/usr/share/elasticsearch
export ES_CONF_HOME=/etc/elasticsearch

Save and close the .bashrc file.

Check whether the elasticsearch is running as a service on Ubuntu:

> sudo service elasticsearch status

if not, start the elasticsearch service by running the following commands:

> sudo service elasticsearch start

Run the following command to test the ElasticSearch:

> curl localhost:9200

You can also open a web browser and enter "localhost:9200" to see ElasticSearch in action.

Create a document

From the terminal, execute a PUT call such as the following:

> curl -XPUT http://127.0.0.1:9200/myindex/mytype/1 -d '{
"start_time" : "2014-11-26T14:00:00",
"end_time" : "2014-11-26T17:30:00",
"subject" : "Elastic Search Tryout",
"agenda" : {
 "item 1" : "Create an index",
 "item 2" : "Delete an index",
 "item 3" : "Open an index",
 "item 4" : "Close an index" }
}'

The _index is specified as "myindex", _type is specified as "mytype", and _id is specified as "1". After the index has been created, it can be accessed by running a GET call

> curl http://127.0.0.1/myindex/mytype/1

We can execute another PUT call as below to generate one more document under myindex/mytype:

> curl -XPUT http://127.0.0.1:9200/myindex/mytype/2 -d '{
"start_time" : "2014-11-27T14:00:00",
"end_time" : "2014-11-27T17:30:00",
"subject" : "Elastic Search Tryout 2",
"agenda" : {
 "item 1" : "Create a document",
 "item 2" : "Delete a document",
 "item 3" : "Execute a search",
 "item 4" : "Delete by search" }
}'

After the index has been created, it can be accessed by running a GET call

> curl http://127.0.0.1/myindex/mytype/2

To create an index document with an auto-generated id, we can do the following:

We can execute another PUT call as below to generate one more document under myindex/mytype:

> curl -XPUT http://127.0.0.1:9200/myindex/mytype/11122 -d '{
"start_time" : "2014-11-28T14:00:00",
"end_time" : "2014-11-28T17:30:00",
"subject" : "Elastic Search Tryout 2",
"agenda" : {
 "item 1" : "Create a document with auto-generated  index",
 "item 2" : "Delete a document",
 "item 3" : "Execute a search",
 "item 4" : "Delete by search" }
}'

The _id should not be omitted, but can be generated using UUID from a programming language.

Create an index

Creating an index is the same as creating a document under an index. From the terminal, execute a PUT call such as the following:

> curl -XPUT http://127.0.0.1:9200/myindex/ -d '{
"settings"  {
 "item 1" : "true",
 "item 2" : "false",
 "item 3" : "1",
 "item 4" : "2" }
}'

Update a document by id

The following commands can be used to update the document with id=11122:

> curl -XPUT http://127.0.0.1:9200/myindex/mytype/11122 -d '{
"start_time" : "2014-11-21T14:00:00",
"end_time" : "2014-11-21T17:30:00",
"subject" : "Elastic Search Tryout 4",
"agenda" : {
 "item 1" : "Create a document with auto-generated  index",
 "item 2" : "Delete a document",
 "item 3" : "Execute a search",
 "item 4" : "Delete by search" }
}'

> curl -XPOST http://127.0.0.1:9200/myindex/mytype/11122 -d '{
"start_time" : "2014-11-24T14:00:00",
"end_time" : "2014-11-24T17:30:00",
"subject" : "Elastic Search Tryout 4",
"agenda" : {
 "item 1" : "Create a document with auto-generated  index",
 "item 2" : "Delete a document",
 "item 3" : "Execute a search",
 "item 4" : "Delete by search" }
}'

Delete a document

Deleting an document will remove the document from the ElasticSearch nodes. From the terminal, execute a DELETE call such as the following:

> curl -XDELETE http://127.0.0.1:9200/myindex/mytype/1

Delete an index

Deleting an index will remove the index from the ElasticSearch nodes. From the terminal, execute a DELETE call such as the following:

> curl -XDELETE http://127.0.0.1:9200/myindex/

Close an index

Closing an index is an alternative to the deleting the index as ElasticSearch put it in an offline mode. From the terminal, execute a POST call such as the following:

> curl -XPOST http://127.0.0.1:9200/myindex/_close

After this, if you run the "curl http://127.0.0.1:9200/myindex", you will get a "not found" message

Open an index

Opening an index will put the offline index online again. From the terminal, execute a POST call such as the following:

> curl -XPOST http://127.0.0.1:9200/myindex/_open

Execute an search

Suppose we want to retrieve a list of documents under an index having a particular word (e.g. "elastic") in their content, we can run a GET call from the terminal:

> curl -XGET http://127.0.0.1:9200/myindex/_search?q=elastic

Suppose we want to search in under a _type = "mytype":

> curl -XGET http://127.0.0.1:9200/myindex/mytype/_search?q=elastic

The query can be written in more details by specifying search in the "subject" field of the document:

> curl -XGET http://127.0.0.1:9200/myindex/mytype/_search -d '
{"query":{"match": { "subject" : "elastic" }}}'

To search all documents under myindex/mytype:

> curl -XGET http://127.0.0.1:9200/myindex/mytype/_search -d '
{"query":{"match_all":{}}}'

Sort a search

Suppose we want the json results returned in certain sorting order (e.g. sort by start_time asc):

> curl -XGET http://127.0.0.1:9200/myindex/mytype/_search -d '
{"query":{"match_all":{}}, "sort":[{"start_time": "asc" }]}'

Filtering


Suppose we want to search records filtered by a range of a values in one its parameter (said, age), and have a particular match query (said, "name" must be "James"), and we want the results to be returned sorted by a field (said, age). Furthermore, we also want the total number of records returned to be limited to 20:

> curl -XGET http://127.0.0.1:9200/myindex/mytype/_search -d'
{
"from": 0, "size": 20,
"query" : {
  "filtered" :
    "query" : {
      "match" : { "name" : "James" }
     },
     "filter": {
       "range" : {
          "age" : { "lte" : 20, "gte" : 10 }
        }
     }
"sort" : { "age" : {"order" : "asc" }}
};

Count by Query

To count the number of documents in myindex/mytype matching a particular query, from the terminal, run the following command:

> curl -XGET http://127.0.0.1:9200/myindex/mytype/_count -d '
{"query":{"match": { "subject" : "elastic" }}}'

Delete by Query

To delete all documents in myindex/mytype matching a particular query,  from the terminal, run the following command:

> curl -XDELETE http://127.0.0.1:9200/myindex/mytype/_query -d '
{"query":{"match": { "subject" : "elastic" }}}'







Tuesday, November 25, 2014

Trident-ML: Regression using Passive-Aggressive algorithm

This post shows some very basic example of how to use the Passive-Aggressive algorithm as regression algorithm in Trident-ML to process data from Storm Spout.

Firstly create a Maven project (e.g. with groupId="com.memeanalytics" artifactId="trident-regression-pa"). The complete source codes of the project can be downloaded from the link:

https://dl.dropboxusercontent.com/u/113201788/storm/trident-regression-pa.tar.gz

For the start we need to configure the pom.xml file in the project.

Configure pom.xml:
Firstly we need to add the clojars repository to the repositories section:

<repositories>
<repository>
<id>clojars</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>

Next we need to add the storm dependency to the dependencies section (for storm):

<dependency>
  <groupId>storm</groupId>
  <artifactId>storm</artifactId>
  <version>0.9.0.1</version>
  <scope>provided</scope>
</dependency>

Next we need to add the strident-ml dependency to the dependencies section (for PA regression):

<dependency>
  <groupId>com.github.pmerienne</groupId>
  <artifactId>trident-ml</artifactId>
  <version>0.0.4</version>
</dependency>

Next we need to add the exec-maven-plugin to the build/plugins section (for execute the Maven project):

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<executable>java</executable>
<classpathScope>compile</classpathScope>
<mainClass>com.memeanalytics.trident_regression_pa.App</mainClass>
</configuration>
</plugin>

Next we need to add the maven-assembly-plugin to the build/plugins section (for packacging the Maven project to jar for submitting to Storm cluster):

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

Implement Spout for training data 

Once the pom.xml update is completed, we can move to implement the BirthDataSpout which is the Storm spout that emits batches of training data to the Trident topology:

package com.memeanalytics.trident_regression_pa;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import com.github.pmerienne.trident.ml.core.Instance;
import com.github.pmerienne.trident.ml.testing.data.Datasets;

import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;

public class BirthDataSpout implements IBatchSpout {

 private static final long serialVersionUID = 1L;

 private int batchSize=10;
 private int batchIndex=0;
 
 private static List<Instance<Double>> sample_data=new ArrayList<Instance<Double>>();
 private static List<Instance<Double>> testing_data=new ArrayList<Instance<Double>>();
 
 public static List<String> getDRPCArgsList()
 {
  List<String> drpc_args_list =new ArrayList<String>();
  for(Instance<Double> instance : testing_data)
  {
   double[] features = instance.getFeatures();
   String drpc_args="";
   for(int i=0; i < features.length; ++i)
   {
    if(i==0)
    {
     drpc_args+=features[i];
    }
    else
    {
     drpc_args+=(","+features[i]);
    }
   }
   drpc_args+=(","+instance.label);
   drpc_args_list.add(drpc_args);
  }
  
  return drpc_args_list;
 }
 
 static{
  FileInputStream is=null;
  BufferedReader br=null;
  try{
   String filePath="src/test/resources/births.csv";
   is=new FileInputStream(filePath);
   br=new BufferedReader(new InputStreamReader(is));
   
   List<Instance<Double>> temp=new ArrayList<Instance<Double>>();
   String line=null;
   while((line=br.readLine())!=null)
   {
    String[] values = line.split(";");
    double label= Double.parseDouble(values[values.length-1]);
    double[] features=new double[values.length-1];
    for(int i=0; i < values.length-1; ++i)
    {
     features[i]=Double.parseDouble(values[i]);
    }
    
    Instance<Double> instance=new Instance<Double>(label, features);
    temp.add(instance);
   }
   
   Collections.shuffle(temp);
   
   for(Instance<Double> instance : temp)
   {
    if(testing_data.size() < 10)
    {
     testing_data.add(instance);
    }
    else
    {
     sample_data.add(instance);
    }
   }
   
  }catch(FileNotFoundException ex)
  {
   ex.printStackTrace();
  }catch(IOException ex)
  {
   ex.printStackTrace();
  }finally
  {
   try {
    if(is!=null) is.close();
    if(br !=null) br.close();
   } catch (IOException e) {
    e.printStackTrace();
   }
  }
 }
 
 public BirthDataSpout()
 {
  
 }
 
 public void open(Map conf, TopologyContext context) {
  // TODO Auto-generated method stub
  
 }

 public void emitBatch(long batchId, TridentCollector collector) {
  // TODO Auto-generated method stub
  int maxBatchCount=sample_data.size() / batchSize;
  if(maxBatchCount > 0 && batchIndex < maxBatchCount)
  {
   for(int i=batchIndex * batchSize; i < sample_data.size() && i < (batchIndex+1) * batchSize; ++i)
   {
    Instance<Double> instance = sample_data.get(i);
    collector.emit(new Values(instance));
   }
   batchIndex=(batchIndex+1) % maxBatchCount;
  }
 }

 public void ack(long batchId) {
  // TODO Auto-generated method stub
  
 }

 public void close() {
  // TODO Auto-generated method stub
  
 }

 public Map getComponentConfiguration() {
  // TODO Auto-generated method stub
  return null;
 }

 public Fields getOutputFields() {
  // TODO Auto-generated method stub
  return new Fields("instance");
 }

}

As can be seen above, the BirthDataSpout is derived from IBatchSpout, and emits a batch of 10 tuples at one time, each tuple is a training record containing the fields ("instance").  The "instance" field contains a datatype Instance<Double> which contains a double array as features and a double value as label. The training records are obtained from a births.csv file residing in src/test/resources.

PA Regression in Trident topology using Trident-ML implementation

Once we have the training data spout, we can build a Trident topology which uses the training data to create a predicted output value for each of the data record using PA regression algorithm in Trident-ML. This is implemented in the main class shown below:

package com.memeanalytics.trident_regression_pa;

import java.util.List;

import com.github.pmerienne.trident.ml.regression.PARegressor;
import com.github.pmerienne.trident.ml.regression.RegressionQuery;
import com.github.pmerienne.trident.ml.regression.RegressionUpdater;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.testing.MemoryMapState;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;

/**
 * Hello world!
 *
 */
public class App 
{
    public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException
    {
        LocalDRPC drpc=new LocalDRPC();
        
        LocalCluster cluster=new LocalCluster();
        Config config=new Config();
        
        cluster.submitTopology("RegressionDemo", config, buildTopology(drpc));
        
        try{
         Thread.sleep(10000);
        }catch(InterruptedException ex)
        {
         ex.printStackTrace();
        }
        
        List<String> drpc_args_list=BirthDataSpout.getDRPCArgsList();
        for(String drpc_args : drpc_args_list)
        {
         System.out.println(drpc.execute("predict", drpc_args));
        }
     
        cluster.killTopology("RegressionDemo");
        cluster.shutdown();
        
        drpc.shutdown();
    }
    
    private static StormTopology buildTopology(LocalDRPC drpc)
    {
     TridentTopology topology=new TridentTopology();
     
     BirthDataSpout spout=new BirthDataSpout();
     
     TridentState regressionModel = topology.newStream("training", spout).partitionPersist(new MemoryMapState.Factory(), new Fields("instance"), new RegressionUpdater("regression", new PARegressor()));
     
     topology.newDRPCStream("predict", drpc).each(new Fields("args"), new DRPCArgsToInstance(), new Fields("instance")).stateQuery(regressionModel, new Fields("instance"), new RegressionQuery("regression"), new Fields("prediction")).project(new Fields("args", "prediction"));
     
     return topology.build();
    }
}
package com.memeanalytics.trident_regression_pa;

import backtype.storm.tuple.Values;

import com.github.pmerienne.trident.ml.core.Instance;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class DRPCArgsToInstance extends BaseFunction {

 private static final long serialVersionUID = 1L;

 public void execute(TridentTuple tuple, TridentCollector collector) {
  String drpc_args = tuple.getString(0);
  String[] args=drpc_args.split(",");
  
  Double label=Double.parseDouble(args[args.length-1]);
  double[] features=new double[args.length-1];
  
  for(int i=0; i < args.length-1; ++i)
  {
   features[i]=Double.parseDouble(args[i]);
  }
  
  Instance<Double> instance=new Instance<Double>(label, features);
  
  collector.emit(new Values(instance));
 }

}

As can be seen above, the Trident topology has the BirthDataSpout emits Instance<Double> training data can be consumed by RegressionUpdater. The RegressionUpdater object from Trident-ML updates the underlying regressionModel via PA algorithm.

The DRPCStream allows user to pass in a new testing instance to the regressionModel which will then return a "predict" field, that contains the predicted output of the testing instance. The DRPCArgsToInstance is a BaseFunction operation which converts the arguments passed into the LocalDRPC.execute() into an Instance<Double> which can be passed into the RegressionQuery which then uses PARegressor and regressionModel to determine the predicted output value.

Once the coding is completed, we can run the project by navigating to the project root folder and run the following commands:

> .mvn compile exec:java

Trident-ML: Sentiment Analysis Classifier

Trident-ML comes with a pre-trained twitter sentiment classifier, this post shows how to use this classifier to perform sentiment analysis in Storm.

This post shows some very basic example of how to use the pre-trained twitter sentiment classifier in Trident-ML to classifier sentiment of text which will return true (positive) or false (negative).

Firstly create a Maven project (e.g. with groupId="com.memeanalytics" artifactId="trident-sentiment-classifier"). The complete source codes of the project can be downloaded from the link:

https://dl.dropboxusercontent.com/u/113201788/storm/trident-sentiment-classifier.tar.gz

For the start we need to configure the pom.xml file in the project.

Configure pom.xml:

Firstly we need to add the clojars repository to the repositories section:

<repositories>
<repository>
<id>clojars</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>

Next we need to add the storm dependency to the dependencies section (for storm):

<dependency>
  <groupId>storm</groupId>
  <artifactId>storm</artifactId>
  <version>0.9.0.1</version>
  <scope>provided</scope>
</dependency>

Next we need to add the strident-ml dependency to the dependencies section (for text classification):

<dependency>
  <groupId>com.github.pmerienne</groupId>
  <artifactId>trident-ml</artifactId>
  <version>0.0.4</version>
</dependency>

Next we need to add the exec-maven-plugin to the build/plugins section (for execute the Maven project):

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<executable>java</executable>
<classpathScope>compile</classpathScope>
<mainClass>com.memeanalytics.trident_sentiment_classifier.App</mainClass>
</configuration>
</plugin>

Next we need to add the maven-assembly-plugin to the build/plugins section (for packacging the Maven project to jar for submitting to Storm cluster):

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

Sentiment Classification in Trident topology using Trident-ML implementation

Once the pom.xml update is completed, we can build a Trident topology which uses TwitterSentimentClassifier in a DRPCStream to classify text sentiment in Trident-ML. This is implemented in the main class shown below:

package com.memeanalytics.trident_sentiment_classifier;

import com.github.pmerienne.trident.ml.nlp.TwitterSentimentClassifier;

import storm.trident.TridentTopology;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;

public class App 
{
    public static void main( String[] args )
    {
        LocalDRPC drpc=new LocalDRPC();
        
        LocalCluster cluster=new LocalCluster();
        Config config=new Config();
        
        cluster.submitTopology("SentimentClassifierDemo", config, buildTopology(drpc));
        
        try{
         Thread.sleep(2000);
        }catch(InterruptedException ex)
        {
         ex.printStackTrace();
        }
        
        System.out.println(drpc.execute("classify", "Have a nice day!"));
        System.out.println(drpc.execute("classify", "I feel really bad!"));
        System.out.println(drpc.execute("classify", "Whatever, i don't really care"));
        System.out.println(drpc.execute("classify", "feel sleepy zzzz...."));
        
        cluster.killTopology("SentimentClassifierDemo");
        cluster.shutdown();
        drpc.shutdown();
    }
    
    private static StormTopology buildTopology(LocalDRPC drpc)
    {
     TridentTopology topology=new TridentTopology();
     
     topology.newDRPCStream("classify", drpc).each(new Fields("args"), new TwitterSentimentClassifier(), new Fields("sentiment"));
     
     return topology.build();
    }
}

The DRPCStream allows user to pass in a text string to the TwitterSentimentClassifier which will then return a "sentiment" field, that contains the predicted label (true for positive; false for negative) of the testing text.

Next copy the following two files into the "main/resources" folder under the project root folder:

twitter-sentiment-classifier-classifier.json:
https://github.com/pmerienne/trident-ml/blob/master/src/main/resources/twitter-sentiment-classifier-classifier.json

twitter-sentiment-classifier-extractor.json:
https://github.com/pmerienne/trident-ml/blob/master/src/main/resources/twitter-sentiment-classifier-extractor.json

The above step can be important, otherwise you may get a FileNotFoundException during runtime.

Once the coding is completed, we can run the project by navigating to the project root folder and run the following commands:

> .mvn compile exec:java

Trident-ML: Text Classification using KLD

This post shows some very basic example of how to use the Kullback-Leibler Distance text classification algorithm in Trident-ML to process data from Storm Spout.

Firstly create a Maven project (e.g. with groupId="com.memeanalytics" artifactId="trident-text-classifier-kld"). The complete source codes of the project can be downloaded from the link:

https://dl.dropboxusercontent.com/u/113201788/storm/trident-text-classifier-kld.tar.gz

For the start we need to configure the pom.xml file in the project.

Configure pom.xml:

Firstly we need to add the clojars repository to the repositories section:

<repositories>
<repository>
<id>clojars</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>

Next we need to add the storm dependency to the dependencies section (for storm):

<dependency>
  <groupId>storm</groupId>
  <artifactId>storm</artifactId>
  <version>0.9.0.1</version>
  <scope>provided</scope>
</dependency>

Next we need to add the strident-ml dependency to the dependencies section (for text classification):

<dependency>
  <groupId>com.github.pmerienne</groupId>
  <artifactId>trident-ml</artifactId>
  <version>0.0.4</version>
</dependency>

Next we need to add the exec-maven-plugin to the build/plugins section (for execute the Maven project):

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<executable>java</executable>
<classpathScope>compile</classpathScope>
<mainClass>com.memeanalytics.trident_text_classifier_kld.App</mainClass>
</configuration>
</plugin>

Next we need to add the maven-assembly-plugin to the build/plugins section (for packacging the Maven project to jar for submitting to Storm cluster):

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

Implement Spout for training data 

Once the pom.xml update is completed, we can move to implement the ReuterNewsSpout which is the Storm spout that emits batches of training data to the Trident topology:

package com.memeanalytics.trident_text_classifier_kld;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;

public class ReuterNewsSpout implements IBatchSpout {

 private static final long serialVersionUID = 1L;
 private List<List<Object>> trainingData=new ArrayList<List<Object>>();
 private static Map<Integer, List<Object>> testingData=new HashMap<Integer, List<Object>>();
 
 private int batchSize=10;
 private int batchIndex=0;
 
 public ReuterNewsSpout()
 {
  try{
   loadReuterNews();
  }catch(FileNotFoundException ex)
  {
   ex.printStackTrace();
  }catch(IOException ex)
  {
   ex.printStackTrace();
  }
 }
 
 public static List<List<Object>> getTestingData()
 {
  List<List<Object>> result=new ArrayList<List<Object>>();
  for(Integer topic_index : testingData.keySet())
  {
   result.add(testingData.get(topic_index));
  }
  
  return result;
 }
 
 private void loadReuterNews() throws FileNotFoundException, IOException
 {
  Map<String, Integer> topics=new HashMap<String, Integer>();
  String filePath="src/test/resources/reuters.csv";
  FileInputStream inputStream=new FileInputStream(filePath);
  BufferedReader reader= new BufferedReader(new InputStreamReader(inputStream));
  String line;
  while((line = reader.readLine())!=null)
  {
   String topic = line.split(",")[0];
   if(!topics.containsKey(topic))
   {
    topics.put(topic, topics.size());
   }
   Integer topic_index=topics.get(topic);
   
   int index = line.indexOf(" - ");
   if(index==-1) continue;
   
   String text=line.substring(index, line.length()-1);
   
   if(testingData.containsKey(topic_index))
   {
    List<Object> values=new ArrayList<Object>();
    values.add(topic_index);
    values.add(text);
    trainingData.add(values);
   }
   else 
   {
    testingData.put(topic_index, new Values(topic_index, text));
   }
  }
  reader.close();
 }
 public void open(Map conf, TopologyContext context) {
  // TODO Auto-generated method stub
  
 }

 public void emitBatch(long batchId, TridentCollector collector) {
  // TODO Auto-generated method stub
  
  int maxBatchIndex = (trainingData.size() / batchSize);
  
  if(trainingData.size() > batchSize && batchIndex < maxBatchIndex)
  {
   for(int i=batchIndex * batchSize; i < trainingData.size() && i < (batchIndex+1) * batchSize; ++i)
   {
    collector.emit(trainingData.get(i));
   }
   
   
   batchIndex++;
   
   //System.out.println("Progress: "+batchIndex +" / "+maxBatchIndex);
  }
 }

 public void ack(long batchId) {
  // TODO Auto-generated method stub
  
 }

 public void close() {
  // TODO Auto-generated method stub
  
 }

 public Map getComponentConfiguration() {
  // TODO Auto-generated method stub
  return null;
 }

 public Fields getOutputFields() {
  // TODO Auto-generated method stub
  return new Fields("label", "text");
 }

}


As can be seen above, the ReuterNewsSpout is derived from IBatchSpout, and emits a batch of 10 tuples at one time, each tuple is a new article containing the fields ("label", "text"). The "label" field is integer value (represents the topic of the news article), while "text" field is a string which is text of the news article. the training records are obtained in such a way that the correct prediction learned from the text classification should be predicting the topic of a news article given the text of the news article.

KLD Text Classification in Trident topology using Trident-ML implementation

Once we have the training data spout, we can build a Trident topology which uses the training data to create a class label for each of the data record using KLD classifier algorithm in Trident-ML. This is implemented in the main class shown below:

package com.memeanalytics.trident_text_classifier_kld;

import java.util.List;

import com.github.pmerienne.trident.ml.nlp.ClassifyTextQuery;
import com.github.pmerienne.trident.ml.nlp.KLDClassifier;
import com.github.pmerienne.trident.ml.nlp.TextClassifierUpdater;
import com.github.pmerienne.trident.ml.preprocessing.TextInstanceCreator;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.testing.MemoryMapState;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;


public class App 
{
    public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException
    {
        LocalDRPC drpc=new LocalDRPC();
        
        LocalCluster cluster=new LocalCluster();
        
        Config config=new Config();
        
        cluster.submitTopology("KLDDemo", config, buildTopology(drpc));
        
        try{
         Thread.sleep(20000);
        }catch(InterruptedException ex)
        {
         ex.printStackTrace();
        }
        
        List<List<Object>> testingData = ReuterNewsSpout.getTestingData();
        
        for(int i=0; i < testingData.size(); ++i)
        {
         List<Object> testingDataRecord=testingData.get(i);
         String drpc_args="";
         for(Object val : testingDataRecord){
          if(drpc_args.equals(""))
          {
           drpc_args+=val;
          }
          else
          {
           drpc_args+=(","+val);
          }
         }
         System.out.println(drpc.execute("predict", drpc_args));
        }
        
        cluster.killTopology("KLDDemo");
        cluster.shutdown();
        
        drpc.shutdown();
    }
    
    private static StormTopology buildTopology(LocalDRPC drpc)
    {
     ReuterNewsSpout spout=new ReuterNewsSpout();
     
     TridentTopology topology=new TridentTopology();
     
     TridentState classifierModel = topology.newStream("training", spout).each(new Fields("label", "text"), new TextInstanceCreator<Integer>(), new Fields("instance")).partitionPersist(new MemoryMapState.Factory(), new Fields("instance"), new TextClassifierUpdater("newsClassifier", new KLDClassifier(9)));
     
     topology.newDRPCStream("predict", drpc).each(new Fields("args"), new DRPCArgsToInstance(), new Fields("instance")).stateQuery(classifierModel, new Fields("instance"), new ClassifyTextQuery("newsClassifier"), new Fields("prediction"));
     return topology.build();
    }
}

package com.memeanalytics.trident_text_classifier_kld;

import java.util.ArrayList;
import java.util.List;

import backtype.storm.tuple.Values;

import com.github.pmerienne.trident.ml.core.TextInstance;
import com.github.pmerienne.trident.ml.preprocessing.EnglishTokenizer;
import com.github.pmerienne.trident.ml.preprocessing.TextTokenizer;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class DRPCArgsToInstance extends BaseFunction{

 private static final long serialVersionUID = 1L;

 public void execute(TridentTuple tuple, TridentCollector collector) {
  // TODO Auto-generated method stub
  String drpc_args=tuple.getString(0);
  String[] args=drpc_args.split(",");
  Integer label=Integer.parseInt(args[0]);
  
  String text=args[1];
  
  TextTokenizer textAnalyzer=new EnglishTokenizer();
  List<String> tokens=textAnalyzer.tokenize(text);
  
  
  TextInstance<Integer> instance=new TextInstance<Integer>(label, tokens);
  
  collector.emit(new Values(instance));
 }

}

As can be seen above, the Trident topology has a TextInstanceCreator<Integer> trident operation which convert raw ("label", "text") tuple into an TextInstance<Integer> object which can be consumed by TextClassifierUpdater. The TextClassifierUpdater object from Trident-ML updates the underlying classifierModel via KLDClassifier training algorithm.

The DRPCStream allows user to pass in a new testing instance to the classifierModel which will then return a "predict" field, that contains the predicted label of the testing instance. The DRPCArgsToInstance is a BaseFunction operation which converts the arguments passed into the LocalDRPC.execute() into an TextInstance<Integer> (Note you can set the label to null in DRPCArgsToInstance.execute() method as the label will be predicted instead) which can be passed into the ClassifyTextQuery which then uses KLD and classifierModel to determine the predicted label.

Once the coding is completed, we can run the project by navigating to the project root folder and run the following commands:

> .mvn compile exec:java

Trident-ML: Classification using Perceptron

This post shows some very basic example of how to use the perceptron classification algorithm in Trident-ML to process data from Storm Spout.

Firstly create a Maven project (e.g. with groupId="com.memeanalytics" artifactId="trident-classifier-perceptron"). The complete source codes of the project can be downloaded from the link:

https://dl.dropboxusercontent.com/u/113201788/storm/trident-classifier-perceptron.tar.gz

For the start we need to configure the pom.xml file in the project.

Configure pom.xml:
Firstly we need to add the clojars repository to the repositories section:

<repositories>
<repository>
<id>clojars</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>

Next we need to add the storm dependency to the dependencies section (for storm):

<dependency>
  <groupId>storm</groupId>
  <artifactId>storm</artifactId>
  <version>0.9.0.1</version>
  <scope>provided</scope>
</dependency>

Next we need to add the strident-ml dependency to the dependencies section (for perceptron classification):

<dependency>
  <groupId>com.github.pmerienne</groupId>
  <artifactId>trident-ml</artifactId>
  <version>0.0.4</version>
</dependency>

Next we need to add the exec-maven-plugin to the build/plugins section (for execute the Maven project):

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<executable>java</executable>
<classpathScope>compile</classpathScope>
<mainClass>com.memeanalytics.trident_classifier_perceptron.App</mainClass>
</configuration>
</plugin>

Next we need to add the maven-assembly-plugin to the build/plugins section (for packacging the Maven project to jar for submitting to Storm cluster):

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

Implement Spout for training data 

Once the pom.xml update is completed, we can move to implement the NANDSpout which is the Storm spout that emits batches of training data to the Trident topology:

package com.memeanalytics.trident_classifier_perceptron;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;

import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;

public class NANDSpout implements IBatchSpout {

 private int batchSize=10;
 
 public void open(Map conf, TopologyContext context) {
  // TODO Auto-generated method stub
  
 }

 public void emitBatch(long batchId, TridentCollector collector) {
  // TODO Auto-generated method stub
  final Random rand=new Random();
  for(int i=0; i < batchSize; ++i)
  {
   boolean x0=rand.nextBoolean();
   boolean x1=rand.nextBoolean();
   boolean label = !(x0 && x1);
   List<Object> values=new ArrayList<Object>();
   values.add(label);
   values.add(x0 ? 1.0 : 0.0);
   values.add(x1 ? 1.0 : 0.0);
   //values.add(x0 ? 1.0 + noise(rand) : 0.0 + noise(rand));
   //values.add(x1 ? 1.0 + noise(rand) : 0.0 + noise(rand));
   collector.emit(values);
  }
 }
 
 public static double noise(Random rand)
 {
  return rand.nextDouble()* 0.0001 - 0.00005;
 }

 public void ack(long batchId) {
  // TODO Auto-generated method stub
  
 }

 public void close() {
  // TODO Auto-generated method stub
  
 }

 public Map getComponentConfiguration() {
  // TODO Auto-generated method stub
  return null;
 }

 public Fields getOutputFields() {
  // TODO Auto-generated method stub
  return new Fields("label", "x0", "x1");
 }

}

As can be seen above, the NANDSpout is derived from IBatchSpout, and emits a batch of 10 tuples at one time, each tuple is a training record containing the fields ("label", "x0", "x1"). The label is boolean value, while x0, x1 are double values which are either 1 (true) or 0 (false). the training records are obtained in such a way that the correct prediction should be a NAND gate from the classification.

Perceptron Classification in Trident topology using Trident-ML implementation

Once we have the training data spout, we can build a Trident topology which uses the training data to create a class label for each of the data record using perceptron classifier algorithm in Trident-ML. This is implemented in the main class shown below:

package com.memeanalytics.trident_classifier_perceptron;

import java.util.Random;

import com.github.pmerienne.trident.ml.classification.ClassifierUpdater;
import com.github.pmerienne.trident.ml.classification.ClassifyQuery;
import com.github.pmerienne.trident.ml.classification.PerceptronClassifier;
import com.github.pmerienne.trident.ml.preprocessing.InstanceCreator;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.testing.MemoryMapState;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;

/**
 * Hello world!
 *
 */
public class App 
{
    public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException
    {
        LocalDRPC drpc=new LocalDRPC();
        
        LocalCluster cluster=new LocalCluster();
        Config config=new Config();
        
        cluster.submitTopology("PerceptronDemo", config, buildTopology(drpc));
        
        try{
         Thread.sleep(10000);
        }catch(InterruptedException ex)
        {
         ex.printStackTrace();
        }
        
        for(int i=0; i < 10; ++i)
        {
         String drpc_args=createDRPCTestingSample();
         System.out.println(drpc.execute("predict", drpc_args));
         try{
          Thread.sleep(1000);
         }catch(InterruptedException ex)
         {
          ex.printStackTrace();
         }
        }
        
        cluster.killTopology("PerceptronDemo");
        cluster.shutdown();
        
        drpc.shutdown();
    }
    
    private static String createDRPCTestingSample()
    {
     String drpc_args="";
     
     final Random rand=new Random();
     
     boolean bit_x0=rand.nextBoolean();
  boolean bit_x1=rand.nextBoolean();
  boolean label = !(bit_x0 && bit_x1);
  
  double x0=bit_x0 ? 1.0 + NANDSpout.noise(rand) : 0.0 + NANDSpout.noise(rand);
  double x1=bit_x1 ? 1.0 + NANDSpout.noise(rand) : 0.0 + NANDSpout.noise(rand);
  
  drpc_args+=label;
  drpc_args+=(","+x0);
  drpc_args+=(","+x1);
  
  return drpc_args;
    }
    
    private static StormTopology buildTopology(LocalDRPC drpc)
    {
     TridentTopology topology=new TridentTopology();
     NANDSpout spout=new NANDSpout();
     TridentState classifierModel = topology.newStream("training", spout).shuffle().each(new Fields("label", "x0", "x1"), new InstanceCreator<Boolean>(), new Fields("instance")).partitionPersist(new MemoryMapState.Factory(), new Fields("instance"), new ClassifierUpdater<Boolean>("perceptron", new PerceptronClassifier()));
     
     topology.newDRPCStream("predict", drpc).each(new Fields("args"), new DRPCArgsToInstance(), new Fields("instance")).stateQuery(classifierModel, new Fields("instance"), new ClassifyQuery<Boolean>("perceptron"), new Fields("predict"));
     
     return topology.build();
     
    }
}
package com.memeanalytics.trident_classifier_perceptron;

import backtype.storm.tuple.Values;

import com.github.pmerienne.trident.ml.core.Instance;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class DRPCArgsToInstance extends BaseFunction {

 private static final long serialVersionUID = 1L;

 public void execute(TridentTuple tuple, TridentCollector collector) {
  // TODO Auto-generated method stub
  String drpc_args=tuple.getString(0);
  String[] args=drpc_args.split(",");
  boolean label=Boolean.parseBoolean(args[0]);
  
  double[] features=new double[args.length-1];
  for(int i=1; i < args.length; ++i)
  {
   features[i-1]=Double.parseDouble(args[i]);
  }
  
  Instance<Boolean> instance=new Instance<Boolean>(label, features);
  
  collector.emit(new Values(instance));
 }

}

As can be seen above, the Trident topology has a InstanceCreator<Boolean> trident operation which convert raw ("label", "x0", "x1") tuple into an Instance<Boolean> object which can be consumed by ClassifierUpdater. The ClassifierUpdater object from Trident-ML updates the underlying classifierModel via perceptron training algorithm.

The DRPCStream allows user to pass in a new testing instance to the classifierModel which will then return a "predict" field, that contains the predicted label of the testing instance. The DRPCArgsToInstance is a BaseFunction operation which converts the arguments passed into the LocalDRPC.execute() into an Instance<Boolean> which can be passed into the ClassifyQuery which then uses perceptron and classifierModel to determine the predicted label.

Once the coding is completed, we can run the project by navigating to the project root folder and run the following commands:

> .mvn compile exec:java

Monday, November 24, 2014

Trident-ML: Clustering using K-Means

This post shows some very basic example of how to use the k means clustering algorithm in Trident-ML to process data from Storm Spout.

Firstly create a Maven project (e.g. with groupId="com.memeanalytics" artifactId="trident-k-means"). The complete source codes of the project can be downloaded from the link:

https://dl.dropboxusercontent.com/u/113201788/storm/trident-k-means.tar.gz

For the start we need to configure the pom.xml file in the project.

Configure pom.xml:
Firstly we need to add the clojars repository to the repositories section:

<repositories>
<repository>
<id>clojars</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>

Next we need to add the storm dependency to the dependencies section (for storm):

<dependency>
  <groupId>storm</groupId>
  <artifactId>storm</artifactId>
  <version>0.9.0.1</version>
  <scope>provided</scope>
</dependency>

Next we need to add the strident-ml dependency to the dependencies section (for k-means clustering):

<dependency>
  <groupId>com.github.pmerienne</groupId>
  <artifactId>trident-ml</artifactId>
  <version>0.0.4</version>
</dependency>

Next we need to add the exec-maven-plugin to the build/plugins section (for execute the Maven project):

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<executable>java</executable>
<classpathScope>compile</classpathScope>
<mainClass>com.memeanalytics.trident_k_means.App</mainClass>
</configuration>
</plugin>

Next we need to add the maven-assembly-plugin to the build/plugins section (for packacging the Maven project to jar for submitting to Storm cluster):

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

Implement Spout for training data 

Once the pom.xml update is completed, we can move to implement the RandomFeatureSpout which is the Storm spout that emits batches of training data to the Trident topology:

package com.memeanalytics.trident_k_means;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import com.github.pmerienne.trident.ml.core.Instance;
import com.github.pmerienne.trident.ml.testing.data.Datasets;

import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;

public class RandomFeatureSpout implements IBatchSpout{

 private int batchSize=10;
 private int numFeatures=3;
 private int numClasses=3;
 
 public void open(Map conf, TopologyContext context) {
  // TODO Auto-generated method stub
  
 }

 public void emitBatch(long batchId, TridentCollector collector) {
  // TODO Auto-generated method stub
  List<Instance<Integer>> data = Datasets.generateDataForMultiLabelClassification(batchSize, numFeatures, numClasses);
  
  
  for(Instance<Integer> instance : data)
  {
   List<Object> values=new ArrayList<Object>();
   values.add(instance.label);
   
   for(double feature : instance.getFeatures())
   {
    values.add(feature);
   }
   collector.emit(values);
  }
  
 }

 public void ack(long batchId) {
  // TODO Auto-generated method stub
  
 }

 public void close() {
  // TODO Auto-generated method stub
  
 }

 public Map getComponentConfiguration() {
  // TODO Auto-generated method stub
  return null;
 }

 public Fields getOutputFields() {
  // TODO Auto-generated method stub
  return new Fields("label", "x0", "x1", "x2");
 }
}

As can be seen above, the RandomFeatureSpout is derived from IBatchSpout, and emits a batch of 10 tuples at one time, each tuple is a training record containing the fields ("label", "x0", "x1", "x2"). The label is integer, while x0, x1, x2 are double values. the training records are obtained from Trident-ML's DataSets.generateDataForMultiLabelClassification() method.

K-means in Trident topology using Trident-ML implementation

Once we have the training data spout, we can build a Trident topology which uses the training data to create a class label for each of the data record using k-means algorithm in Trident-ML. This is implemented in the main class shown below:

package com.memeanalytics.trident_k_means;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import com.github.pmerienne.trident.ml.clustering.ClusterQuery;
import com.github.pmerienne.trident.ml.clustering.ClusterUpdater;
import com.github.pmerienne.trident.ml.clustering.KMeans;
import com.github.pmerienne.trident.ml.core.Instance;
import com.github.pmerienne.trident.ml.preprocessing.InstanceCreator;
import com.github.pmerienne.trident.ml.testing.data.Datasets;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.testing.MemoryMapState;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;

public class App 
{
    public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException
    {
        LocalDRPC drpc=new LocalDRPC();
        
        Config config=new Config();
        
        LocalCluster cluster=new LocalCluster();
        
        cluster.submitTopology("KMeansDemo", config, buildTopology(drpc));
        
        try{
         Thread.sleep(10000);
        }catch(InterruptedException ex)
        {
         ex.printStackTrace();
        }
        
        
        
        for(int i=0; i < 10; ++i)
        {
         String drpc_args=generateRandomTestingArgs();
         System.out.println(drpc.execute("predict", drpc_args));
         try{
          Thread.sleep(1000);
         }catch(InterruptedException ex)
         {
          ex.printStackTrace();
         }
        }
        
        cluster.killTopology("KMeansDemo");
        cluster.shutdown();
        drpc.shutdown();
    }
    
    private static String generateRandomTestingArgs()
    {
     int batchSize=10;
     int numFeatures=3;
     int numClasses=3;
     
     final Random rand=new Random();
     
     List<Instance<Integer>> data = Datasets.generateDataForMultiLabelClassification(batchSize, numFeatures, numClasses);
  
  String args="";
  Instance<Integer> instance = data.get(rand.nextInt(data.size()));
  
  
  args+=instance.label;
  
  for(double feature : instance.getFeatures())
  {
   args+=(","+feature);
  }
   
  
  return args;
    }
    
    private static StormTopology buildTopology(LocalDRPC drpc)
    {
     TridentTopology topology=new TridentTopology();
     
     RandomFeatureSpout spout=new RandomFeatureSpout();
     
     TridentState clusterModel = topology.newStream("training", spout).each(new Fields("label", "x0", "x1", "x2"), new InstanceCreator<Integer>(), new Fields("instance")).partitionPersist(new MemoryMapState.Factory(), new Fields("instance"), new ClusterUpdater("kmeans", new KMeans(3)));
     
     topology.newDRPCStream("predict", drpc).each(new Fields("args"), new DRPCArgsToInstance(), new Fields("instance")).stateQuery(clusterModel, new Fields("instance"), new ClusterQuery("kmeans"), new Fields("predict"));
     
     return topology.build();
    }
}
package com.memeanalytics.trident_k_means;

import java.util.ArrayList;
import java.util.List;

import backtype.storm.tuple.Values;

import com.github.pmerienne.trident.ml.core.Instance;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class DRPCArgsToInstance extends BaseFunction{

 private static final long serialVersionUID = 1L;

 public void execute(TridentTuple tuple, TridentCollector collector) {
  // TODO Auto-generated method stub
  String drpc_args = tuple.getString(0);
  String[] args = drpc_args.split(",");
  Integer label=Integer.parseInt(args[0]);
  double[] features=new double[args.length-1];
  for(int i=1; i < args.length; ++i)
  {
   double feature=Double.parseDouble(args[i]);
   features[i-1] = feature;
  }
  Instance<Integer> instance=new Instance<Integer>(label, features);
  
  collector.emit(new Values(instance));
 }

}

As can be seen above, the Trident topology has a InstanceCreator<Integer> trident operation which convert raw ("label", "x0", "x1", "x2") tuple into an Instance<Integer> object which can be consumed by ClusterUpdator. The ClusterUpdate object from Trident-ML updates the underlying clusterModel via k-Means algorithm.

The DRPCStream allows user to pass in a new testing instance to the clusterModel which will then return a "predict" field, that contains the predicted label of the testing instance. The DRPCArgsToInstance is a BaseFunction operation which converts the arguments passed into the LocalDRPC.execute() into an Instance<Integer> which can be passed into the ClusterQuery which then uses kmeans and clusterModel to determine the predicted label.

Once the coding is completed, we can run the project by navigating to the project root folder and run the following commands:

> .mvn compile exec:java

Integration of Kafka-Trident-MySQL

This post shows a most basic example in which user can integrate Kafka, Trident (on top of Storm) and MySQL. The example uses a Kafka producer which randomly produce messages to Kafka brokers (a random list of country names), a TransactionalTridentKafkaSpout is used pull data from Kafka messaging system and emits the tuples (containing the field "str" which is the country names from the Kafka producer) to a Trident operation that serialize the received messages into the mysql database.

Some ZooKeeper and Kafka settings need to be explained before we proceed. the source codes developed here assumes that the ZooKeepers runs on the following nodes:

192.168.2.2:2181
192.168.2.4:2181

and also assumes that the Kafka brokers runs at the following hostname:port:

192.168.2.2:9092
192.168.2.4:9092

The Kafka producer can be downloaded from the following link:

https://dl.dropboxusercontent.com/u/113201788/storm/kakfa-producer-for-trident.tar.gz

Basically the Kafka producer emits a random list of country names as messages sent to the Kafka brokers. The tutorial on how to implement Kafka producer can be found at:

Next we need to create Maven project (e.g. with groupId="com.memeanalytics" and artifactId="kafka-trident-consumer") which will consumes the Kafka message in a Trident topology and serialize it to mysql database. The source codes of the project can be downloaded from:

https://dl.dropboxusercontent.com/u/113201788/storm/kafka-trident-consumer.zip

Below we will explain how to prepare the pom.xml file, implement the storm operation for mysql serialization, as well as configuration of Trident topology which can consume messages from Kafka brokers.

Prepare pom.xml

In the pom.xml, first add in the clojars repository in the repositories section:

  <repositories>
  <repository>
  <id>clojars</id>
  <url>http://clojars.org/repo</url>
  </repository>
  </repositories>

Next add in the storm-kafka-0.8-plus dependency in the dependencies section (for TransactionalTridentKafkaSpout):

<dependency>
  <groupId>net.wurstmeister.storm</groupId>
  <artifactId>storm-kafka-0.8-plus</artifactId>
  <version>0.4.0</version>
</dependency>

Next add in the storm-core dependency in the dependencies section (for storm):

<dependency>
  <groupId>storm</groupId>
  <artifactId>storm-core</artifactId>
  <version>0.9.0.1</version>
</dependency>

Next add in the mysql-connector-java dependency in the dependencies section (for mysql):

<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.6</version>
</dependency>

Next add in the maven-assembly-plugin in the build/plugins section (for packaging the Maven project as jar for submitting to Storm cluster):

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

Next add in the exec-maven-plugin in the build/plugins section (for executing the Maven project):

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>

<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>

<configuration>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<executable>java</executable>
<classpathScope>compile</classpathScope>
<mainClass>com.memeanalytics.kafka_trident_consumer.App</mainClass>
</configuration>

</plugin>

This completes the pom.xml configuration. Next we will implements the Trident operation which serializes the TridentTuple data into mysql database.

Trident operation for mysql serialization

The Trident operation which serializes the TridentTuple data is a BaseFilter object from Trident that has the following implementation:

package com.memeanalytics.kafka_trident_consumer;

import java.util.Map;

import storm.trident.operation.BaseFilter;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;

public class TridentUtils {
 public static class MySqlPersist extends BaseFilter{

  private static final long serialVersionUID = 1L;

  private MySqlDump mysqlSerializer=null;
  
  public boolean isKeep(TridentTuple tuple) {
   String country=tuple.getString(0);
   mysqlSerializer.store(country);
   System.out.println("Country: "+country);
   return true;
  }
  
  @Override
     public void prepare(Map conf, TridentOperationContext context) {
   mysqlSerializer=new MySqlDump("localhost","mylog", "root", "[username]");
     }

     @Override
     public void cleanup() {
      mysqlSerializer.close();
     }
  
 }
}

In the above implementation, the mysqlSerializer member variable is responsible for actually storing the data in mysql database. it opens the mysql connection (in its constructor) in prepare() method and closes the mysql connection in cleanup() method. the data serialization happens in the execute() method. Below is the implementation of the class of the variable:

package com.memeanalytics.kafka_trident_consumer;

import java.sql.Connection;
import java.sql.SQLException;

import java.sql.PreparedStatement;

public class MySqlDump {
 private String ip;
 private String database;
 private String username;
 private String password;
 private Connection conn;
 
 public MySqlDump(String ip, String database, String username, String password)
 {
  this.ip = ip;
  this.database=database;
  this.username=username;
  this.password=password;
  conn=MySqlConnectionGenerator.open(ip, database, username, password);
 }
 
 public void store(String dataitem)
 {
  if(conn==null) return;
  
  PreparedStatement statement = null;
  try{
   statement = conn.prepareStatement("insert into mylogtable (id, dataitem) values (default, ?)");
   statement.setString(1, dataitem);
   statement.executeUpdate();
  }catch(Exception ex)
  {
   ex.printStackTrace();
  }finally
  {
   if(statement != null)
   {
    try {
     statement.close();
    } catch (SQLException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
    }
   }
  }
 }
 
 public void close()
 {
  if(conn==null) return;
  try {
   conn.close();
  } catch (SQLException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }
}
package com.memeanalytics.kafka_trident_consumer;

import java.sql.Connection;
import java.sql.DriverManager;

public class MySqlConnectionGenerator {
 public static Connection open(String ip, String database, String username, String password)
 {
  Connection conn = null;
  try{
   Class.forName("com.mysql.jdbc.Driver");
   conn=DriverManager.getConnection("jdbc:mysql://"+ip+"/"+database+"?user="+username+"&password="+password);
  }catch(Exception ex)
  {
   ex.printStackTrace();
  }
  
  return conn;
 }
}

Trident topology implementation

Once this is completed. we are ready to implement the Trident topology which consumes data from Kafka and saves it to mysql. this is implemented in the main class:

package com.memeanalytics.kafka_trident_consumer;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.tuple.Fields;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.trident.TransactionalTridentKafkaSpout;
import storm.kafka.trident.TridentKafkaConfig;
import storm.trident.TridentTopology;

public class App 
{
    public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException
    {
        ZkHosts zkHosts=new ZkHosts("192.168.2.4:2181");
        
        String topic="country-topic";
        String consumer_group_id="storm";
        
        TridentKafkaConfig kafkaConfig=new TridentKafkaConfig(zkHosts, topic, consumer_group_id);
        
        kafkaConfig.scheme=new SchemeAsMultiScheme(new StringScheme());
        kafkaConfig.forceFromStart=true;
        
        TransactionalTridentKafkaSpout spout=new TransactionalTridentKafkaSpout(kafkaConfig);
        
        TridentTopology topology=new TridentTopology();
        
        topology.newStream("spout", spout).shuffle().each(new Fields("str"), new TridentUtils.MySqlPersist());
        
        LocalCluster cluster=new LocalCluster();
        
        Config config=new Config();
        
        cluster.submitTopology("KafkaTridentMysqlDemo", config, topology.build());
        
        try{
         Thread.sleep(10000);
        }catch(InterruptedException ex)
        {
         ex.printStackTrace();
        }
        
        cluster.killTopology("KafkaTridentMysqlDemo");
        cluster.shutdown();
    }
}

For demo purpose, its uses LocalCluster to submit and run the Trident topology and does not implement a DRPCStream which can be used to queried result. The main change is the TractionalTridentKafkaSpout which takes in a TridentKafkaConfig object as parameter in its constructor. The TransactionalTridentKafkaSpout emits tuples which is serialized by the TridentUtil.MySqlPersist that is the mysql serialization Trident operation.

Once done, we can compile and exec the Maven project by navigating to its root folder and run the following command:

> mvn compile exec:java