카프카 스트림즈(Kafka Streams)란?
카프카 스트림즈란 연속적인 이벤트 스트림이 들어올때마다 그때그때 처리하고 분석하여 의미있는 정보를 추출하고 실시간으로 작업을 처리하는 자바 라이브러리 기능이다.
이전에는 카프카 컨슈머 및 프로듀서 조합으로 필터링하여 다시 토픽에 넘겨주는 작업을 해주었지만 카프카 스트림즈는 매우 간단한 스트림즈 DSL 이란 기능으로 강력한 필터링 기능을 제공한다.
아래 그림으로 비교를 해보자.
기존의 필터링 방식
카프카 스트림즈를 이용한 필터링 방식
또한 내장 DB 인 Rocks DB를 이용하여 key-value Store 기능도 제공한다.
2. 카프카 스트림즈 예제 코드
간단한 스프링부트 코드로 스트림즈 예제를 실습해보았다.
1. 스트림즈 DSL 라이브러리 추가 (의존성추가)
카프카 스트림즈, 그리고 토픽에서 뽑은 json value를 역직렬화 하기 위해 jackson binder 도 추가해 주었다.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version> <!-- Ensure this version matches your Kafka server version -->
</dependency>
<!-- Jackson Databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.2</version>
</dependency>
<!-- Jackson Core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.13.2</version>
</dependency>
<!-- Jackson Annotations -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.13.2</version>
</dependency>
2. application.yml 에 스트림즈 속성 추가
spring:
kafka:
consumer:
bootstrap-servers: 123.143.255.255:9092
group-id: auto-driving-service
properties:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: SCRAM-SHA-512
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
streams: // 🎯 여기부터 아래 3줄 추가!!!!
bootstrap-servers: 123.143.255.255:9092
application-id: streams-test
3. 예시 코드 1
value 값의 길이가 3보다 크면 필터링 해서 다른 토픽으로 넣어주는 엄청 간단한 예시 코드로 실습을 해보겠다.
1) 프로듀서 코드
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerTest {
/**
* SASL 인증을 위한 JAAS Template
*/
private static final String JAAS_TEMPLATE = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\\"admin\\" password=\\"admin-secret\\";";
public static Object test() {
// 1. 설정 세팅
Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "123.143.255.255:9092"); // kafka host 및 server 설정
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정
// 1-1) SASL 설정 추가 (프로듀서 생성 전에 추가해야 함)
prop.put("security.protocol", "SASL_PLAINTEXT");
prop.put("sasl.mechanism", "SCRAM-SHA-256");
String jaasConfig = String.format(JAAS_TEMPLATE, "admin", "admin-secret");
prop.put("sasl.jaas.config", jaasConfig);
// 2. producer 생성
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
// 3. message 전달
producer.send(new ProducerRecord<>("directTest", "key1", "1"));
producer.send(new ProducerRecord<>("directTest", "key2", "12"));
producer.send(new ProducerRecord<>("directTest", "key3", "123"));
producer.send(new ProducerRecord<>("directTest", "key4", "1234")); // 이거랑
producer.send(new ProducerRecord<>("directTest", "key5", "12345")); // 이것만 스트림즈로 추출해서 streams-test로 보내고 싶다.
// 종료
producer.flush();
producer.close();
return 1;
}
}
2) 스트림즈 코드
예제 코드이기 때문에,
1개의 필터링 조건만 넣어보려고 한다.
value 값이 3보다 큰 메시지만 뽑아서 streams-test토픽으로 넣어볼 것이다.
아래와 같이 간단하게 작성하면 되는데, 이런 코드를 스트림즈DSL 이라고 부른다.
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import java.util.Properties;
@Configuration
@EnableKafka
public class StreamsFilter {
/** SASL 인증을 위한 JAAS Template */
private static final String JAAS_TEMPLATE = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\\"admin\\" password=\\"admin-secret\\";";
@Bean
public KafkaStreams kafkaStreams() {
// 1. 설정 세팅
Properties prop = new Properties();
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "123.143.255.255:9092"); // kafka host 및 server 설정
prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-test");
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 1-1) SASL 설정 추가 (프로듀서 생성 전에 추가해야 함)
prop.put("security.protocol", "SASL_PLAINTEXT");
prop.put("sasl.mechanism", "SCRAM-SHA-256");
String jaasConfig = String.format(JAAS_TEMPLATE, "admin", "admin-secret");
prop.put("sasl.jaas.config", jaasConfig);
// 2. 스트림즈 빌더 생성
final StreamsBuilder streamsBuilder = new StreamsBuilder();
// 2-1) 소스 프로세서 생성
KStream<String, String> streamLog = streamsBuilder.stream("directTest");
// 2-2) 스트림 프로세서 생성
streamLog.filter((key, value) -> value.length() > 3)
// 2-3) 싱크 프로세서 생성
.to("streams-test");
// 3. 스트림즈 생성
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), prop);
kafkaStreams.start();
return kafkaStreams;
}
}
프로듀서로 메시지를 보내고, kafkaUI를 보니
directTest 토픽에는 5개의 메시지가 잘 전송이 되는 것을 확인했다.
그리고 streams-test 토픽에는 내가 스트림즈를 생성해서 3보다 큰 메시지만 필터링해서 넣은 2개의 메시지가 잘 전송이 된 것을 볼 수 있다!!! 😍
4. 예시 코드 2
이제는 조금 복잡하게 value로 아래 형태의 json을 넣어줄 것이다. 여기서 name 만 뽑아서 새로운 stream-name-test 토픽에 넣어주려고 한다.
{
"name" : "hj",
"age" : 20,
"phone" : "01012341234",
"job" : "developer"
}
1) 프로듀서 코드
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerTest {
/**
* SASL 인증을 위한 JAAS Template
*/
private static final String JAAS_TEMPLATE = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin-secret\";";
public static Object test() {
// 1. 설정 세팅
Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "123.143.255.255:9092, 123.143.255.255:9093, 123.143.255.255:9094"); // kafka host 및 server 설정
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정
// 1-1) SASL 설정 추가 (프로듀서 생성 전에 추가해야 함)
prop.put("security.protocol", "SASL_PLAINTEXT");
prop.put("sasl.mechanism", "SCRAM-SHA-256");
String jaasConfig = String.format(JAAS_TEMPLATE, "admin", "admin-secret");
prop.put("sasl.jaas.config", jaasConfig);
// 2. producer 생성
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
// 3. message 전달
String jsonValue = "{\"name\":\"hj\", \"age\":20, \"phone\":\"01012341234\", \"job\":\"developer\"}";
producer.send(new ProducerRecord<>("stream-all-test", "json-test-key", jsonValue));
// 종료
producer.flush();
producer.close();
return 1;
}
}
2) json 과 매핑할 Dto 생성
import lombok.Data;
@Data
public class StreamsTestDto {
private String name;
private Long age;
private String phone;
private String job;
}
3) 스트림즈 코드
objectMapper 객체를 사용하여 Dto로 변환해주고, 변환한 변수를 value로 넣어줄 것이다.
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import java.util.Properties;
@Configuration
@EnableKafka
public class StreamsFilter {
/** SASL 인증을 위한 JAAS Template */
private static final String JAAS_TEMPLATE = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin-secret\";";
@Bean
public KafkaStreams kafkaStreams() {
// 1. 설정 세팅
Properties prop = new Properties();
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "123.143.255.255:9092"); // kafka host 및 server 설정
prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-test");
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 1-1) SASL 설정 추가 (프로듀서 생성 전에 추가해야 함)
prop.put("security.protocol", "SASL_PLAINTEXT");
prop.put("sasl.mechanism", "SCRAM-SHA-256");
String jaasConfig = String.format(JAAS_TEMPLATE, "admin", "admin-secret");
prop.put("sasl.jaas.config", jaasConfig);
// 2. 스트림즈 빌더 생성
final StreamsBuilder streamsBuilder = new StreamsBuilder();
// 2-1) 소스 프로세서 생성
KStream<String, String> streamLog = streamsBuilder.stream("stream-all-test");
// 2-2) 스트림 프로세서 생성
streamLog.map((key, value) -> {
try {
ObjectMapper objectMapper = new ObjectMapper();
// JSON 문자열을 StreamsTestDto 객체로 변환
StreamsTestDto dto = objectMapper.readValue(value, StreamsTestDto.class);
// dto의 name 필드를 key-value 쌍으로 반환
return KeyValue.pair("name", dto.getName());
} catch (Exception e) {
return KeyValue.pair("name", null); // 예외 발생 시 key는 "name", value는 null
}
})
// 2-3) 싱크 프로세서 생성
.to("stream-name-test");
// 3. 스트림즈 생성
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), prop);
kafkaStreams.start();
return kafkaStreams;
}
}
프로듀서로 메시지를 보내고, kafkaUI를 보니 json 이 value 값으로 잘 들어왔다.
name 만 뽑아서 stream-name-test 토픽으로 새로 넣어준 것이 잘 확인되었다.
이제 카프카 스트림즈를 사용해서 해당 토픽의 일부분만 필터링 해서 다른 토픽으로 메시지를 전송할 수 있는 것을 확인했다.
또한 카프카 스트림즈를 사용하기 전에는 외부 key-value store인 redis 같은 저장소를 사용해야만 했지만
카프카 스트림즈는 내장 key-value store를 제공하기 때문에 간편하게 원하는 메시지만 필터 저장하고 select 문 등을 이용해서 쉽게 꺼내올 수도 있다!!
메시지를 쉽게 필터링 해서 토픽에 넣고, 쉽게 가져오는 매우 강력한 카프카 스트림즈 기능을 알아보았다.
더 상세한 기능은 아래 공식문서를 참고하면 된다.
출처
https://kafka.apache.org/37/documentation/streams/quickstart
https://velog.io/@bbangi/Kafka-Streams
'Infra > Kafka, MQTT' 카테고리의 다른 글
[Kafka] 카프카 컨슈머에서 받은 데이터 MariaDB로 전송하기 (0) | 2024.06.18 |
---|---|
[Kafka] 카프카 브로커 하나씩 다운시킨 후 메시지 전송 테스트 (고가용성 테스트, replication test) (0) | 2024.06.10 |
[Kafka] 이미 생성된 토픽의 replication factor 변경하기 (0) | 2024.05.27 |
[Grafana] Springboot 서버가 꺼지면 Slack 으로 알림받기 (0) | 2024.05.16 |
로컬 VM에 Docker 및 Kafka 설치 (docker-compose.yml작성) (0) | 2024.04.22 |