Kafka mit Frühlingsströmen Schritt für Schritt Anleitung für Anfänger

Blog

Kafka mit Frühlingsströmen Schritt für Schritt Anleitung für Anfänger

Kafka mit Frühlingsströmen Schritt für Schritt Anleitung für Anfänger

Dieser Beitrag enthält eine Schritt-für-Schritt-Anleitung zum Aktivieren von Messaging in einem Microservice mithilfe von Kafka mit Spring Cloud Stream.

Spring Cloud Stream ist ein Framework unter dem Dachprojekt Spring Cloud, das es Entwicklern ermöglicht, ereignisgesteuerte Microservices mit Messaging-Systemen wie Kafka und . aufzubauen KaninchenMQ .

Asynchrone Messaging-Systeme sind immer ein wichtiger Bestandteil jeder modernen Unternehmenssoftwarelösung. Die Entwicklung von Microservices hat die Time-to-Market für jedes Softwareprodukt verkürzt, aber dies ist ohne die notwendigen Tools und Frameworks nicht möglich.

Spring Cloud Stream ist ein Framework, das auf Spring Integration basiert. Es lässt sich nahtlos in Spring Boot integrieren, um in kürzerer Zeit effiziente Microservices zu erstellen, um eine Verbindung mit freigegebenen Messaging-Systemen herzustellen. Spring Cloud Stream bietet mehrere Binder-Implementierungen wie Kafka, RabbitMQ und verschiedene andere. Die Details sind bekannt Hier .

Hier ist eine Schritt-für-Schritt-Anleitung zum Erstellen einer einfachen Microservice-Anwendung basierend auf Spring Boot und verwendet Spring Cloud Stream, um eine Verbindung mit einer Kafka-Instanz herzustellen.

Einstieg

Installieren Sie Kafka und erstellen Sie ein Thema. Ich verwende für diese Demonstration einen Kafka-Broker, der auf meinem lokalen Windows-Rechner läuft, aber es kann auch eine Installation auf einem Unix-Rechner sein. Schritte für die Installation von Kafka auf einem Windows-Rechner werden bereitgestellt Hier .

Erstellen Sie ein Spring Boot-Starterprojekt entweder mit STS IDE oder Spring Initialisierung . Ich stelle die pom.xml als Referenz zur Verfügung.

4.0.0 org.springframework.boot spring-boot-starter-parent 2.1.8.RELEASE com.techwording spring-cloud-stream-kafka-example 0.0.1-SNAPSHOT spring-cloud-stream-kafka-example Demo project for Spring Cloud Stream and Kafka 1.8 Greenwich.SR3 org.springframework.boot spring-boot-starter-actuator org.springframework.boot spring-boot-starter-web org.springframework.cloud spring-cloud-stream org.springframework.cloud spring-cloud-stream-binder-kafka org.springframework.cloud spring-cloud-stream-binder-kafka-streams org.springframework.kafka spring-kafka org.springframework.boot spring-boot-starter-test test org.springframework.cloud spring-cloud-stream-test-support test org.springframework.kafka spring-kafka-test test org.springframework.cloud spring-cloud-dependencies ${spring-cloud.version} pom import org.springframework.boot spring-boot-maven-plugin

Das Spring Cloud Stream-Projekt muss mit der Kafka-Broker-URL, dem Thema und anderen Binderkonfigurationen konfiguriert werden. Unten sehen Sie ein Beispiel für die Konfiguration der Anwendung.

spring: cloud: stream: default-binder: kafka kafka: binder: brokers: - localhost:9092 bindings: input: binder: kafka destination: test content-type: text/plain group: input-group-1 output: binder: kafka destination: test group: output-group-1 content-type: text/plain

Wir benötigen mindestens einen Producer und einen Consumer, um die Nachricht zu testen und Sende- und Empfangsvorgänge zu senden. Unten finden Sie den Beispielcode für einen Produzenten und einen Verbraucher in seiner einfachsten Form, der mit Spring Cloud Stream entwickelt wurde.

package com.techwording.scs; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; @EnableBinding(Source.class) public class Producer { private Source mySource; public Producer(Source mySource) { super(); this.mySource = mySource; } public Source getMysource() { return mySource; } public void setMysource(Source mysource) { mySource = mySource; } } package com.techwording.scs; import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.time.format.FormatStyle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.handler.annotation.Payload; @EnableBinding(Sink.class) public class Consumer { private static final Logger logger = LoggerFactory.getLogger(Consumer.class); @StreamListener(target = Sink.INPUT) public void consume(String message) { logger.info('recieved a string message : ' + message); } @StreamListener(target = Sink.INPUT, condition = 'headers['type']=='chat'') public void handle(@Payload ChatMessage message) { final DateTimeFormatter df = DateTimeFormatter.ofLocalizedTime(FormatStyle.MEDIUM) .withZone(ZoneId.systemDefault()); final String time = df.format(Instant.ofEpochMilli(message.getTime())); logger.info('recieved a complex message : [{}]: {}', time, message.getContents()); } }

Wir werden auch eine Rest Controller-Klasse erstellen, die die Nachricht über HTTP akzeptiert und an den Producer übergibt. Dies dient nur dazu, das Testen bequemer zu gestalten.

package com.techwording.scs; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; @RestController public class Controller { private Producer producer; public Controller(Producer producer) { super(); this.producer = producer; } // get the message as a complex type via HTTP, publish it to broker using spring cloud stream @RequestMapping(value = '/sendMessage/complexType', method = RequestMethod.POST) public String publishMessageComplextType(@RequestBody ChatMessage payload) { payload.setTime(System.currentTimeMillis()); producer.getMysource() .output() .send(MessageBuilder.withPayload(payload) .setHeader('type', 'chat') .build()); return 'success'; } // get the String message via HTTP, publish it to broker using spring cloud stream @RequestMapping(value = '/sendMessage/string', method = RequestMethod.POST) public String publishMessageString(@RequestBody String payload) { // send message to channel producer.getMysource() .output() .send(MessageBuilder.withPayload(payload) .setHeader('type', 'string') .build()); return 'success'; } }

Führen Sie die folgenden Maven-Befehle aus, um dieses Projekt zu erstellen und auszuführen.

mvn clean install mvn spring-boot:run

Erreichen Sie den POST-Endpunkt|_+_| und überprüfen Sie die Protokolle der Anwendungskonsole. Hier ist eine Beispielausgabe der Anwendung, die erzeugt wird, wenn ich diesen Endpunkt mit der Nachricht hallo im Resttext erreicht habe.

/sendMessage/string

Erreichen Sie den POST-Endpunkt |_+_| und überprüfen Sie die Protokolle der Anwendungskonsole.

2019-10-01 14:37:22.764 INFO 377456 --- [container-0-C-1] com.techwording.scs.Consumer : received a string message : {'contents':'hello','time':1569920841187}

Die Anmerkung |_+_| nimmt eine oder mehrere Schnittstellen als Parameter. In diesem Beispiel haben wir Sink- und Source-Schnittstellen verwendet, die Eingabe- bzw. Ausgabekanäle deklarieren. Dazu können Sie auch eigene Schnittstellen definieren.

|_+_| Annotation ist eine bequeme Methode von Spring Cloud Stream für inhaltsbasiertes Routing. Es funktioniert nach einem Pub-Sub-Modell, und jeder |_+_|erhält seine eigene Kopie der Nachricht.

Ich habe in diesem Projekt zwei Stream-Listener verwendet – einen zum Konsumieren von einfachen String-Nachrichten und einen anderen für Nachrichten mit einem komplexen Typ, ChatMessage. Der Producer sendet Nachrichten, die mit einem Header-Typ mit einem logischen Wert angehängt sind, und der Consumer kann Bedingungen anwenden, um Nachrichten mit |_+_| zu filtern.

Das komplette Projekt findet ihr Hier .

prüfe ob numpy installiert ist

#kafka #spring #microservice