RabbitMQ is a popular message broker that uses AMQP protocol. Helps creating and working with queues of data messages. Queues are needed almost everywhere now: everyone needs better performance and scalability. With the help of message broker you achieve both: you can use it to transfer data asynchronously between services, so that publisher sends the data to the queue without really waiting for it to be processed, the queue is persistent (can survive service/server restarts) and it will be dequeued by consumer/subscriber of that queue. There can be 1 or more consumers, based on that there are various strategies of distributing messages between them (round-robin algorithm by default, sending next message to the next free customer in circular order). Sure you can send the SAME message to several queues, each of which will distribute them to their subscribers (using this way you can broadcast messages to large audiences; exchange is being used in RabbitMQ)
This was a brief introduction, I want to show a sample of using multi subscriber simple example in C#.
Example project on GitHub.
We assume in the samples that RabbitMQ is hosted on local machine; why not use the docker image right from the docker hub (works on Windows too):
The task is to process the messages in queue “faster”, so the main idea is that we need to subscribe to it using several consumers. Sure we can create 1 executable and just run it as many times as needed, but why not make it from single executable, which also gives us the ability to scale using some configuration for example. You can check the sample on GitHub.
The main points
We use event based message receive model (allows to get the messages as they arrive), create a share connection, but different channels to work with the queue.
We use “exchange” here, just to show the exchange mechanics in same sample, it’s not really needed for the task (check Worker2 project, it works with another queue, which is binded to the same exchange):
channel.ExchangeDeclare(exchange: “logs”, type: ExchangeType.Fanout);
After that we create multiple consumers that use that newly created channels to work with the queue.
The main part is setting the QOS parameters: in case we use the default ones — all the messages will be distributed to last connected subscriber:
channel.BasicQos(0, 1, false);
Which means in the current example: take 1 message per consumer and while he is busy — continue distributing to the next free one.
That is probably it, the other things are standard ones: we create cannels with consumers in a loop, each of them receives 1 message and “works” on it. While working — distribution continues. RabbitMQ has async receive implementation too, but I was not using it in that case.
Full sample of consumption (ignore the timer in the beginning, it’s there just for another test O_O. Each consumer does some “intensive” “work” using Thread.Sleep to emulate the processing process):
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Timers;
class Worker
{
public static void Main()
{
// Test of timer handler
System.Timers.TimeraTimer = new System.Timers.Timer();
aTimer.Elapsed += new ElapsedEventHandler((source, e)
=> Console.Write("Timer Test"));
aTimer.Interval=3000;
// Test timer
// aTimer.Enabled = true;
var factory = new ConnectionFactory()
{
HostName = "localhost", UserName="user", Password="password",
// DispatchConsumersAsync = true
};
var connection = factory.CreateConnection();
// Add multiple consumers, so that queue can be processed "in
parallel"
for (int i=1; i<10; i++)
{
var j=i;
var channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "logs", type:
ExchangeType.Fanout);
var queueName=channel.QueueDeclare("test1", durable: true,
autoDelete: false, exclusive: false);
// take 1 message per consumer
channel.BasicQos(0, 1, false);
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
Console.WriteLine($" [*] Waiting for messages in {j}");
var consumer = new EventingBasicConsumer(channel);
consumer.Received+= (model, ea) =>
{
byte[] body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received in {j} -> {message} at
{DateTime.Now}");
// Thread.Sleep(dots * 1000);
// await Task.Delay(3000);
Thread.Sleep(10000);
// async works too
if (j==5)
{
// Test special case of returning item to queue: in
this case we received the message, but did not process
it because of some reason.
// QOS is 1, so our consumer is already full. We need
to return the message to the queue, so that another
consumer can work with it
Console.WriteLine($"[-] CANT PROCESS {j} consumer!
Error with -> {message}");
channel.BasicNack(deliveryTag: ea.DeliveryTag,
multiple: false, true);
}
else
{
Console.WriteLine($" [x] Done {j} -> {message} at
{DateTime.Now}");
// here channel could also be accessed as
((EventingBasicConsumer)sender).Model
channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: false);
}
};
channel.BasicConsume(queue: queueName, autoAck: false,
consumer: consumer);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
Sample of publisher that sends several messages to the queue
using System;
using RabbitMQ.Client;
using System.Text;
class NewTask
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
HostName="localhost",
UserName="user",
Password="password"
};
using(var connection=factory.CreateConnection())
using(var channel=connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
channel.QueueDeclare("test1", durable: true, autoDelete: false, exclusive: false);
channel.QueueDeclare("test2", durable: true, autoDelete: false, exclusive: false);
for (int i=0; i<31; i++)
{
var message=GetMessage(new string[]{i.ToString()});
var body=Encoding.UTF8.GetBytes(message);
var properties=channel.CreateBasicProperties();
properties.Persistent=true;
channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: properties, body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return ((args.Length>0) ?string.Join("", args) :"Hello World!");
}
}
Sample response from Worker
[x] Received in 9 -> 9 at 2/10/2022 22:28:20
[x] Done 4 -> 4 at 2/10/2022 22:28:29
[x] Done 2 -> 2 at 2/10/2022 22:28:29
[-] CANT PROCESS 5 consumer! Error with -> 5
[x] Done 1 -> 20 at 2/10/2022 22:28:29
[x] Done 3 -> 3 at 2/10/2022 22:28:29
[x] Received in 2 -> 10 at 2/10/2022 22:28:29
[x] Received in 3 -> 11 at 2/10/2022 22:28:29
[x] Received in 4 -> 13 at 2/10/2022 22:28:29
[x] Received in 1 -> 12 at 2/10/2022 22:28:29
[x] Received in 5 -> 5 at 2/10/2022 22:28:29
[x] Done 6 -> 6 at 2/10/2022 22:28:29
[x] Received in 6 -> 14 at 2/10/2022 22:28:29
[x] Done 7 -> 7 at 2/10/2022 22:28:29
[x] Received in 7 -> 15 at 2/10/2022 22:28:29
[x] Done 8 -> 8 at 2/10/2022 22:28:29
[x] Received in 8 -> 16 at 2/10/2022 22
Source: Medium - Dima Pursanov
The Tech Platform
Comments