Nasza strona używa cookies. Dowiedz się więcej o celu ich używania i zmianie ustawień w przeglądarce. Korzystając ze strony, wyrażasz zgodę na używanie cookies, zgodnie z aktualnymi ustawieniami przeglądarki. Rozumiem

RabbitMQ - Bezbolesna integracja z .NET Core

Kamil Kiełbasa .NET Developer / 7N
Dowiedz się, jak zintegrować message brokera RabbitMQ z aplikacją .NET Core MVC.
RabbitMQ - Bezbolesna integracja z .NET Core

Ostatnio w moje łapki wpadła bardzo ciekawa książka - RabbitMQ in Depth. Moją nową miłością okazało się tworzenie aplikacji rozproszonych przy wykorzystaniu DDD, TDD i jeszcze kilku innych skrótów, których nie wymienię. 

Mają one ważną cechę – muszą się ze sobą komunikować. Nie ma znaczenia, czy robią to synchronicznie za pomocą requestów HTTP, czy też asynchronicznie, za pomocą kolejki wiadomości. Są jak zespół - bez komunikacji praca jest bez sensu. W dzisiejszym artykule chciałbym Wam przedstawić, jak w bezbolesny sposób zintegrować message brokera RabbitMQ z aplikacją .NET Core MVC. Wykorzystamy do tego bibliotekę Raw Rabbit. Poznałem ją dzięki Piotrowi Gankiewiczowi podczas prowadzonych przez niego devWarsztatów.


Konfiguracja Raw Rabbita

No dobra, przejdźmy do mięska. Nie będę Wam opisywał, jak uruchomić kolejkę RabbitMQ, ponieważ zrobiłem to w poprzednim artykule. Zaczniemy od instalacji paczkek nugeta.

<PackageReference Include="RawRabbit" Version="1.10.4" />
<PackageReference Include="RawRabbit.vNext" Version="1.10.4" />


Raw Rabbit do swojego działania potrzebuje dwóch paczek. Na chwilę obecną polecam Wam instalację numerów 1.10.4, ponieważ wersja 2+ jest jeszcze oznaczona jako release candidate. Stabilność projektu - ważna sprawa!

Przejdźmy do pliku appsettings.json, w którym wklejamy konfigurację biblioteki.

{
  "rabbitmq": {
    "Username": "guest",
    "Password": "guest",
    "VirtualHost": "/",
    "Port": 5672,
    "Hostnames": [ "localhost" ],
    "RequestTimeout": "00:00:10",
    "PublishConfirmTimeout": "00:00:01",
    "RecoveryInterval": "00:00:10",
    "PersistentDeliveryMode": true,
    "AutoCloseConnection": true,
    "AutomaticRecovery": true,
    "TopologyRecovery": true,
    "Exchange": {
      "Durable": true,
      "AutoDelete": true,
      "Type": "Topic"
    },
    "Queue": {
      "AutoDelete": true,
      "Durable": true,
      "Exclusive": true
    }
  }
}

No i mamy ścianę informacji. Co z tego możemy wyciągnąć? Na przykład konfiguracje kolejek, czy centrali Exchange, w które - na szczęście - dla Was i dla mnie, nie ma teraz sensu się zagłębiać. Obecnie najważniejszymi kluczami są:
  • Username i Password, wiadomo chyba co robią.
  • Port, czyli na którym porcie nasłuchuje nasza instancja RabbitMQ. Standardowy port to 5672.


Tworzenie klienta

Stwórzmy teraz klienta, czyli obiektu, za pomocą którego będziemy komunikować się z RabbitMQ. Osobiście uwielbiam zamykać je w mechanizmie extension methods. Umożliwia to łatwe i przyjemne wyniesienie kodu do zewnętrznej biblioteki i używanie w większej ilości projektów.

public static class ServiceCollectionExtensions
{
    public static void AddRabbitMq(this IServiceCollection services, IConfigurationSection section)
    {
        // RabbitMQ Configuration
        var options = new RawRabbitConfiguration();
        section.Bind(options);
        
        var client = BusClientFactory.CreateDefault(options);
        services.AddSingleton<IBusClient>(_ => client);
    }
}


Sama implementacja nie wymaga wiele omawiania. Metoda rozszerzająca (tak to się tłumaczy?) otrzymuje sekcję konfiguracyjną z pliku appsettings.json. Następnie wiąże ją z konfiguracją Raw Rabbita, by na końcu stworzyć defaultowy obiekt BusClient.

Wisienką na torcie jest wpis w pliku Startup.cs.

public void ConfigureServices(IServiceCollection services)
{          
    services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);
    
    // RabbitMQ Configuration
    services.AddRabbitMq(Configuration.GetSection("rabbitmq"));
}


Pierwsza zdefiniowana wiadomość

Kiedy klient został już odpowiednio skonfigurowany, możemy przejść do zdefiniowania pierwszej wiadomości w systemie. Stwórzmy interfejs znacznikowy, którym będziemy naznaczać nasze wiadomości.

public interface IMessage { }


Oczywiście interfejs będzie pusty w środku. Teraz definiujemy POCO (Plain Old C# Object). Nie chcąc komplikować tego tutorialu, będzie to bardzo prosta klasa, która zawiera tylko jedno pole Message.

public class SendMessage : IMessage
{
    public string Message { get; }
    public SendForgetPasswordEmailCommand(string message)
    {
        Message = message;
    }
}


Ciekawym szczegółem implementacji jest fakt, że obiekt klasy SendMessage jest immutable. Oznacza to, że nie można zmienić jego stanu. Takie tam fanaberię można robić!


Pierwszy handler wiadomości

Skoro sama wiadomość posiada swój interfejs, to dlaczego nie miałby go posiadać handler? Możemy stworzyć taki dość uproszczony rodzaj handlera na podstawie poniższego interfejsu.

public interface IHandler<in T> where T : IMessage
{
    Task HandleAsync(T message, CancellationToken token);
}


Będzie on definiował przymus implementowania metody HandleAsync, która w swoich argumentach przyjmuje wiadomość, jak i CancellationToken. W tym artykule pominiemy implementację wykorzystania tego mechanizmu. Warto jednak wiedzieć o takiej możliwość.

Przejdźmy do przykładowej implementacji handlera, niewiele dłuższej od samego, zdefiniowanego interfejsu.

public class SendMessageHandler : IHandler<SendMessage>
{   
    public async Task HandleAsync(SendMessage @event, CancellationToken token)
    {          
        Console.WriteLine($"Receive: {@event.Message}");
        return Task.CompletedTask;
    }
}


W nim po prostu wypisujemy na konsolę to, co otrzymaliśmy w wiadomości.

Następnie wystarczy, że dodamy nasz handler do kontenera DI. W tym celu użyjemy standardowego kontenera .NET Core MVC.

services.AddTransient<IHandler<SomeMessage>>, SendMessageHandler>();


Zapisujemy się na przychodzące wiadomości

Jako że RabbitMQ powstał z myślą o aplikacjach rozproszonych. Poprawną praktyką jest tworzenie kodu nadającego się do użycia w kilku aplikacjach naraz. Za pomocą mechanizmu Extension Methods możemy zrobić rozszerzenie do interfejsu IApplicationBuilder, które najpierw pobierze nam instancje obiektu BusClient, a następnie wywoła na nim metodę SubscribeAsync. Dodatkowo, jeżeli nasze rozszerzenie zwraca typ IApplicationBuilder, otrzymujemy przydatne FluetAPI. Wykorzystując app.ApplicationServices.GetService, dynamicznie pobieramy instancję handlera. Cała implementacja wygląda następująco:

public static class ApplicationBuilderExtensions
{
    public static IApplicationBuilder AddHandler<T>(this IApplicationBuilder app, IBusClient client) 
        where T : IMessage
    {
        if (!(app.ApplicationServices.GetService(typeof(IHandler<T>)) is IHandler<T> handler))
            throw new NullReferenceException();
        
        client
            .SubscribeAsync<T>(async (msg, context) =>
            {
                await handler.HandleAsync(msg, CancellationToken.None);
            });
        return app;
    }
    public static IApplicationBuilder AddHandler<T>(this IApplicationBuilder app)
        where T : IMessage
    {
        if (!(app.ApplicationServices.GetService(typeof(IBusClient)) is IBusClient busClient))
            throw new NullReferenceException();
        
        return AddHandler<T>(app, busClient);
    }
}


Łatwo zauważyć, że w parametrze SubscribeAsync jest wbite na sztywno wyrażenie lambda. Dla zwiększenia elastyczności implementacji logiki wystarczy przekazać wyrażenie lambda w argumentach metody. Na potrzeby tego artykułu oszczędzałem ilość znaków, stąd sztywna Lambda.

Nie pozostało nam nic innego, jak tylko wykorzystać ładnie stworzone API do zapisywania się na nadchodzące wiadomości.

public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
    // ...
    app.UseMvc();
    app.AddHandler<SomeMessage>()
        .AddHandler<SomeAnotherMessage>()
        .AddHandler<MayberOneMoreMessage>();
}


Wysłanie wiadomości z poziomu kontrolera

Na zakończenie przejdźmy jeszcze do implementacji wysyłania wiadomości z poziomu kontrolera. Nie ma tutaj większej magii. Pobieramy obiekt BusClient z naszego kontenera DI i w jego akcji możemy wywołać metodę PublishAsync.

public class MessageController : Controller
{
    private readonly IBusClient _client;
    public ProfilesController(IBusClient client)
    {
        _client = client;
    }
    [HttpGet]
    [Route("Create")]
    public async Task<IActionResult> Create()
    {
        await _client.PublishAsync(new SomeMessage("Test Message"));
        
        return Accepted();
    }
}


Robimy to w tej samej aplikacji, jednak polecam wygenerowanie nowej. Pozwoli nam to poczuć, jak wygląda komunikacja pomiędzy dwoma aplikacjami. Oczywiście musicie mieć na uwadze, że tworząc nową aplikację, musi ona zawierać konfigurację połączenia z RabbitMQ.


Podsumowanie

Jeżeli jakimś cudem dobrnęliście na koniec tego artykułu, należą Wam się szczere gratulacje. W sumie ostatnio przyzwyczaiłem Was drodzy czytelnicy do krótszych form, acz niecałą wiedzę można przekazać w krótki, prosty sposób. Stworzyliśmy w naszym projekcie uproszczoną w obsłudze implementację wykorzystania biblioteki Raw Rabbit. Podobną można znaleźć we wspólnym projekcie edukacyjnym Piotra Gankiewicza i Dariusza Pawlukiewicza pod szyldem DevMentors. Główną różnicą mojej wersji jest wykorzystanie standardowego konteneru DI oraz przyjęte nazewnictwo.

Mam nadzieje, że udało mi się przybliżyć sposób wykorzystania brokera wiadomości RabbitMQ w sposób wystarczająco klarowny, abyście mogli zacząć stosować go w swoim przyszłych projektach.

Dzięki za poświęcony czas na czytanie tego artykułu.

Zobacz więcej na Bulldogjob

Masz coś do powiedzenia?

Podziel się tym z 100 tysiącami naszych czytelników

Dowiedz się więcej
Rocket dog