Kafka Word Cloud

Im Rahmen einer Intensivwoche für Know-how-Ausbau hat unser Tech-Consulting Team für Apache Kafka einen Use-Case für eine dynamische Word-Cloud umgesetzt. Die Word-Cloud wird über Kafka und mit Live-Daten aus dem internen Chat-System gefüttert. Dargestellt werden technische Begriffe aus ausgewählten Chats.

Unser Team hatte pro Person ungefähr sechs Tage für den Aufbau der internen Kafka-Plattform, das Planen und Umsetzen eines Use-Cases und das Schreiben dieses Blogs zur Verfügung. Das Hauptziel der Intensivwoche war der Know-how Auf- und Ausbau für bestehende und neue Members des Kafka-Teams.

Das Kafka-Team bestehend aus Adrian Bader, Thomas Selber, Benjamin Bühlmann, Ramon Spahr und Stefan Girod (v.l.) nach Abschluss der Intensivwoche.

Kafka-Cluster Setup

Für den Aufbau des Clusters, haben wir uns für Strimzi entschieden. Es verspricht den Aufwand für den Cluster-Aufbau – insbesondere auf OpenShift – wesentlich zu reduzieren. Strimzi übernimmt beispielsweise das Ausstellen und Verwalten von Zertifikaten für den Cluster. Wir haben die bis dato aktuellste Version 0.11.1 von Strimzi basierend auf Apache Kafka 2.0.1 eingesetzt.

Strimzi liegt das Operator-Framework zugrunde und es verwendet Custom-Ressource-Definitions für OpenShift, anhand welcher der zu deployende Cluster beschrieben werden kann. Das sieht in OpenShift nach dem Deployment wie folgt aus:

Die drei sichtbaren Hauptkomponenten von Strimzi sind der Cluster Operator, der Topic Operator, und der User Operator. Dazu natürlich der Kafka- und Zookeeper-Cluster (basierend auf Statefulsets).

Interessant sind die tls-sidecars, mit welchen Strimzi TLS ermöglicht.

Herausforderungen

Mit Strimzi ist der Cluster sehr schnell deployed und lauffähig. Allerdings hatten wir auf unserem internen OpenShift 3.9 einige Probleme im Betrieb:

  • Beim Entfernen des Clusters wurden nicht alle Komponenten gelöscht.
  • Das Cluster hängte sich des Öfteren auf; den Logs zu Folge wegen Storage-Issues (es wurde Glusterfs verwendet).

Wir wichen demzufolge auf unser neues OpenShift 3.11 aus. Die obigen Issues wurden dadurch eliminiert.

Weiter sind die Operators noch zu wenig flexibel: Es ist die Aufgabe der Operators, die verschiedenen OpenShift-Komponenten zu erzeugen und zu konfigurieren. Allerdings werden sie dadurch automatisch überschrieben. Das wurde uns zum Verhängnis, weil wir bei den Routes via Labels den Scope (intern/extern) konfigurieren sollten. Der Operator überschreibt diese Labels. Es sollte also möglich sein, in der Operator-Konfiguration „Overrides“ für Labels definieren zu können.

Ein weiteres Problem war die Verwendung von Kafka Connect via Strimzi. Wir konnten zwar ein Image mit unserem eigenen RocketChatConnectorSource-Plugin deployen und das Plugin wurde erkannt. Allerdings liess sich der Connector nicht konfigurieren/aktivieren: Ein Post auf /connectors mit dem Konfigurationspayload gemäss Dokumentation von Confluent lieferte den Fehler, dass der Connector nicht vorhanden sei. Wir sind danach auf das Original-Image von Confluent für Kafka Connect umgestiegen.

Kafka Cluster-Management und Monitoring

Für das Cluster-Management und Monitoring haben wir uns auf neue Pfade gewagt und das Cruise-Control von Linkedin ausprobiert.

Cruise Control ist eine Anwendung, die die Last auf die Kakfa-Brokers ausgewogen halten soll und unterstützt beim Management des Clusters. Cruise Control bietet eine REST-Schnittstelle, die verschiedene Informationen über den Cluster zur Verfügung stellt und uns erlaubt, das Balancing des Clusters zu steuern.

Cruise Control UI ist eine graphische Web-Oberfläche, die uns die REST-Schnittstelle von Cruise Control in einer benutzerfreundlichen Art darstellt.

Stand der Kafka Cluster.
Aktueller Ressourcenverbrauch per Host und per Broker.
Stand des Cruise Control Monitorings vom Kafka Cluster.
Massnahmen, die getroffen wurden, um die Performance des Clusters zu steigern.
Einfache Kafka Cluster Administrationsseite.

Metrics

Die Metrics werden von eine Erweiterungs-Library, die in jedem Broker läuft, gelesen und auf ein Topic geschrieben. Cruise Control liest dieses Topic aus und bildet anhand dieser Information ein Modell, um den aktuellen Stand zu analysieren und Verbesserungen an Broker- und Topic-Topologie auszuführen.

Cruise Control auf Openshift

Leider gibt es keine Docker-Images für Cruise Control und Cruise Control UI, so dass wir sie selber bauen mussten.

Herausforderungen

Wir mussten ein eigenes Kafka Image erstellen, um die cruise-control-metrics-reporter-A.B.C.jar Library von Cruise Control als Dependency hinzuzufügen und den CruiseControlMetricsReporter als Metrics Collector benutzen zu können.

Use Case

Für unseren Use-Case nutzen wir Kafka auf vielseitige Weise:

  • mit der Connect API als Integrations-Werkzeug
  • mit der Streams API als Stream-Processing-Engine
  • und natürlich durch die Topics als Message-Broker und Datenhaltung

Die folgende Abbildung zeigt den Aufbau des Use-Cases und seinen Komponenten:

Chat

Puzzle nutzt Rocket.Chat als internes Chat-System. Für die Verbindung zum Rocket.Chat haben wir einen spezifischen Benutzer eingerichtet und diesen auf den gewünschten Channels subskribiert.

Kafka Connector

Die Kafka Connect API erlaubt die Implementation von Connectors, welche Datenströme aus Quellsystemen zu Kafka-Topics (Source-Connectors) oder von Kafka-Topics zu Zielsystemen (Sink-Connectors) integrieren.

 

Confluent bietet eine übersichtliche Liste mit verfügbaren Connectoren auf ihrem Confluent Hub.

Die Vorteile der Connect API sind nebst den bereits verfügbaren Connectors ein standardisiertes API für eigene Implementierungen. Zudem berücksichtigt Kafka Connect bereits die Skalierung. 

RocketChatSourceConnector

Für unseren Use-Case konnten wir nicht auf einen bestehenden Connector zurückgreifen. Wir haben deshalb unseren eigenen Connector RocketChatSourceConnector gebaut. Wie das gehen kann, beschreibt Confluent in einem separaten Guide.

Das Herzstück unseres Connectors ist die Klasse RocketChatSourceTask.java, welche von org.apache.kafka.connect.source.SourceTask ableitet. Diese Klasse nutzt den open-Source rocketchat-modern-client und bindet Rocket.Chat per WebSocket an. Der folgende Code-Ausschnitt zeigt das wesentliche Grundgerüst dieser Klasse:

public class RocketChatSourceTask extends SourceTask {
    ...
    private BlockingQueue queue = null;
    private RocketChatClient client;
    ...
    
    @Override
    public void start(Map props) {
        ...setup RocketChatClient...
        ConnectableObservable msgStream = ObservableReplay.createFrom(subscription.thenApply(room -> client.streamRoomMessages(room.rid))
                .join());
        streamDispose = msgStream.connect();
        msgStream.forEach(msg -> queue.put(conversion.apply(msg)));
    }
    @Override
    public List poll() throws InterruptedException {
        List records = new LinkedList<>();
        while (running.get()) {
            // Poll for new records but only for a max amount of time!
            SourceRecord record = queue.poll(1L, TimeUnit.SECONDS);
            if (record == null) {
                // the queue was empty, so continue looping ...
                log.debug("Empty queue, looping...");
                continue;
            }
            records.add(record);
            queue.drainTo(records);
            log.debug("Returning " + records.size() + " records.");
            return records;
        }
        return records;
    }
    @Override
    public void stop() {
        ...
        running.set(false);
        client.close();
        this.queue.clear();
    }
    ...
}

Die Connect API erwartet die Implementierung einer poll() Methode. Da wir aber durchgängig nach dem Push-Prinzip arbeiten wollten, nutzten wir den Umweg über eine interne Queue-Datenstruktur, welche ständig durch den Websocket befüllt wird. Die Implementierung der poll() Methode beschränkt sich auf das regelmässige abgreifen der neusten Records aus dieser Queue.

Der Connector selbst muss danach als uber-jar in Kafka Connect integriert und via Rest (oder alternativ über ein UI wie z.B. das Landoop-Connct-UI erstellt werden.

Via REST-Api kann ein Connector wie folgt konfiguriert werden:

cat << EOF > RocketChatSourceConnector.json
{
  "name": "RocketChatSourceConnector",
  "config": {
    "connector.class": "ch.puzzle.kafka.connectors.rocketchat.source.RocketChatSourceConnector",
    "tasks.max": "1",
    "rc.endpoint.url": "wss:///websocket",
    "rc.endpoint.user": "",
    "rc.endpoint.password": "",
    "rc.channels": "",
    "kafka.topic": ""
  }
}
EOF
curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" -d @RocketChatSourceConnector.json /connectors

Stream Processing

Für die Prozessierung der Daten wird die höherwertige Stream Processing API von Kafka verwendet. Kafka Stream Processing erfolgt vollständig auf der Clientseite und kann durch eine Library direkt in eine Applikation integriert werden.

Unser Stream Processor basiert auf Spring-Boot und wurde als eigenständige Applikation implementiert. Deployed wird diese als Docker Image auf OpenShift 3.11. Die Applikation subskribiert sich auf das Input-Topic, welches durch den Source-Connector mit den rohen Messages aus Rocket.Chat befüllt wird. Die Messages werden mittels Avro serialisiert. Die Avro Schemas werden in einer zentralen Schema-Registry versioniert abgelegt.

Die Stream Processing Logik ist auf ein paar wenigen Zeilen umgesetzt. In einem ersten Schritt wird der Inhalt jeder Message transformiert und unter demselben Key in ein neues Topic geschrieben. Dabei wird aus der ursprünglichen Avro-serialisierten Message nur der relevante Payload (in unserem Fall die eingegebene Chat Message) ausgelesen und auf eine primitive Art ein Sanitizing durchgeführt. Die transformierten Daten werden in ein eigenes Topic zur Weiterverarbeitung geschrieben.

 
StreamsBuilder builder = new StreamsBuilder();
// build Avro deserializer
final Map serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
final Serde valueDeserializer = new SpecificAvroSerde();
valueDeserializer.configure(serdeConfig, false);
// pipe messages from raw input topic to intermediate topic with value = plain message string
builder.stream(sourceTopic, Consumed.with(Serdes.String(), valueDeserializer))
.map((KeyValueMapper>)
(key, value) -> new KeyValue(key, value.getMessage().replaceAll("[^\\w\\sÖÄÜöäüàèé:\\/\\.]", "")))
.to(plainMessageTopic, Produced.with(Serdes.String(), Serdes.String()));

In einem zweiten Stream werden die Chat Nachrichten in einzelne Begriffe gesplittet, zu kurze Begriffe gefiltert und nach Begriffen gruppiert. Die gruppierten Begriffe werden gezählt und in ein compacted Topic persistiert. Im Output-Topic wird als Key der Begriff und als Value ein Counter verwendet, welcher aussagt wie oft der Begriff bereits vorgekommen ist.

Da in diesem Fall nur der letzte Stand und nicht die gesamte History von Events interessiert, wurde ein compacted Topic verwendet. Dabei werden bereits existierende Messages durch neue Messages, die denselben Key aufweisen, überschrieben (dies geschieht asynchron durch den Compacting-Job auf den Brokern).

 
// split value into words, group and count them and pipe to output topic
builder
.stream(plainMessageTopic, Consumed.with(Serdes.String(), Serdes.String()))
.flatMapValues(value -> asList(value.toLowerCase().split(" ")))
.mapValues(value -> value.trim())
.filter((key, value) -> value.length() >= minWordLength)
.groupBy((key, value) -> value)
.count()
.toStream()
.to(wordcountTopic, Produced.with(Serdes.String(), Serdes.Long()));

Topology topology = builder.build();
KafkaStreams kafkaStreams = new KafkaStreams(topology, getConfiguration());
kafkaStreams.start();

WebSocket Gateway

Damit anschliessend das Word-Cloud-Frontend der im Stream-Processing generierte Output (Map mit Words und deren Anzahl) zur Anzeige bekommt, brauchen wir einen Kafka-Consumer, welcher das Output-Topic des Streamprocessors liest und an die verbundenen WebSockets pushed.

Auch dazu haben wir eine Spring-Boot Applikation erstellt. Wichtig ist, dass für jeden verbundenen WebSocket ein eigener Kafka-Listener mit einer eigenen group.id erzeugt wird. Ausserdem soll dieser das Kafka-Topic *from beginning* konsumieren. Das bewirkt, dass ein neu verbundener WebSocket initial das komplette Word-Count-Map erhält. Der verwendete BatchMessageListener liefert die Records bereits als Liste; somit können sie auch gebatched über den WebSocket geschickt werden.

Das Herzstück ist der WebSocketHandler, welcher in WebSocketConfigurer registriert wird:

 

@Configuration
@EnableWebSocket
public class WebSocketConfiguration implements WebSocketConfigurer {
    @Autowired
    SocketHandler socketHandler;

    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(socketHandler, "/consumer").setAllowedOrigins("*");
    }
}
 
@Component
public class SocketHandler extends TextWebSocketHandler {
    @Value("${proxy.kafka-topic}")
    String topic;
    @Value("${spring.kafka.consumer.bootstrap-servers}")
    String bootstrapServer;
    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        Map consumerConfig = ImmutableMap.of(
                BOOTSTRAP_SERVERS_CONFIG, bootstrapServer,
                AUTO_OFFSET_RESET_CONFIG, "earliest",
                GROUP_ID_CONFIG, UUID.randomUUID().toString());
        DefaultKafkaConsumerFactory kafkaConsumerFactory =
                new DefaultKafkaConsumerFactory<>(
                        consumerConfig,
                        new StringDeserializer(),
                        new LongDeserializer());
        ContainerProperties containerProperties = new ContainerProperties(topic);
        containerProperties.setMessageListener((BatchMessageListener) records -> {
            ...
            if (session.isOpen()) {
                Map recordMap = new HashMap<>();
                records.forEach(record -> recordMap.put(record.key(), record.value()));
                List wordCounts = recordMap.entrySet().stream()
                        .map(entry -> WordCount.builder().word(entry.getKey()).count(entry.getValue()).build())
                        .collect(Collectors.toList());
                session.sendMessage(new TextMessage(objectMapper.writeValueAsString(wordCounts)));
            }
            ...
        });
        ConcurrentMessageListenerContainer container =
                new ConcurrentMessageListenerContainer<>(
                        kafkaConsumerFactory,
                        containerProperties);
        container.start();
    }
    ...
}

Word-Cloud Application

Als Frontend dient eine einfache Javascript-Applikation, welche sich mit einem Websocket verbindet und schliesslich die Wordcloud darstellt.
Damit der Browser an einer hohen Datenflut nicht kollabiert, werden die Nachrichten in einer clientseitigen Queue zwischengespeichert und mit Hilfe der Methode window.requestAnimationFrame() in einer D3-Wordcloud dargestellt. Mit Hilfe von RequestAnimationFrame wird dem Browser mitgeteilt, dass eine Animation vorgenommen werden soll. So wird die Konsumierung ab Websocket vom Rendering der Wordcloud abgekoppelt.

Fazit

Trotz mittlerweile grosser Erfahrung im Team hatten wir auch für den Aufbau unseres internen Kafka-Cluster mit unvorhergesehenen Problemen zu kämpfen, die uns etwas Zeit gekostet haben.
Beim Aufsetzen des Kafka-Clusters mittels Strimzi hatten wir einige Zeit in die Problem-Analyse von Strimzi investiert. Die Probleme konnten schliesslich erst durch einen Wechsel von OpenShift 3.9 auf 3.11 gelöst werden. Da Strimzi selbst quasi noch in seinen Kinderschuhen steckt (Version 0.xx), war bei den Issues nicht sofort klar, ob es sich um Bugs von Strimzi oder um Probleme unserer Umgebung handelt.

Zum lokalen Testen und Entwickeln, insbesondere in Zusammenarbeit mit den Connect- und Stream-APIs hat es sich sehr bewährt, lokal ein komplettes Kafka-Ökosystem bereitzustellen. Wir haben uns die Arbeit einfach gemacht und das Ökosystem kafka-stack-docker-compose von Stephane Maarek verwendet. Es hat zudem den Vorteil, dass die Landoop-UI für Topics und Connectors auch enthalten sind, was vieles einfacher macht.

Die Tage reichten gerade so aus, um die geplanten Aufgaben erfolgreich abzuschliessen. Unter dem Strich mussten wir über das ganze Team mit deutlich weniger zur Verfügung stehenden Tagen auskommen, da einige von uns doch wiederkehrend durch das Alltagsgeschäft absorbiert wurden. Wir hätten gerne auch noch das Lastverhalten analysiert und die Skalierbarkeit verifiziert sowie das „exaclty once processing“ in einer Form mitberücksichtigt. Dazu reichte die Zeit aber leider nicht mehr aus.

Kommentare sind geschlossen.