Wednesday, November 26, 2014

Trident-ML: Indexing Sentiment Classification results with ElasticSearch

In Storm, we may have scenarios in which we like to index results obtained from real-time processing or machine learning into a search and analytics engine. For example, we may have some text streaming in from Kafka messaging system which will go through a TwitterSentimentClassifier (which is available in Trident-ML). After that, we may wish to save the text together with the classified sentiment label as an indexed document in ElasticSearch. This post shows one way to realize such an implementation.

First create a Maven project (e.g. with groupId="com.memeanalytics" and artifactId="es-create-index"), the complete source code of the project can be downloaded from the link below:

https://dl.dropboxusercontent.com/u/113201788/storm/es-create-index.tar.gz

Configure pom.xml and libraries to be used

Before we proceed, I would like to discuss how to write a elasticsearch client which is compatible with Trident-ML as we will be using both in this project. Traditionally an elasticsearch java client can be implemented using native code such as this:

import static org.elasticsearch.node.NodeBuiler.*;

Node node=nodeBuilder().clusterName("elasticsearch").node();
Client client=node.getClient();

//TODO HERE: put document, delete document, etc using the client

node.close();

The above code requires the following dependency in pom.xml:

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>1.4.0</version>
</dependency>

However, as the pom and coding of this library has dependency on lucene-core [version=3.6.0] that is an older version that is not compatible with lucene-analyzers [version=3.6.2] which is currently one of Trident-ML's dependency (The TwitterTokenizer in TwitterSentimentClassifier uses this library). As a result, the elasticsearch library above cannot be used if the TwitterSentimentClassifier in Trident-ML is to be used in this project.

Since the above java code and elastic library cannot be used in this project, the project uses httpclient [version=4.3] from org.apache.httpcomponents in its place to communicate with elasticsearch via RESTful api. The httpclient provides CloseableHttpClient and operators such as HttpGet, HttpPut, HttpDelete,

The dependencies section of the pom for this project looks like the following:

<dependency>
  <groupId>storm</groupId>
  <artifactId>storm</artifactId>
  <version>0.9.0.1</version>
</dependency>
<dependency>
  <groupId>com.github.pmerienne</groupId>
  <artifactId>trident-ml</artifactId>
  <version>0.0.4</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.httpcomponents</groupId>
  <artifactId>httpclient</artifactId>
  <version>4.3</version>
</dependency>

Spout

Once the pom.xml is properly updated, we can move to implement the code for the Storm spout used in this project. The spout, named TweetCommentSpout, reads tweets from "src/test/resources/twitter-sentiment.csv" and emits them in batch to the Trident topology. the implementation of the spout is shown below:

package com.memeanalytics.es_create_index;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import com.github.pmerienne.trident.ml.core.TextInstance;
import com.github.pmerienne.trident.ml.preprocessing.EnglishTokenizer;
import com.github.pmerienne.trident.ml.preprocessing.TextTokenizer;

import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;

public class TweetCommentSpout implements IBatchSpout {
 
 private static final long serialVersionUID = 1L;
 private static List<List<Object>> data=new ArrayList<List<Object>>();

 private int batchIndex;
 private int batchSize=10;
 
 static{
  BufferedReader br=null;
  FileInputStream is=null;
  String filePath="src/test/resources/twitter-sentiment.csv";
  try {
   is=new FileInputStream(filePath);
   br=new BufferedReader(new InputStreamReader(is));
   String line=null;
   while((line=br.readLine())!=null)
   {
    String[] values = line.split(",");
    Integer label=Integer.parseInt(values[0]);
    String text=values[1];
//    TextTokenizer tokenizer=new EnglishTokenizer();
//    List<String> tokens = tokenizer.tokenize(text);
//    TextInstance<Integer> instance=new TextInstance<Integer>(label, tokens);
    data.add(new Values(text, label));
   }
  } catch (FileNotFoundException e) {
   e.printStackTrace();
  }catch(IOException ex)
  {
   ex.printStackTrace();
  }
  
 }
 public void open(Map conf, TopologyContext context) {
  // TODO Auto-generated method stub
  
 }

 public void emitBatch(long batchId, TridentCollector collector) {
  // TODO Auto-generated method stub
  int maxBatchCount = data.size() / batchSize;
  if(maxBatchCount > 0 && batchIndex < maxBatchCount)
  {
   for(int i=(batchSize * batchIndex); i < data.size() && i < (batchIndex+1) * batchSize; ++i)
   {
    collector.emit(data.get(i));
   }
   batchIndex++;
  }
 }

 public void ack(long batchId) {
  // TODO Auto-generated method stub
  
 }

 public void close() {
  // TODO Auto-generated method stub
  
 }

 public Map getComponentConfiguration() {
  // TODO Auto-generated method stub
  return null;
 }

 public Fields getOutputFields() {
  // TODO Auto-generated method stub
  return new Fields("text", "label");
 }
 
}

The tuples emitted by the spout contains two fields: "text" and "label", the label is ignored, we are going to have the Trident-ML's TweetSentimentClassifier predict the sentiment label for us instead.

Trident operation for ElasticSearch

Next we are going to implement a BaseFilter, named CreateESIndex, which is a Trident operation that create an indexed document in ElasticSearch from each tweet text and its predicted sentiment label. The implementation of the Trident operation is shown below:

package com.memeanalytics.es_create_index;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.Map;

import org.apache.http.HttpEntity;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;

import storm.trident.operation.BaseFilter;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;

public class CreateESIndex extends BaseFilter{

 private static final long serialVersionUID = 1L;
 private int esIndex=1;
 private String wsUrl="http://127.0.0.1:9200";
 private String indexName="twittersentiment"; //must be lowercase
 private String typeName="trident";
 private CloseableHttpClient client;
 private String lastIndexedDocumentIdQueryJson="{\"query\": { \"match_all\": {}}, \"size\": 1,"+
   "\"sort\": ["+
     "{"+
       "\"_timestamp\": {"+
         "\"order\": \"desc\""+
       "}"+
     "}"+
   "]"+
 "}";

 public boolean isKeep(TridentTuple tuple) {
  // TODO Auto-generated method stub
  Boolean prediction =tuple.getBooleanByField("prediction");
  String comment=tuple.getStringByField("text");
  System.out.println(comment + " >> " + prediction);
  
  if(client != null)
  {
   HttpPut method=new HttpPut(wsUrl+"/"+indexName+"/"+typeName+"/"+esIndex);
   
   Date currentTime= new Date();
   SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM-dd");
   SimpleDateFormat format2 = new SimpleDateFormat("HH:mm:ss");
   String dateString = format1.format(currentTime)+"T"+format2.format(currentTime);
   
   CloseableHttpResponse response=null;
   try{
    String json = "{\"text\":\""+comment+"\", \"prediction\":\""+prediction+"\", \"postTime\":\""+dateString+"\"}";
    System.out.println(json);
    StringEntity params=new StringEntity(json);
    params.setContentType(new BasicHeader(HTTP.CONTENT_TYPE, "application/json"));
    
    method.setEntity(params);
    
    method.addHeader("Accept", "application/json");
    method.addHeader("Content-type", "application/json");
    
    response = client.execute(method);
    
    HttpEntity entity=response.getEntity();
    String responseText=EntityUtils.toString(entity);
    System.out.println(responseText);
   }catch(IOException ex) {
    ex.printStackTrace();
   }finally {
    method.releaseConnection();
   }
   esIndex++;
  }
  
  return true;
 }
 
 @Override
    public void prepare(Map conf, TridentOperationContext context) {
  
  client=HttpClients.custom().setRetryHandler(new MyRetryHandler()).build();
  
  CloseableHttpResponse response=null;
  HttpDelete method=new HttpDelete(wsUrl+"/"+indexName);
  try{
   response = client.execute(method);
   HttpEntity entity=response.getEntity();
   String responseBody=EntityUtils.toString(entity);
   System.out.println(responseBody);
  }catch(IOException ex)
  {
   ex.printStackTrace();
  }
    }
 
 private class MyRetryHandler implements HttpRequestRetryHandler {

  public boolean retryRequest(IOException arg0, int arg1, HttpContext arg2) {
   // TODO Auto-generated method stub
   return false;
  }
 }

    @Override
    public void cleanup() {
     try {
   client.close();
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
    }

}

In the prepare() method of the CreateESIndex, a RESTful DELETE call is performed to delete all indexed documents under twittersentiment/trident in ElasticSearch. This is to ensure that no data will be under twittersentiment/trident when the bolt is run. Now in its isKeep() method, the tweet text and its associated predicted sentiment label is serialized to a json and sent to elasticsearch via a http PUT call. The CloseableHttpClient object is closed in its cleanup() method.

Trident topology

Now we have the neccessary spout and trident operation, we can define a simple Trident topology which stream tweets-> classified by TwitterSentimentClassifier -> indexed by ElasticSearch. Below is the implementation in the main class:

package com.memeanalytics.es_create_index;

import com.github.pmerienne.trident.ml.nlp.TwitterSentimentClassifier;

import storm.trident.TridentTopology;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;

public class App 
{
    public static void main( String[] args )
    {
        LocalCluster cluster=new LocalCluster();
        Config config=new Config();
        
        cluster.submitTopology("TridentWriteToESDemo", config, buildTopology());
        
        try{
         Thread.sleep(10000);
        }catch(InterruptedException ex)
        {
         
        }
        
        cluster.killTopology("TridentWriteToESDemo");
        cluster.shutdown();
    }
    
    private static StormTopology buildTopology()
    {
     TridentTopology topology=new TridentTopology();
     
     TweetCommentSpout spout=new TweetCommentSpout();
     
     topology.newStream("classifyAndIndex", spout).each(new Fields("text"), new TwitterSentimentClassifier(), new Fields("prediction")).each(new Fields("text", "prediction"), new CreateESIndex());
     
     return topology.build();
    }
}

Once it is completed, run the following command in the project root folder:

> mvn compile exec:java

No comments:

Post a Comment