Monday, November 24, 2014

Trident-ML: Clustering using K-Means

This post shows some very basic example of how to use the k means clustering algorithm in Trident-ML to process data from Storm Spout.

Firstly create a Maven project (e.g. with groupId="com.memeanalytics" artifactId="trident-k-means"). The complete source codes of the project can be downloaded from the link:

https://dl.dropboxusercontent.com/u/113201788/storm/trident-k-means.tar.gz

For the start we need to configure the pom.xml file in the project.

Configure pom.xml:
Firstly we need to add the clojars repository to the repositories section:

<repositories>
<repository>
<id>clojars</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>

Next we need to add the storm dependency to the dependencies section (for storm):

<dependency>
  <groupId>storm</groupId>
  <artifactId>storm</artifactId>
  <version>0.9.0.1</version>
  <scope>provided</scope>
</dependency>

Next we need to add the strident-ml dependency to the dependencies section (for k-means clustering):

<dependency>
  <groupId>com.github.pmerienne</groupId>
  <artifactId>trident-ml</artifactId>
  <version>0.0.4</version>
</dependency>

Next we need to add the exec-maven-plugin to the build/plugins section (for execute the Maven project):

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<executable>java</executable>
<classpathScope>compile</classpathScope>
<mainClass>com.memeanalytics.trident_k_means.App</mainClass>
</configuration>
</plugin>

Next we need to add the maven-assembly-plugin to the build/plugins section (for packacging the Maven project to jar for submitting to Storm cluster):

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

Implement Spout for training data 

Once the pom.xml update is completed, we can move to implement the RandomFeatureSpout which is the Storm spout that emits batches of training data to the Trident topology:

package com.memeanalytics.trident_k_means;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import com.github.pmerienne.trident.ml.core.Instance;
import com.github.pmerienne.trident.ml.testing.data.Datasets;

import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;

public class RandomFeatureSpout implements IBatchSpout{

 private int batchSize=10;
 private int numFeatures=3;
 private int numClasses=3;
 
 public void open(Map conf, TopologyContext context) {
  // TODO Auto-generated method stub
  
 }

 public void emitBatch(long batchId, TridentCollector collector) {
  // TODO Auto-generated method stub
  List<Instance<Integer>> data = Datasets.generateDataForMultiLabelClassification(batchSize, numFeatures, numClasses);
  
  
  for(Instance<Integer> instance : data)
  {
   List<Object> values=new ArrayList<Object>();
   values.add(instance.label);
   
   for(double feature : instance.getFeatures())
   {
    values.add(feature);
   }
   collector.emit(values);
  }
  
 }

 public void ack(long batchId) {
  // TODO Auto-generated method stub
  
 }

 public void close() {
  // TODO Auto-generated method stub
  
 }

 public Map getComponentConfiguration() {
  // TODO Auto-generated method stub
  return null;
 }

 public Fields getOutputFields() {
  // TODO Auto-generated method stub
  return new Fields("label", "x0", "x1", "x2");
 }
}

As can be seen above, the RandomFeatureSpout is derived from IBatchSpout, and emits a batch of 10 tuples at one time, each tuple is a training record containing the fields ("label", "x0", "x1", "x2"). The label is integer, while x0, x1, x2 are double values. the training records are obtained from Trident-ML's DataSets.generateDataForMultiLabelClassification() method.

K-means in Trident topology using Trident-ML implementation

Once we have the training data spout, we can build a Trident topology which uses the training data to create a class label for each of the data record using k-means algorithm in Trident-ML. This is implemented in the main class shown below:

package com.memeanalytics.trident_k_means;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import com.github.pmerienne.trident.ml.clustering.ClusterQuery;
import com.github.pmerienne.trident.ml.clustering.ClusterUpdater;
import com.github.pmerienne.trident.ml.clustering.KMeans;
import com.github.pmerienne.trident.ml.core.Instance;
import com.github.pmerienne.trident.ml.preprocessing.InstanceCreator;
import com.github.pmerienne.trident.ml.testing.data.Datasets;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.testing.MemoryMapState;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;

public class App 
{
    public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException
    {
        LocalDRPC drpc=new LocalDRPC();
        
        Config config=new Config();
        
        LocalCluster cluster=new LocalCluster();
        
        cluster.submitTopology("KMeansDemo", config, buildTopology(drpc));
        
        try{
         Thread.sleep(10000);
        }catch(InterruptedException ex)
        {
         ex.printStackTrace();
        }
        
        
        
        for(int i=0; i < 10; ++i)
        {
         String drpc_args=generateRandomTestingArgs();
         System.out.println(drpc.execute("predict", drpc_args));
         try{
          Thread.sleep(1000);
         }catch(InterruptedException ex)
         {
          ex.printStackTrace();
         }
        }
        
        cluster.killTopology("KMeansDemo");
        cluster.shutdown();
        drpc.shutdown();
    }
    
    private static String generateRandomTestingArgs()
    {
     int batchSize=10;
     int numFeatures=3;
     int numClasses=3;
     
     final Random rand=new Random();
     
     List<Instance<Integer>> data = Datasets.generateDataForMultiLabelClassification(batchSize, numFeatures, numClasses);
  
  String args="";
  Instance<Integer> instance = data.get(rand.nextInt(data.size()));
  
  
  args+=instance.label;
  
  for(double feature : instance.getFeatures())
  {
   args+=(","+feature);
  }
   
  
  return args;
    }
    
    private static StormTopology buildTopology(LocalDRPC drpc)
    {
     TridentTopology topology=new TridentTopology();
     
     RandomFeatureSpout spout=new RandomFeatureSpout();
     
     TridentState clusterModel = topology.newStream("training", spout).each(new Fields("label", "x0", "x1", "x2"), new InstanceCreator<Integer>(), new Fields("instance")).partitionPersist(new MemoryMapState.Factory(), new Fields("instance"), new ClusterUpdater("kmeans", new KMeans(3)));
     
     topology.newDRPCStream("predict", drpc).each(new Fields("args"), new DRPCArgsToInstance(), new Fields("instance")).stateQuery(clusterModel, new Fields("instance"), new ClusterQuery("kmeans"), new Fields("predict"));
     
     return topology.build();
    }
}
package com.memeanalytics.trident_k_means;

import java.util.ArrayList;
import java.util.List;

import backtype.storm.tuple.Values;

import com.github.pmerienne.trident.ml.core.Instance;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class DRPCArgsToInstance extends BaseFunction{

 private static final long serialVersionUID = 1L;

 public void execute(TridentTuple tuple, TridentCollector collector) {
  // TODO Auto-generated method stub
  String drpc_args = tuple.getString(0);
  String[] args = drpc_args.split(",");
  Integer label=Integer.parseInt(args[0]);
  double[] features=new double[args.length-1];
  for(int i=1; i < args.length; ++i)
  {
   double feature=Double.parseDouble(args[i]);
   features[i-1] = feature;
  }
  Instance<Integer> instance=new Instance<Integer>(label, features);
  
  collector.emit(new Values(instance));
 }

}

As can be seen above, the Trident topology has a InstanceCreator<Integer> trident operation which convert raw ("label", "x0", "x1", "x2") tuple into an Instance<Integer> object which can be consumed by ClusterUpdator. The ClusterUpdate object from Trident-ML updates the underlying clusterModel via k-Means algorithm.

The DRPCStream allows user to pass in a new testing instance to the clusterModel which will then return a "predict" field, that contains the predicted label of the testing instance. The DRPCArgsToInstance is a BaseFunction operation which converts the arguments passed into the LocalDRPC.execute() into an Instance<Integer> which can be passed into the ClusterQuery which then uses kmeans and clusterModel to determine the predicted label.

Once the coding is completed, we can run the project by navigating to the project root folder and run the following commands:

> .mvn compile exec:java

No comments:

Post a Comment