Nasza strona używa cookies. Korzystając ze strony, wyrażasz zgodę na używanie cookies, zgodnie z aktualnymi ustawieniami przeglądarki. Rozumiem

IoT z .NET i Azure. Jak zacząć?

Sprawdź instrukcję krok po kroku, jak użyć IoT Hub, by stworzyć rozwiązanie IoT, oparte o Azure i zaimplementowane w .NET Framework.

IoT to wciąż dziki zachód IT. Sam sposób komunikacji autonomicznych, zdalnie pracujących urządzeń, to temat, do którego każdy projektant musi podejść sam, uwzględniając wymogi tworzonego systemu. Moje doświadczenie z IoT miało swój początek podczas pracy nad projektem AirVein. Zespół wówczas zdecydował się skorzystać z gotowej, prostej, a mimo to stabilnej i wydajnej usługi.


Jak się za to zabrać? 

Zacznę od wyboru biblioteki do integracji z usługami Azure oraz przedstawienia przykładowej implementacja komponentu odbierającego wiadomości w .NET Framework. Wykorzystywane narzędzia to darmowe konto Azure, Visual Studio z Cloud Explorer.

IoT Hub

IoT Hub jest usługą pośredniczącą w komunikacji dowolnego typu urządzeń. Komunikacja realizowana jest przy pomocy protokołów AMQP, MQTT oraz HTTPS. IoT Hub jest rozszerzeniem Azure Event Hubs – oprócz komunikacji urządzenie-chmura (Device2Cloud), udostępnia też komunikację chmura-urządzenie (Cloud2Device). Nie umożliwia jednak broadcastu do urządzeń – odbiorca zawsze musi być sprecyzowany.

Warto wspomnieć o szczególnym rodzaju komunikacji C2D, Direct Methods. W tym przypadku nie jest wysyłana wiadomość z danymi, a uruchamiana jest zdefiniowana na urządzeniu metoda. Ma to na celu symulowanie żądania synchronicznego i cechuje się dużą szybkością odpowiedzi.

W ramach IoT Hub istnieje usługa IoT Edge. Jest ona warstwą/urządzeniem pośrednim pomiędzy docelowym urządzeniem IoT a IoT Hub.

IoT Hub wspiera routing – wiadomości przychodzące mogą zostać odfiltrowane pod względem różnych kryteriów, a następnie przekazane do innych usług, jak np. Azure Service Bus. Sposób wysyłki D2C jest ściśle związany z możliwościami urządzenia, dlatego nie będę o tym pisał szczegółowo. Więcej o wysyłce D2C można znaleźć np. na MSDN.


Konfiguracja

W celu przetestowania Azure, wystarczy wejść na stronę i zarejestrować się. Po rejestracji użytkownicy mają możliwość bezpłatnego testowania. 

Tworzenie IoT Hub

Logujemy się do portalu Azure. W lewym górnym rogu klikamy w menu i wybieramy Create a resource. Wyszukujemy IoT Hub, klikamy Create. Zostaliśmy przekierowani do kreatora usługi. W zakładce Basic wybieramy region – północna Europa oraz podajemy nazwę dla naszego IoT Huba.

Należy także wyspecyfikować resource group – grupę zasobów i usług, które są ze sobą w jakiś sposób związane, np. komunikują się między sobą. Pamiętajmy, że początkowo żadna resource group nie istnieje, musimy ją utworzyć klikając Create new i podając nazwę.

Kolejna zakładka to Networking, w której określa się możliwości łączenia z usługą. Tutaj pozostawiamy domyślny Public.

Zakładka Size and Scale  dotyczy w głównej mierze wydajności. Ustawiamy Scale tier and units na F1: Free tier. Nie chcemy wypalić naszych darmowych kredytów zbyt szybko.

Number of units określa możliwości automatycznego horyzontalnego skalowania – uruchamiania dodatkowych instancji IoT Hub w przypadku większego obciążenia.

W Advanced settings znajduje się parametr Device-to-cloud partitions. Parametr ten odpowiada za ilość równoczesnych słuchaczy czytających wiadomości z IoT Hub. Każda partycja może mieć tylko jednego słuchacza per consumer group (o nich dalej). Kolejność komunikatów jest zachowana tylko w ramach jednej partycji. Więcej o partycjach dowiesz się np. tutaj.

Zakładka Tags pozwala zdefiniować tagi dla danego serwisu. Pomijamy i tworzymy IoT Hub.

Tworzenie urządzenia

IoT Hub został utworzony, więc możemy przejść do panelu zarządzania usługi. Aby urządzenie mogło wysyłać wiadomości do IoT Hub, musi zostać zdefiniowane IoT devices. Przechodzimy do IoT devices i klikamy New.

Podajemy ID urządzenia oraz ustawiamy Authentication type na Symmetric key. Zatwierdzamy urządzenie.

Zdefiniowane urządzenie jest gotowe do wykorzystania.

Wysyłka przez urządzenie

W celu przetestowania wysyłki D2C można by napisać symulator urządzenia, jednak dużo bardziej poręczne będzie użycie Cloud Explorer dostępnego w Visual Studio. Jeżeli brak go w Visual Studio, można go doinstalować z Visual Studio Marketplace.

W celu skorzystania z Cloud Explorera, trzeba zalogować się w Visual Studio na konto Microsoft. Wcześniej utworzyliśmy IoT Hub i nasze urządzenie. Przycisk konta znajduje się w prawym górnym rogu, tuż obok przycisku minimalizacji okna.

W celu przetestowania wysyłki wykonujemy następujące czynności:

  • Otwieramy Cloud Explorer
  • Rozwijamy subskrypcję powiązaną z naszym kontem
  • Odnajdujemy nasz IoT Hub
  • Klikamy prawym przyciskiem myszy i uruchamiamy monitoring wbudowanego endpointu


Monitor jest wbudowanym w Visual Studio narzędziem, pozwalającym w prosty sposób podejrzeć  czy nadesłane wiadomości trafiają do IoT Hub. W oknie Output obserwujemy komunikaty o utworzeniu nasłuchiwacza.

W celu  wysłania wiadomości rozwijamy IoT Hub, klikamy prawym przyciskiem na urządzeniu i wybieramy Send D2C message.

W oknie Output pojawiają się informacje zarówno o wysyłce, jak i o odbiorze.

Poza samym „ciałem” wiadomości, widzimy także System Properties i Application Properties. System Properties występuje zawsze i dla każdej wiadomości, nie można go rozszerzyć ani zawęzić. Application Properties to właściwości, które można dodać przy nadawaniu wiadomości (niestety, nie dostępne z poziomu Cloud Explorer). Jest to słownik string-string. Zarówno System Properties jak i Application Properties, mogą być użyte do routingu wiadomości.

Zwracam uwagę, że ciało wiadomości przesyłane jest w formie binarnej. Monitor Cloud Explorera bardzo wygodnie wyręczył nas przy konwersji. Jeżeli jednak będziemy wysyłać coś innego niż tekst, to sami musimy zabdać o odpowiednią deserializację.


Konektor IoT Hub

IoT Hub to rozszerzona wersja Event Hub. Z tego powodu do połączenia z nim używa się tych samych bibliotek. Microsoft udostępnia „EventProcessor” dla EventHub, automatyzujący m.in. połączenie, zarządzanie partycjami, obsługę równoległych słuchaczy czy śledzenie postępu odczytu (checkpointing).

Wybór biblioteki może być problematyczny. W sieci można znaleźć wiele artykułów, stron dokumentacji, tutoriali, każdy korzystający z innej biblioteki. Wynika to stąd, że początkowo, Event Hub był częścią usługi Azure Service Bus, po czym został wyodrębniony jako osobna usługa. Przez lata, zarówno Azure jak i oba huby ewoluowały. Obecnie istnieją trzy oficjalne biblioteki to:


Pierwsza to właśnie relikt z czasów, gdy Azure Event Hub był częścią Azure Service Bus. Wymaga .NET Framework 4.6.2 lub nowszego. Dwie pozostałe napisane są w .NET Standard. Mimo to, Microsoft.Azure.EventHubs.Processor uznana jest za przestarzałą i zalecana jest migracja do nowej biblioteki.

Biblioteki i parametry

Pierwszym krokiem jest utworzenie pustej solucji na przykładowe implementacje.

Następnym krokiem jest utworzenie dwóch projektów: aplikacji konsolowej oraz biblioteki klas.

Nadałem nazwy ConsoleApp i IoTHubListener. W IoTHubListener najlepiej od razu zmienić Class1 na coś lepiej opisującego, np. SimpleListener.

Kolejny krok to dodanie Azure.Messaging.EventHubs.Processor oraz do projektu IoTHubListener.

W tym celu otwieramy NuGet Package Manager w Tools, i Manage for Solution

Wyszukujemy Azure.Messaging.EventHubs.Processor i instalujemy dla IoTHubListener.

Mamy teraz wszystkie potrzebne biblioteki.

Wcześniej wspomniałem o automatyzacji jaką daje EventProcessorClient. Wymaga ona jednak dodatkowej usług – Azure Storage Account. W niej EventProcessorClient przechowuje swoje metadane. Tworzymy nowy zasób z poziomu portalu Azure – znajdujemy Storage Account, wybieramy Resource group, nadajemy nazwę. Pozostałe ustawienia pozostawiamy bez zmian – nie mają znaczenia w tym przykładzie.

Po utworzeniu Storage Account, tworzymy kontener Blob. To w kontenerze typu Blob przechowywane są metadane EventProcessorClient. W Storage Account odnajdujemy Blob service i dodajemy kontener.

EventProcessorClient do uruchomienia wymaga parametrów:

  • BlobContainerClient – klient do komunikacji z Storage Account. Biblioteka Azure.Storage.Blobs została automatycznie zainstalowana jako zależność dla EventProcessorClient.
  • string consumerGroup – grupa konsumentów. Wszystkie instancje z tej samej grupy współdzielą pozycję odczytu. Dla domyślnego endpointu Eventów IoT Hub jest to string „$Default”
  • string iotHubConnectionString – connection string do IoT Hub, dostępny w portalu Azure.


Jednak, aby nie było zbyt łatwo, jest pewien haczyk. Używamy biblioteki dedykowanej dla Event Hub. Z tego powodu trzeba dobrać dedykowany connection string. Znajduje się on w Built-in endpoints wybranego IoT Huba pod nazwą Event Hub-compatible endpoint.

BlobContainerClient wymaga connection string do Storage Account oraz nazwy kontenera Blob. Connection string znajdziemy w Access keys danego Storage Account.

Wszystkie parametry umieszczamy w App.config aplikacji konsolowej.

Mamy wszystko, co potrzebne, aby napisać konektor.

Implementacja

Zacznijmy od klasy SimpleListener. Celem jest przyjmowanie zdefiniowanych wcześniej parametrów, inicjowanie EventProcessorClient, rozpoczynanie i zatrzymywanie nasłuchiwanych eventów oraz umożliwienie dostępów do wysłanej zawartości tekstu.

Deklarujemy prywatne pole procesor oraz tworzymy konstruktor, który przyjmuje wymagane parametry, a następnie inicjuje EventProcessorClient:

public class SimpleListener
{
	private readonly EventProcessorClient processor; 
	public SimpleListener(
    		string iotHubConnectionString,
    		string consumerGroup,
    		string accountStorageConnectionString,
    		string blobContainerName)
	{
    	BlobContainerClient blobContainerClient = new BlobContainerClient(
        	accountStorageConnectionString,
        	blobContainerName);
    	processor = new EventProcessorClient(blobContainerClient,
        	consumerGroup,
        	iotHubConnectionString);
	}
}


EventProcessorClient do poprawnej pracy wymaga implementacji metod obsługujących nadejście eventu oraz wystąpienie błędu. Chwilowe placeholdery:

public SimpleListener(
string iotHubConnectionString,
	string consumerGroup,
	string accountStorageConnectionString,
	string blobContainerName)
{
	BlobContainerClient blobContainerClient = new BlobContainerClient(
    		accountStorageConnectionString,
    		blobContainerName);
	processor = new EventProcessorClient(
    		blobContainerClient,
    		consumerGroup,
    		iotHubConnectionString);
	processor.ProcessEventAsync += ProcessEventHandler;
	processor.ProcessErrorAsync += ProcessErrorHandler;
}
 
private async Task ProcessEventHandler(ProcessEventArgs ev)
{
}
private async Task ProcessErrorHandler(ProcessErrorEventArgs ev)
{
}


Umożliwiamy dostarczenie logiki procesującej otrzymane informacje tekstowe spoza klasy SimpleListener. Do tego wykorzystany zostanie delegat o parametrze wejściowym typu string. Ciało eventu musi zostać przekonwertowane do string przed przekazaniem do delegatu.

public class SimpleListener
{
	private readonly EventProcessorClient processor;
 
	public delegate void OnEvent(string data);
 
	public OnEvent ProcessEventData { get; set; }
	public OnEvent ProcessEventError { get; set; }
 
	public SimpleListener(
    	string iotHubConnectionString,
    	string consumerGroup,
    	string accountStorageConnectionString,
    	string blobContainerName)
	{
    	BlobContainerClient blobContainerClient = new BlobContainerClient(
       		 	accountStorageConnectionString,
        		blobContainerName);
    		processor = new EventProcessorClient(
        		blobContainerClient,
        	consumerGroup,
       		 	iotHubConnectionString);
    		processor.ProcessEventAsync += ProcessEventHandler;
    		processor.ProcessErrorAsync += ProcessErrorHandler;
	}
 
	private string BodyToString(ReadOnlyMemory<byte> body)
	{
    		return Encoding.UTF8.GetString(body.ToArray());
	}
	private async Task ProcessEventHandler(ProcessEventArgs ev)
	{
        	ProcessEventData(BodyToString(ev.Data.Body));
	}
	private async Task ProcessErrorHandler(ProcessErrorEventArgs ev)
	{
    		ProcessEventError(ev.Operation);
	}
}


Publiczne property ProcessEventData i ProcessEventData pozwalają na dodanie dowolnej ilości zewnętrznych handlerów. Pozostaje dodać metody rozpoczynania i kończenia nasłuchiwania. Kompletna implementacja klasy SimpleListener:

public class SimpleListener
{
	private readonly EventProcessorClient processor;
 
	public delegate void OnEvent(string data);
 
	public OnEvent ProcessEventData { get; set; }
	public OnEvent ProcessEventError { get; set; }
 
	public SimpleListener(
    	string iotHubConnectionString,
    	string consumerGroup,
    	string accountStorageConnectionString,
    	string blobContainerName)
	{
    		BlobContainerClient blobContainerClient = new BlobContainerClient(
        		accountStorageConnectionString,
        	blobContainerName);
    		processor = new EventProcessorClient(
        	blobContainerClient,
       		 	consumerGroup,
        		iotHubConnectionString);
    	processor.ProcessEventAsync += ProcessEventHandler;
    		processor.ProcessErrorAsync += ProcessErrorHandler;
	}
 
	public void Start()
	{
    		processor.StartProcessingAsync();
	}
	public void Stop()
	{
    		processor.StopProcessingAsync();
	}
 
	private string BodyToString(ReadOnlyMemory<byte> body)
	{
    		return Encoding.UTF8.GetString(body.ToArray());
	}
	private async Task ProcessEventHandler(ProcessEventArgs ev)
	{
        	ProcessEventData(BodyToString(ev.Data.Body));
	}
	private async Task ProcessErrorHandler(ProcessErrorEventArgs ev)
	{
    		ProcessEventError(ev.Operation);
	}
}


Ostatni krok to zainicjowanie SimpleListener w metodzie Main aplikacji konsolowej i uruchomienie nasłuchiwania. W tym celu wydobywamy parametry z AppSettings przy użyciu ConfigureManager.

class Program
{
	static void Main()
	{
    		string iotHubConnectionString = ConfigurationManager
            		.AppSettings["iotHubConnectionString"];
    		string consumerGroup = ConfigurationManager
            		.AppSettings["consumerGroup"];
    		string accountStorageConnectionString = ConfigurationManager
            		.AppSettings["accountStorageConnectionString"];
    		string blobContainerName = ConfigurationManager
            		.AppSettings["blobContainerName"];
 
    		SimpleListener listener = new SimpleListener(
        		iotHubConnectionString,
        		consumerGroup,
        		accountStorageConnectionString,
        		blobContainerName);
 
    		listener.ProcessEventData +=
            		(string data) => Console.WriteLine($"msg: {data}");
    		listener.ProcessEventError +=
            		(string data) => Console.WriteLine($"error: {data}");
 
    		Console.WriteLine("Starting listener...");
    		listener.Start();
    		Console.ReadKey();
    		listener.Stop();
	}
}


Czas uruchomić program. Nawiązywanie połączenia z IoT Hub może zająć kilka sekund. EventProcessorClient odczyta wcześniej wysłaną wiadomość „hello”.

Wyślijmy kolejną przy pomocy Cloud Explorera:

Kolejna wiadomość została odebrana. Sukces!

Śledzenie postępu odczytu

Co się stanie, jeżeli uruchomię program ponownie? Sprawdźmy…

Niestety, ponownie zostały odczytane wszystkie wiadomości. Dlaczego tak się stało? 

Nie zaznaczyłem punktu postępu (checkpoint). To, co EventProcessorClient faktycznie automatyzuje w tym zakresie, to wykorzystanie rozproszonego cache – nasz kontener blob. Aby zaznaczać checkpoint, trzeba wywołać metodę UpdateCheckpointAsync, znajdującą się w strukturze ProcessEventArgs. Dodamy ją do ProcessEventHandler w SimpleListener:

private async Task ProcessEventHandler(ProcessEventArgs ev)
{
ProcessEventData(BodyToString(ev.Data.Body));
await ev.UpdateCheckpointAsync();
}


Aby przetestować zmiany, trzeba:

  • uruchomić program
  • odczytać wiadomości
  • zamknąć program
  • wysłać nową wiadomość
  • uruchomić program ponownie.


 W efekcie:

Postęp odczytu został zachowany.

Bonus - Reaktywny konektor

Przy obsłudze zdarzeń polecam Reactive Extensions, jako pewnego rodzaju udogodnienie. Jest to .NET-owy odpowiednik znanej frontendowcom biblioteki RxJS. Wykorzystuje ona wzorzec obserwatora do procesowania zdarzeń, jednak posiada wielkie możliwości.

Utworzymy przykładowy RXListener. W tym celu instalujemy bibliotekę System.Reactive oraz System.Reactive.Linq dla obydwu projektów.

W projekcie biblioteki, kopiujemy SimpleListener, zmieniamy nazwę pliku i nazwę klasy na RXListener. Pozbywamy się delegatów. W ich miejsce dodajemy prywatne Subject i publiczne Observable.

public class RXListener
{
	private readonly EventProcessorClient processor;
 
	private readonly ISubject<string> MessagesSubject = new Subject<string>();
	private readonly ISubject<string> ErrorsSubject = new Subject<string>();
	public IObservable<string> Messages => MessagesSubject.AsObservable();
	public IObservable<string> Errors => ErrorsSubject.AsObservable();
 
	public RXListener(
string iotHubConnectionString,
string consumerGroup,
string accountStorageConnectionString,
string blobContainerName)
	{
    		BlobContainerClient blobContainerClient = new BlobContainerClient(
accountStorageConnectionString,
blobContainerName);
    		processor = new EventProcessorClient(
blobContainerClient,
consumerGroup,
iotHubConnectionString);
    		processor.ProcessEventAsync += ProcessEventHandler;
  	 	processor.ProcessErrorAsync += ProcessErrorHandler;
	}


Czym jest observable? To obiekt, który informuje obserwatorów, swoich subskrybentów o wyemitowanej nowej wartości. Każdy obserwator dostaje wiadomość i może prowadzić dalsze transformacje wiadomości przy użyciu specjalnych operatorów.

Czym jest subject? To specjalny typ observable, który oprócz nasłuchiwania pozwala emitować wartości.

Metoda AsObservable pozwala ograniczyć funkcjonalność subject do observable. Dlatego pola typu subject są prywatne. Tylko odczyt z IoT Hub powoduje emisję wartości, nic z zewnątrz. Publiczne mają ograniczone wersje, z ogólnym dostępem.

W handlerach zastępujemy wywołanie delegatu emisją wartości do subject.

public class RXListener
{
	private readonly EventProcessorClient processor;
 
	private readonly ISubject<string> MessagesSubject = new Subject<string>();
	private readonly ISubject<string> ErrorsSubject = new Subject<string>();
	public IObservable<string> Messages => MessagesSubject.AsObservable();
	public IObservable<string> Errors => ErrorsSubject.AsObservable();
 
	public RXListener(
    		string iotHubConnectionString,
    		string consumerGroup,
    		string accountStorageConnectionString,
    		string blobContainerName)
	{
    		BlobContainerClient blobContainerClient = new BlobContainerClient(
        		accountStorageConnectionString,
        		blobContainerName);
    		processor = new EventProcessorClient(
        		blobContainerClient,
        		consumerGroup,
        		iotHubConnectionString);
    		processor.ProcessEventAsync += ProcessEventHandler;
    		processor.ProcessErrorAsync += ProcessErrorHandler;
	}
 
	public void Start()
	{
    		processor.StartProcessingAsync();
	}
	public void Stop()
	{
    		processor.StopProcessingAsync();
	}
 
	private string BodyToString(ReadOnlyMemory<byte> body)
	{
    		return Encoding.UTF8.GetString(body.ToArray());
	}
	private async Task ProcessEventHandler(ProcessEventArgs ev)
	{
       		MessagesSubject.OnNext(BodyToString(ev.Data.Body));
    		await ev.UpdateCheckpointAsync();
	}
	private async Task ProcessErrorHandler(ProcessErrorEventArgs ev)
	{
    		ErrorsSubject.OnNext(ev.Operation);
	}
}


Metodę Main modyfikujemy następująco:

class Program
{
	static void Main()
	{
    		string iotHubConnectionString = ConfigurationManager
        		.AppSettings["iotHubConnectionString"];
    	string consumerGroup = ConfigurationManager
       		 	.AppSettings["consumerGroup"];
    	string accountStorageConnectionString = ConfigurationManager
       		 	.AppSettings["accountStorageConnectionString"];
    	string blobContainerName = ConfigurationManager
       		 	.AppSettings["blobContainerName"];
 
    		//SimpleListener listener = new SimpleListener(
    		//	iotHubConnectionString,
    		//	consumerGroup,
    		//	accountStorageConnectionString,
    		//	blobContainerName);
    		//listener.ProcessEventData +=
    		//	(string data) => Console.WriteLine($"msg: {data}");
    		//listener.ProcessEventError +=
    		//	(string data) => Console.WriteLine($"error: {data}");
 
        	RXListener listener = new RXListener(
        		iotHubConnectionString,
        		consumerGroup,
        		accountStorageConnectionString,
        		blobContainerName);
    		listener.Messages
        		.Select(x => $"msg: {x}") //rxjs map
        		.Do(x => Console.WriteLine(x)) //rxjs tap
        		.Subscribe();
    		listener.Errors
        		.Subscribe(x =>
        		{
            			Console.WriteLine($"error: {x}");
        		});
 
    	Console.WriteLine("Starting listener...");
    	listener.Start();
    	Console.ReadKey();
    	listener.Stop();
	}
}


Stosowane są dwa podejścia. Obsługa emisji za pomocą łańcucha operatorów (Messages) i obsługa za pomocą wyrażenia przekazanego w Subscribe (Error). Oba realizują dokładnie to samo: sformatuj i wyświetl wiadomość. Select modyfikuje dane z każdej emisji, modyfikacja przekazywana jest do kolejnego operatora lub Subscribe. Metoda Do wykonuje podaną akcję dla każdej emisji, ale nie modyfikuje danych.

Istotne jest wywołanie metody Subscribe, dopiero to rozpoczyna obserwację observable. Wszystko wcześniej to tylko modyfikatory. Przypomina to budowę IQueryable bez zawołania ToList na końcu.

W tym prostym przykładzie użycie Reactive Extensios powoduje, że kod jest obszerniejszy, jednak w bardziej złożonych przypadkach potrafi zwiększyć przejrzystość i reużywalność, np. poprzez połączenie kilku źródeł w jedno observable.


Podsumowanie

Mam nadzieję, że przybliżyłem Wam sposób wykorzystania IoT Hub do odbioru komunikatów. Przedstawiłem pełen proces, od konfiguracji środowiska i elementy platformy Azure, po implementację prostych konektorów do usług chmurowych. Wykorzystując IoT Hub możecie tworzyć bezpieczne, kompleksowe rozwiązania nawiązujące komunikację między wieloma urządzeń IoT. Warto zgłębić temat. :)

Zobacz kogo teraz szukają