top of page

Using RabbitMQ with multiple consumers from one .NET executable

Writer's picture: The Tech PlatformThe Tech Platform


RabbitMQ is a popular message broker that uses AMQP protocol. Helps creating and working with queues of data messages. Queues are needed almost everywhere now: everyone needs better performance and scalability. With the help of message broker you achieve both: you can use it to transfer data asynchronously between services, so that publisher sends the data to the queue without really waiting for it to be processed, the queue is persistent (can survive service/server restarts) and it will be dequeued by consumer/subscriber of that queue. There can be 1 or more consumers, based on that there are various strategies of distributing messages between them (round-robin algorithm by default, sending next message to the next free customer in circular order). Sure you can send the SAME message to several queues, each of which will distribute them to their subscribers (using this way you can broadcast messages to large audiences; exchange is being used in RabbitMQ)


This was a brief introduction, I want to show a sample of using multi subscriber simple example in C#.



We assume in the samples that RabbitMQ is hosted on local machine; why not use the docker image right from the docker hub (works on Windows too):


The task is to process the messages in queue “faster”, so the main idea is that we need to subscribe to it using several consumers. Sure we can create 1 executable and just run it as many times as needed, but why not make it from single executable, which also gives us the ability to scale using some configuration for example. You can check the sample on GitHub.


The main points

  • We use event based message receive model (allows to get the messages as they arrive), create a share connection, but different channels to work with the queue.

  • We use “exchange” here, just to show the exchange mechanics in same sample, it’s not really needed for the task (check Worker2 project, it works with another queue, which is binded to the same exchange):

channel.ExchangeDeclare(exchange: “logs”, type: ExchangeType.Fanout);
  • After that we create multiple consumers that use that newly created channels to work with the queue.

The main part is setting the QOS parameters: in case we use the default ones — all the messages will be distributed to last connected subscriber:

channel.BasicQos(0, 1, false);

Which means in the current example: take 1 message per consumer and while he is busy — continue distributing to the next free one.


That is probably it, the other things are standard ones: we create cannels with consumers in a loop, each of them receives 1 message and “works” on it. While working — distribution continues. RabbitMQ has async receive implementation too, but I was not using it in that case.


Full sample of consumption (ignore the timer in the beginning, it’s there just for another test O_O. Each consumer does some “intensive” “work” using Thread.Sleep to emulate the processing process):

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Timers;

class Worker
{
    public static void Main()    
    {
        // Test of timer handler
        System.Timers.TimeraTimer = new System.Timers.Timer();
        aTimer.Elapsed += new ElapsedEventHandler((source, e) 
                                => Console.Write("Timer Test"));
        aTimer.Interval=3000;
        // Test timer
        // aTimer.Enabled = true;
        
        var factory = new ConnectionFactory()        
        {
            HostName = "localhost", UserName="user", Password="password",
            // DispatchConsumersAsync = true        
        };
        var connection = factory.CreateConnection();
        
        // Add multiple consumers, so that queue can be processed "in 
        parallel"
        for (int i=1; i<10; i++)        
        {
            var j=i;
            var channel = connection.CreateModel();
            
            channel.ExchangeDeclare(exchange: "logs", type: 
                                    ExchangeType.Fanout);
            var queueName=channel.QueueDeclare("test1", durable: true, 
                            autoDelete: false, exclusive: false); 
            
            // take 1 message per consumer
            channel.BasicQos(0, 1, false);
            
            channel.QueueBind(queue: queueName,
                    exchange: "logs",
                    routingKey: "");
            Console.WriteLine($" [*] Waiting for messages in {j}");
            
            var consumer = new EventingBasicConsumer(channel);
            
            consumer.Received+= (model, ea) =>            
            {
                byte[] body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine($" [x] Received in {j} -> {message} at 
                                                    {DateTime.Now}");
                
                // Thread.Sleep(dots * 1000);
                
                // await Task.Delay(3000);
                Thread.Sleep(10000); 
                // async works too
                
                if (j==5)                
                {
                    // Test special case of returning item to queue: in 
                    this case we received the message, but did not process 
                    it because of some reason.
                    // QOS is 1, so our consumer is already full. We need 
                    to return the message to the queue, so that another 
                    consumer can work with it
                    Console.WriteLine($"[-] CANT PROCESS {j} consumer! 
                                        Error with -> {message}"); 
                    channel.BasicNack(deliveryTag: ea.DeliveryTag, 
                                            multiple: false, true);                
                }
                else                
                {
                    Console.WriteLine($" [x] Done {j} -> {message} at 
                                        {DateTime.Now}");
                    
                    // here channel could also be accessed as 
                    ((EventingBasicConsumer)sender).Model
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, 
                                                multiple: false);                
                }            
            };
            channel.BasicConsume(queue: queueName, autoAck: false, 
                                consumer: consumer);        
        }
        
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();    
    }
}

Sample of publisher that sends several messages to the queue

using System;
using RabbitMQ.Client;
using System.Text;

class NewTask
{
    public static void Main(string[] args)    
    {
        var factory = new ConnectionFactory() 
        { 
                HostName="localhost",
                UserName="user", 
                Password="password"
        };
        using(var connection=factory.CreateConnection())
        using(var channel=connection.CreateModel())        
        {
            channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
            channel.QueueDeclare("test1", durable: true, autoDelete: false, exclusive: false);
            channel.QueueDeclare("test2", durable: true, autoDelete: false, exclusive: false);
            
            for (int i=0; i<31; i++)            
            {
                var message=GetMessage(new string[]{i.ToString()});
                var body=Encoding.UTF8.GetBytes(message);
                var properties=channel.CreateBasicProperties();
                properties.Persistent=true;
                channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: properties, body: body);
                Console.WriteLine(" [x] Sent {0}", message);            
            }        
        }
        
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();    
    }
    
    private static string GetMessage(string[] args)    
    {
        return ((args.Length>0) ?string.Join("", args) :"Hello World!");    
    }
}

Sample response from Worker

 [x] Received in 9 -> 9 at 2/10/2022 22:28:20
 [x] Done 4 -> 4 at 2/10/2022 22:28:29
 [x] Done 2 -> 2 at 2/10/2022 22:28:29
[-] CANT PROCESS 5 consumer! Error with -> 5
 [x] Done 1 -> 20 at 2/10/2022 22:28:29
 [x] Done 3 -> 3 at 2/10/2022 22:28:29
 [x] Received in 2 -> 10 at 2/10/2022 22:28:29
 [x] Received in 3 -> 11 at 2/10/2022 22:28:29
 [x] Received in 4 -> 13 at 2/10/2022 22:28:29
 [x] Received in 1 -> 12 at 2/10/2022 22:28:29
 [x] Received in 5 -> 5 at 2/10/2022 22:28:29
 [x] Done 6 -> 6 at 2/10/2022 22:28:29
 [x] Received in 6 -> 14 at 2/10/2022 22:28:29
 [x] Done 7 -> 7 at 2/10/2022 22:28:29
 [x] Received in 7 -> 15 at 2/10/2022 22:28:29
 [x] Done 8 -> 8 at 2/10/2022 22:28:29
 [x] Received in 8 -> 16 at 2/10/2022 22


Source: Medium - Dima Pursanov


The Tech Platform

0 comments

Recent Posts

See All

Comments


bottom of page