Tuesday, November 25, 2014

Trident-ML: Classification using Perceptron

This post shows some very basic example of how to use the perceptron classification algorithm in Trident-ML to process data from Storm Spout.

Firstly create a Maven project (e.g. with groupId="com.memeanalytics" artifactId="trident-classifier-perceptron"). The complete source codes of the project can be downloaded from the link:

https://dl.dropboxusercontent.com/u/113201788/storm/trident-classifier-perceptron.tar.gz

For the start we need to configure the pom.xml file in the project.

Configure pom.xml:
Firstly we need to add the clojars repository to the repositories section:

<repositories>
<repository>
<id>clojars</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>

Next we need to add the storm dependency to the dependencies section (for storm):

<dependency>
  <groupId>storm</groupId>
  <artifactId>storm</artifactId>
  <version>0.9.0.1</version>
  <scope>provided</scope>
</dependency>

Next we need to add the strident-ml dependency to the dependencies section (for perceptron classification):

<dependency>
  <groupId>com.github.pmerienne</groupId>
  <artifactId>trident-ml</artifactId>
  <version>0.0.4</version>
</dependency>

Next we need to add the exec-maven-plugin to the build/plugins section (for execute the Maven project):

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<executable>java</executable>
<classpathScope>compile</classpathScope>
<mainClass>com.memeanalytics.trident_classifier_perceptron.App</mainClass>
</configuration>
</plugin>

Next we need to add the maven-assembly-plugin to the build/plugins section (for packacging the Maven project to jar for submitting to Storm cluster):

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

Implement Spout for training data 

Once the pom.xml update is completed, we can move to implement the NANDSpout which is the Storm spout that emits batches of training data to the Trident topology:

package com.memeanalytics.trident_classifier_perceptron;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;

import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;

public class NANDSpout implements IBatchSpout {

 private int batchSize=10;
 
 public void open(Map conf, TopologyContext context) {
  // TODO Auto-generated method stub
  
 }

 public void emitBatch(long batchId, TridentCollector collector) {
  // TODO Auto-generated method stub
  final Random rand=new Random();
  for(int i=0; i < batchSize; ++i)
  {
   boolean x0=rand.nextBoolean();
   boolean x1=rand.nextBoolean();
   boolean label = !(x0 && x1);
   List<Object> values=new ArrayList<Object>();
   values.add(label);
   values.add(x0 ? 1.0 : 0.0);
   values.add(x1 ? 1.0 : 0.0);
   //values.add(x0 ? 1.0 + noise(rand) : 0.0 + noise(rand));
   //values.add(x1 ? 1.0 + noise(rand) : 0.0 + noise(rand));
   collector.emit(values);
  }
 }
 
 public static double noise(Random rand)
 {
  return rand.nextDouble()* 0.0001 - 0.00005;
 }

 public void ack(long batchId) {
  // TODO Auto-generated method stub
  
 }

 public void close() {
  // TODO Auto-generated method stub
  
 }

 public Map getComponentConfiguration() {
  // TODO Auto-generated method stub
  return null;
 }

 public Fields getOutputFields() {
  // TODO Auto-generated method stub
  return new Fields("label", "x0", "x1");
 }

}

As can be seen above, the NANDSpout is derived from IBatchSpout, and emits a batch of 10 tuples at one time, each tuple is a training record containing the fields ("label", "x0", "x1"). The label is boolean value, while x0, x1 are double values which are either 1 (true) or 0 (false). the training records are obtained in such a way that the correct prediction should be a NAND gate from the classification.

Perceptron Classification in Trident topology using Trident-ML implementation

Once we have the training data spout, we can build a Trident topology which uses the training data to create a class label for each of the data record using perceptron classifier algorithm in Trident-ML. This is implemented in the main class shown below:

package com.memeanalytics.trident_classifier_perceptron;

import java.util.Random;

import com.github.pmerienne.trident.ml.classification.ClassifierUpdater;
import com.github.pmerienne.trident.ml.classification.ClassifyQuery;
import com.github.pmerienne.trident.ml.classification.PerceptronClassifier;
import com.github.pmerienne.trident.ml.preprocessing.InstanceCreator;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.testing.MemoryMapState;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;

/**
 * Hello world!
 *
 */
public class App 
{
    public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException
    {
        LocalDRPC drpc=new LocalDRPC();
        
        LocalCluster cluster=new LocalCluster();
        Config config=new Config();
        
        cluster.submitTopology("PerceptronDemo", config, buildTopology(drpc));
        
        try{
         Thread.sleep(10000);
        }catch(InterruptedException ex)
        {
         ex.printStackTrace();
        }
        
        for(int i=0; i < 10; ++i)
        {
         String drpc_args=createDRPCTestingSample();
         System.out.println(drpc.execute("predict", drpc_args));
         try{
          Thread.sleep(1000);
         }catch(InterruptedException ex)
         {
          ex.printStackTrace();
         }
        }
        
        cluster.killTopology("PerceptronDemo");
        cluster.shutdown();
        
        drpc.shutdown();
    }
    
    private static String createDRPCTestingSample()
    {
     String drpc_args="";
     
     final Random rand=new Random();
     
     boolean bit_x0=rand.nextBoolean();
  boolean bit_x1=rand.nextBoolean();
  boolean label = !(bit_x0 && bit_x1);
  
  double x0=bit_x0 ? 1.0 + NANDSpout.noise(rand) : 0.0 + NANDSpout.noise(rand);
  double x1=bit_x1 ? 1.0 + NANDSpout.noise(rand) : 0.0 + NANDSpout.noise(rand);
  
  drpc_args+=label;
  drpc_args+=(","+x0);
  drpc_args+=(","+x1);
  
  return drpc_args;
    }
    
    private static StormTopology buildTopology(LocalDRPC drpc)
    {
     TridentTopology topology=new TridentTopology();
     NANDSpout spout=new NANDSpout();
     TridentState classifierModel = topology.newStream("training", spout).shuffle().each(new Fields("label", "x0", "x1"), new InstanceCreator<Boolean>(), new Fields("instance")).partitionPersist(new MemoryMapState.Factory(), new Fields("instance"), new ClassifierUpdater<Boolean>("perceptron", new PerceptronClassifier()));
     
     topology.newDRPCStream("predict", drpc).each(new Fields("args"), new DRPCArgsToInstance(), new Fields("instance")).stateQuery(classifierModel, new Fields("instance"), new ClassifyQuery<Boolean>("perceptron"), new Fields("predict"));
     
     return topology.build();
     
    }
}
package com.memeanalytics.trident_classifier_perceptron;

import backtype.storm.tuple.Values;

import com.github.pmerienne.trident.ml.core.Instance;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class DRPCArgsToInstance extends BaseFunction {

 private static final long serialVersionUID = 1L;

 public void execute(TridentTuple tuple, TridentCollector collector) {
  // TODO Auto-generated method stub
  String drpc_args=tuple.getString(0);
  String[] args=drpc_args.split(",");
  boolean label=Boolean.parseBoolean(args[0]);
  
  double[] features=new double[args.length-1];
  for(int i=1; i < args.length; ++i)
  {
   features[i-1]=Double.parseDouble(args[i]);
  }
  
  Instance<Boolean> instance=new Instance<Boolean>(label, features);
  
  collector.emit(new Values(instance));
 }

}

As can be seen above, the Trident topology has a InstanceCreator<Boolean> trident operation which convert raw ("label", "x0", "x1") tuple into an Instance<Boolean> object which can be consumed by ClassifierUpdater. The ClassifierUpdater object from Trident-ML updates the underlying classifierModel via perceptron training algorithm.

The DRPCStream allows user to pass in a new testing instance to the classifierModel which will then return a "predict" field, that contains the predicted label of the testing instance. The DRPCArgsToInstance is a BaseFunction operation which converts the arguments passed into the LocalDRPC.execute() into an Instance<Boolean> which can be passed into the ClassifyQuery which then uses perceptron and classifierModel to determine the predicted label.

Once the coding is completed, we can run the project by navigating to the project root folder and run the following commands:

> .mvn compile exec:java

No comments:

Post a Comment