Showing posts with label ElasticSearch. Show all posts
Showing posts with label ElasticSearch. Show all posts

Monday, February 1, 2016

Elasticsearch Version Upgrade using Rolling Restart

This upgrade method follows updating one node at a time and restart again. The same method can also be used for restarting a elasticsearch cluster in a safe and efficient way.

Master-eligible nodes


In a elasticsearch production cluster, start with one master-eligible node, stop the elasticsearch service, perform upgrade, and then restart the service. Repeat this until all master-eligible nodes have been upgraded. Since master-eligible nodes do not keep shards and replicas, the process is safe with data so far.

Client nodes


Next stop, upgrade, and restart the client nodes one at a time just like with the master-eligible nodes.

Data nodes


Next, before starting to upgrade any data node in the elasticsearch cluster, dynamically adding a setting to temporarily turn off the resharding of the elasticsearch cluster via restful api calls to the cluster (because if we restart a data node, the shards in the cluster will rebalance). The setting to be temporally disabled is the cluster.routing.allocation.enable, which can be done by issue the following call to the elasticsearch cluster:

curl -X PUT -H "Content-Type: application/json" http://elastic-cluster:9200/_cluster/settings/ -d '
{ "transient": { "cluster.routing.allocation.enable": "none" } }
'
(Note that the "transient" allows the setting to be not permanent)

Now stop, upgrade and restart a data node. At this point, we can reverse the setting for cluster.routing.allocation.enable by running the curl restful below:

curl -X PUT -H "Content-Type: application/json" http://elastic-cluster:9200/_cluster/settings/ -d '
{ "transient": { "cluster.routing.allocation.enable": "all" } }
'
Once this is done, the origin shards for that data node will be up again.

Next, proceed to the second data node and repeat the process above until all data nodes are upgraded.



Sunday, January 31, 2016

Quorum and minimum_master_nodes setting for elasticsearch configuration

Theory behind elasticsearch recovery

Elastic search recovery works by having a master election satifying a particular minimum_master_nodes criteria. That is a master node will only be elected if there is a N number of master-eligible nodes (nodes in which node.master=true in their elasticsearch.yml file) to join it. N is specified by discovery.zen.minimum_master_nodes in the elasticsearch.yml file.

For  the quorum scheme, N should be set to

discovery.zen.minimum_master_nodes = (number of master-eligible nodes) / 2 + 1

The recovery works like this. When the current master node dies, a new master node will only be elected if there is N master-eligible nodes to join it, where N = (number of master-eligible nodes) / 2 + 1

Once the new master node is elected, if later the originally dead master node comes alive. It will have less than N master-eligible nodes to join it, therefore it will have to step down. Thus this scheme ensures that there will no two masters at the same time (which is the so-called split-brain scenario, that is not desirable since all master nodes have higher authority in its being able to update cluster-state in the data nodes and client nodes)

Minimum number of master eligible nodes required for an elasticsearch cluster

The minimum number of master-eligible nodes should be 3. And the zen.discovery.minimum_master_nodes should be equal = 3 / 2 + 1 = 2.

Reason: Suppose we only have two master-eligible nodes. If the master node dies, there is only 1 master-eligible node left, two things will happen depending on the value in discovery.zen.minimum_master_nodes:

Case 1:  if the zen.discovery.minimum_master_nodes is set to greater than 1, then there won't be any new master node elected, and the cluster will not operate.


Case 2:  if we set the zen.discovery.minimum_master_nodes=1, the new master node will be elected, however, when the originally dead master is brought alive again, the original master will not step down since it now also has one master-eligible node to join it, leading to split-brain problem.

Therefore the recommended minimum settings is to increases the number of master-eligible nodes to 3, and set the zen.discovery.minimum_master_nodes=2.

Saturday, November 7, 2015

Use HttpURLConnection to send a "GET" command to Elastic Search with a json body (enum "-d" option in curl's GET)

Recently I was working on implementing a java equivalence using HttpURLConnection to the following curl query which sends a "GET" command to elastic search with a json body specified in the "-d" option of curl command, something like the one below:

curl -XGET "http://127.0.0.1:9200/messages/_search?pretty" -d '
{
  "size" : 10,
  "query" : {
    "bool" : {
      "must" : [ {
        "match" : {
          "id" : {
            "query" : "[some id]",
            "type" : "boolean"
          }
        }
      },  {
        "nested" : {
          "query" : {
            "bool" : {
              "must" : {
                "match" : {
                  "agent" : {
                    "query" : "[some agent name]",
                    "type" : "boolean"
                  }
                }
              }
            }
          },
          "path" : "agents"
        }
      } ]
    }
  }
  }
}

The command requires a json body to be sent to the elastic search via the "GET" restful call. After some trial and error, I got this to work, below is the method implemented in Java.

 public static String httpGet(String urlToRead, String data) {
        URL url;
        HttpURLConnection conn;
        BufferedReader rd;
        String line;
        StringBuilder result = new StringBuilder();
        try {
            url = new URL(urlToRead);
            conn = (HttpURLConnection) url.openConnection();
            conn.setDoOutput(true);
            conn.setRequestMethod("GET");
            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(conn.getOutputStream()));
            writer.write(data);
            writer.flush();

            rd = new BufferedReader(new InputStreamReader(conn.getInputStream()));
            while ((line = rd.readLine()) != null) {
                result.append(line);
            }
            rd.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return result.toString();
}
To call the above method and realize the curl query to ES above. just implement the following:

 
int size = 10;
String ipAddress = "127.0.0.1";
String url = "http://"+ipAddress+":9200/messages/_search?pretty";
String data = " { \"size\" : "+size+", \"query\" : { \"bool\" : { \"must\" : [ { \"match\" : { \"id\" : { \"query\" : \"[some id]\", \"type\" : \"boolean\" } } }, { \"nested\" : { \"query\" : { \"bool\" : { \"must\" : { \"match\" : { \"agent\" : { \"query\" : \"[some agent name]\", \"type\" : \"boolean\" } } } } }, \"path\" : \"agents\" } } ] } } } ";
String response = httpGet(url, data);

Thursday, January 1, 2015

ElasticSearch: query with AND and OR criteria

Frequently when we will need to construct query to elasticsearch which bear AND and OR criteria similar to the following SQL statements:

Statement 1:
SELECT * FROM tblOrder WHERE orderDate='2015-01-01 14:00:00' AND customerID=29439;

Statement 2:
SELECT * FROM tblOrder WHERE orderDate='2015-01-01 14:00:00' OR customerID=29439;

Statement 3:
SELECT * FROM tblOrder WHERE orderDate <= '2015-01-01 14:00:00' AND customerID=29439;

Statement 4:
SELECT * FROM tblOrder WHERE (orderDate='2015-01-01 14:00:00' AND customerID=29439) OR customerID = 20991;

Statement 5:
SELECT * FROM tblOrder WHERE orderDate='2015-01-01 14:00:00' AND (customerID=29439 OR customerID = 20991);

In ElasticSearch, we use "must" and "should" in place of AND and OR and "bool" in place of WHERE.

Suppose in our ElasticSearch (at 179.168.0.1:9200), we have indexed documents having the following structures at index myOrder/myOrder:

{
"orderID" : xxxxx,
"customerID"  : xxxxx,
"orderDate" : "yyyyMMdd'T'HHmmss",
"itemLines" [
  {
   "itemLineID" : xxxx,
   "productID" : yyyyy,
   "quantity" : xxxxx
  },
  ...
]
}

Below is the translation of the above SQL statements to equivalent ElasticSearch query, in the following example we use curl, and we want to get a max of 100 records in each query from start, i.e., 0):

Statement 1:
curl -XGET http://179.168.0.1:9200/myOrder/myOrder/_search d'
{
"from" : 0,
"size" : 100,
"query": {
   "bool" : {
       "must" : [
          "match" : {"orderDate" : "20150101T140000" },
          "match" : {"customerID" : 29439 }
       ]
   }
}
}'

Statement 2:
curl -XGET http://179.168.0.1:9200/myOrder/myOrder/_search d'
{
"from" : 0,
"size" : 100,
"query": {
   "bool" : {
       "should" : [
          "match" : {"orderDate" : "20150101T140000" },
          "match" : {"customerID" : 29439 }
       ]
   }
}
}'

Statement 3:
curl -XGET http://179.168.0.1:9200/myOrder/myOrder/_search d'
{
"from" : 0,
"size" : 100,
"query": {
   "bool" : {
       "must" : [
          "range" : {"orderDate" : { "lte" : "20150101T140000" } },
          "match" : {"customerID" : 29439 }
       ]
   }
}
}'

Statement 4:
curl -XGET http://179.168.0.1:9200/myOrder/myOrder/_search d'
{
"from" : 0,
"size" : 100,
"query": {
   "bool" : {
       "should" : [
          "bool" : {
              "must" : [
                 "match" : {"orderDate" : "20150101T140000" },
                 "match" : {"customerID" : 29439 }
              ]
          },
          "match" : { "customerID " :  20991}
       ]
   }
}
}'

Statement 5:
curl -XGET http://179.168.0.1:9200/myOrder/myOrder/_search d'
{
"from" : 0,
"size" : 100,
"query": {
   "bool" : {
       "must" : [
          "match" : {"orderDate" : "20150101T140000" },
          "bool" : {
              "should" : [
                   "match" : { "customerID " :  20991}, 
                   "match" : {"customerID" : 29439 }
              ]
          }
       ]
   }
}
}'

Saturday, December 13, 2014

ElasticSearch: Filtered Query using JEST

While it is possible to query ElasticSearch using httpclient or es node, it is not as effective as JEST. This post explains the basics of using JEST for filtered query against ElasticSearch. To start, create Maven project and add the following dependencies into the pom file:

<dependency>
    <groupId>io.searchbox</groupId>
    <artifactId>jest</artifactId>
    <version>0.1.3</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.6.1</version>
</dependency>
<dependency>
    <groupId>com.googlecode.json-simple</groupId>
    <artifactId>json-simple</artifactId>
    <version>1.1.1</version>
</dependency>

Suppose the elastic search stores indexed document with the following example mapping:

{ "name" : "xxx",
  "age" : 11,
"address" : "xxxxx"}

Let's suppose the elasticsearch is running at 192.168.2.2:9200 and the indexed documents are stored under http://192.168.2.2:9200/myindex/mytype.

Let's create a simple java class representing this index document:

public class SearchResultTuple {
 public String address;
 public String name;
 public int age;
}


We want to retrieves 20 records of the indexed documents from the elasticsearch matching using the following query:

{"from": 0, "size" : 20,
"sort" : {
   "age" : { "order" : "asc" }
},
"query" : {
  "filtered" : {
     "query" : {
        "match" : { "name" : "James" }
     },

    "filter" : {
       "range" : { "age" : { "lte" : 10, "gte" : 20 } }
     }
   }
  }
}


The java implementation to execute the above filtered query using JEST on ElasticSearch is shown below:

import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.core.Search;
import io.searchbox.core.SearchResult;
import io.searchbox.core.SearchResult.Hit;

import java.io.IOException;
import java.sql.Date;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;

import org.json.simple.JSONObject;

public class App 
{
    public static void main( String[] args ) 
    {
        JestClient client = openClient();
        
        JSONObject json=new JSONObject();
     
        json.put("from", 0);
        json.put("size", 20);
     
        JSONObject sortJson=new JSONObject();
        json.put("sort", sortJson);
     
        JSONObject sortDateJson=new JSONObject();
        sortJson.put("age", sortDateJson);
        sortDateJson.put("order", "asc");
     
        JSONObject queryJson=new JSONObject();
        json.put("query", queryJson);
        
        JSONObject filteredJson=new JSONObject();
        queryJson.put("filtered", filteredJson);
 

        JSONObject queryMatchJson=new JSONObject();
        filteredJson.put("query", queryMatchJson);
 
        JSONObject matchJson=new JSONObject();
        queryMatchJson.put("match", matchJson);
 
        matchJson.put("name", "James");
 
        JSONObject filterJson=new JSONObject();
        filteredJson.put("filter", filterJson);
  
        JSONObject rangeJson=new JSONObject();
        filterJson.put("range", rangeJson);

        JSONObject dateJson = new JSONObject();

        rangeJson.put("age", dateJson);

        dateJson.put("gte", 20);
        dateJson.put("lte", 10);
     
        String jsonString = json.toJSONString();
     
        Search search = (Search) new Search.Builder(jsonString)
        .addIndex("myindex")
        .addType("mytype")
        .build();

        try {
           SearchResult result = client.execute(search);
           //System.out.println(result.getJsonString());
           List<Hit<ElasticSearchResultTuple, Void>> hits = result.getHits(SearchResultTuple.class);
           //System.out.println(hits.size());
          for(Hit<SearchResultTuple, Void> hit : hits)
          {
            SearchResultTuple hitTuple = hit.source;
            int age = hitTuple.age;
            String name = hitTuple.name;
            String address =hitTuple.address;
          }
        } catch (Exception e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
        }
     
       
        client.shutdownClient();
    }
    
    private static JestClient openClient()
    {
     HttpClientConfig clientConfig = new HttpClientConfig.Builder("http://192.168.2.2:9200")
          .multiThreaded(true).build();
     JestClientFactory factory = new JestClientFactory();
  
     factory.setHttpClientConfig(clientConfig);
     JestClient jestClient = factory.getObject();
  
     return jestClient;
    }
}




Wednesday, November 26, 2014

Trident-ML: Indexing Sentiment Classification results with ElasticSearch

In Storm, we may have scenarios in which we like to index results obtained from real-time processing or machine learning into a search and analytics engine. For example, we may have some text streaming in from Kafka messaging system which will go through a TwitterSentimentClassifier (which is available in Trident-ML). After that, we may wish to save the text together with the classified sentiment label as an indexed document in ElasticSearch. This post shows one way to realize such an implementation.

First create a Maven project (e.g. with groupId="com.memeanalytics" and artifactId="es-create-index"), the complete source code of the project can be downloaded from the link below:

https://dl.dropboxusercontent.com/u/113201788/storm/es-create-index.tar.gz

Configure pom.xml and libraries to be used

Before we proceed, I would like to discuss how to write a elasticsearch client which is compatible with Trident-ML as we will be using both in this project. Traditionally an elasticsearch java client can be implemented using native code such as this:

import static org.elasticsearch.node.NodeBuiler.*;

Node node=nodeBuilder().clusterName("elasticsearch").node();
Client client=node.getClient();

//TODO HERE: put document, delete document, etc using the client

node.close();

The above code requires the following dependency in pom.xml:

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>1.4.0</version>
</dependency>

However, as the pom and coding of this library has dependency on lucene-core [version=3.6.0] that is an older version that is not compatible with lucene-analyzers [version=3.6.2] which is currently one of Trident-ML's dependency (The TwitterTokenizer in TwitterSentimentClassifier uses this library). As a result, the elasticsearch library above cannot be used if the TwitterSentimentClassifier in Trident-ML is to be used in this project.

Since the above java code and elastic library cannot be used in this project, the project uses httpclient [version=4.3] from org.apache.httpcomponents in its place to communicate with elasticsearch via RESTful api. The httpclient provides CloseableHttpClient and operators such as HttpGet, HttpPut, HttpDelete,

The dependencies section of the pom for this project looks like the following:

<dependency>
  <groupId>storm</groupId>
  <artifactId>storm</artifactId>
  <version>0.9.0.1</version>
</dependency>
<dependency>
  <groupId>com.github.pmerienne</groupId>
  <artifactId>trident-ml</artifactId>
  <version>0.0.4</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.httpcomponents</groupId>
  <artifactId>httpclient</artifactId>
  <version>4.3</version>
</dependency>

Spout

Once the pom.xml is properly updated, we can move to implement the code for the Storm spout used in this project. The spout, named TweetCommentSpout, reads tweets from "src/test/resources/twitter-sentiment.csv" and emits them in batch to the Trident topology. the implementation of the spout is shown below:

package com.memeanalytics.es_create_index;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import com.github.pmerienne.trident.ml.core.TextInstance;
import com.github.pmerienne.trident.ml.preprocessing.EnglishTokenizer;
import com.github.pmerienne.trident.ml.preprocessing.TextTokenizer;

import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;

public class TweetCommentSpout implements IBatchSpout {
 
 private static final long serialVersionUID = 1L;
 private static List<List<Object>> data=new ArrayList<List<Object>>();

 private int batchIndex;
 private int batchSize=10;
 
 static{
  BufferedReader br=null;
  FileInputStream is=null;
  String filePath="src/test/resources/twitter-sentiment.csv";
  try {
   is=new FileInputStream(filePath);
   br=new BufferedReader(new InputStreamReader(is));
   String line=null;
   while((line=br.readLine())!=null)
   {
    String[] values = line.split(",");
    Integer label=Integer.parseInt(values[0]);
    String text=values[1];
//    TextTokenizer tokenizer=new EnglishTokenizer();
//    List<String> tokens = tokenizer.tokenize(text);
//    TextInstance<Integer> instance=new TextInstance<Integer>(label, tokens);
    data.add(new Values(text, label));
   }
  } catch (FileNotFoundException e) {
   e.printStackTrace();
  }catch(IOException ex)
  {
   ex.printStackTrace();
  }
  
 }
 public void open(Map conf, TopologyContext context) {
  // TODO Auto-generated method stub
  
 }

 public void emitBatch(long batchId, TridentCollector collector) {
  // TODO Auto-generated method stub
  int maxBatchCount = data.size() / batchSize;
  if(maxBatchCount > 0 && batchIndex < maxBatchCount)
  {
   for(int i=(batchSize * batchIndex); i < data.size() && i < (batchIndex+1) * batchSize; ++i)
   {
    collector.emit(data.get(i));
   }
   batchIndex++;
  }
 }

 public void ack(long batchId) {
  // TODO Auto-generated method stub
  
 }

 public void close() {
  // TODO Auto-generated method stub
  
 }

 public Map getComponentConfiguration() {
  // TODO Auto-generated method stub
  return null;
 }

 public Fields getOutputFields() {
  // TODO Auto-generated method stub
  return new Fields("text", "label");
 }
 
}

The tuples emitted by the spout contains two fields: "text" and "label", the label is ignored, we are going to have the Trident-ML's TweetSentimentClassifier predict the sentiment label for us instead.

Trident operation for ElasticSearch

Next we are going to implement a BaseFilter, named CreateESIndex, which is a Trident operation that create an indexed document in ElasticSearch from each tweet text and its predicted sentiment label. The implementation of the Trident operation is shown below:

package com.memeanalytics.es_create_index;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.Map;

import org.apache.http.HttpEntity;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;

import storm.trident.operation.BaseFilter;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;

public class CreateESIndex extends BaseFilter{

 private static final long serialVersionUID = 1L;
 private int esIndex=1;
 private String wsUrl="http://127.0.0.1:9200";
 private String indexName="twittersentiment"; //must be lowercase
 private String typeName="trident";
 private CloseableHttpClient client;
 private String lastIndexedDocumentIdQueryJson="{\"query\": { \"match_all\": {}}, \"size\": 1,"+
   "\"sort\": ["+
     "{"+
       "\"_timestamp\": {"+
         "\"order\": \"desc\""+
       "}"+
     "}"+
   "]"+
 "}";

 public boolean isKeep(TridentTuple tuple) {
  // TODO Auto-generated method stub
  Boolean prediction =tuple.getBooleanByField("prediction");
  String comment=tuple.getStringByField("text");
  System.out.println(comment + " >> " + prediction);
  
  if(client != null)
  {
   HttpPut method=new HttpPut(wsUrl+"/"+indexName+"/"+typeName+"/"+esIndex);
   
   Date currentTime= new Date();
   SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM-dd");
   SimpleDateFormat format2 = new SimpleDateFormat("HH:mm:ss");
   String dateString = format1.format(currentTime)+"T"+format2.format(currentTime);
   
   CloseableHttpResponse response=null;
   try{
    String json = "{\"text\":\""+comment+"\", \"prediction\":\""+prediction+"\", \"postTime\":\""+dateString+"\"}";
    System.out.println(json);
    StringEntity params=new StringEntity(json);
    params.setContentType(new BasicHeader(HTTP.CONTENT_TYPE, "application/json"));
    
    method.setEntity(params);
    
    method.addHeader("Accept", "application/json");
    method.addHeader("Content-type", "application/json");
    
    response = client.execute(method);
    
    HttpEntity entity=response.getEntity();
    String responseText=EntityUtils.toString(entity);
    System.out.println(responseText);
   }catch(IOException ex) {
    ex.printStackTrace();
   }finally {
    method.releaseConnection();
   }
   esIndex++;
  }
  
  return true;
 }
 
 @Override
    public void prepare(Map conf, TridentOperationContext context) {
  
  client=HttpClients.custom().setRetryHandler(new MyRetryHandler()).build();
  
  CloseableHttpResponse response=null;
  HttpDelete method=new HttpDelete(wsUrl+"/"+indexName);
  try{
   response = client.execute(method);
   HttpEntity entity=response.getEntity();
   String responseBody=EntityUtils.toString(entity);
   System.out.println(responseBody);
  }catch(IOException ex)
  {
   ex.printStackTrace();
  }
    }
 
 private class MyRetryHandler implements HttpRequestRetryHandler {

  public boolean retryRequest(IOException arg0, int arg1, HttpContext arg2) {
   // TODO Auto-generated method stub
   return false;
  }
 }

    @Override
    public void cleanup() {
     try {
   client.close();
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
    }

}

In the prepare() method of the CreateESIndex, a RESTful DELETE call is performed to delete all indexed documents under twittersentiment/trident in ElasticSearch. This is to ensure that no data will be under twittersentiment/trident when the bolt is run. Now in its isKeep() method, the tweet text and its associated predicted sentiment label is serialized to a json and sent to elasticsearch via a http PUT call. The CloseableHttpClient object is closed in its cleanup() method.

Trident topology

Now we have the neccessary spout and trident operation, we can define a simple Trident topology which stream tweets-> classified by TwitterSentimentClassifier -> indexed by ElasticSearch. Below is the implementation in the main class:

package com.memeanalytics.es_create_index;

import com.github.pmerienne.trident.ml.nlp.TwitterSentimentClassifier;

import storm.trident.TridentTopology;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;

public class App 
{
    public static void main( String[] args )
    {
        LocalCluster cluster=new LocalCluster();
        Config config=new Config();
        
        cluster.submitTopology("TridentWriteToESDemo", config, buildTopology());
        
        try{
         Thread.sleep(10000);
        }catch(InterruptedException ex)
        {
         
        }
        
        cluster.killTopology("TridentWriteToESDemo");
        cluster.shutdown();
    }
    
    private static StormTopology buildTopology()
    {
     TridentTopology topology=new TridentTopology();
     
     TweetCommentSpout spout=new TweetCommentSpout();
     
     topology.newStream("classifyAndIndex", spout).each(new Fields("text"), new TwitterSentimentClassifier(), new Fields("prediction")).each(new Fields("text", "prediction"), new CreateESIndex());
     
     return topology.build();
    }
}

Once it is completed, run the following command in the project root folder:

> mvn compile exec:java

Setup and run ElasticSearch on Ubuntu 14.04 LTS

Run the following command to download the ElasticSearch 1.4.0:

> wget https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.4.0.deb

Setup ElasticSearch

After download, run the following command to install the debian package:

> sudo dpkg -i elasticsearch-1.4.0.deb

After installation, the configuration files for ElasticSearch are stored in the following directory:

/etc/elasticsearch/

The bin folder for ElasticSearch can be found in the following directory:

/usr/share/elasticsearch/

Now navigate to the user root folder, and open the .bashrc for editing:

> sudo gedit $HOME/.bashrc

In the .bashrc file, add in the following lines to the end

export ES_HOME=/usr/share/elasticsearch
export ES_CONF_HOME=/etc/elasticsearch

Save and close the .bashrc file.

Check whether the elasticsearch is running as a service on Ubuntu:

> sudo service elasticsearch status

if not, start the elasticsearch service by running the following commands:

> sudo service elasticsearch start

Run the following command to test the ElasticSearch:

> curl localhost:9200

You can also open a web browser and enter "localhost:9200" to see ElasticSearch in action.

Create a document

From the terminal, execute a PUT call such as the following:

> curl -XPUT http://127.0.0.1:9200/myindex/mytype/1 -d '{
"start_time" : "2014-11-26T14:00:00",
"end_time" : "2014-11-26T17:30:00",
"subject" : "Elastic Search Tryout",
"agenda" : {
 "item 1" : "Create an index",
 "item 2" : "Delete an index",
 "item 3" : "Open an index",
 "item 4" : "Close an index" }
}'

The _index is specified as "myindex", _type is specified as "mytype", and _id is specified as "1". After the index has been created, it can be accessed by running a GET call

> curl http://127.0.0.1/myindex/mytype/1

We can execute another PUT call as below to generate one more document under myindex/mytype:

> curl -XPUT http://127.0.0.1:9200/myindex/mytype/2 -d '{
"start_time" : "2014-11-27T14:00:00",
"end_time" : "2014-11-27T17:30:00",
"subject" : "Elastic Search Tryout 2",
"agenda" : {
 "item 1" : "Create a document",
 "item 2" : "Delete a document",
 "item 3" : "Execute a search",
 "item 4" : "Delete by search" }
}'

After the index has been created, it can be accessed by running a GET call

> curl http://127.0.0.1/myindex/mytype/2

To create an index document with an auto-generated id, we can do the following:

We can execute another PUT call as below to generate one more document under myindex/mytype:

> curl -XPUT http://127.0.0.1:9200/myindex/mytype/11122 -d '{
"start_time" : "2014-11-28T14:00:00",
"end_time" : "2014-11-28T17:30:00",
"subject" : "Elastic Search Tryout 2",
"agenda" : {
 "item 1" : "Create a document with auto-generated  index",
 "item 2" : "Delete a document",
 "item 3" : "Execute a search",
 "item 4" : "Delete by search" }
}'

The _id should not be omitted, but can be generated using UUID from a programming language.

Create an index

Creating an index is the same as creating a document under an index. From the terminal, execute a PUT call such as the following:

> curl -XPUT http://127.0.0.1:9200/myindex/ -d '{
"settings"  {
 "item 1" : "true",
 "item 2" : "false",
 "item 3" : "1",
 "item 4" : "2" }
}'

Update a document by id

The following commands can be used to update the document with id=11122:

> curl -XPUT http://127.0.0.1:9200/myindex/mytype/11122 -d '{
"start_time" : "2014-11-21T14:00:00",
"end_time" : "2014-11-21T17:30:00",
"subject" : "Elastic Search Tryout 4",
"agenda" : {
 "item 1" : "Create a document with auto-generated  index",
 "item 2" : "Delete a document",
 "item 3" : "Execute a search",
 "item 4" : "Delete by search" }
}'

> curl -XPOST http://127.0.0.1:9200/myindex/mytype/11122 -d '{
"start_time" : "2014-11-24T14:00:00",
"end_time" : "2014-11-24T17:30:00",
"subject" : "Elastic Search Tryout 4",
"agenda" : {
 "item 1" : "Create a document with auto-generated  index",
 "item 2" : "Delete a document",
 "item 3" : "Execute a search",
 "item 4" : "Delete by search" }
}'

Delete a document

Deleting an document will remove the document from the ElasticSearch nodes. From the terminal, execute a DELETE call such as the following:

> curl -XDELETE http://127.0.0.1:9200/myindex/mytype/1

Delete an index

Deleting an index will remove the index from the ElasticSearch nodes. From the terminal, execute a DELETE call such as the following:

> curl -XDELETE http://127.0.0.1:9200/myindex/

Close an index

Closing an index is an alternative to the deleting the index as ElasticSearch put it in an offline mode. From the terminal, execute a POST call such as the following:

> curl -XPOST http://127.0.0.1:9200/myindex/_close

After this, if you run the "curl http://127.0.0.1:9200/myindex", you will get a "not found" message

Open an index

Opening an index will put the offline index online again. From the terminal, execute a POST call such as the following:

> curl -XPOST http://127.0.0.1:9200/myindex/_open

Execute an search

Suppose we want to retrieve a list of documents under an index having a particular word (e.g. "elastic") in their content, we can run a GET call from the terminal:

> curl -XGET http://127.0.0.1:9200/myindex/_search?q=elastic

Suppose we want to search in under a _type = "mytype":

> curl -XGET http://127.0.0.1:9200/myindex/mytype/_search?q=elastic

The query can be written in more details by specifying search in the "subject" field of the document:

> curl -XGET http://127.0.0.1:9200/myindex/mytype/_search -d '
{"query":{"match": { "subject" : "elastic" }}}'

To search all documents under myindex/mytype:

> curl -XGET http://127.0.0.1:9200/myindex/mytype/_search -d '
{"query":{"match_all":{}}}'

Sort a search

Suppose we want the json results returned in certain sorting order (e.g. sort by start_time asc):

> curl -XGET http://127.0.0.1:9200/myindex/mytype/_search -d '
{"query":{"match_all":{}}, "sort":[{"start_time": "asc" }]}'

Filtering


Suppose we want to search records filtered by a range of a values in one its parameter (said, age), and have a particular match query (said, "name" must be "James"), and we want the results to be returned sorted by a field (said, age). Furthermore, we also want the total number of records returned to be limited to 20:

> curl -XGET http://127.0.0.1:9200/myindex/mytype/_search -d'
{
"from": 0, "size": 20,
"query" : {
  "filtered" :
    "query" : {
      "match" : { "name" : "James" }
     },
     "filter": {
       "range" : {
          "age" : { "lte" : 20, "gte" : 10 }
        }
     }
"sort" : { "age" : {"order" : "asc" }}
};

Count by Query

To count the number of documents in myindex/mytype matching a particular query, from the terminal, run the following command:

> curl -XGET http://127.0.0.1:9200/myindex/mytype/_count -d '
{"query":{"match": { "subject" : "elastic" }}}'

Delete by Query

To delete all documents in myindex/mytype matching a particular query,  from the terminal, run the following command:

> curl -XDELETE http://127.0.0.1:9200/myindex/mytype/_query -d '
{"query":{"match": { "subject" : "elastic" }}}'