Das Kamel als Bähnler

Das Flottenmanagement System Extended (FMSx) dient der Verwaltung von IT-Komponenten der Komfortsysteme einer Fahrzeugflotte (Fahrgastinformation, Videoüberwachung, Fahrgastzählung). Dazu werden Meldungen von zentralen Anwendungen über eine Luftschnittstelle an die Fahrzeuge übermittelt und vice-versa.

Und voilà – da kommt auch schon das Kamel ins Spiel: denn wesentliche Teile der Meldungen werden mittels dieses Apache Camel Frameworks verarbeitet und weitergeleitet.

Vorab ein Mitschnitt der Definition von Camel aus Wikipedia

  • Apache Camel ist eine freie, regelbasierte Routing- und Konvertierungsengine. Mit Apache Camel kann man Routing- und Konvertierungsregeln deklarativ in Java oder Scala basierend auf einer domänenspezifischen Sprache, oder mittels Spring basierter XML-Konfiguration definieren.
  • Apache Camel basiert auf Enterprise Integration Patterns – Entwurfsmuster welche für den Entwurf von Enterprise Application Integration und Message Oriented Middleware basierten Systemen geschaffen wurden. Apache Camels Bean Binding unterstützt dabei Plain Old Java Objects und JavaBeans. Dadurch lässt es sich einfach in Dependency Injection Frameworks wie Spring oder Google Guice integrieren.
  • Apache Camel verwendet Uniform Resource Identifiers und kann somit direkt mit unterschiedlichen Transport- und Messageprotokollen wie beispielsweise HTTP, JMS oder AMQP umgehen. Es kann so beispielsweise mit JBI, SCA, Apache ActiveMQ, RabbitMQ, Apache MINA oder Apache CXF zusammenarbeiten. Somit kann basierend auf der Apache Camel-Programmierschnittstelle gearbeitet werden, obwohl die darüber angesprochenen Komponenten technologisch unterschiedliche Schnittstellen verwenden.

Integration in Java

Camel verwendet sogenannte “exchange messages” für den Transport von Meldungen. Diese exchange messages werden zwischen verschiedenen “Endpoints” übermittelt.
Anhand von “Routen” wird der Kommunikationsfluss abgebildet.

Wir verwenden für diese Routen Definition die Java DSL Notation, welche direkt in der Klasse integriert werden kann. Dazu muss lediglich die abstrakte Klasse org.apache.camel.builder.RouteBuilder erweitert werden. Innerhalb der zu implementierenden Methode public void configure() kann nun die Route definiert werden.

Im folgenden Code-Beispiel ist eine Route definiert, welche sich über einen Jms Endpoint auf eine Queue abonniert. Dies geschieht über die Notation from("jms:..."). Dabei können x-beliebige Verbindungsparameter angegeben werden, welche für den Zugriff auf den Endpunkt benötigt werden.
Wird auf der Queue eine Meldung publiziert, so wird diese über den Endpoint empfangen und es wird eine “exchange message” in dieser Route transportiert bis zum Endpoint to("direct:a"). Mittels dieser “direct:” Notation können Routen unterteilt werden. Die “exchange message” wird “Direkt” zum entsprechenden Endpoint weitergeleitet.

Innerhalb von Routen können Meldungen verarbeitet werden. Eine Möglichkeit besteht mittels Process Definitionen .process(exchange -> {}. Dies ist eine Java Implementierung. Innerhalb der Process Methode hat man vollen Zugriff auf die gesamte Exchange Message.

An dieser Stelle stellt Camel eine Palette von Mechanismen zur Verfügung, welche die Verarbeitung der Meldungen nach den vorhin erwähnten Enterprise Integration Patterns ermöglichen.

Ebenfalls kann ein Error Handling in Routen integriert werden. Dies auf Stufe von einzelnen Routen wie z.B. .onException(CustomException.class) welche alle Fehler des Typs CustomException.class abfängt – oder aber auf globaler Ebene, gültig für alle (nicht in Sub-Routen behandelten) Fehler, welche beim Durchlaufen eines Exchanges durch die Routen auftreten können.

@Component
public class FmsxExampleRouteBuilder extends RouteBuilder {
    
    @Override
    public void configure() {

        onException(Exception.class)
                .handled(true)
                .log(ERROR, "Fängt alle nicht behandelten Fehler ab")
                .end()
        ;

        from("jms:...")
                .log("received message from jms endpoint")
                .to("direct:a")
        ;
                
        from("direct:a")
                .onException(CustomException.class)
                .handled(true)
                .log(ERROR, "Fängt alle Fehler innerhalb dieser Route ab")
                .end()
                
                .process(exchange -> {
                    Object messageBodyFromExchangeMessage = exchange.getIn().getBody();
                    // do some stuff
                })
                .log("processed message")
        ;
    }
}

Camel Integration in Applikationen

Damit Camel innerhalb einer Applikation verwendet werden kann, muss ein sogenannter “Camel-Context” aufgebaut und gestartet werden. Dies kann programmatisch oder durch bestehende Integrationen von Frameworks gemacht werden. In FMSx wird Camel mittels Spring-Boot integriert. Dadurch können die vom Framework zur Verfügung gestellten Boardmittel verwendet werden (Dependency-Injection, Events, …)

Nach dieser kurzen Einführung in Camel und seiner Syntax möchte ich nun zum eigentlichen Thema übergehen.

Der Bähnler in FMSx

FMSx besteht aus mehreren Komponenten. Die für die Meldungsverarbeitung zuständige Komponente – der Bähnler – ist der FMSx-Hub.

In diesem ist die gesamte Definition des Routing beherbergt und wird durch Hochfahren des Camel-Context gestartet.

Dabei wird ein “RabbitMq Message-Broker” zum Versenden und Empfangen von Meldungen über die RabbitMq Endpoint Komponente angebunden.

Vom FMSx Cockpit zum Fahrzeug

Diese Route empfängt Meldungen von zentralen Anwendungen via Message Broker. Die Weiterleitung der Meldung in gewünschtem Format erfolgt ebenfalls via Message Broker an das entsprechende Fahrzeug: 

from("rabbitmq:fmsx?exchangeType=topic&queue=cockpit.out&routingKey=cockpit.out.#")
        .unmarshal().json(Jackson, FmsxMessage.class)
        .process(new KonvertiereFmsxModellNachLieferantModellProcessor())
        .marshal().json(Jackson, LieferantModell.class)
        .process(new BerechneUndSetzeRoutingKeyFuerZielfahrzeugQueueProcessor())

        .to("rabbitmq:amq.topic?exchangeType=topic")
;
  • Der Endpoint stellt eine Verbindung zum Broker auf den Exchange “fmsx” auf und abonniert sich auf die Queue “cockpit.out” mit dem Routing Key “cockpit.out.#”. Das heisst, dass alle Meldungen, welche auf die Queue “cockpit.out” mit einem Routing Key publiziert werden, und auf welche das Abonnier Muster passt, vom FMSx-Hub empfangen und verarbeitet werden.
  • Json Meldungen werden in ein internes Format umgewandelt.
  • Die Klasse “KonvertiereFmsxModellNachLieferantModellProcessor” wird instantiiert. Diese implementiert das Interface org.apache.camel.Processor und wandelt die Meldungen in das entsprechende Meldungsformat des Lieferanten der Fahrzeugschnittstelle.
  • Das Lieferanten spezifische Meldungsmodell wird im Message Body als Json Format konvertiert.
  • Der Routing Key muss so gesetzt werden, dass die Meldung an das richtige Fahrzeug adressiert wird.
  • Versenden der Nachricht an den RabbitMq Broker an den Exchange: “amq.topic”.

Vom Fahrzeug zur zentralen Anwendung (FMSx-Cockpit)

Eine Nachricht vom Fahrzeug soll empfangen werden und an die adressierte zentrale Anwendung weitergeleitet werden. Dazu verlangt die Fahrzeugschnittstelle, dass jede empfangene Nachricht explizit in einer vom Lieferanten definierten Meldung quittiert werden muss.
Diese Anforderung wird durch Anwendung der Enrich-Aggregate-Strategie in dieser Route umgesetzt.

from("rabbitmq:amq.topic?exchangeType=topic&queue=fmsxhub.fza_fitsx.request&routingKey=*.v1.fmsx_hub.fza_fitsx.*.control.*.#")
        .unmarshal().json(Jackson, LieferantModell.class)
        .process(exchange -> {
            Message exchangeMessage = exchange.getIn();
            // lese Pojo aus Message Body
            LieferantModell lieferantModell = exchangeMessage.getBody(LieferantModell.class);
            // Lese Fahrzeug UIC aus Modell und schreibe diese in den Header
            exchangeMessage.setHeader("fmsx.RESOURCE_UIC", extractFahrzeugUic(lieferantModell));
        })

        .enrich("direct:sendeMessageAnFmsxCockpitRoute", (origin, enriched) -> origin)

        .process(new ErstelleAntwortAnFahrzeugProcessor())
        .marshal().json(Jackson, LieferantModell.class)
        .process(berechneUndSetzeRoutingKeyFuerZielfahrzeugQueue)

        .to("rabbitmq:amq.topic?exchangeType=topic")
  • Jede Meldung, welche auf dem Exchange “amq.topic” auf die Queue “fmsxhub.fza_fitsx.request” gesendet wird und das angegebene RoutingKey Pattern erfüllt, wird von dieser Route empfangen und verarbeitet.
  • Die Verarbeitung der konvertierten Meldung wird hier exemplarisch direkt über process definiert. Dies als Darstellung, wie innerhalb einer Processor Klasse ein Camel-Exchange gelesen und angepasst werden kann. An dieser Stelle sollen Header gesetzt werden.
  • Der Exchange wird mittels .enrich("direct:sendeMessageAnFmsxCockpitRoute", (a, b) -> a) an die nachfolgend definierte Route “direct:sendeMessageAnFmsxCockpitRoute” weitergeleitet. Wenn die Verarbeitung auf dieser Route beendet wurde, so sind im Aggregator (origin, enriched) -> origin) beide Exchanges, der origin, welcher der ursprünglichen Meldung entspricht sowie die “angereicherte” Meldung. An dieser Stelle macht eine Fehlerbehandlung Sinn, denn anhand dieser kann eine positive respektive negative Antwortmeldung an das Fahrzeug gesendet werden. Zur Vereinfachung an dieser Stelle leiten wir einfach die Original-Nachricht in unserer aktuellen Route weiter.
  • Folgend wird eine Antwortmeldung im lieferantenspezifischen Format erstellt und in Json transformiert.
  • Der Routing Key muss so gesetzt werden, dass die Meldung an das richtige Fahrzeug adressiert wird. Da dieser Processor in mehreren Routen verwendet wird, kann diese mittels instant Variable definiert und so verwendet werden.
  • Diese Meldung wird an das entsprechende Exchange Topic an den Brocker versendet.

Route-Sende-Meldung an zentrale Anwendung (FMSx-Cockpit)

Diese Route ist innerhalb des Camel Context eindeutig mittels “direct:sendeMessageAnFmsxCockpitRoute” aufrufbar. Dies geschieht in unserem Beispiel über den Aufruf im Enrich zum Senden der Nachricht an eine zentrale Anwendung.

from("direct:sendeMessageAnFmsxCockpitRoute")
        .process(new ConvertLieferantToFmsxModelProcessor())
        .process(new DefiniereRoutingKeyFuerFmsxExchangeProcessor())
        .marshal().json(Jackson, FmsxMessage.class)
        
        .to("rabbitmq:fmsx?exchangeType=topic")
  • Wie schon in der Gegenrichtung muss die lieferantenspezifische Meldung in ein FMSx internes Meldungsformat übersetzt werden.
  • Aus der Meldung muss der Empfänger der Nachricht eruiert werden und damit entsprechende der Routing Key gesetzt werden.
  • Die transformierte Meldung wird an das entsprechende Exchange Topic auf dem Brocker versendet

Anhand diesen exemplarischen Routen Definitionen soll aufgezeigt werden, wie das Camel Framework im FMSx Projekt für die Kommunikation mit der Fahrzeugflotte eingesetzt wird. Weiter thematisiert dieser Beitrag auch die Anwendung der wichtigsten Merkmale der Wikipedia Einführung.

Kommentare sind geschlossen.