Nasza strona używa cookies. Korzystając ze strony, wyrażasz zgodę na używanie cookies, zgodnie z aktualnymi ustawieniami przeglądarki. Rozumiem

Szybkie pobieranie dużej ilości danych z AWS Lambda i Amazon SQS

David Hurley Data Scientist / Golder
Dowiedz się, jak możesz wykorzystać AWS Lambda oraz Amazon SQS do szybkiego pobierania dużej ilości danych.
Szybkie pobieranie dużej ilości danych z AWS Lambda i Amazon SQS

W tym artykule omówię, w jaki sposób wykorzystałem AWS Lambda i Amazon SQS do pobrania 194 204 plików (w formacie CSV) do Amazon S3, z prędkością 350 razy większą niż na moim komputerze stacjonarnym. Ostatnio stworzyłem aplikację webową do pobierania historycznych danych pogodowych w Kanadzie. By wszystko działało, musiałem pobrać dane z zewnętrznego serwera dla 8324 niezależnych stacji pogodowych i przechowywać je w bazie danych (Amazon S3). Wydaje się to proste, prawda? Wystarczy napisać trochę kodu, uruchomić go i z głowy... no niestety nie.

Każda stacja posiada dane pochodzące z wielu lat, a zewnętrzny serwer pozwala na ściągnięcie danych tylko z jednego roku. Krótko mówiąc, pobranie zestawu danych dla wszystkich 8324 stacji oznaczałoby pobranie 194 204 oddzielnych plików (tj. jeden plik na każdy rok danych z jednej stacji), łączenie danych z poszczególnych stacji i przekazanie danych do Amazon S3.

Problemem z pobieraniem danych nie jest rozmiar pliku (cały zestaw danych ma tylko 4,4 GB), ale prędkość pobierania i opóźnienie sieci. Na moim komputerze stacjonarnym pobranie danych z jednego roku i przekazanie ich do Amazon S3 zajmuje około 1,7 sekundy. Oznacza to, że pobieranie na moim komputerze wszystkich 194 204 plików i przesyłanie do Amazon S3 potrwa 3,9 dni, przy założeniu, że nie ma przerw w połączeniu lub pobieraniu 😲 Stać nas na więcej.


Rozwiązanie

Oczywiste jest, że pobieranie danych na komputer stacjonarny jest niepraktyczne i niepewne, szczególnie przy tak łatwym dostępie do chmury.

Ostatecznie zdecydowałem się użyć AWS Lambda do uruchomienia mojego kodu Pythona i kolejki Amazon SQS, aby wywołać zdarzenia Lambda i komunikować zadania pobierania.


Zarys

Nie zamierzam zbytnio skupiać się tutaj na AWS Lambda oraz na SQS. Istnieją już źródła, które dostatecznie wyczerpują temat. Przedstawię Wam jednak ogólny zarys tych narzędzi oraz opowiem o frameworku, którego użyłem. 

AWS Lambda może posłużyć do uruchomienia kodu poprzez zdarzenia i to bez potrzeby konfigurowania i zarządzania serwerami. Amazon SQS służy do wysyłania, magazynowania oraz przekazywania wiadomości między komponentami oprogramowania (tj. wiadomości wysyłane między dwoma funkcjami AWS Lambda).

Ja użyłem jednej funkcji Lambda do generowania wiadomości, które zawierają takie informacje jak nazwę oraz identyfikator stacji meteorologicznej (dla każdej z 8324 stacji). Każda wiadomość jest wysyłana i przechowywana w kolejce SQS. 

Kolejna funkcja AWS Lambda została zaprojektowana do uruchamiania się w odpowiedzi na wydarzenie SQS (tj. wiadomość docierającą do kolejki). Funkcja Lambda pobiera wiadomości z kolejki SQS, ściągając dane przy użyciu informacji pochodzących z wiadomości i wypychając je do Amazon S3, które je przechowa.
  

AWS Lambda i Amazon SQS współpracujące ze sobą


Dlaczego Lambda i SQS są takie przydatne

Poniżej przedstawię kilka powodów, dla których uważam, że Lambda oraz SQS są świetne. Omówię również sposób, w jaki przyśpieszają czas uruchamiania. 

  • Funkcja Lambdy może pobrać do 10 wiadomości z kolejki SQS za jednym razem
  • Funkcji Lambdy można używać współbieżnie. Na przykład, wykonanie zadania 10 razy używając „for loop” (gdzie wykonanie jednej pętli zajmuje 1 sekundę) zajęłoby 10 sekund. Współbieżne wykonywanie pozwala na klonowanie funkcji Lambdy i wykonanie 10 identycznych funkcji w tym samym czasie — zadanie zostaje wtedy ukończone w 1 sekundę. 
  • Funkcja Lambdy może alokować konkretne ilości pamięci (czy procesora). Przyśpiesza to znacznie czas wykonywania. 


Zwróć jednak uwagę na to, że należy zachować ostrożność przy zwiększaniu współbieżności oraz pamięci funkcji AWS Lambda, ponieważ może to doprowadzić do zwiększenia kosztów. 


Po co to wszystko

Możesz się zastanawiać, dlaczego w ogóle wysyłać informacje z jednej funkcji Lambdy do drugiej przez SQS. Można przecież, zamiast tego użyć jednej funkcji Lambdy do wszystkiego. Funkcje AWS Lambda mają ograniczenie maksymalnego czasu wykonania ustawione na 15 minut. Jeśli chciałbym jednak wykonać 30 zadań, z których każde potrzebuje 1 minuty, to przekroczyłabym próg 15 minut. Zamiast tego, muszę rozbić te 30 zadań na kawałki z czasem wykonania niższym niż 15 minut. 

Używając SQS, mogę uruchomić Lambdę i zdefiniować ile zadań jestem w stanie przy jej pomocy zabrać z kolejki (max 10 zadań), upewniając się przy tym, że łączny ich czas utrzymuje się poniżej limitu czasu wykonywania. 

Kod


Krok 1:

Użyj AWS Lambda do wysyłania wiadomości do kolejki SQS. W moim przypadku wysłałem wiadomość dla każdej stacji meteorologicznej, która zawiera takie informacje, jak ID stacji oraz początek i koniec okresu danych. 

###  SEND MESSAGES TO SQS MESSAGE QUEUE WITH AWS LAMBDA

import boto3
import pandas as pd

#  list of metadata for each weather station
df = pd.read_csv('WEATHER-STATION-METADATA.CSV')

#  direct to sqs queue
sqs = boto3.client('sqs',region_name='REGION-NAME')
queue_url = 'QUEUE-URL'
    
#  lambda function to send messages to sqs queue
def lambda_handler(event, context):
    
    #  one message for each weather station
    for i in range(0, len(df)):
        
        response = sqs.send_message(
            QueueUrl=queue_url,
            MessageAttributes={
                
                'weather_station_id': {
                    'DataType': 'String',
                    'StringValue': df.station_id[i],

                },
                'data_start_year': {
                    'DataType': 'String',
                    'StringValue': df.first_year_of_data[i]
                },
                'data_end_year': {
                    'DataType': 'String',
                    'StringValue': df.last_year_of_data[i]
                }
                
            },
            
            MessageBody=(
                'Download Information for Weather Station: ' \
                ' Station ID: ' + df.station_id[i]
            )
        )
                
    return {
        'Message Status': '200'
        
    }


Funkcja AWS Lambda do wysyłania wiadomości do kolejki SQS


Krok 2:

Stwórz funkcję AWS Lambda, która uruchamia się, kiedy w kolejce są dostępne wiadomości. U mnie funkcja Lambdy pobiera wiadomości SQS, używa zawartych w nich informacji do pobrania danych o stacjach meteorologicznych i wypycha je do Amazon S3.

Skonfigurowałem funkcję Lambda do czytania 10 wiadomości z kolejki po kolei. Utrzymuje to całkowity czas wykonywania poniżej limitu 15 minut. Skonfigurowałem również funkcję Lambdy ze współbieżnością do 1000 (Lambda może wywołać funkcję 1000 razy jednocześnie) i 256 MB pamięci. 

###  READ MESSAGES FROM SQS MESSAGE QUEUE AND DOWNLOAD AND STORE DATA WITH AWS LAMBDA

import pandas as pd
from io import StringIO
import boto3

#  url pointing to data on third-party server
external_url = 'https://climate.weather.gc.ca/climate_data/bulk_data_e.html?' \
                     'format=csv&stationID={}&Year={}&Month=1&Day=1&timeframe=2'

#  lambda function to offload messages from sqs queue and download and store data
def lambda_handler(event, context):
    
    #  process each message, up to 10
    for record in event['Records']:
        
            #  extract information from message
            station_id = record['messageAttributes']['weather_station_id']['stringValue']
            first_year_of_data = record['messageAttributes']['data_start_date']['stringValue']
            last_year_of_data = record['messageAttributes']['data_end_date']['stringValue']
            
            #  download data from external url
            download_years = range(num(first_year_of_data), num(last_year_of_data))
            list_of_urls = [external_url.format(station_id, year) for year in download_years]
            df = pd.concat((pd.read_csv(url) for url in list_of_urls))
            
            #  send data to S3
            csv_buffer = StringIO()
            df.to_csv(csv_buffer)
            s3_resource = boto3.resource('s3')
            obj = s3_resource.Object('BUCKET-NAME', 'FILENAME')
            obj.put(Body=csv_buffer.getvalue())

    return {
        'Status': '200'
    }

Funkcja AWS Lambda czyta wiadomości z kolejki SQS, pobiera i przechowuje dane

Ustawianie SQS jako wyzwalacz do uruchomienia Lambdy w konsoli


Wyniki

Pobranie i wypchnięcie wszystkich danych o 8 324 stacjach pogodowych do Amazon S3 zajęłoby 3,9 dnia na moim komputerze stacjonarnym. 

Używanie AWS Lambda oraz Amazon SQS umożliwiło mi ukończenie pobierania w 16 minut, czyli 350 razy szybciej niż na moim komputerze. 

Generowania wiadomości dla kolejki SQS (krok 1) zajmuje około 8 minut i kolejne 8 minut, żeby Lambda współbieżnie ściągnęła i wypchnęła dane do Amazon S3 (krok 2).

Całkowity koszt z darmową wersją AWS wyniósłby $0. Nieużywając darmowej wersji zapłacilibyśmy $4.30. 

  • Zająłbym się tym na moim komputerze bez kosztów, ale musiałbym liczyć na nieprzerwane połączenie internetowe przez 3.9 dnia. 
  • Mógłbym to zrobić dzięki instancji EC-2 i uruchomieniu cronjob, ale łatwość, z jaką da się używać AWS Lambda, była warta każdych pieniędzy.


Podsumowując, bardzo się cieszę, że odkryłem combo AWS Lambda oraz SQS. Z pewnością wykorzystam to jeszcze w przyszłości.


Oryginał tekstu w języku angielskim możesz przeczytać tutaj

Nie przegap treści Bulldogjob
Subskrybuj artykuły

Lubisz dzielić się wiedzą i chcesz zostać autorem?

Podziel się wiedzą z 160 tysiącami naszych czytelników

Dowiedz się więcej