23.11.20238 min
Przemysław Jastrowicz

Przemysław JastrowiczEngineering Manager

Implementacja maszyny stanów Mass Transit Saga

Sprawdź, w jaki sposób rozwiązać problem w aplikacji korzystając z MassTransit Saga.

Implementacja maszyny stanów Mass Transit Saga

Dzisiejszy krajobraz systemów technologicznych jest bardzo bujny i obfituje w wiele różnych narzędzi, wzroców czy tecnologii. Do rozwiązania każdego problemu możemy podejść na wiele różnych sposobów wykorzystując przeróżne metody i narzędzia. Dziś chciałbym opisać problem który napotkaliśmy w naszej aplikacji, oraz przedstawić sposób w jaki go rozwiązaliśmy przy użyciu maszyny stanów od Mass Transita. Na wstępie chciałbym zaznaczyć, że artykuł ten będzie oparty o konkretny przykład który wystąpił w działającym produkcyjnie systemie dlatego dobieranie rozwiązań musiały być osadzone w kontekście obecnie używanych technologii, zasobów, oraz priorytetów.

Co to jest maszyna stanów?

Maszyna stanów to koncepcja programistyczna, która opisuje sposób modelowania i zarządzania cyklem życia systemu, uwzględniając różne stany, zdarzenia i przejścia między nimi. Składa się ze zdefiniowanych stanów, zdarzeń, przejść oraz akcji, gdzie stany reprezentują różne fazy lub warunki systemu, zdarzenia to akcje lub sytuacje powodujące zmianę stanu, przejścia określają, jak system przechodzi z jednego stanu do drugiego, a akcje są związane z konkretnymi zdarzeniami lub przejściami i reprezentują wykonywane zadania w odpowiedzi na te zdarzenia lub przejścia. Maszyny stanów są używane w programowaniu do modelowania zachowań systemów, umożliwiając programistom łatwe zarządzanie złożonymi procesami i reakcję na różnorodne warunki działania systemu.

Opis problemu

System o którym mówimy wyglądał nastepująco:

  • Aplikacja frontendowa A- Aplikacja ERM służąca do zarządzania zasobami firmy
  • Mikroserwis A- Jeden z mikroserwisów który ściśle współpracował z Aplikacją Frontendową A i wykonywał określony zestaw zadań
  • Baza danych A- Baza danych która służyła do przechowywania informacji skonfigurowanych w wyżej wymienionych systemach
  • Mikroserwis B- Mikroserwis który wykonywał specyfincze operacje bazując na danych skonfigurowanych w sytemach A. 
  • Baza danych B- Baza służąca do przetrzymywania kopii transformowanych danych z serwisów A. Rozwiązanie to powstało po to, aby w przyszłości móc bezboleśnie wymienić całą część A, a część B funkcjonować będzie dalej bez zarzutu. Można powiedzieć, że część B  jest niejako "source of truth" dla reszty systemów.
  • Mikroserwis C- mikroserwis którzy korzysta z danych z mikroserwisów B
  • Aplikacja Frontendowa C- Aplikacja która prezentuje dane odpowiednim użytkownikom.
  • ASB- Azure Service Bus służący do komunikacji między mikroserwisami


Problem polega na tym, że użytkownik serwisu A oczekuje, że zmiany które wprowadzi będą widoczne w systmeie C. Niestety, jeśli w trakcie synchronizacji pojawi się problem, użytkownik widzi różne dane w obu systemach więc od razu kontaktuje się z programistą. Czasami interwencja jest potrzebna, ale czasami synchronizacja dopiero jest na kolejce  i zanim programista zdąży sprawdzić jaki jest status danej encji, synchronizacaj się zakończy i problem sam się rozwiąże. Powoduje to mnóstwo problemów komunikacyjnych, frustrację developerów ale przede wszystkim podważa zaufanie użytkownika do naszej aplikacji ponieważ nigdy nie ma on pewności czy akcje które podjął mają jakiekolwiek odzwierciedlenie w innych częściach systemu.

Nasz zespół odpowiedzialny był za część A, natmoiast inne zespoły były odpowiedzialne za inne części systemu. Warto zwrócić szczególną uwagę na to, że mikroserwis B pobierał dane z mikroserwisu A poprzez zwykłe zapytanie HTTP. Jest to część systemu która wygenerowała cały problem. Nie będę zagłębiał się dlaczego to wszystko zostało w ten sposób zaaranżowane ponieważ nie zawsze mamy na to wpływ. Najczęściej dołączamy do już działającego projektu i rzadko kiedy otrzymujemy zielone światło aby przepisywać wszystko od zera. Musieliśmy stawić czoła temu problemowi trzymając się następujących założeń:

  1. Użytkownik systemu A musi mieć pewność że dane na których pracuje są aktualne, lub otrzymać informacje że synchronizacja danych zawiodła.
  2. Nie możemy ingerować w system B. Oczywiście możemy dodać tam kilka linijek kodu w porozumieniu z drugim zespołem, natomiast odpadają wszystkie duże i czasochłonne zmiany gdyż zwyczajnie nie ma na to czasu i przestrzeni w pozostałych zespołach.
  3. Poprawa nie może zająć nam kilku sprintów gdyż jest dużo innych bardziej istotnych zadań do wykonania w tym momenice.


Oto nasze rozwiązanie.

Po analizach wielu różnych rozwiązań doszliśmy do wniosku, że dopóki nie zaimplementujemy docelowego rozwiązania postawimy na maszynę stanów od Mass Transit.

Poniżej opiszę w jaki sposób zdefiniowaliśmy naszą maszynę stanów, a na sam koniec pokażę nowy schemat komunikacji.

public record OfferState : SagaStateMachineInstance, IStateEntity
{
    public Guid CorrelationId { get; set; }
    public int CurrentState { get; set; }
    public string Message { get; set; }
    public DateTime TimeStamp { get; set; }
}
public interface IStateEntity
{
    public Guid CorrelationId { get; set; }
    public int CurrentState { get; set; }
    public string Message { get; set; }
    public DateTime TimeStamp { get; set; }
}


W taki sposób zdefiniowaliśmy nasz stan. Interfejs SagaStateMachineInstance jest niezbędny do prawidłowego funkcjonowania naszej maszyny. Interfejs IStateEntity to nasz interfejs który dodaliśmy aby łatwiej nawigować po naszych stanach oraz w przyszłości umożliwić generyczne tworzenie stanów. 

Następnie w naszej maszynie stanów zdefiniowaliśmy trzy stany dla naszej encji:

public State SyncPending { get; set; }
public State Synced { get; set; }
public State SyncFailed { get; set; }


Jako że nasza maszyna stanów jest bardzo prosta i póki co wykorzystujemy ją tylko do przechowywania informacji o obecnym stanie synchronizacji, te trzy podstawowe stany są dla nas wystarczające. 

Następnie musimy zdefiniować eventy. Będą to eventy (tak naprawdę wiadomości) które będą miały wpływ na zmianę stanu naszej encji. Tutaj możemy defniować tyle eventów ile potrzebujemy. W naszym przypadku wyglądało to tak:

public Event<IOfferUpsertedMessage> OfferUpserted { get; set; }
public Event<IOfferSynced> OfferSynced { get; set; }
public Event<IOfferSyncFailed> OfferSyncFailed { get; set; }
public Event<IOfferArchivedMessage> OfferArchived { get; set; }
public Event<IOfferDeleted> OfferDeleted { get; set; }


Są to wszystkie wiadomości, które wymienia sytem A z systemem B.

Gdy mamy już zdefiniowane stany naszej encji oraz zdarzenia które mogą ten stan modyfikować, czas skonfigurować naszą maszynę stanów. 

Warto nadmienić, że eventy oraz stany mogą być skonfigurowane poza klasą maszyny stanów. W naszym przypadku wszystkie zostały zdefiniowane w jednej klasie, aby łatwiej było ją czytać. Do utworzenia maszyny stanów wystarczy utworzyć klasę, która odziedziczy MassTransitStateMachine<T> gdzie T to właśnie nasza klasa stanu którą zdefiniowaliśmy wczesniej

public class OfferStateMachine : MassTransitStateMachine<OfferState> {
...
}


Aby skonfigurować maszynę stanów należy do niej niejako zaimportować stany. Innymi słowy, musimy podać jakie stany może przyjmować nasza encja. Korzystając z uprzednio zdefiniowanych stanów należy wywołać metodę InstanceState gdzie podamy nasze stany.

InstanceState(x => x.CurrentState, SyncPending, Synced, SyncFailed);


Ważna jest tutaj kolejność. Nasze stany w bazie danych widniały będą jako intigery, a to miejsce w kodzie zdefiniuje nam ich kolejność. To nie oznacza, że nie będziemy mogli cofać się np. ze stanu numer 4 do stanu numer 3. Teoretycznie z perspektywy wykonywalności kodu, nie ma to znaczenia w jakiej kolejności będą one tu zdefiniowane, natomiast dobrą praktyką jest, zdefiniowanie kolejności która będzie odzwierciedlała prawidłowy przepływ.

Pomimo że teoretycznie status "Zamówienie zakończone" może być oznaczony jako 3 a status "Oczekuję na płatność" jako 6 to z perspektywy czytelnośći, lepiej uszeregować je w odpowiedniej kolejnośći.

Następnie musimy zdefiniować CorelationId. CorelationId jest to Id po którym nasza maszyna stanów identyfikuje, czy dana wiadomość dotyczy konkretnej encji. W tym przypadku wystarczy identyfikator naszej encji.

Event(() => OfferUpserted, x => x.CorrelateById(context => context.Message.EntityId));
Event(() => OfferSynced, x => x.CorrelateById(context => context.Message.EntityId));
Event(() => OfferSyncFailed, x => x.CorrelateById(context => context.Message.EntityId));
Event(() => OfferArchived, x => x.CorrelateById(context => context.Message.EntityId));
Event(() => OfferDeleted, x => x.CorrelateById(context => context.Message.EntityId));


Gdy mamy już określone jakie stany przyjmuje nasza maszyna, oraz jak identyfikujemy nasze wiadomości, możemy zabrać się za definiowanie przepływu w naszej maszynie. Jest to niejako serce całej maszyny stanów ponieważ w tym miejscu określimy pomiędzy jakimi stanami nasza encja może się przełączać, jaki będzie jej stan początkowy, który stan uważamy za "zakończony" itp. Jest tutaj wiele możliwości, i MassTransit daje nam bardzo przejrzysty i czytelny sposób na definiowanie naszych przepływów. Nie będę ich wszystkich opisywał ponieważ bardzo łatwo można o nich przeczytać w dokumentacji. Opiszę kilka podstawowych, które będą przydatne w każdym przypadku:

  • Initially() - deklarujemy tuta zdarzenia i aktywności które będą podejmowane w fazie początkowej. Innymi słowy, kiedy nasza encja nie ma jeszcze żadnego stanu musimy zdefiniować co ma się wydarzyć. Dla przykładu, gdy tworzymy nowe zamówienie w sklepie internetwoym raczej nie chcemy aby lądowało ono od razu w statusie "Zakończony"
  • Finally() - analogiczny przepływ który definiuje nam co ma się wydarzyć gdy nasza encja wejdzie w stan "ostateczny"
  • During() - deklarujemy tutaj akcje które mają wydarzyć się gdy nasza encja znajduje się w jakimś konkretnym stanie
  • DuringAny() - w tym przypadku, możemy zadeklarować, że chcemy wykonać aktywności niezależnie od stanu w którym nasza encja się obecnie znajduje.


Dalej możemy definiować nasze zachowania i akcje. Akcje to niec innego jak definicja tego co ma się wydarzyć, natomiast zachowanie opisuje kiedy ma się ono wydarzyć. Poniżej przedstawiam pełny zdefiniowany przepływ:

Initially(
    When(OfferUpserted ).Then(context => {
        context.Saga.Message = "Sync pending - not synced yet.";
        context.Saga.TimeStamp = DateTime.UtcNow;
    }).TransitionTo(SyncPending));


Przepływ ten mówi nam o tym że, początkowo nasłuchujemy tylko zdarzenia OfferUpserted a gdy to zdarzenie się pojawi, ustawiamy status naszej encji na SyncPending. Dodatkowo do naszej sagi dodajemy informację tesktową o tym, co się wydarzyło, oraz dodajemy TimeStamp  dzięki czemu w przyszłości, łatwo będziemy mogli sprawdzić kiedy cokolwiek zadziało się z naszą sagą.

Kolejny przykładowy przepływ będzie nam służył do obsługi błędów, i jest tak naprawdę kluczowym w całym tym rozwiżaniu:

DuringAny(
    When(OfferSyncFailed).Then(context => {
        context.Saga.Message = "Created on \"A\" side. Not synced to \"B\" yet." + context.Message.Message;
        context.Saga.TimeStamp = DateTime.UtcNow;
    }).TransitionTo(SyncFailed));


Jak widzimy, kiedykolwiek dostaniemy wiadomość OfferSyncFailed, zmienimy stan naszej encji na SyncFailed oraz dodamy do wiadomości treść błędu który otrzymaliśmy od serwisu B. W obecnym systemie jest to ogromny krok naprzód ponieważ w tym momencei wiemy, że dana encja nie zsynchronizowała się poprawie i możemy tę informację zaprezentować użytkownikowi systemu A.

Nowy schemat komunikacji między serwisami

Jak widzimy teraz komunikacja między systemem B i A następuje chociaż częściowo poprzez ASB. Tutaj nasuwa się pytanie, skoro można było wprowadzić taką komunikację, to dlaczego nie można było od razu poprawić całej komunikacj i rozwiązać tego problemu lepiej. Otórz tak jak wspomniałem w założeniach, nie było na to przestrzeni. Komunikacja która została dodata wymagała od zespołu B dodania dosłownie dwóch linijek kodu:

await context.Publish(new OfferSynced() { EntityId = context.Message.EntityId });


w miejscu w którym synchronizacja się zakończyła, oraz

await context.Publish(new OfferSyncedFailed() { EntityId = context.Message.EntityId, Message = e.Message });


w miejscu obsługi wyjątku.

Podsumowanie

Dzięki tym prostym krokom byliśmy w stanie w ciągu niecałęgo sprintu przygotować naszym użytkownikom rozwiązanie które roboczo nazwaliśmy "traffic lights" Teraz użytkownik w aplikacji frontendowej, przy każdej encji któa wymagała synchronizacji widzi status: zielony - oznaczający że encja na której pracuje jest "live" i synchronizacja przebiegła prawidłowo, status pomarańczowy - encja jest w trakcie synchronizacji, więc nadal zmiany nie będą widoczne po stronie serwisu C, oraz światło czerwone czyli po prostu status opisujący brak synchronizacji.

Oczywiście, cała komunikacja jest nadal napisana niezgodnie ze standardami i jeżeli tylko będzie na to czas, dołożymy starań aby całościowo poprawić nasz system, natomiast wprowadzenie w gruncie rzeczy dość niewielkiej zmiany, dało naszym pracownikom dużo lepszy ogląd na to jaki jest status encji nad którymi pracują, a nam programistom zespołu A, oszczędziło ogromu jednakowych zadań o tytule : zmiany które wprowadziłem nie są widoczne w systemie C.

<p>Loading...</p>