首頁 > 軟體

NServiceBus 結合 RabbitMQ 使用教學

2020-06-16 17:15:00

NServiceBus 結合 RabbitMQ 使用可以參考官方教學:

Step by Step Guide

新建4個專案:

  • A Console Application named Client
  • A Console Application named Server
  • A Console Application named Subscriber
  • A Class Library named Shared

Framework框架選擇4.6及以上,後面有用到。

Client,Server,Subscriber參照Shared。

4個專案都安裝NServiceBus包:

Install-Package NServiceBus

3個控制台專案安裝NServiceBus.RabbitMQ包:

Install-Package NServiceBus.RabbitMQ

Share程式碼:

using NServiceBus;
public class PlaceOrder:ICommand
    {
        public Guid Id { get; set; }
        public string Product { get; set; } 
    }
public class OrderPlaced:IEvent
    {
        public Guid OrderId { get; set; }
    }
public class PlaceShipping:ICommand
    {
        public Guid Id { get; set; }
        public string Product { get; set; }
    }

Client程式碼:

namespace Client
{
    class Program
    {
        static void Main(string[] args)
        {
            AsyncMain().GetAwaiter().GetResult();
        }
        static async Task AsyncMain()
        {
            Console.Title = "Sample.StepByStep.Client";
            var endpointConfiguration = new EndpointConfiguration(endpointName: "Sample.StepByStep.Client");
            endpointConfiguration.SendFailedMessagesTo("error");
            var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
            transport.ConnectionString("host=10.255.20.44;username=guest;password=guest");
            endpointConfiguration.UseSerialization<JsonSerializer>();
            endpointConfiguration.EnableInstallers();
            endpointConfiguration.UsePersistence<InMemoryPersistence>();
            var endpointInstance = await Endpoint.Start(endpointConfiguration).ConfigureAwait(false);
            try
            {
                await SendOrder(endpointInstance);
            }
            catch (Exception)
            {
                await endpointInstance.Stop().ConfigureAwait(false);
            }
        }

        private static async Task SendOrder(IEndpointInstance endpointInstance)
        {
            Console.WriteLine("Press enter to send a message");
            Console.WriteLine("Press any key to exit");
            while(true)
            {
                var key = Console.ReadKey();
                Console.WriteLine();
                if(key.Key!=ConsoleKey.Enter)
                {
                    return;
                }
                var id = Guid.NewGuid();
                var id2 = Guid.NewGuid();
                var placeOrder = new PlaceOrder
                {
                    Product = "New shoes",
                    Id = id
                };
                var placeShipping = new PlaceShipping
                {
                    Product = "A-->B",
                    Id = id2
                };
                await endpointInstance.Send("Samples.StepByStep.Server", placeOrder);
                await endpointInstance.Send("Samples.StepByStep.Server", placeShipping);
                Console.WriteLine($"Sent a PlaceOrder messge with id:{id:N}");
                Console.WriteLine($"Sent a PlaceShipping messge with id:{id2:N}");
            }
        }
    }
}

Server程式碼:

namespace Server
{
    class Program
    {
        static void Main(string[] args)
        {
            AsyncMain().GetAwaiter().GetResult();
        }
        static async Task AsyncMain()
        {
            Console.Title = "Samples.StepByStep.Server";
            var endpointConfiguration = new EndpointConfiguration("Samples.StepByStep.Server");
            endpointConfiguration.UseSerialization<JsonSerializer>();
            endpointConfiguration.EnableInstallers();
            var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
            transport.ConnectionString("host=10.255.20.44;username=guest;password=guest");
            endpointConfiguration.UsePersistence<InMemoryPersistence>();
            endpointConfiguration.SendFailedMessagesTo("error");

            var endpointInstance = await Endpoint.Start(endpointConfiguration)
                .ConfigureAwait(false);
            try
            {
                Console.WriteLine("Press any key to exit");
                Console.ReadKey();
            }
            finally
            {
                await endpointInstance.Stop()
                    .ConfigureAwait(false);
            }
        }
    }
}
 
 
namespace Server
{
    public class PlaceOrderHandler : IHandleMessages<PlaceOrder>
    {
        static ILog log = LogManager.GetLogger<PlaceOrderHandler>();

        public Task Handle(PlaceOrder message, IMessageHandlerContext context)
        {
            log.Info($"Order for Product:{message.Product} placed with id: {message.Id}");
            log.Info($"Publishing: OrderPlaced for Order Id: {message.Id}");

            var orderPlaced = new OrderPlaced
            {
                OrderId = message.Id
            };
            return context.Publish(orderPlaced);
        }
    }
}
 
 
namespace Server
{
    public class PlaceShippingHandler : IHandleMessages<PlaceShipping>
    {
        static ILog log = LogManager.GetLogger<PlaceShippingHandler>();

        public Task Handle(PlaceShipping message, IMessageHandlerContext context)
        {
            log.Info($"Shipping for Product:{message.Product} placed with id: {message.Id}");
            return Task.CompletedTask;
        }
    }
}

為什麼要選4.6以上,原因就在Task.CompletedTask需要4.6以上。

SubScribe程式碼:

namespace Subscriber
{
    class Program
    {
        static void Main(string[] args)
        {
            AsyncMain().GetAwaiter().GetResult();
        }
        static async Task AsyncMain()
        {
            Console.Title = "Samples.StepByStep.Subscriber";
            var endpointConfiguration = new EndpointConfiguration("Samples.StepByStep.Subscriber");
            endpointConfiguration.UseSerialization<JsonSerializer>();
            endpointConfiguration.EnableInstallers();
            var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
            transport.ConnectionString("host=10.255.20.44;username=guest;password=guest");
            endpointConfiguration.UsePersistence<InMemoryPersistence>();
            endpointConfiguration.SendFailedMessagesTo("error");

            var endpointInstance = await Endpoint.Start(endpointConfiguration)
                .ConfigureAwait(false);
            try
            {
                Console.WriteLine("Press any key to exit");
                Console.ReadKey();
            }
            finally
            {
                await endpointInstance.Stop()
                    .ConfigureAwait(false);
            }
        }
    }
}
 
 
namespace Subscriber
{
    public class OrderCreatedHandler : IHandleMessages<OrderPlaced>
    {
        static ILog log = LogManager.GetLogger<OrderCreatedHandler>();

        public Task Handle(OrderPlaced message, IMessageHandlerContext context)
        {
            log.Info($"Handling: OrderPlaced for Order Id: {message.OrderId}");
            return Task.CompletedTask;
        }
    }
}

選擇多啟動專案:

啟動專案,在Client端按回車,可以看到Server端和Subscribe端的接收資訊:

同時檢視http://10.255.20.44:15672/#/queues:

CentOS 7.2 下 RabbitMQ 叢集搭建 http://www.linuxidc.com/Linux/2016-12/137812.htm

CentOS7環境安裝使用專業的訊息佇列產品RabbitMQ http://www.linuxidc.com/Linux/2016-11/13673.htm

RabbitMQ入門教學  http://www.linuxidc.com/Linux/2015-02/113983.htm

在CentOS7上安裝RabbitMQ 詳解  http://www.linuxidc.com/Linux/2017-05/143765.htm


IT145.com E-mail:sddin#qq.com