Docker compose로 kafka 설치하기
Docker Compose를 사용하여 application.yml 파일을 통해 Kafka 컨테이너를 생성합니다.
version: '3.8'
services:
zookeeper:
image: zookeeper:3.9.1
platform: linux/amd64
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: wurstmeister/kafka:latest
platform: linux/amd64
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
kafka-ui:
image: provectuslabs/kafka-ui:latest
platform: linux/amd64
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
KAFKA_CLUSTERS_0_READONLY: "false"
위의 Docker Compose 파일(docker-compose.yml)은 zookeeper, kafka, kafka-ui 세 개의 서비스를 설정합니다. 이 파일을 사용하여 Kafka 컨테이너를 비롯한 관련 서비스들을 한꺼번에 생성할 수 있습니다.
- Zookeeper 서비스
- zookeeper:3.9.1 이미지를 사용하여 Zookeeper 컨테이너를 생성합니다.
- 포트 2181을 호스트와 매핑합니다.
- 환경 변수로 ZOOKEEPER_CLIENT_PORT와 ZOOKEEPER_TICK_TIME을 설정합니다.
- Kafka 서비스
- wurstmeister/kafka:latest 이미지를 사용하여 Kafka 컨테이너를 생성합니다.
- 포트 9092를 호스트와 매핑합니다.
- Kafka의 설정을 위해 다양한 환경 변수를 설정합니다:
- KAFKA_ADVERTISED_LISTENERS, KAFKA_LISTENER_SECURITY_PROTOCOL_MAP, KAFKA_LISTENERS, KAFKA_INTER_BROKER_LISTENER_NAME, KAFKA_ZOOKEEPER_CONNECT.
- /var/run/docker.sock을 볼륨으로 마운트 하여 Docker와 상호작용합니다.
- Kafka-UI 서비스
- provectuslabs/kafka-ui:latest 이미지를 사용하여 Kafka 관리 UI를 제공하는 컨테이너를 생성합니다.
- 포트 8080을 호스트와 매핑합니다.
- Kafka 클러스터와의 연결을 위한 환경 변수를 설정합니다.
localhost:8080에 접속하면 kafka UI 에 접속할 수 있습니다.
Producer Application
build.gradle
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
application.properties
spring.application.name=producer
server.port=8090
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
ProducerApplicationKafkaConfig
ProducerApplicationKafkaConfig클래스는 Kafka 프로듀서 설정을 위한 Spring 설정 클래스입니다.
Spring Kafka를 사용하여 Kafka 메시지를 생성하고 전송하는 기능을 제공합니다. 클래스는 두 가지 주요 빈(Beans)을 정의하고 있습니다.
- @Configuration
- 이 클래스가 Spring 설정 클래스로 사용된다는 것을 명시합니다. Spring이 이 클래스를 읽어 애플리케이션 컨텍스트에 빈을 등록합니다.
- ProducerFactory<String, String> producerFactory()
- Kafka 프로듀서를 생성하기 위한 팩토리 빈을 정의합니다.
- configProps 맵을 통해 Kafka 프로듀서의 설정을 지정합니다.
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG: Kafka 서버의 주소를 지정합니다 (localhost:9092).
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG: 메시지 키를 직렬화할 클래스를 지정합니다 (StringSerializer).
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG: 메시지 값을 직렬화할 클래스를 지정합니다 (StringSerializer).
- DefaultKafkaProducerFactory는 이 설정을 기반으로 Kafka 프로듀서를 생성합니다.
- KafkaTemplate<String, String> kafkaTemplate():
- 메시지를 Kafka 토픽으로 전송하는 데 사용되는 KafkaTemplate 빈을 정의합니다.
- 이 KafkaTemplate은 앞서 정의한 producerFactory()를 통해 생성된 ProducerFactory를 사용합니다.
KafkaTemplate❓
Spring Kafka 애플리케이션에서 Kafka 메시지를 효율적으로 전송하기 위한 핵심 클래스입니다.
메시지 전송과 관련된 여러 작업을 추상화하여 개발자가 쉽게 사용할 수 있도록 도와주며, 동기/비동기 전송, 키-값 메시지 전송, 트랜잭션 등 다양한 기능을 제공합니다.
이를 통해 Kafka와의 상호작용을 단순화하고, 개발 생산성을 높일 수 있습니다.
ProducerController
ProducerController 클래스는 Spring Boot 애플리케이션에서 Kafka로 메시지를 전송하기 위한 REST API의 컨트롤러 클래스입니다.
topic, key, message라는 세 가지 파라미터를 받아서, ProducerService의 sendMessage 메서드를 호출하여 해당 메시지를 지정된 Kafka 토픽으로 전송합니다.
메서드의 리턴 값은 단순한 String으로, "Message sent to Kafka topic"이라는 응답 메시지를 클라이언트에 반환합니다.
ProducerService
ProducerService 클래스는 Spring Kafka를 사용하여 특정 Kafka 토픽에 메시지를 전송하는 서비스입니다.
이 클래스는 주어진 토픽과 키, 메시지를 받아서 Kafka에 전송하는 역할을 합니다. 이 서비스는 ProducerController와 함께 동작하며, 컨트롤러에서 요청을 받으면 이 서비스가 실제로 Kafka에 메시지를 전송합니다.
- @Service
- 이 어노테이션은 이 클래스가 Spring의 서비스 컴포넌트임을 나타냅니다. Spring의 컴포넌트 스캔 메커니즘에 의해 이 클래스는 자동으로 Spring 컨테이너에 빈으로 등록됩니다.
- @RequiredArgsConstructor
- Lombok에서 제공하는 어노테이션으로, final로 선언된 필드를 인자로 받는 생성자를 자동으로 생성합니다.
- 본 글에서는 경우 KafkaTemplate<String, String> 타입의 kafkaTemplate 필드가 생성자를 통해 주입됩니다.
- private final KafkaTemplate<String, String> kafkaTemplate
- KafkaTemplate은 Kafka 메시지를 전송하는 데 사용되는 템플릿 클래스입니다. 이 서비스 클래스는 KafkaTemplate을 통해 Kafka에 메시지를 전송합니다.
- sendMessage 메서드
- 이 메서드는 실제로 메시지를 Kafka로 전송하는 역할을 합니다.
- 매개변수
- topic: 메시지를 보낼 Kafka 토픽의 이름.
- key: 메시지의 키. 메시지 키는 Kafka가 메시지를 특정 파티션에 할당하는 데 사용됩니다.
- message: 전송할 메시지 내용.
- 기능
- 메서드는 0부터 9까지 총 10번의 메시지를 Kafka로 전송합니다.
- 각 메시지는 전송될 때 message 뒤에 인덱스 값을 붙여서 전송됩니다 (message + " " + i).
- 예를 들어, message가 "Hello"라면, "Hello 0", "Hello 1", ... "Hello 9"의 메시지가 순차적으로 전송됩니다.
Consumer Application
build.gradle
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
application.properties
spring.application.name=consumer
server.port=8091
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
ConsumerApplicationKafkaConfig
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
// 이 클래스는 Kafka 컨슈머 설정을 위한 Spring 설정 클래스입니다.
@EnableKafka // Kafka 리스너를 활성화하는 어노테이션입니다.
@Configuration // Spring 설정 클래스로 선언하는 어노테이션입니다.
public class ConsumerApplicationKafkaConfig {
// Kafka 컨슈머 팩토리를 생성하는 빈을 정의합니다.
// ConsumerFactory는 Kafka 컨슈머 인스턴스를 생성하는 데 사용됩니다.
// 각 컨슈머는 이 팩토리를 통해 생성된 설정을 기반으로 작동합니다.
@Bean
public ConsumerFactory<String, String> consumerFactory() {
// 컨슈머 팩토리 설정을 위한 맵을 생성합니다.
Map<String, Object> configProps = new HashMap<>();
// Kafka 브로커의 주소를 설정합니다.
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 메시지 키의 디시리얼라이저 클래스를 설정합니다.
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 메시지 값의 디시리얼라이저 클래스를 설정합니다.
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 설정된 프로퍼티로 DefaultKafkaConsumerFactory를 생성하여 반환합니다.
return new DefaultKafkaConsumerFactory<>(configProps);
}
// Kafka 리스너 컨테이너 팩토리를 생성하는 빈을 정의합니다.
// ConcurrentKafkaListenerContainerFactory는 Kafka 메시지를 비동기적으로 수신하는 리스너 컨테이너를 생성하는 데 사용됩니다.
// 이 팩토리는 @KafkaListener 어노테이션이 붙은 메서드들을 실행할 컨테이너를 제공합니다.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
// ConcurrentKafkaListenerContainerFactory를 생성합니다.
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
// 컨슈머 팩토리를 리스너 컨테이너 팩토리에 설정합니다.
factory.setConsumerFactory(consumerFactory());
// 설정된 리스너 컨테이너 팩토리를 반환합니다.
return factory;
}
}
ConsumerService
ConsumerService 클래스는 Kafka 메시지를 소비(consume)하는 역할을 합니다. 각 메서드는 특정 Kafka 토픽에서 메시지를 수신하고 처리하기 위한 리스너로 설정되어 있으며, Spring Kafka를 사용하여 구현되었습니다.
@KafkaListener
- 이 어노테이션은 해당 메서드를 Kafka 리스너로 지정하여, 특정 Kafka 토픽으로부터 메시지를 수신하는 역할을 합니다. groupId와 topics 속성을 통해 어느 그룹으로 메시지를 소비할지와 어떤 토픽을 들을지를 설정합니다.
- 컨슈머 그룹
- 동일한 groupId를 사용하는 리스너들은 하나의 컨슈머 그룹으로 묶이며,
같은 메시지를 공유하지 않고 분배받습니다.
- 동일한 groupId를 사용하는 리스너들은 하나의 컨슈머 그룹으로 묶이며,
- 토픽
- 각 리스너는 지정된 Kafka 토픽에서 메시지를 수신하며, 동일한 토픽을 서로 다른 컨슈머 그룹이 소비할 경우, 각 그룹은 동일한 메시지를 받을 수 있습니다.
Kafka 동작 확인
목표 🏁
- 동일한 Group ID의 리스너들은 메시지를 분배받습니다.
- 서로 다른 Consumer Group은 동일한 토픽의 메시지를 각각 수신합니다.
Topics 섹션
Consumer 컨테이너가 토픽(Topic 1~5)을 리슨하고 있으며, 입력한 정보에 따라 토픽이 생성된 것을 확인할 수 있습니다.
Topics 섹션
입력한 정보(group_a~d)에 따라 컨슈머들이 생성된 것을 확인할 수 있습니다.
http://localhost:8090/send?topic=test-topic&key=key-1&message=hi로 요청을 보내면, test-topic이 생성된 것을 확인할 수 있습니다.
Overview
Overview에서 파티션 0에 10개의 메시지가 들어왔으며, 다음 메시지의 오프셋이 10임을 확인할 수 있습니다. 동일한 키로 인해 메시지가 같은 파티션에 저장되었습니다.
Messages
메시지를 확인해보면, for문을 통해 생성된 'hi' 메시지들이 생성된 것을 확인할 수 있습니다.
동일한 Group ID & 동일한 Topic
이제 컨슈머 서비스에서 토픽 리스너를 지정한 topic1에 메시지를 요청해보겠습니다.
현재 topics는 동일한 Group ID("topic1")로 설정되어 있습니다.
요청 URL: http://localhost:8090/send?topic=topic1&key=key-1&message=hi
Apache Kafka UI에서 확인해보면, topic1에 발행된 10개의 메시지가 정상적으로 들어간 것을 확인할 수 있습니다.
topic1의 Consumer 섹션에서 지정한 group A와 group B가 존재하는 것을 확인할 수 있습니다. 이 두 그룹이 메시지를 받아서 처리를 완료했음을 의미합니다.
Consumer의 로그를 확인해보면, Group A와 Group B가 시간차가 있을 수 있지만 동일한 메시지를 수신하고 있는 것을 확인할 수 있습니다.
이를 통해, 동일한 토픽에 대해 서로 다른 그룹이 각자의 메시지를 독립적으로 수신한다는 것을 확인할 수 있습니다
동일한 Group ID & 서로 다른 Topic
이제 컨슈머 서비스에서 토픽 리스너를 지정한 topic2에 메시지를 요청해보겠습니다.
현재 topics는 동일한 Group ID("topic2")와 서로 다른 Topic ID로 설정되어 있습니다.
요청 URL: http://localhost:8090/send?topic=topic1&key=key-1&message=hi
Apache Kafka UI에서 확인해보면, topic2에 발행된 10개의 메시지가 정상적으로 들어간 것을 확인할 수 있습니다.
Consumers 섹션과 Consumer 로그를 보면, 같은 그룹에 속하더라도 메시지는 특정 토픽을 대상으로 발행되기 때문에, 위와 같이 Group C에 있는 Consumer 리스너만 동작하는 것을 확인할 수 있습니다.
- 동일한 Group ID의 리스너들은 메시지를 분배받아 처리하며, 서로 다른 Consumer Group은 동일한 토픽의 메시지를 독립적으로 수신합니다.
- 토픽을 대상으로 발행된 메시지는 지정된 Consumer Group에 의해 처리되며, 동일한 토픽을 사용하는 다른 그룹들도 독립적으로 메시지를 수신할 수 있습니다.
'Kafka' 카테고리의 다른 글
[Kafka] Apache Kafka란 무엇일까❓ (0) | 2024.08.16 |
---|