CloudEvents
Standardization is a general need in all fields after some technique is widely used. In the last few years the trend for scalable, event-driven systems grew massively. Everybody and everything is communicating in events.
And all systems face the same question at some point – what should our events look like. If you were smart enough you asked yourself this question rather earlier than later. If not, at some point you will face the truth that you will have to refactor a lot to achieve consistency in your events throughout your distributed system. This is where you would have wished to know CloudEvents already.
CloudEvents brings a specification to describe events in a common way. The common language increases consistency, accessibility and portability in distributed systems. Major programming languages like Java, Go, JavaScript, Ruby, Rust, Python have SDKs and APIs to implement CloudEvents in a simple way. At its core it will bring us a blueprint or language to define a set of metadata to describe the event.
Example CloudEvent:
{
"specversion" : "1.0",
"type" : "com.github.pull_request.opened",
"source" : "https://github.com/cloudevents/spec/pull",
"subject" : "123",
"id" : "A234-1234-1234",
"time" : "2018-04-05T17:31:00Z",
"comexampleextension1" : "value",
"comexampleothervalue" : 5,
"datacontenttype" : "text/xml",
"data" : "<much wow="xml"/>"
}
Specification
It all comes down to a handfull of attributes:
Required attributes:
Optional attributes:
In addition to the specification of CloudEvents itself, there are extensions giving extra flexibility for other meta fields to enrich your event. For example, OpenTracing header fields can be added by the Distributed Tracing extension.
Implementation
Let’s try to get our hands dirty and test the CloudEvent specification to fire some events. We are going to use the Quarkus framework with the Smallrye reactive messaging extension.
As it just so happens, the Smallrye reactive messaging extension supports CloudEvents out of the box!
We create two new Quarkus projects:
Creation of Quarkus projects
mvn io.quarkus.platform:quarkus-maven-plugin:2.10.3.Final:create
-DprojectGroupId=org.acme
-DprojectArtifactId=data-producer
-Dextensions="resteasy-reactive"
mvn io.quarkus.platform:quarkus-maven-plugin:2.10.3.Final:create
-DprojectGroupId=org.acme
-DprojectArtifactId=data-consumer
-Dextensions="resteasy-reactive"
Remove the test classes and add the following extensions to your projects‘ pom.xml:
Dependencies in pom.xml
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-avro</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>6.1.1</version>
<exclusions>
<exclusion>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</exclusion>
</exclusions>
</dependency>
...
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
For demonstration purposes we use an Avro-schema, which is the most common approach to manage schemas with Kafka.
The Producer
Create an Avro schema src/main/avro/SensorMeasurement.avsc with the following content:
{
"namespace": "org.acme",
"type": "record",
"name": "SensorMeasurement",
"fields": [
{
"name": "data",
"type": "double"
}
]
}
This represents a simple Java POJO to hold some information about measurements we want to emit.
We create a service org.acme.KafkaProducer which allows us to emit SensorMeasurements to a defined Channel measurements which we will connect to a Kafka Topic. This could look like the following:
package org.acme;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
@ApplicationScoped
public class KafkaProducer {
@Channel("measurements")
@Inject
Emitter<SensorMeasurement> sensorMeasurementEmitter;
public void emitEvent(SensorMeasurement sensorMeasurement) {
sensorMeasurementEmitter.send(Message.of(sensorMeasurement));
}
}
Then we alter the pre-generated REST resource to emit an event every time we receive a POST request to the endpoint /measurements:
package org.acme;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;
import java.util.Random;
@Path("/measurements")
public class MeasurementsResource {
private final KafkaProducer kafkaProducer;
public MeasurementsResource(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
@POST
public Response emitMeasurement() {
SensorMeasurement measurement = SensorMeasurement.newBuilder().setData(new Random().nextDouble()).build();
kafkaProducer.emitEvent(measurement);
return Response.ok().build();
}
}
Of course we need some configuration in the application.properties to emit the events to our Kafka broker:
mp.messaging.outgoing.measurements.connector=smallrye-kafka
mp.messaging.outgoing.measurements.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
mp.messaging.outgoing.measurements.topic=measurements
mp.messaging.outgoing.measurements.cloud-events-source=event-producer
mp.messaging.outgoing.measurements.cloud-events-type=measurement-emitted
mp.messaging.outgoing.measurements.cloud-events-subject=subject-123
And this is where the magic happens: Configuring the channel “measurements” with additional properties cloud-events-XXX will simply enrich the message envelope with the properties defined. If you don’t like the config approach and would rather enrich the message programmatically, I got you covered!
public void emitEvent(SensorMeasurement sensorMeasurement) {
OutgoingCloudEventMetadata<Object> metadata = OutgoingCloudEventMetadata.builder()
.withId(UUID.randomUUID().toString())
.withSource(URI.create("event-producer"))
.withType("measurement-emitted")
.withSubject("subject-123")
.build();
sensorMeasurementEmitter.send(Message.of(sensorMeasurement).addMetadata(metadata));
}
And that’s all you need to create events to our little system!
The consumer
The data-consumer’s side of the system looks quite similar. Create an Avro schema src/main/avro/SensorMeasurement.avsc with the following content:
{
"namespace": "org.acme",
"type": "record",
"name": "SensorMeasurement",
"fields": [
{
"name": "data",
"type": "double"
}
]
}
Instead of producing messages, we will simply define a listener on a channel connected to the same Kafka topic and print the events to the command line!
Create the EventListener org.acme.EventListener:
package org.acme;
import io.smallrye.reactive.messaging.ce.IncomingCloudEventMetadata;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.jboss.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;
@ApplicationScoped
public class EventListener {
private final Logger logger = Logger.getLogger(EventListener.class);
@Incoming("measurements")
public CompletionStage<Void> consume(Message<SensorMeasurement> message) {
IncomingCloudEventMetadata cloudEventMetadata = message.getMetadata(IncomingCloudEventMetadata.class).orElseThrow(() -> new IllegalArgumentException("Expected a CloudEvent!"));
logger.infof("Received Cloud Events (spec-version: %s): id: '%s', source: '%s', type: '%s', subject: '%s', payload-message: '%s' ",
cloudEventMetadata.getSpecVersion(),
cloudEventMetadata.getId(),
cloudEventMetadata.getSource(),
cloudEventMetadata.getType(),
cloudEventMetadata.getSubject().orElse("no subject"),
message.getPayload());
return message.ack();
}
}
Add the following properties to the application.properties file:
mp.messaging.incoming.measurements.connector=smallrye-kafka
mp.messaging.incoming.measurements.topic=measurements
quarkus.http.port=8081
And that’s all you need to have your application up and running! If you have Docker installed, starting the application will also start a mock Kafka broker and connect your applications automatically. If you don’t have Docker installed you will need to configure the connection to a Kafka broker yourself by adding the following property to the applications:
kafka.bootstrap.servers=your-kafka-broker:9092
kafka.bootstrap.servers=your-kafka-broker:9092
Test your events
Start both of your applications in your favorite IDE or shell:
./mvnw compile quarkus:dev
Fire some requests against your producer and test your CloudEvents getting emitted and consumed!
Received Cloud Events (spec-version: 1.0): id: '98d85610-6d8d-4943-b8ea-641c4940e148', source: [...]