Nasza strona używa cookies. Dowiedz się więcej o celu ich używania i zmianie ustawień w przeglądarce. Korzystając ze strony, wyrażasz zgodę na używanie cookies, zgodnie z aktualnymi ustawieniami przeglądarki. Rozumiem

Java NIO. Jak zbudować prosty, nieblokujący serwer?

Maciej Marczak Full-Stack Java Developer, Bloger @ Kodujze.pl
Krótko i zwięźle o pakiecie java.nio.
Java NIO. Jak zbudować prosty, nieblokujący serwer?

Ostatnio zacząłem uważniej przyglądać się reaktywnemu programowaniu i frameworkowi WebFlux, który ukazał się wraz z releasem Spring 5.0. Przy tej okazji spędziłem sporo czasu na czytaniu o nieblokujących serwerach HTTP, będących swoistego rodzaju fundamentem, na którym zbudowany jest cały ten reaktywny stos. Fundamentem, który dla wielu programistów jest… czarną skrzynką.

Być może to tylko moje odczucie, ale odnoszę wrażenie, że coraz częściej uczymy się API frameworków i bibliotek, a nie zasad ich działania. Z tego powodu postanowiłem napisać krótki post o pakiecie java.nio, w oparciu o który zbudowane są takie projekty jak Netty czy Undertow (używane we wspomnianym WebFluxie).

Czym jest nieblokujący serwer?

W „tradycyjnym” podejściu serwer nasłuchuje w pętli ruch na zadanym porcie, a gdy tylko pojawia się nowe żądanie, oddelegowuje pracę związaną z jego obsługą do wcześniej utworzonej puli wątków. Model ten ma pewne wady. Po pierwsze, w każdym momencie liczba jednocześnie obsługiwanych klientów może być co najwyżej równa rozmiarowi puli. Po drugie, jeżeli chociaż jeden z tych klientów korzysta ze słabego łącza to wątek, który został mu przypisany, przez większość czasu jest bezczynny w swoim oczekiwaniu na kolejne bity zapytania. W tym czasie mógłby zająć się innym klientem, więc jest to poważne marnowanie zasobów.

Nieblokujący serwer to serwer, który stara się zaadresować te problemy. W nieblokującym serwerze jeden wątek może obsługiwać wiele zapytań naraz. Jest to możliwe dzięki zastosowaniu nieblokującego IO, implementowanego w Javie przez klasy z pakietu java.nio.

java.nio

Wbrew  powszechnemu przekonaniu NIO nie jest akronimem od Non-blocking IO, lecz New IO (java.nio jest nowszą wersją paczki java.io, a nie tylko jej rozszerzeniem). Pakiet ten jest częścią biblioteki standardowej Javy od wersji 1.4 i dostarcza narzędzi służących do przeprowadzania operacji wejścia/wyjścia zarówno w sposób blokujący, jak i nieblokujący. Został zbudowany w oparciu o trzy główne abstrakcje:

  • bufory (buffers)
  • kanały (channels)
  • selektory (selectors)



Bufor w kontekście niskopoziomowych mechanizmów


W kontekście niskopoziomowych mechanizmów bufor to blok pamięci, w którym tymczasowo umieszcza się dane odbierane lub wysyłane do zewnętrznego urządzenia. Jest swego rodzaju pośrednikiem, który umożliwia komunikację urządzeń przetwarzających informacje z różnymi prędkościami.

Znaczenie buforów można wyjaśnić poprzez analogię cieknącego wiadra (leaky bucket analogy). Wyobraźmy sobie, że mamy wiaderko z dziurą na dnie. Możemy do niego wlewać wodę z dowolną prędkością, lecz prędkość jej uciekania zdeterminowana jest przez rozmiar dziury. W analogi tej wiaderko to bufor, prędkość wlewania wody do niego to prędkość, z jaką proces chce wysyłać dane, a rozmiar dziury to ograniczenia interfejsu sieciowego.

Bufor w kontekście Javy


W kontekście Javy bufor to nic innego jak klasa, która opakowuje swój niskopoziomowy odpowiednik w stan i metody służące do jego łatwiejszej manipulacji. Prócz zawartości, Java śledzi również takie własności bufora jak jego:

  • pozycję (position) – indeks następnego elementu, który ma zostać odczytany/zapisany
  • limit (limit) – indeks pierwszego elementu, który ma nie być odczytany/zapisany
  • ładowność (capacity) – rozmiar bufora


Aby lepiej zrozumieć te charakterystyki, a także zobaczyć w akcji działanie pewnych metod służących do manipulacji buforem, spójrzmy na przykład.

// Stwórz nowy CharBuffer o rozmiarze 80 bajtów.
CharBuffer buffer = CharBuffer.allocate(80);
// Pozycja: 0, Limit: 80, Ładowność: 80
 
 
// Umieść ciąg znaków "Hello World!" w buforze.
for (char c : "Hello World!".toCharArray()) {
    buffer.put(c);
}
// Pozycja: 12, Limit: 80, Ładowność: 80
 
 
// Zaktualizuj pozycję i limit w taki sposób, aby wskazywały odpowiednio
// początek i koniec umieszczonych danych.
buffer.flip();
// Pozycja: 0, Limit: 12, Ładowność: 80
 
 
// Odczytaj zawartość bufora.
StringBuilder sb = new StringBuilder();
for (int i = buffer.position(); i < buffer.limit(); i++) {
    sb.append(buffer.get());
}
// Pozycja: 12, Limit: 12, Ładowność: 80
// sb.toString() -> "Hello World!"



Kanał


Kanał reprezentuje połączenie z obiektem zdolnym do przeprowadzania operacji wejścia/wyjścia
. Korzysta z buforów, z których czyta dane przeznaczone do wysłania, i do których zapisuje otrzymane informacje. Tak jak Bufor jest opakowaniem natywnego bufora, tak kanał jest warstwą abstrakcji, pod którą bezpośrednio kryje się deskryptor pliku.

// Utwórz kanał w oparciu o gniazdo sieciowe i nawiąż połączenie ze stroną www.google.com.
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("www.google.com", 80));



Selektor 


Niektóre kanały (m.in. te związane z komunikacją sieciową) rozszerzają klasę abstrakcyjną SelectableChannel. Kanały reprezentowane przez instancje tej klasy mają taką własność, że można je ustawić w nieblokujący tryb pracy i multipleksować. Narzędziem, które służy do multipleksacji, jest selektor.

Zasada działania selektora jest bardzo prosta. Po utworzeniu rejestrujemy w nim wszystkie kanały, które chcemy nasłuchiwać. W wyniku tej operacji każdemu kanałowi przypisywany jest selectionKey – obiekt jednoznacznie identyfikujący go i zawierający informacje o jego stanie (np. gotowość do przyjęcia danych). Następnie cyklicznie odpytujemy selektor o listę kluczy, których kanały są gotowe do przeprowadzenia operacji wejścia/wyjścia.

// Zakładamy, że w tym momencie mamy już utworzone dwa kanały
// typu SocketChannel: channel_1, channel_2.
 
 
// Ustawiamy tryby pracy kanałów jako nieblokujący.
channel_1.configureBlocking(false);
channel_2.configureBlocking(false);
 
 
// Tworzymy selektor.
Selector selector = Selector.open();
 
 
// Rejestrujemy kanały w selektorze. OP_READ oznacza, że chcemy być informowani
// o gotowości do przeprowadzenia operacji READ.
channel_1.register(selector, SelectionKey.OP_READ);
channel_2.register(selector, SelectionKey.OP_READ);
 
 
for (;;) {
    // W pętli prosimy o sprawdzenie dostępności kanałów.
    selector.select();
 
 
    // I pobieramy klucze tych kanałów, które są gotowe do przeprowadzenia
    // operacji READ (zgodnie z konfiguracją powyżej).
    Set<SelectionKey> keys = selector.selectedKeys();
 
 
    // Przetwarzamy zbiór, pamiętając, by usunąć z niego
    // wszystkie klucze. Selektor tego nie robi, więc jest to nasza
    // odpowiedzialność.
}


Przykład 

W ramach podsumowania złożymy w całość wszystkie opisane powyżej elementy i stworzymy prosty, nieblokujący, jednowątkowy serwer. Serwer ten przetransformuje każdy otrzymany tekst na wersję UPPERCASE i odeśle go z powrotem do klienta.

class SimpleServer {
 
    // Z każdym klientem powiążemy kolejkę oczekujących na wysłanie buforów.
    private final Map<SocketChannel, Queue<ByteBuffer>> pendingData = new HashMap<>();
 
    private ServerSocketChannel ssc;
    private Selector selector;
 
    public void start() throws IOException {
        selector = Selector.open();
 
        ssc = ServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(8090));
        ssc.configureBlocking(false);
 
        // Chcemy być informowani o gotowości do akceptacji nowego połączenia.
        ssc.register(selector, SelectionKey.OP_ACCEPT);
 
        while (true) {
            selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
 
            for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext(); ) {
 
                SelectionKey sk = it.next();
                // Należy pamiętać o usuwaniu przetworzonych kluczy!
                it.remove();
 
                if (sk.isValid() && sk.isAcceptable()) {
                    // Nowy klient czeka na akceptację.
                    handleAccept();
                } else if (sk.isValid() && sk.isReadable()) {
                    // Możemy wykonać nieblokującą operację READ na kliencie.
                    handleRead(sk);
                } else if (sk.isValid() && sk.isWritable()) {
                    // Możemy wykonać nieblokującą operację WRITE na kliencie.
                    handleWrite(sk);
                }
            }
        }
    }
 
    private void handleAccept() throws IOException {
        SocketChannel sc = ssc.accept();
 
        if (sc != null) {
            sc.configureBlocking(false);
            // Chcemy być informowani o możliwości wykonania nieblokującej
            // operacji READ na kliencie.
            sc.register(selector, SelectionKey.OP_READ);
 
            pendingData.put(sc, new LinkedList<ByteBuffer>());
        }
    }
 
    private void handleRead(SelectionKey sk) throws IOException {
        // Pobieramy kanał powiązany z zadanym kluczem.
        SocketChannel sc = (SocketChannel) sk.channel();
        ByteBuffer bb = ByteBuffer.allocate(120);
 
        // Czytamy z kanału sc do bufora bb. Zmienna read zawiera
        // liczbę przeczytanych znaków.
        int read = sc.read(bb);
        if (read == -1) {
            // -1 -> EOF. Usuwamy klienta z mapy i zamykamy połączenie.
            pendingData.remove(sc);
            sc.close();
        } else if (read > 0) {
            // Aktualizujemy pozycję i limit w buforze, a także podmieniamy jego
            // zawartość na wersję UPPERCASE.
            bb.flip();
            for (int i = 0; i < bb.limit(); i++) {
                bb.put(i, (byte) Character.toUpperCase((char) bb.get(i)));
            }
            // Przetworzony bufor zostaje dodany do kolejki klienta.
            pendingData.get(sc).add(bb);
            // Po odczytaniu danych chcemy wysłać przetworzone wejście z powrotem
            // do klienta, więc od teraz interesuje nas, kiedy można wykonać na nim
            // nieblokującą operację WRITE.
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }
 
    private void handleWrite(SelectionKey sk) throws IOException {
        SocketChannel sc = (SocketChannel) sk.channel();
        Queue<ByteBuffer> queue = pendingData.get(sc);
 
        while (!queue.isEmpty()) {
            ByteBuffer bb = queue.peek();
            // Piszemy do kanału sc z bufora bb. Zmienna write zawiera 
            // liczbę wysłanych znaków.
            int write = sc.write(bb);
            if (write == -1) {
                pendingData.remove(sc);
                sc.close();
                return;
            } else if (bb.hasRemaining()) {
                // Nie udało się wysłać całej zawartości bufora. Oznacza to, że w trakcie
                // wykonywania operacji write przestało być możliwe nieblokujące
                // wysyłanie danych. Opuszczamy metodę - reszta bufora zostanie wysłana
                // przy następnej okazji.
                return;
            }
 
            // Wysłaliśmy cały bufor. Usuwamy go z kolejki.
            queue.remove();
        }
 
        // Wysłaliśmy odpowiedź. Wracamy do nasłuchiwania zapytań od klienta.
        sk.interestOps(SelectionKey.OP_READ);
    }
}
Zobacz więcej na Bulldogjob

Masz coś do powiedzenia?

Podziel się tym z 80 tysiącami naszych czytelników

Dowiedz się więcej
Rocket dog