Monday, November 24, 2014

Write and test a simple Distributed RPC with Storm Trident

Distributed RPC can be used to query results from a Trident topology running in a storm cluster in real-time. This post shows how to use DRPC to query the accumulated country count in real-time on a storm Trident topology implemented in http://czcodezone.blogspot.sg/2014/11/write-and-test-trident-non.html).

The source codes of the project can be downloaded from the following link:

https://dl.dropboxusercontent.com/u/113201788/storm/trident-drpc-test.tar.gz

The main difference between the source codes in this post and that in http://czcodezone.blogspot.sg/2014/11/write-and-test-trident-non.html is in the main class, for which the source codes are show below:

package com.memeanalytics.trident_drpc_test;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.MapGet;
import storm.trident.testing.MemoryMapState;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.DRPCClient;

public class App 
{
    public static void main( String[] args ) throws Exception
    {
        Config config=new Config();
        config.setMaxSpoutPending(20);
        
        if(args.length==0)
        {
         LocalDRPC drpc=new LocalDRPC();
         
         LocalCluster cluster=new LocalCluster();
         cluster.submitTopology("DRPCTridentDemo", config, buildTopology(drpc));
         
         try{
          Thread.sleep(2000);
         }catch(InterruptedException ex)
         {
          ex.printStackTrace();
         }
         
         for(int i=0; i < 10; ++i)
         {
          System.out.println(drpc.execute("Count", "China,Russia,USA"));
          try{
              Thread.sleep(1000);
             }catch(InterruptedException ex)
             {
              ex.printStackTrace();
             }
         }
         
         cluster.killTopology("DRPCTridentDemo");
         cluster.shutdown();
        }
        else
        {
         config.setNumWorkers(3);
         try{
          StormSubmitter.submitTopology(args[0], config, buildTopology(null));
         }catch(AlreadyAliveException ex)
         {
          ex.printStackTrace();
         }catch(InvalidTopologyException ex)
         {
          ex.printStackTrace();
         }
         
         try{
          Thread.sleep(2000);
         }catch(InterruptedException ex)
         {
          ex.printStackTrace();
         }
         
         DRPCClient client=new DRPCClient("RRPC-Server",1234);
         
         System.out.print(client.execute("Count", "China,Russia,USA"));
        }
    }
    
    private static StormTopology buildTopology(LocalDRPC drpc)
    {
     TridentTopology topology=new TridentTopology();
     RandomWordSpout spout=new RandomWordSpout(10);
     
     TridentState countryCount = topology.newStream("spout1", spout).shuffle().each(new Fields("Country", "Rank"), new TridentComps.CountryFilter()).groupBy(new Fields("Country")).persistentAggregate(new MemoryMapState.Factory(), new Fields("Country"), new Count(), new Fields("count")).parallelismHint(2);
     
     try{
      Thread.sleep(2000);
     }catch(InterruptedException ex)
     {
      ex.printStackTrace();
     }
     
     topology.newDRPCStream("Count", drpc).each(new Fields("args"), new TridentComps.CountrySplit(), new Fields("Country")).stateQuery(countryCount, new Fields("Country"), new MapGet(), new Fields("count")).each(new Fields("count"), new FilterNull());
     
     return topology.build();
     
    }
}

As shown in the source codes,the Trident topology now has a second stream which is for DRPC, it queries the TridentState object which store accumulated count (from the start of the program) of country frequency in a MemoryMapState object (which is a memory-based map object). The DRPCStream has a transaction id "Count", which can be used by the DRPClient.execute() or LocalDRPC.execute(), to query the country accumulated via the "args" (which in this case is "China,Russia,USA", the TridentComps.CountrySplit is a BaseFunction trident operation object which split the "args" tuple ["China,Russia,USA"] into 3 tuples:

["China.Russia.USA", "China"]
["China.Russia.USA", "Russia"]
["China.Russia.USA", "USA"]

The stateQuery() method query the MemoryMapState object for the accumulated count of "Country" field stored.

By the way, for remote DRPC, the user needs to add the following lines to the storm.yaml in the STORM_HOME/conf folder:

drpc.servers:
  - "192.168.2.4"

where 192.168.2.4 is the drpc server's hostname (may be the same machine as the master node in Storm). and then in the terminal, run the following command to start the drpc server:

> $STORM_HOME/bin/storm drpc

No comments:

Post a Comment