Cześć. Ostatnio w ramach zajęć na uczelni miałem okazję popracować nieco z kafką. Był to mój pierwszy kontakt z tą platformą i chciałbym opisać swoje doświadczenia – jak to wyglądało, z jakimi problemami się spotkałem oraz podać parę ciekawych linków do nauki, przez które sam przeszedłem i śmiało mogę polecić innym – bo są po prostu dobre :).
Oto zadanie jakie dostałem w ramach zajęć:
Zaimplementuj prototyp systemu monitorującego zasoby komputerów w klastrze korzystając z Apache Kafka. W minimalnej formie system powinien monitorować zużycie procesora i zajętość RAM. System powinien składać się z następujących procesów:
- procesy monitorujące (agenci) powinny działać na obserwowanych maszynach w klastrze (węzłach).
- proces główny powinien na bieżąco analizować dane nadsyłane przez procesy monitorujące.
Agenci przesyłają periodycznie (np. co 2-5 sekund) wiadomości o stanie monitorowanego systemu.Proces główny monitoruje i analizuje dane:
wylicza średnie zużycie zasobów dla całego klastra oraz dla poszczególnych węzłów;
reaguje na nagłe zmiany obciążenia klastra i poszczególnych węzłów (np. logując wiadomości z priorytetem WARNING kiedy średnie obciążenie CPU lub RAM klastra przekroczy określony próg).
Nie udało mi się zrealizować całego zadania – i nie będę ukrywać po części było to spowodowane brakiem chęci 😪 a częściowo też brakiem czasu.
ALE zacznijmy od początku.
Czym jest apache kafka, możesz dowiedzieć się z wielu źródeł, ja rzucę tutaj tylko ogólna definicję, że jest to platforma do przetwarzania danych w czasie rzeczywistym. Czyli możemy mieć wiele źródeł, które produkują wiadomości (1) -> coś co przetwarza te wiadomości (2) -> oraz następne coś konsumujące przetworzone wiadomości (3). Poszczególne cosie mają swoje bardziej szczegółowe nazwy i definicje:
(1) producer – nadawca wiadomości / informacji
(2) streaming / processing – logika przetwarzające dane – nie zawsze musi występować!
(3) consumer – odbiorca przetworzonej wiadomości.
Jako przykład weźmy sobie taki youtube. Producentem możemy być my – użytkownicy. Klikamy w jeden filmik, później w kolejny i tak dalej. Informacje o tym idą do logiki przetwarzającej te dane. Przecież każdy filmik ma odpowiednie kategorie i tagi dzięki którym mogą być nam (i są) proponowane kolejne filmy o podobnej treści. Odbiorcą przetworzonych danych może być kolejna aplikacja zapisująca w bazie nasze preferencje. To był jeden prosty przykład żeby Ci to zobrazować – tu możesz znaleźć więcej takich przykładów.
Po tym krótkim wstępie możemy przejść do implementacji zadania. Do zbierania informacji o systemie użyłem framework’u oshi. Klasa SystemInformation jest odpowiedzialna za zbieranie informacji o systmie i to ona używa wspomnianego framework’u. Cały kod aplikacji możesz znaleźć na githubie – tutaj skupie się tylko na najważniejszych aspektach w ramach kafki.
(1) SystemInfoProducer – czyli nasz producent.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ACKS_CONFIG, "all"); props.put(LINGER_MS_CONFIG, 1); props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); SystemInformation systemInformation = new SystemInformation(); try (Producer<String, String> producer = new KafkaProducer<>(props)) { for (int i = 0; i < 100000; i++) { producer.send(new ProducerRecord<>( "cpu", systemInformation.getSerialNumber(), String.valueOf(systemInformation.getCpuUsage())) ); producer.send(new ProducerRecord<>( "ram", systemInformation.getSerialNumber(), String.valueOf(systemInformation.getMemoryUsage())) ); Thread.sleep(1500); } } } |
W lini 3-7 ustawiamy propertiesy wymagane przez kafke, o każdym z nich bez problemu znajdziesz informacje w sieci. To co nas tak naprawdę interesuje to linie 12-21. Tutaj wysyłamy naszą wiadomość. W lini 10 tworzymy producenta a następnie w pętli co 1.5 sekundy wysyłamy informację o naszym systemie.
1 2 3 4 5 | producer.send(new ProducerRecord<>( "cpu", systemInformation.getSerialNumber(), String.valueOf(systemInformation.getCpuUsage())) ); |
Do wysłania wiadomości tworzymy nowy obiekt ProducerRecord. Konstruktor jaki używamy przyjmuje następujące parametry:
1 2 3 | public ProducerRecord(String topic, K key, V value) { this(topic, (Integer)null, (Long)null, key, value, (Iterable)null); } |
, gdzie topic = nazwa, po której konsument będzie identyfikował nadsyłane dane. Zobaczysz za chwilę 🙂
(2) SystemInfoConsumer – umieściłem tu logikę przetwarzającą dane i konsumenta w jednym. Wydaje mi się, że nie jest to dobre rozwiązanie i lepiej byłoby umieścic tu tylko kawałek logiki przetwarzającej dane i puścić je w ramach kolejnego producenta ale z braku laku lepsze takie rozwiązanie niż żadne.
Tak jak poprzednio. Na początku ustawiamy podstawowe propetisy:
1 2 3 4 5 6 7 | Properties props = new Properties(); props.put(APPLICATION_ID_CONFIG, "system-info-application"); props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); |
Następnie przygotowujemy się do przetworzenia danych od naszych producentów tworząc dwa strumienie danych – 1 dla „ram” oraz 1 dla „cpu” (pamiętasz o nazwie topic’u wspomnianej wyżej? To właśnie tej nazwy używamy w tym miejscu)
1 2 3 | StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> cpuInfo = builder.stream("cpu"); KStream<String, String> ramInfo = builder.stream("ram"); |
Teraz obliczymy średnie zużycie zasobów na poszczególnych węzłach.
Oto cały kod, który wykonuje całą logikę
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | private static void monitorDataForEachNode(KStream<String, String> cpuInfo, KStream<String, String> ramInfo) { KTable<String, Double> sum = getSumForEachNode(cpuInfo); KTable<String, Integer> count = getCountForEachNode(cpuInfo); KTable<String, Double> cpuJoin = count .join(sum, (integer, aDouble) -> aDouble / integer); KTable<String, Double> ramSum = getSumForEachNode(ramInfo); KTable<String, Integer> ramCount = getCountForEachNode(ramInfo); KTable<String, Double> ramJoin = ramCount .join(ramSum, (integer, aDouble) -> aDouble / integer); cpuJoin.toStream() .peek((s, aDouble) -> System.out.println("Srednie zuzycie cpu na wezle " + s + " = " + aDouble)); ramJoin.toStream() .peek((s, aDouble) -> System.out.println("Srednie zuzycie ram na wezle " + s + " = " + aDouble)); } private static KTable<String, Integer> getCountForEachNode(KStream<String, String> cpuInfo) { return cpuInfo .groupByKey() .aggregate( () -> 0, (o, o2, integer) -> ++integer, Materialized.with(Serdes.String(), Serdes.Integer())); } private static KTable<String, Double> getSumForEachNode(KStream<String, String> cpuInfo) { return cpuInfo .mapValues(Double::parseDouble) .groupByKey() .reduce(Double::sum, Materialized.with(Serdes.String(), Serdes.Double())); } |
W bardzo dużym skrócie:
- bierzemy sumę wszystkich wartości jakie do nas przyszły (dla poszczególnego topicu oraz węzła) – linia 2 oraz 8
- zliczamy ilość danych jakie przyszły – linie 3 i 9
- dzielimy sumę przez ilość – linie 4 i 10
- wypisujemy wynik – linie 13-16
Zaglądnijmy na szybko do naszej metody sumującej dane getSumForEachNode(…). Jeżeli jesteś zaznajomiony z Javą w wersji 8 to nie będziesz mieć kłopotów ze zrozumieniem tego kawałka kodu. Zwróce tu tylko uwagę na metodę reduce która sumuje nadsyłane wartości oraz pozwala na serializację danych na odpowiedni typ (w tym przypadku naszym kluczem jest nadal string, ale wartość zmienia się na typ Double). Gdybyśmy nadal chcieli używać wartości String to moglibyśmy zostawić metodę reduce z wywołaniem samej metody sum, a serializacją na odpowiedni typ zajęłyby się wcześniej ustawione propertisy.
1 2 3 4 | props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); .reduce(Double::sum); |
Nieco ciekawsza jest metoda zliczająca getCountForEachNode(…)
1 2 3 4 5 6 | return cpuInfo .groupByKey() .aggregate( () -> 0, (o, o2, integer) -> ++integer, Materialized.with(Serdes.String(), Serdes.Integer())); |
W linii 3 agregujemy nasz wynik. Agregator przyjmuje następujące parametry:
, w linii 4 ustawiamy wartość początkową na 0 a linię niżej zwiększamy tę wartość za każdym razem gdy do naszego topicu wpada nowa wartość z tym kluczem (zauważ, że w linii 2 grupujemy elementy po kluczu). Ostatnia linia zajmujuje się odpowiednią serializacją klucza i wartości (patrz akapit wyżej).
I to tyle. Nie napisaliśmy nawet 100 linii kodu a otrzymujemy fajnie działające rozwiązanie. W moim przypadku jest to kompletnie zbędne i zapewne gdyby nie zajęcia to bym nigdy tego nie napisał. Mimo wszystko dzięki takim zadaniom można rozszerzyć swoje horyzonty i umiejętności, więc jeżeli masz ochotę dokończyć to zadanie / poprawić obecne to śmiało! :).
Jeszcze jedna bardzo istotna rzecz!!
Materiały do nauki – nie mogę Cię przecież zostawić bez źródeł które uważam za wartościowe (a ktoś poświęcił swój czas żeby je stworzyć)
- Oficjalna strona kafki – jeżeli jesteś świeży to koniecznie intro + quickstart. Jak już z tym zaczniesz to zobaczysz, że to jest serio fajne
- Kanał na YT tego Pana – wejdź w jego playlisty i szukaj kafka. Dla początkujących polecam szczególnie playliste Apache Kafka for beginners
Na sam początek powinno wystarczyć. Spróbuj zrobić nawet najprostszą wersję producenta-konsumera. Jak to zrobisz to gwarantuję Ci że będziesz mieć satysfakcje a kto wie czy w przyszłości się to nie przyda.. Powodzenia!