Monday, July 18, 2016

Working with WSO2 RabbitMQ Inbound

I am writing this blog by considering you already have knowledge about inbound and RabbitMQ. If not please refer my previous blogs. It has brief idea about those.

In WSO2 RabbitMQ inbound, we use inbound as RabbitMQ consumer. It is event based inbound which polls only once to establish a connection with the remote server and then consumes events.  In here AMQP messaging protocol is used. AMQP is a wire-level messaging protocol that describes the format of the data that is sent across the network. If a system or application can read and write AMQP, it can exchange messages with any other system or application that understands AMQP regardless of the implementation language.

Steps to enable RabbitMQ Inbound 
  • Start the WSO2 Server by going <ESB_HOME>/bin by specifying this command sh ./wso2server.sh
  • Create  two simple sequences.  In my case I have created simple sequence names as TestIn and amqpErrorSeq with following configuration
    <?xml version="1.0" encoding="UTF-8"?>
    <sequence name="TestIn" onError="amqpErrorSeq" xmlns="http://ws.apache.org/ns/synapse">
        <log level="full"/>
        <drop/>
    </sequence>



    <?xml version="1.0" encoding="UTF-8"?>
    <sequence name="amqpErrorSeq" xmlns="http://ws.apache.org/ns/synapse">
        <log level="full"/>
        <drop/>
    </sequence>
  •  Go to inbound endpoints in Service Bus menu.
  • Click on Add Inbound endpoint. you will see the page like below.

  • Enter the name of the endpoint and select type as rabbitmq. Click on Next.
  • In the next page specify 

    • Sequence as TestIn.
    • Error Sequence as amqpErrorSeq. 
    • Suspend as false. We need to put as false to make inbound in enable mode. if we put true, the inbound is disabled.
    • Sequential as true.
    • Coordination as true.
    • rabbitmq.connection.factory as AMQPConnectionFactory
    • rabbitmq.server.host.name as localhost
    • rabbitmq.server.port as 5672. Here management console of rabbitmq is 15672. But here we have to mention as 5672.
    • rabbitmq.server.user.name as guest (In my case i am using guest account).
    • rabbitmq.server.password as guest (In my case i am using guest account).
    • rabbitmq.queue.name : specify a queue name.
  • rabbitmq.exchange.name : specify exchange name.
  • If you want to change anything in advanced options, click it and change it.
Thats all for the RabbitMq inbound. That means we have created consumer. For consuming events, we need to provide events. so i have created a sample java producer. That was given below. You can see the output of the message in ESB started terminal.

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    private final static String QUEUE_NAME = "queue";
    private final static String contentEncoding = "utf-8";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setPort(5672);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.exchangeDeclare("exchange", "direct", true);
        channel.queueBind(QUEUE_NAME, "exchange", "route");
        String message ="<m:placeOrder xmlns:m=\"http://services.samples\">" +
                "<m:order>" +
                "<m:price>100</m:price>" +
                "<m:quantity>20</m:quantity>" +
                "<m:symbol>RMQ</m:symbol>" +
                "</m:order>" +
                "</m:placeOrder>";
        // Populate the AMQP message properties
        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties().builder();
        builder.contentType("application/xml");
        builder.contentEncoding(contentEncoding);

        // Publish the message to exchange
        channel.basicPublish("exchange", QUEUE_NAME, builder.build(), message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        channel.close();
        connection.close();
    }
}

The out is given below.



No comments:

Post a Comment