RabbitMQ – message broker

rabbitmq

RabbitMQ is software that represents a message broker that originally implements the Advanced Message Queuing Protocol (AMQP). The RabbitMQ server program is written in the Erlang programming language and is built on the Open Telecom Platform framework for clustering and rapid recovery from system crashes. Simply put, RabbitMQ is software where we can define queues, applications can connect to those queues and send or receive messages through them.

The message can contain any kind of information. For example, it may contain information about a process or task that should be run on another application (which may be on another server), or it may be a plain text message. The message queue manager stores the messages until the application that retrieves the message from the queue appears. Thereafter, the application that downloaded the message processes it and uses it according to its needs.

What does RabbitMQ allow us to do?

Messaging allows us to more easily connect and scale applications. Applications can be interconnected as components of a larger application, or they can connect to and receive data from users’ devices. We can think of messaging as the delivery of some data, push notifications, the use of publish / subscribe systems, or message queues. RabbitMQ offers many features that allow you to increase reliability at the expense of performance, which includes durability, delivery confirmation, sender confirmation and high availability. RabbitMQ clients exist for almost all programming languages, and various messaging protocols are supported.

Installation

For example, to show how messaging works and all the features RabbitMQ has, it is essential that I have the appropriate packages installed on my computer. If you are working with Debian or Ubuntu (14.04-18.04) distributions it is enough to download and install the package from the following address:
https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.7/rabbitmq-server_3.7.7-1_all.deb
If you want to install all the necessary packages manually or use the Windows operating system, more detailed instructions can be found at the following address:
https://www.rabbitmq.com/download.html
The RabbitMQ server will be automatically started after installing the package, but also with each restart of the computer. You can check the status of the server with the command:

sudo rabbitmqctl status

Note: You need sudo privileges to install and use the RabbitMQ server.

How does it actually work?

We can think of the RabbitMQ server as a post office. When we want to send a message, we go to the nearest branch and insert a letter into the mailbox. Thereafter, the letter will almost certainly be delivered to whoever it is intended for. In this analogy, RabbitMQ is a mailbox, a post office and a postman.

The main difference between Mail and RabbitMQ is that RabbitMQ works by accepting, storing and forwarding data in binary format – messages, while Mail works with envelopes and papers. RabbitMQ, like all messaging applications, uses a certain jargon. Some of the most important concepts are:

  • Producing: means sending a message. The program that sends the message is the manufacturer, ie. producer.
  • Queue: is the name of the mailbox that resides inside RabbitMQ, and is essentially a large message buffer. All messages are stored in the proper order. The queue is limited by the memory and disk size of the host computer.
  • Consuming: means receiving. The program waiting to receive the messages is the consumer.

Note: The manufacturer, consumer and broker need not reside on the same host; in fact, in most situations this is not the case.

Messaging

Messages are not forwarded directly to the queue; instead, the manufacturer sends messages for exchange. The messaging process is responsible for routing messages in different rows. The exchange takes the message from the manufacturer and determines its route based on bindings and routing keys. Binding links the exchange process to the rows.

Message flow

  • The manufacturer sends a message for exchange. When we create an exchange, we define its type as well.
  • The exchange dumps the message and becomes responsible for its routing, which it does based on the message attributes.
  • Link-to-row links were created. In the example shown, we have two different rows and one link to each.
  • The message is in order until it is picked up by the consumer.
    The consumer processes the messages.

Types of exchange

There are four types of exchange:

Direct: The message will be delivered in rows whose binding keys are in full agreement with the routing keys specified in the message

Fanout: The message will be forwarded to all rows related to the exchange

Topic: The message will be delivered to the queue if the routing key meets the scheme defined by the queue and exchange link

Headers: For routing, it uses the attributes contained in the message header, not the routing keys.

Hello World example

I will write two programs in the Java programming language; the manufacturer who will send the single message, and the consumer who will accept the messages and print them at the terminal.

The diagram above shows the Manufacturer marked “P” and the Consumer marked “C”. The red rectangle in the middle is the row – the message buffer. In order to write these programs, we need a Java client for the RabbitMQ server. You can find the necessary libraries at the following links:
client library
SLF4J API
SLF4J Simple

We import the classes we need in the usual way:

import com.rabbitmq.client. *;

Send a message

The Send class will represent our manufacturer. The manufacturer connects to the RabbitMQ server, sends a single message, closes the connection to the server and ends.

public class Send {
   private final static String QUEUE_NAME = “hello”;
   public static void main (String [] argv)
       throws java.io.IOException {
       …
   }
}

I create a connection to the server as follows:

ConnectionFactory factory = new ConnectionFactory ();
factory.setHost (“localhost”);
Connection connection = factory.newConnection ();
Channel channel = connection.createChannel ();

The Connection class abstracts the socket connection to the server and takes care of the protocol version negotiation, authentication, and other important things that happen in the background. The connection represents a TCP competition between the application and the RabbitMQ broker. In this way, a connection was made to the broker on the local machine (localhost). If the broker is on another machine, we would simply pass the name or IP address of that machine to the appropriate function. After that, we create a channel that allows us to use most of the features to communicate with the broker. A channel is a virtual connection within a connection. In order to send a message, we need to declare the order in which we want to send the message.

channel.queueDeclare (QUEUE_NAME, false, false, false, null);
String message = “Hello World!”;
channel.basicPublish (“”, QUEUE_NAME, null, message.getBytes ());
System.out.println (“[x] Sent ‘” + message + “‘”);

If the declared row does not exist, it will be created otherwise there will be no changes. When we send a message, we close the channel and then connect to the broker.

channel.close ();
connection.close ();

Receiving the Message

The Receive class will represent the consumer. It will read messages that arrive at a queue that will constantly listen. As with the Send class, we similarly open the connection and channel, and declare the line from which we want to read messages.

public class Recv {
   private final static String QUEUE_NAME = “hello”;

   public static void main (String [] argv)
       throws java.io.IOException,
              java.lang.InterruptedException {

     ConnectionFactory factory = new ConnectionFactory ();
     factory.setHost (“localhost”);
     Connection connection = factory.newConnection ();
     Channel channel = connection.createChannel ();

     channel.queueDeclare (QUEUE_NAME, false, false, false, null);
     System.out.println (“[*] Waiting for messages. To exit press CTRL + C”);
     }
}

Here, we will also declare a queue in case the Consumer Program first goes live, and by declaring it we ensure that the queue is secure. Now we need to notify the server that we want it to send us messages that are coming in line. Since the server sends messages asynchronously, we need to provide a callback function (in the form of a DefaultConsumer class object) that will buffer the messages until the Consumer process is ready to use them.

Consumer consumer = new DefaultConsumer (channel) {
   @Override
   public void handleDelivery (String consumerTag, Envelope envelope,
                              AMQP.BasicProperties properties, byte [] body)
       throws IOException {
     String message = new String (body, “UTF-8”);
     System.out.println (“[x] Received ‘” + message + “‘”);
   }
};
channel.basicConsume (QUEUE_NAME, true, consumer);

Starting

Now both classes are complete. To compile them, we need to add the RabbitMQ Java client to the classpath. If you are using Eclipse the path work environment is as follows: right click on the project → Build Path → Configure Build Path, then select the Libraries tab and add the already mentioned .jar files that are part of the * RabbitMQ Java * client. Then you run both programs with the Run as Application option. You can also add .jar files, compile and run from the terminal using the following commands:

  • compiling classes

javac -cp amqp-client-4.0.2.jar Send.java Recv.java

  • running the program

java -cp .: amqp-client-4.0.2.jar: slf4j-api-1.7.21.jar: slf4j-simple-1.7.22.jar Recv
java -cp .: amqp-client-4.0.2.jar: slf4j-api-1.7.21.jar: slf4j-simple-1.7.22.jar Send

Show rows

Using the rabbitmqctl tool, we can see how many lines RabbitMQ has created and how many messages are contained in it:

Ubuntu:

sudo rabbitmqctl list_queues

Windows:

rabbitmqctl.bat list_queues

Command result after program start

Timeout: 60.0 seconds …
Listing queues for vhost / …
hello 1

The result of the command after the RabbitMQ broker sends a message to the Consumer:

Timeout: 60.0 seconds …
Listing queues for vhost / …
hello 0

Round-robin dispatching

If there is only one consumer and multiple messages are in order, all messages will be forwarded to that consumer. However, if we had more messages in the queue and more consumers, the RabbitMQ broker would implement a round-robin system where every nth consumer would receive an nth message from the queue. This ensures that the message is forwarded evenly, that is, on average, each consumer will receive the same number of messages.

Message acknowledgment

As we can see, after delivering the message to the client listening to the appropriate queue, RabbitMQ deletes the message from that queue. If one of the consumers stops processing the message during the message processing, the message will be lost. Of course, we want to avoid this, and in that case forward the message to another free consumer. For this reason, the consumer is able to send a message informing the RabbitMQ broker that the message has been received, processed and can be deleted from the queue. Failure to receive this message means that the consumer has ceased operations and that the message should be forwarded to another consumer.

channel.basicQos (1); // accept only one unack-ed message at a time (see below)
final Consumer consumer = new DefaultConsumer (channel) {
   @Override
   public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException {
     String message = new String (body, “UTF-8”);
     System.out.println (“[x] Received ‘” + message + “‘”);
     System.out.println (“[x] Received ‘” + message + “‘”);
     try {
       doWork (message);
     } finally {
       System.out.println (“[x] Done”);
       channel.basicAck (envelope.getDeliveryTag (), false);
     }
   }
};
boolean autoAck = false;
channel.basicConsume (TASK_QUEUE_NAME, autoAck, consumer);

The code above is similar to the Recv class in HelloWorld, except that we have added the part where after processing the message with the channel.basicAck function we send a confirmation to the server. We also need to turn off automatic confirmation by setting the autoAck flag to false. It should be noted that the confirmation must be sent via the same channel through which the message was received, otherwise the result will be an exception related to the channel level protocol. Certificates can be tracked using the command:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

The first number in the result indicates the number of messages to send, while the second number indicates the number of acknowledgments we are waiting for.

Timeout: 60.0 seconds …
Listing queues for vhost / …
hello 0 1

This output tells us that the consumer has received the message but has not yet confirmed that processing is complete. In the next output we see that the message is still in order and is waiting to be forwarded to the next consumer, which means that the previous consumer stopped working and did not process the message until the end.

Timeout: 60.0 seconds …
Listing queues for vhost / …
hello 1 0

Message durability

Sending an acknowledgment to the broker on receipt of the message prevents the message from being lost from the queue if some consumer stops working and does not complete the message processing until the end. However, there is still the possibility of losing messages if the RabbitMQ server stops working. When RabbitMQ ceases to operate, whether due to an error, power failure, or intentional shutdown, it will forget all lines and messages in them, unless we tell it otherwise. First, we need to ensure that RabbitMQ never loses our queue. In order to do this we must declare that our order is permanent.

channel.queue_declare (queue = ‘hello’, durable = True)

In the HelloWorld example, we have already declared the order “hello” and it exists on the broker from the moment of running our examples, so this command in our case would result in an error. This is because RabbitMQ does not allow redefining rows with different parameters. So we will declare a new row (both programs in our example must declare the same row).

channel.queue_declare (queue = ‘task_queue’, durable = True)

We are now confident that our queues will continue to exist after the restart of RabbitMQ. The second step is to ensure the message is durable. For each message sent to the broker, we define that we want it to be permanent by setting the value of the delivery_mode attribute to 2. When it comes to the permanence of the message, it cannot be fully guaranteed in this way. That is, there will always be that short period of time that elapses from the moment the message is accepted to the recording of the message to disk. In addition, RabbitMQ will not write to the disk for each message. In most cases, messages will be cached and then written to disk together.

Even message forwarding

I already mentioned that message forwarding works by forwarding every nth message to a n consumer. Let’s take an example where we have two consumers, each odd message is large and the first consumer will be constantly busy, and each even message is small and the second consumer will have almost no business. This is because RabbitMQ only forwards messages, not viewing the number of unconfirmed messages for the consumer. It blindly adheres to its standard method of forwarding.

Here we can use channel.basic_qos to pass the parameter prefetch_count = 1. In this way, we tell the RabbitMQ server not to send a message to the consumer if he is still processing the previous one, regardless of whether it is his turn, but to forward the message to the first free consumer.

Publish / Subscribe

I have already described the complete messaging model on the RabbitMQ broker, saying that the manufacturer never sends a message directly to the queue, but forwards it to the exchange, where the message is routed to the rows depending on the value of the routing key (i.e., the header attribute or routing scheme, depending on the type of exchange). Now we will see how exchanges, links are created, and how the message is routed to all log queues. For starters, we’ll create a fanout exchange.

channel.exchangeDeclare (“logs”, “fanout”);

When sending a message, we provide only the name of the exchange:

channel.basicPublish (“logs”, “”, null, message.getBytes ());

We also need to create a link between exchange and order:

channel.queueBind (queueName, “logs”, “”);

The third parameter is the routing key, however, its value will certainly be ignored due to the type of exchange. The commands for listing exchanges and links on the server are as follows:

sudo rabbitmqctl list_exchanges
rabbitmqctl list_bindings

In this case, consumers create queues and associate them with the exchange.

channel.exchangeDeclare (EXCHANGE_NAME, “fanout”);
String queueName = channel.queueDeclare (). GetQueue ();
channel.queueBind (queueName, EXCHANGE_NAME, “”);

Each consumer creates a separate queue, and each row is forwarded to a message that arrives for exchange. This way, each consumer will receive a copy of the same message, which is essentially a publish / subscribe template.

Calling a remote procedure

The so-called * RPC * (Remote procedure call) template is very used in programming and is based on calling a method located on a remote computer and waiting for results. In the following example, we will illustrate the operation of RPC services through a RabbitMQ server. The client will have a call method that will send an RPC request to the server and wait for a response.

FibonacciRpcClient FibonacciRpc = new FibonacciRpcClient ();
String result = FibonacciRpc.call (“4”);
System.out.println (“fib (4) is” + result);

The client sends a request message to the server, and the server responds by forwarding a response message to the RabbitMQ server.

callbackQueueName = channel.queueDeclare (). getQueue ();
BasicProperties props = new BasicProperties
                             .Builder ()
                             .replyTo (callbackQueueName)
                             .build ();
channel.basicPublish (“”, “rpc_queue”, props, message.getBytes ());
// … then code to read a response message from the callback_queue …

This creates a new row for each RPC request, which is not efficient enough. A better and more efficient way is for each client to create their own order. This brings with it new problems. How will we know which request is being answered for the response that has come to our attention? Fortunately, the AMQP protocol allows us to include attributes in a message, and in this situation we will use correlationId, to which we will assign a new unique value for each request. Later, when the message arrives, we can easily match the request and response based on that id. In this way, we also prevent duplicate responses from being accepted.

The client sends an RPC request, specifically sending a message to the server with two attributes: replayTo, which is set to an exclusive row created only for accepting the request, and correlationId, which is set to a unique value for each request. The request is sent to the rpc_queue queue. The server listens to this queue waiting for a new request. When a request occurs, the server processes it and sends a result message to the client in the replyTo queue. The client listens to the repalyTo queue. When the message arrives it checks the correlationId attribute. If it agrees with the value generated when sending the request, it continues to process the message. We will show this example using two classes. In the RPCServer class, we will establish a connection, a channel, and declare a queue. If we want to run more than one server process, then we need to set the prefetchCount variable in the channel.basicQos () method so that each server receives the same number of requests. We use basicConsume to access the queue, where we create a DefaultConsumer object that performs calculations in the background and sends a response back. The client code is slightly more demanding (RPSClient class). First, we establish a connection and a channel. The call () method is in charge of sending RPC requests. This is where the correlationId is created and stored in memory to be used later to accept the appropriate response. We then create an exclusive response queue and log in to it. Finally, we send a message with the set attributes replyTo and correlationId. Since delivery happens in a separate thread, we need to ensure that the main thread is also suspended until the response arrives. One solution is to use blocking rows. In this case, we create an ArrayBlockingQueue with a capacity set to 1 because we will always wait for one response. The handleDelivery method is in charge of checking the response, i.e. whether the correlationId of the response received matches the request. If it matches, we place it in a blocking row. At the same time, the main thread is waiting for a response to get it out of order. The main thread retrieves the message and displays the response to the user. The design shown in the previous example is not the only possible implementation of RPC service, but it has many advantages:

  • if the RPC server is too slow we can scale it by adding another server;
  • on the client side, RCP requires only one message to be sent and received. The result is that the RPC client only needs one return path through the network for one RPC request.

Parallel programming using RabbitMQ message queues

We can extend the previous example with a remote procedure invocation so that the calculation that generates the result for instances of the RPCClient class is distributed to all running instances of the RPCServer class. In this example, we will use exactly that Monte Carlo method for determining ????. We will generate n points (x, y) where we will take the values of x and y by the random sampling method. In the RPCServer class we will define a new monteCarlo method, where the variable in will represent the number of points that satisfy the condition x2 + y2 ≤ 1.

private static int monteCarlo () {
       int in = 0;
       double x, y;
       Random random = new Random ();
      
       for (int i = 0; i <n_samples; i ++) {
           x = random.nextDouble ();
           y = random.nextDouble ();
          
           if (x * x + y * y <= 1) and ++;
       }
      
       return in;
   }

We also make changes to RPCClient. Instead of one message in the call method, we will send n messages and wait until all are received, and then we will print the result in the main thread. The call method now looks like this:

public double call () throws IOException, InterruptedException {
    Channel channel = connection.createChannel ();
    List corrIds = new ArrayList ();
    
    String replyQueueName = channel.queueDeclare (). GetQueue ();
    String message;
    
    for (int i = 0; i <n_scale; i ++) {
        corrIds.add (UUID.randomUUID (). toString ());
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder ()
                .correlationId (corrIds.get (s))
                .replyTo (replyQueueName)
                .build ();
        
        message = Integer.toString (i);
        channel.basicPublish (“”, requestQueueName, props, message.getBytes (“UTF-8”));
    }
final BlockingQueue response = new ArrayBlockingQueue (10);
    
    String ctag = channel.basicConsume (replyQueueName, true, new DefaultConsumer (channel) {
      @Override
      public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException {
        for (String corrId: corrIds) {
            if (properties.getCorrelationId (). equals (corrId)) {
              response.offer (new String (body, “UTF-8”));
            }
        }
      }
    });

    String in;
    int in_global = 0;
    
    for (int i = 0; i <n_scale; i ++) {
        in = response.take ();
        in_global + = Integer.parseInt (in);
    }
    
    Double result;
    result = 4.0 * in_global / (10000.0 * n_scale);
    
    channel.basicCancel (ctag);
    try {
        channel.close ();
    } catch (TimeoutException e) {
        // TODO Auto-generated catch block
        e.printStackTrace ();
    }
    
    return result;
  }

We have already set channel.basicQos (1) in the RPCServer class so we know that all messages will be distributed evenly to the servers. Given this, the generation of random points iv will run in parallel on as many servers as are available at any given time. The answer to each message is the number of points that satisfy the above condition. Number ???? is then calculated by the formula:


???? ≈ (4 * Ninner) / Ntotal

Where Ninner is the total number of hits and Ntotal is the total number of hits. An example of using RabbitMQ message queues for the purpose of calculating Monte Carlo number method in parallel ???? is one of the simpler examples of parallel programming in general since there is no communication between parallel processes, otherwise the RPCServer class would be much more demanding and complicated.

Performance Comparison

There are many software solutions and libraries that allow communication between different processes, however, not all of them are equally effective in solving a variety of problems. The choice of solutions is influenced by many factors, from what type of communication we have, whether we send smaller or larger packets, to how important our delivery guarantee or latency is. Let’s take the Monte Carlo method of calculating the number again ????. Using the RabbitMQ client, we implemented the computation split across multiple servers without major hassle, thus significantly reducing the wait time for results. When we have one client and only one active server that performs random sampling 1000000000 times, we can expect the result in about 77.0 seconds.

PI = 3.141596724
Elapsed time is 77.0
PI25 – PIour = -4.070410206669095E-6

If we add another server, the execution time will be almost twice as fast because we now have two servers that process 500000000 samples in parallel.

PI = 3.141580376
Elapsed time is 38.0
PI25 – PIour = 1.2277589793274757E-5

However, with four servers we will not get four times shorter execution time as some time is lost in sending, receiving and scheduling messages.

PI = 3.141490404
Elapsed time is 26.0
PI25 – PIour = 1.0224958979332399E-4

Now let’s look at the performance of a program written using the MPI (Message Passing Interface) library, where MPI is a portable messaging standard for use on parallel computing architectures. In our case, a program written according to the MPI standard involves running n processes that will do the same job as RPCServer, except that there is no special program to represent the client but a null process that returns the value of the number ????. When we run only one MPI process that performs 100000000 sampling, we get a runtime of 69 seconds, which is almost 10 seconds shorter than the RabbitMQ solution with a single active server.

PI = 3.1416036119999999343121999
Elapsed time is 69.307003
PI25 – PIour = -0.0000109584102068183142364

When we run two processes again we get a slightly shorter computation time compared to the RabbitMQ implementation.

PI = 3.1415385960000001830394467
Elapsed time is 35.220628
PI25 – PIour = 0.0000540575897929329585168

With four active MPI processes, we get a slightly longer computation time than the RabbitMQ solution, which can be caused by the synchronization of MPI processes and the cost of messaging between multiple processes.

PI = 3.1415493319999998611535830
Elapsed time is 27.595450
PI25 – PIour = 0.0000433215897932548443805

What is important to note is that MPI is primarily designed to work on clusters with a fast and reliable network, in which case it is a much better choice if you are developing an application that needs to perform complex scientific calculations in parallel. We would rather use RabbitMQ when we need communication between different remote applications or applications that operate on a producer / consumer basis, but also when we need more error tolerance and network instability than when it comes to distributed systems.

Conclusion

RabbitMQ messaging solution is applicable in many situations. It is most commonly used to allow web services to respond quickly to a request, rather than being forced to perform demanding operations while users wait for a result. It is also used to distribute the message to multiple clients, but also to balance the distribution of messages among consumers. Specific use cases may be:

  • The application must work with any combination of existing protocols such as AMQP 0-9-1, STOMP, MQTT, AMQP 1.0.
  • Better message-based consistency control is needed.
  • The application needs different types of messaging (point to point, request / reply, publish / subscribe)
  • Complex consumer routing, integrating multiple services / applications with nontrivial routing logic.

 

Likes:
9 0
Views:
1607
Article Categories:
PROGRAMMINGTECHNOLOGY

Leave a Reply

Your email address will not be published. Required fields are marked *