Tuesday, November 25, 2014

Trident-ML: Regression using Passive-Aggressive algorithm

This post shows some very basic example of how to use the Passive-Aggressive algorithm as regression algorithm in Trident-ML to process data from Storm Spout.

Firstly create a Maven project (e.g. with groupId="com.memeanalytics" artifactId="trident-regression-pa"). The complete source codes of the project can be downloaded from the link:

https://dl.dropboxusercontent.com/u/113201788/storm/trident-regression-pa.tar.gz

For the start we need to configure the pom.xml file in the project.

Configure pom.xml:
Firstly we need to add the clojars repository to the repositories section:

<repositories>
<repository>
<id>clojars</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>

Next we need to add the storm dependency to the dependencies section (for storm):

<dependency>
  <groupId>storm</groupId>
  <artifactId>storm</artifactId>
  <version>0.9.0.1</version>
  <scope>provided</scope>
</dependency>

Next we need to add the strident-ml dependency to the dependencies section (for PA regression):

<dependency>
  <groupId>com.github.pmerienne</groupId>
  <artifactId>trident-ml</artifactId>
  <version>0.0.4</version>
</dependency>

Next we need to add the exec-maven-plugin to the build/plugins section (for execute the Maven project):

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<executable>java</executable>
<classpathScope>compile</classpathScope>
<mainClass>com.memeanalytics.trident_regression_pa.App</mainClass>
</configuration>
</plugin>

Next we need to add the maven-assembly-plugin to the build/plugins section (for packacging the Maven project to jar for submitting to Storm cluster):

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

Implement Spout for training data 

Once the pom.xml update is completed, we can move to implement the BirthDataSpout which is the Storm spout that emits batches of training data to the Trident topology:

package com.memeanalytics.trident_regression_pa;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import com.github.pmerienne.trident.ml.core.Instance;
import com.github.pmerienne.trident.ml.testing.data.Datasets;

import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;

public class BirthDataSpout implements IBatchSpout {

 private static final long serialVersionUID = 1L;

 private int batchSize=10;
 private int batchIndex=0;
 
 private static List<Instance<Double>> sample_data=new ArrayList<Instance<Double>>();
 private static List<Instance<Double>> testing_data=new ArrayList<Instance<Double>>();
 
 public static List<String> getDRPCArgsList()
 {
  List<String> drpc_args_list =new ArrayList<String>();
  for(Instance<Double> instance : testing_data)
  {
   double[] features = instance.getFeatures();
   String drpc_args="";
   for(int i=0; i < features.length; ++i)
   {
    if(i==0)
    {
     drpc_args+=features[i];
    }
    else
    {
     drpc_args+=(","+features[i]);
    }
   }
   drpc_args+=(","+instance.label);
   drpc_args_list.add(drpc_args);
  }
  
  return drpc_args_list;
 }
 
 static{
  FileInputStream is=null;
  BufferedReader br=null;
  try{
   String filePath="src/test/resources/births.csv";
   is=new FileInputStream(filePath);
   br=new BufferedReader(new InputStreamReader(is));
   
   List<Instance<Double>> temp=new ArrayList<Instance<Double>>();
   String line=null;
   while((line=br.readLine())!=null)
   {
    String[] values = line.split(";");
    double label= Double.parseDouble(values[values.length-1]);
    double[] features=new double[values.length-1];
    for(int i=0; i < values.length-1; ++i)
    {
     features[i]=Double.parseDouble(values[i]);
    }
    
    Instance<Double> instance=new Instance<Double>(label, features);
    temp.add(instance);
   }
   
   Collections.shuffle(temp);
   
   for(Instance<Double> instance : temp)
   {
    if(testing_data.size() < 10)
    {
     testing_data.add(instance);
    }
    else
    {
     sample_data.add(instance);
    }
   }
   
  }catch(FileNotFoundException ex)
  {
   ex.printStackTrace();
  }catch(IOException ex)
  {
   ex.printStackTrace();
  }finally
  {
   try {
    if(is!=null) is.close();
    if(br !=null) br.close();
   } catch (IOException e) {
    e.printStackTrace();
   }
  }
 }
 
 public BirthDataSpout()
 {
  
 }
 
 public void open(Map conf, TopologyContext context) {
  // TODO Auto-generated method stub
  
 }

 public void emitBatch(long batchId, TridentCollector collector) {
  // TODO Auto-generated method stub
  int maxBatchCount=sample_data.size() / batchSize;
  if(maxBatchCount > 0 && batchIndex < maxBatchCount)
  {
   for(int i=batchIndex * batchSize; i < sample_data.size() && i < (batchIndex+1) * batchSize; ++i)
   {
    Instance<Double> instance = sample_data.get(i);
    collector.emit(new Values(instance));
   }
   batchIndex=(batchIndex+1) % maxBatchCount;
  }
 }

 public void ack(long batchId) {
  // TODO Auto-generated method stub
  
 }

 public void close() {
  // TODO Auto-generated method stub
  
 }

 public Map getComponentConfiguration() {
  // TODO Auto-generated method stub
  return null;
 }

 public Fields getOutputFields() {
  // TODO Auto-generated method stub
  return new Fields("instance");
 }

}

As can be seen above, the BirthDataSpout is derived from IBatchSpout, and emits a batch of 10 tuples at one time, each tuple is a training record containing the fields ("instance").  The "instance" field contains a datatype Instance<Double> which contains a double array as features and a double value as label. The training records are obtained from a births.csv file residing in src/test/resources.

PA Regression in Trident topology using Trident-ML implementation

Once we have the training data spout, we can build a Trident topology which uses the training data to create a predicted output value for each of the data record using PA regression algorithm in Trident-ML. This is implemented in the main class shown below:

package com.memeanalytics.trident_regression_pa;

import java.util.List;

import com.github.pmerienne.trident.ml.regression.PARegressor;
import com.github.pmerienne.trident.ml.regression.RegressionQuery;
import com.github.pmerienne.trident.ml.regression.RegressionUpdater;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.testing.MemoryMapState;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;

/**
 * Hello world!
 *
 */
public class App 
{
    public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException
    {
        LocalDRPC drpc=new LocalDRPC();
        
        LocalCluster cluster=new LocalCluster();
        Config config=new Config();
        
        cluster.submitTopology("RegressionDemo", config, buildTopology(drpc));
        
        try{
         Thread.sleep(10000);
        }catch(InterruptedException ex)
        {
         ex.printStackTrace();
        }
        
        List<String> drpc_args_list=BirthDataSpout.getDRPCArgsList();
        for(String drpc_args : drpc_args_list)
        {
         System.out.println(drpc.execute("predict", drpc_args));
        }
     
        cluster.killTopology("RegressionDemo");
        cluster.shutdown();
        
        drpc.shutdown();
    }
    
    private static StormTopology buildTopology(LocalDRPC drpc)
    {
     TridentTopology topology=new TridentTopology();
     
     BirthDataSpout spout=new BirthDataSpout();
     
     TridentState regressionModel = topology.newStream("training", spout).partitionPersist(new MemoryMapState.Factory(), new Fields("instance"), new RegressionUpdater("regression", new PARegressor()));
     
     topology.newDRPCStream("predict", drpc).each(new Fields("args"), new DRPCArgsToInstance(), new Fields("instance")).stateQuery(regressionModel, new Fields("instance"), new RegressionQuery("regression"), new Fields("prediction")).project(new Fields("args", "prediction"));
     
     return topology.build();
    }
}
package com.memeanalytics.trident_regression_pa;

import backtype.storm.tuple.Values;

import com.github.pmerienne.trident.ml.core.Instance;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class DRPCArgsToInstance extends BaseFunction {

 private static final long serialVersionUID = 1L;

 public void execute(TridentTuple tuple, TridentCollector collector) {
  String drpc_args = tuple.getString(0);
  String[] args=drpc_args.split(",");
  
  Double label=Double.parseDouble(args[args.length-1]);
  double[] features=new double[args.length-1];
  
  for(int i=0; i < args.length-1; ++i)
  {
   features[i]=Double.parseDouble(args[i]);
  }
  
  Instance<Double> instance=new Instance<Double>(label, features);
  
  collector.emit(new Values(instance));
 }

}

As can be seen above, the Trident topology has the BirthDataSpout emits Instance<Double> training data can be consumed by RegressionUpdater. The RegressionUpdater object from Trident-ML updates the underlying regressionModel via PA algorithm.

The DRPCStream allows user to pass in a new testing instance to the regressionModel which will then return a "predict" field, that contains the predicted output of the testing instance. The DRPCArgsToInstance is a BaseFunction operation which converts the arguments passed into the LocalDRPC.execute() into an Instance<Double> which can be passed into the RegressionQuery which then uses PARegressor and regressionModel to determine the predicted output value.

Once the coding is completed, we can run the project by navigating to the project root folder and run the following commands:

> .mvn compile exec:java

No comments:

Post a Comment