Introduction to WSO2 CEP
WSO2 Complex Event Processor (CEP) is a lightweight, easy-to-use, open source Complex Event Processing server (CEP). CEP is used to real time processing of the data. We can give data through the receiver (There are lot of ways to give data to CEP such as HTTP format, Kafka, WSO2 event..), process the stream by writing siddhi queries in execution plan and publish it in different format.
Introduction to Siddhi
Siddhi Query Language (SiddhiQL) is designed to process event streams to identify complex event occurrences. We have to add streams for input and output to write execution plans and siddhi queries. the queries is applicable to that stream.
Creating stream for Input
- Start the CEP server using sh ./wso2server.sh command in terminal
- Go to https://localhost:9443/carbon
- In the Management Console, under Manage tag you could see Streams. Click on that.
- Then you click Add Event Stream. Then you will see following window.
- In that Specify the name, version, description and Attributes. The data should be given in the order of attributes.
- Like that define your output stream also.
- Click on Execution plans under the Streaming Analytics in Manage and click on Add New Execution Plan.
- You could see below window.
- In that you can import stream (that means your input stream related to receiver) and export stream (that means your output stream related to publisher)
- After that you could write your siddhi queries in that by referring streams and you can put result in output stream.
Window extension is type of siddhi extension. Window Extension allows events to be collected and expired without altering the event format based on the given input parameters like the Window operator You can get the archetype of the window extension in here. After cloning this git repo and follow setup.txt file to get your sample project. Specify the class name as your wish when you are creating project from archetype. Then the main class and test case class will be created with wanted classes. The structure of the project is given below.
Here we will see the purposes of methods in SampleWindow.java class.
- init : This method will be called before other methods. When we writing queries we write parameter value for method.
For e.g window.unique:length(ip,3). This init method will take that parameter values from the query.
- process : The main processing method that will be called upon event arrival. The logic of the window extension will be written here. We have to write what kind of processes we have to apply for arriving event.
- find : To find events from the processor event pool, that the matches the matching Event based on finder logic.
- constructFinder : To construct a finder having the capability of finding events at the processor that corresponds to the incoming matching Event and the given matching expression logic.
- start : This will be called only once and this can be used to acquire required resources for the processing element.
- stop : This will be called only once and this can be used to release the acquired resources for processing.
- currentState : Used to collect the serialize state of the processing element, that need to be persisted for the reconstructing the element to the same state on a different point of time.
- restoreState : Used to restore serialized state of the processing element, for reconstructing the element to the same state as if was on a previous point of time.
In Sample.siddhiext you have to give a name and the main class which is having the logic like this. SampleWindow=org.wso2.extension.siddhi.window.sample.SampleWindow
Then you have to build the Project using "mvn clean install" in terminal pointing this project's pom.xml. Then you will get jar file in target folder. Take that jar file to <CEP_HOME>/repository/components/dropins. After starting the CEP, you can use query like this
There is already I created a siddhi extension called UniqueLength. UniqueLength means according to the given attribute it will give unique events within given length. You can found the source code from here. After cloning, you can build and put it in jar file in <CEP_HOME>/repository/components/dropins and use this query to use this extension functionality.
/* Enter a unique ExecutionPlan */
/* Enter a unique description for ExecutionPlan */
/* define streams/tables and write queries here ... */
define stream UniqueIN (timeStamp long, a string, ip string);
define stream UniqueOUT (a string, ip string);
select a, ip
insert expired events into UniqueOUT;
Here my siddhiext file name is unique.siddhiext and value in that file is length=org.wso2.extension.siddhi.window.uniquelength.UniqueLengthWindowProcessor
Hope you have got at least basic idea of Window extension. Have a nice day. :)