Big Data-Storm Streaming Framework (5)—DRPC

DRPC

Concept

The idea behind Distributed RPC (DRPC) is to use Storm to compute really powerful functions in parallel on the fly. The Storm topology receives a stream of function parameters as input and sends an output stream of results for each function call.

DRPC is not a feature of Storm, as it is based on Storm’s high-level abstractions of spouts, bolts, and topologies. DRPC could have been packaged as a separate library for Storm, but it is useful to be bundled with Storm.

Top view

Distributed RPCs are coordinated by a “DRPC server” (implemented included with Storm). The DRPC server coordinates receiving RPC requests, sending the requests to the Storm topology, receiving results from the Storm topology, and sending the results back to waiting clients. From the client’s perspective, distributed RPC calls look just like regular RPC calls. For example, here’s how the client calculates the result of the “reach” function using the parameter “http://twitter.com”:

DRPCClient client = new DRPCClient("drpc-host", 3772);
String result = client.execute("reach", "http://twitter.com");

Distributed RPC workflow:

The client sends the name of the function to be executed and the parameters of the function to the DRPC server. The topology that implements this functionality uses DRPCSpout to receive a stream of function calls from the DRPC server. Each function call is tagged with a unique ID by the DRPC server. The topology then calculates the results, and at the end of the topology, a bolt called ReturnResults connects to the DRPC server and provides it with the results of the function call id. The DRPC server then uses the id to match the result the client is waiting for, unblocks the waiting client, and sends the result to it.

LinearDRPCTopologyBuilder

Storm comes with a topology builder called LinearDRPC TopologyBuilder that automates almost all steps involving DRPC. These include:

1. Set spout

2. Return the result to the DRPC server

3. Provide functions for bolt to perform limited aggregation on groups of tuples

Let’s look at a simple example. This is an implementation of DRPC topology which returns input parameters with “!” Attached:

public static class ExclaimBolt extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String input = tuple.getString(1);
        collector.emit(new Values(tuple.getValue(0), input + "!"));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "result"));
    }
}

public static void main(String[] args) throws Exception {
    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
    builder.addBolt(new ExclaimBolt(), 3);
    // ...
}

As you can see, there are not a few lines of code. When you create a LinearDRPC TopologyBuilder, you can tell it the name of the topology’s DRPC function. A single DRPC server can coordinate many functions, and function names can distinguish individual functions. The first bolt declared takes as input a 2-tuple, where the first field is the request ID and the second field is the parameter for that request. LinearDRPCTopologyBuilder expects the last bolt to emit an output stream containing a 2-tuple of the form [id, result]. Finally, all intermediate tuples must contain the request ID as the first field.

In this example, ExclaimBolt simply appends a “!” to the second field of the tuple. LinearDRPC TopologyBuilder handles the rest of the coordination of connecting to the DRPC server and returning results.

DRPC in local mode

DRPC can run in local mode. The following example illustrates how to run local mode DRPC:

LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();

cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));

System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));

cluster.shutdown();
drpc.shutdown();

First, create a LocalDRPC object. This object simulates an ongoing DRPC server, much like LocalCluster simulates a Storm cluster in-process. Then create a LocalCluster to run the topology in local mode. LinearDRPCTopologyBuilder has separate methods for creating local topology and remote topology. In local mode, the LocalDRPC object is not bound to any port, so the topology needs to know what to communicate with. This is why createLocalTopology receives a LocalDRPC object as input.

After starting the topology, you can perform DRPC calls using the execute method on LocalDRPC.

DRPC in remote mode

Using DRPC on a real cluster is also simple. There are three steps:

1. Start the DRPC server

2. Configure the location of the DRPC server

3. Submit the DRPC topology to the Storm cluster

Starting the DRPC server can be done using a storm script, just like starting Nimbus or the UI:

bin/storm drpc

Next, you need to configure the Storm cluster to know the location of the DRPC server. This is how DRPCSpout knows where to read function calls from. This can be done through the storm.yaml file or topology configuration. Configuring this via storm.yaml looks like this:

drpc.servers:
  - "drpc1.foo.com"
  - "drpc2.foo.com"

Finally, use StormSubmitter to start the DRPC topology just like any other topology. To run the above example in remote mode, do the following:

StormSubmitter
.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());

createRemoteTopology is used to create a suitable topology for the storm cluster.

Slightly more complex example

The exclamation point DRPC example is a toy example used to illustrate DRPC concepts. Let’s look at a more complex example that really requires the parallelism that a Storm cluster provides for computing DRPC functions. The example we’ll see is calculating the range of URLs on Twitter.

The scope of a URL is the unique number of people exposed to the URL on Twitter. To calculate reach, you need:

1. Get everyone who tweeted the URL

2. Get all the followers of all these people

3. Unique followers

4. Count a unique set of fans

A single arrival calculation may involve thousands of database calls and tens of millions of follower records during the computation. This is a very, very intensive calculation. As you are about to see, implementing this functionality on top of Storm is very simple. On a single machine, reaching the calculation can take minutes; in a Storm cluster, you can calculate coverage for the hardest URLs in seconds.

The sample range topology is defined in the storm-starter here. Here’s how to define a range topology:

LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
builder.addBolt(new GetTweeters(), 3);
builder.addBolt(new GetFollowers(), 12)
        .shuffleGrouping();
builder.addBolt(new PartialUniquer(), 6)
        .fieldsGrouping(new Fields("id", "follower"));
builder.addBolt(new CountAggregator(), 2)
        .fieldsGrouping(new Fields("id"));

Topology is performed in four steps:

1. GetTweeters gets the user who tweeted the URL. It converts the input stream of [id, url] to the output stream of [id, tweeter]. Each url tuple will be mapped to many tweeter tuples.

2. GetFollowers gets Twitter followers. It converts the input stream of [id, tweeter] to the output stream of [id, follower]. Across all tasks, when someone follows multiple people who post the same URL, there may be duplicate follow tuples.

3. PartialUniquer groups follower streams by follower ID. This has the effect of identical followers performing the same task. Therefore, each task of PartialUniquer will get independent followers. Once PartialUniquer receives all follower tuples directed at it for a request ID, it emits a unique count of its follower subset.

4. Finally, CountAggregator receives the partial counts from each PartialUniquer task and adds them together to complete the arrival calculation.

PartialUniquer code:

public class PartialUniquer extends BaseBatchBolt {
    BatchOutputCollector _collector;
    Object_id;
    Set<String> _followers = new HashSet<String>();

    @Override
    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
        _collector = collector;
        _id = id;
    }

    @Override
    public void execute(Tuple tuple) {
        _followers.add(tuple.getString(1));
    }

    @Override
    public void finishBatch() {
        _collector.emit(new Values(_id, _followers.size()));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "partial-count"));
    }
}

PartialUniquer implements IBatchBolt by extending BaseBatchBolt. The batch processing bolt provides a first-class API for processing batches of tuples as concrete units. A new batch bolt instance is created for each request ID, and Storm will take care of cleaning up the instance when appropriate.

When PartialUniquer receives a follow tuple in the execute method, it adds it to the set of request IDs in the internal HashSet.

The batch bolt provides the finishBatch method, which is called after all tuples for this batch for this task have been processed. In the callback, PartialUniquer emits a tuple containing a unique count of a subset of its follower ids.

Under the hood, CoordinatedBolt is used to detect when a given bolt has received all tuples for any given request ID. CoordinatedBolt utilizes direct flow to manage this coordination.

The rest of the topology should be self-explanatory. As you can see, every step up to the calculation is done in parallel and defining the DRPC topology is very simple.

Nonlinear DRPC topology

LinearDRPC TopologyBuilder only handles “linear” DRPC topologies, where calculations are expressed as a series of steps (like coverage). It is not difficult to imagine functions requiring more complex topologies, including branching and merging of bolts. Now, to do this, you need to use CoordinatedBolt directly. Be sure to discuss use cases for nonlinear DRPC topologies on the mailing list in order to build more general abstractions for DRPC topologies.

LinearDRPCTopologyBuilder workflow:

DRPCSpout emits [args, return-info]. return-info is the host name and port number of the DRPC server, and the id generated by the DRPC server.

Creating a topology involves:

  1. DRPCSpout
  2. PrepareRequest (generates request ID, creates a stream for return information, creates a stream for parameters)
  3. CoordinatedBolt
  4. JoinResult (use return info to merge results)
  5. ReturnResult (connect to DRPC server and return results)

LinearDRPC TopologyBuilder is a good example of building high-level abstractions on top of Storm primitives.

Advanced

KeyedFairBolt is used to weave the processing of multiple simultaneous requests

How to use CoordinatedBolt directly

DRPC (Distributed RPC) remote procedure call

Distributed remote procedure call

DRPC implements distributed RPC functions through a DRPC server.

The DRPC Server is responsible for receiving RPC requests, sending the requests to the Topology running in Storm, waiting to receive the processing results sent by the Topology, and returning the results to the client that sent the request.

(In fact, from the client’s perspective, DPRC is no different from ordinary RPC calls.)

DRPC design purpose:

In order to make full use of Storm’s computing power to achieve high-density parallel real-time computing.

(Storm receives several data stream inputs, the data is run in Topology, and then the results are output through DRPC.)

The client obtains the processing result by sending the name of the function to be executed and the parameters of the function to the DRPC server. The topology that implements this function uses a DRPCSpout to receive a stream of function calls from the DRPC server. The DRPC server tags each function call with a unique id. The topology will then execute the function to calculate the results, and use a bolt named ReturnResults at the end of the topology to connect to the DRPC server and return the results of the function call based on the id of the function call.

Define DRPC topology

Method 1:

Via LinearDRPCTopologyBuilder (this method is also expired and deprecated)

This method will automatically set the Spout for us, return the results to the DRPC Server, etc. We only need to implement the Topology

Method 2:

Create DRPC topology directly through the common topology construction method TopologyBuilder

You need to manually set the starting DRPCSpout and the ending ReturnResults

Running mode:

1. Local mode

2. Remote mode (cluster mode)

Modify the configuration file conf/storm.yaml

drpc.servers:

-“node1”

Start DRPC Server

bin/storm drpc &

Submit topology via StormSubmitter.submitTopology

Case:

Audience statistics of a certain URL in Twitter (how many people have seen this tweet)