We have gone through the core technical details of the Apache Storm and now it is time to code some simple scenarios.
Mobile call and its duration will be given as input to Apache Storm and the Storm will process and group the call between the same caller and receiver and their total number of calls.
Spout is a component which is used for data generation. Basically, a spout will implement an IRichSpout interface. “IRichSpout” interface has the following important methods −
The signature of the open method is as follows –
The signature of the nextTuple method is as follows –
nextTuple() is called periodically from the same loop as the ack() and fail() methods. It must release control of the thread when there is no work to do, so that the other methods have a chance to be called. So the first line of nextTuple checks to see if processing has finished. If so, it should sleep for at least one millisecond to reduce load on the processor before returning.
The signature of the close method is as follows –
The signature of the declare Output Fields method is as follows –
Declarer − It is used to declare output stream ids, output fields, etc.
This method is used to specify the output schema of the tuple.
The signature of the ack method is as follows –
This method acknowledges that a specific tuple has been processed.
The signature of the Next Tuple method is as follows –
This method informs that a specific tuple has not been fully processed. Storm will reprocess the specific tuple.
In our scenario, we need to collect the call log details. The information of the call log contains.
Since, we don’t have real-time information of call logs, we will generate fake call logs. The fake information will be created using Random class. The complete program code is given below.
Bolt is a component that takes tuples as input, processes the tuple, and produces new tuples as output. Bolts will implement IRichBolt interface. In this program, two bolt classes CallLogCreatorBolt and CallLogCounterBolt are used to perform the operations.
IRichBolt interface has the following methods −
The signature of the prepare method is as follows –
The signature of the execute method is as follows –
Here tuple is the input tuple to be processed.
The execute method processes a single tuple at a time. The tuple data can be accessed by getValue method of Tuple class. It is not necessary to process the input tuple immediately. Multiple tuple can be processed and output as a single output tuple. The processed tuple can be emitted by using the OutputCollector class.
The signature of the cleanup method is as follows –
The signature of the Declare Output Fields method is as follows –
Here the parameter declarer is used to declare output stream ids, output fields, etc.
This method is used to specify the output schema of the tuple
Call log creator bolt receives the call log tuple. The call log tuple has caller number, receiver number, and call duration. This bolt simply creates a new value by combining the caller number and the receiver number. The format of the new value is "Caller number – Receiver number" and it is named as new field, "call". The complete code is given below.
Call log counter bolt receives name and its duration as a tuple. This bolt initializes a dictionary (Map) object in the prepare method. In execute method, it checks the tuple and creates a new entry in the dictionary object for every new “call” value in the tuple and sets a value 1 in the dictionary object. For the already available entry in the dictionary, it just increment its value. In simple terms, this bolt saves the call and its count in the dictionary object. instead of saving the call and its count in the dictionary, we can also save it to a datasource. The complete program code is as follows −
The Storm topology is basically a Thrift structure. TopologyBuilder class provides simple and easy methods to create complex topologies. The TopologyBuilder class has methods to set spout (setSpout) and to set bolt (setBolt). Finally, TopologyBuilder has createTopology to create topology. Use the following code snippet to create a topology –
shuffleGrouping and fieldsGrouping methods help to set stream grouping for spout and bolts.
For development purpose, we can create a local cluster using "LocalCluster" object and then submit the topology using "submitTopology" method of "LocalCluster" class. one of the arguments for "submitTopology" is an instance of "Config" class. The "Config" class is used to set configuration options before submitting the topology. This configuration option will be merged with the cluster configuration at run time and sent to all task (spout and bolt) with the prepare method. once topology is submitted to the cluster, we will wait 10 seconds for the cluster to compute the submitted topology and then shutdown the cluster using “shutdown” method of "LocalCluster". The complete program code is as follows −
The complete application has four Java codes. They are −
The application can be built using the following command –
The application can be run using the following command –
Once the application is started, it will output the complete details about the cluster startup process, spout and bolt processing, and finally, the cluster shutdown process. In "CallLogCounterBolt", we have printed the call and its count details. This information will be displayed on the console as follows –
Storm topologies are implemented by Thrift interfaces which makes it easy to submit topologies in any language. Storm supports Ruby, Python and many other languages. Let’s take a look at python binding.
Python is a general-purpose interpreted, interactive, object-oriented, and high-level programming language. storm supports Python to implement its topology. Python supports emitting, anchoring, acking, and logging operations.
As you know, bolts can be defined in any language. Bolts written in another language are executed as sub-processes, and storm communicates with those sub-processes with JSON messages over stdin/stdout. First take a sample bolt WordCount that supports python binding.
Here the class WordCount implements the IRichBolt interface and running with python implementation specified super method argument "splitword.py". Now create a python implementation named "splitword.py".
This is the sample implementation for Python that counts the words in a given sentence. Similarly you can bind with other supporting languages as well.
Apache Storm Tutorial
All rights reserved © 2018 Wisdom IT Services India Pvt. Ltd
Wisdomjobs.com is one of the best job search sites in India.