top of page
Writer's pictureThe Tech Platform

Notification Queue : RabbitMQ in .NET Core

lets start the RabbitMQ server running in docker container. If you don’t have already, it will pull by default and command it:

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management

Now we will create the Publisher/Sender and here is the complete code:

using RabbitMQ.Client;
using System;
using System.Text;
namespace RabbitMQPublisher
{
    class Sender
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() 
            { 
                HostName = "localhost" 
            };
            using (var connection = factory.CreateConnection())
            {
                Console.WriteLine("Enter command 'exit' to stop sending 
                                                           meesages");
                string message = string.Empty;
                string receiver = string.Empty;
                do
                {
                    SendMessage(connection, message, receiver);
                    Console.Write("Enter your message:");
                    message = Console.ReadLine();
                    Console.Write("Enter the reciever:");
                    receiver = Console.ReadLine();
                } 
                while (!message.Equals("exit",     
                                   StringComparison.OrdinalIgnoreCase));
            }
        }
        private static void SendMessage(IConnection connection, string 
        message, string receiver)
        {
            if (string.IsNullOrEmpty(message))
                return;
            using (var channel = connection.CreateModel())
            {
                //Create the exchange type direct which will broadcast 
                messages to a specific queue driven by routingKey.
                channel.ExchangeDeclare(exchange: "direct_msg", type: 
                    "direct");
                channel.QueueDeclare(queue: "Notification",durable: false, 
                    exclusive: false, autoDelete: false, arguments: null);
                var body = Encoding.UTF8.GetBytes(message);
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;
                channel.BasicPublish(exchange: "direct_msg",
                                     routingKey: receiver,
                                     basicProperties: properties,
                                     body: body);
                Console.WriteLine($" Message Sent to '{receiver}'");
            }
        }
    }
}

In above code, receiver name is an input and based on that code creates a routingKey as “{receiver_name}” input and bind it to the queue “Notification” with exchange type as “direct”. Rest all is same as per my previous article.


Next, we will create Receivers/Subscribers. Here is the complete code:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace RabbitMQSubscriber
{
    class Receiver
    {
        public static void Main()
        {
            var queueName = "Notification";
            Console.Write("Enter UserName:");
            string userName = Console.ReadLine();
            var factory = new ConnectionFactory() 
            {
                 HostName = "localhost" 
            };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "direct_msg", type: 
                                                               "direct");
                    channel.QueueDeclare(queue: queueName, durable: false, 
                    exclusive: false, autoDelete: false, arguments: null);
                    
                     channel.QueueBind(queue: queueName, exchange: 
                     "direct_msg", routingKey: userName);
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body.ToArray();
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine($"Message Received: {message}");
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, 
                                                    multiple: false);
                    };
                //making autoAck: false as we are doint it manually above
                    channel.BasicConsume(queue: queueName, autoAck: false, 
                                                      consumer: consumer);
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();}
            }
        }
    }
}


In above Receiver’s code, routing key is set to the user input {user_name} and bind it to the queue named “Notification” and exchange type is “direct”. That’s all, now if you run the both the application as One instance of Sender and multiple instances of Receiver then the output would be as below:




Source: Medium - Binod Mahto


The Tech Platform

0 comments

Σχόλια


bottom of page