728x90
카프카 스트림즈로 필터링된 토픽을 컨슈밍하는 컨슈머를 만들고, 그 컨슈머에서 받은 데이터를 DB로 insert하도록 하는 기능을 구현했다.
카프카 커넥트로 하려고 했으나, 번거로워서 직접 컨슈머 코드 안에서 dao를 사용하여 db에 접근하여 insert 하도록 했다.
전체 코드이다.
프로듀서
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerTest {
/**
* SASL 인증을 위한 JAAS Template
*/
private static final String JAAS_TEMPLATE = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\\"admin\\" password=\\"admin-secret\\";";
public static Object test() {
// 1. 설정 세팅
Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "123.143.255.255:9092, 123.143.255.255:9093, 123.143.255.255:9094"); // kafka host 및 server 설정
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정
// 1-1) SASL 설정 추가 (프로듀서 생성 전에 추가해야 함)
prop.put("security.protocol", "SASL_PLAINTEXT");
prop.put("sasl.mechanism", "SCRAM-SHA-256");
String jaasConfig = String.format(JAAS_TEMPLATE, "admin1", "admin1");
prop.put("sasl.jaas.config", jaasConfig);
// 2. producer 생성
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
// 3. message 전달
// 시작 시간 기록
long startTime = System.nanoTime();
String id = "id";
String brand = "brand";
for (int i = 0; i < 10; i++) {
String jsonValue = "{ \\"carId\\": \\"" + id+ "-"+ i + "\\", \\"carData\\": { \\"cleanData\\": true, \\"locationData\\": 123, \\"carBrand\\": \\"" + brand + i + "\\" } }";
producer.send(new ProducerRecord<>("stream-all-test4", "car-test4-"+ i, jsonValue)); // ⭐ 데이터 보낼 토픽 이름
}
// 종료 시간 기록
long endTime = System.nanoTime();
// 실행 시간 계산 및 출력 (단위: milliseconds)
long duration = (endTime - startTime) / 1000000; // 나노초를 밀리초로 변환
System.out.println("🍀 프로듀서 실행 시간: " + duration + "ms");
// 종료
producer.flush();
producer.close();
return 1;
}
}
디비용 토픽으로 쏴주는 스트림즈
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import java.util.Properties;
/**
* [ 스트림즈 2 ]
* stream-all-test4 토픽에서 정제 후 stream-car-test4-db 토픽으로 보내는 스트림즈
*/
@Bean
public KafkaStreams kafkaStreamsForDB() {
// 1. 설정 세팅
Properties prop = new Properties();
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "123.143.255.255:9092"); // kafka host 및 server 설정
prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-test4-db"); // ⭐ application id
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 기본값이 none 인듯..?
// 1-1) SASL 설정 추가 (프로듀서 생성 전에 추가해야 함)
prop.put("security.protocol", "SASL_PLAINTEXT");
prop.put("sasl.mechanism", "SCRAM-SHA-256");
String jaasConfig = String.format(JAAS_TEMPLATE, "admin", "admin-secret");
prop.put("sasl.jaas.config", jaasConfig);
// 2. 스트림즈 빌더 생성
final StreamsBuilder streamsBuilder = new StreamsBuilder();
// 2-1) 소스 프로세서 생성
KTable<String, String> streamLog = streamsBuilder.table("stream-all-test4"); // "stream-all-test" 토픽에서 KTable 생성 // ⭐ 데이터 꺼낼 토픽 이름
// 2-2) 스트림 프로세서 생성
// 시작 시간 기록
long startTime = System.nanoTime();
streamLog.toStream().map((key, value) -> { // KTable을 KStream으로 변환하고, key-value 맵핑
try {
ObjectMapper objectMapper = new ObjectMapper();
// JSON 문자열을 StreamsTestDto 객체로 변환
StreamsTestDto dto = objectMapper.readValue(value, StreamsTestDto.class);
// dto의 name 필드를 key-value 쌍으로 반환
// return KeyValue.pair("name", dto.getName());
return KeyValue.pair(dto.getCarId(), dto.getCarData().getCarBrand()); //Key: carId , Value: 브랜드
} catch (Exception e) {
e.printStackTrace();
return KeyValue.pair(-1, "error"); // 예외 발생 시 key -1, value "error" 반환
}
})
// 2-3) 싱크 프로세서 생성: 변환된 데이터를 "stream-name-test" 토픽으로 전송
.to("stream-car-test4-db"); // ⭐ 데이터 보낼 토픽 이름 🎯
// 종료 시간 기록
long endTime = System.nanoTime();
// 실행 시간 계산 및 출력 (단위: milliseconds)
long duration = (endTime - startTime) / 1000000; // 나노초를 밀리초로 변환
System.out.println("🍀 스트림즈 필터링 실행 시간: " + duration + "ms");
// 3. KafkaStreams 객체 생성 및 시작
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), prop);
kafkaStreams.start();
return kafkaStreams;
}
}
컨슈머
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class KafkaConsumerTest {
private long startTime; // 전체 처리 시작 시간
private int messageCount; // 처리한 메시지 수
private final KafkaTestDao kafkaTestDao;
@KafkaListener(topics="stream-car-test4-db", groupId="consumer-test4-db") //⭐ 그룹 id
public void testForDB(String param) {
if (messageCount == 0) {
// 첫 번째 메시지일 때 전체 처리 시작 시간 기록
startTime = System.currentTimeMillis();
}
System.out.println("🍎 stream-car-test4-db 토픽을 바라보는 컨슈머가 가져온 메시지 🍎 " + param);
kafkaTestDao.regCarInfo(param); // 🎯🎯🎯🎯🎯🎯🎯 db 에 저장!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
messageCount++;
// 모든 메시지를 처리한 후에 실행 시간 출력
if (messageCount >= 9) {
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
System.out.println("🍀 messageCount - db: " + messageCount);
System.out.println("🍀 전체 메시지 컨슈밍 시간 - db: " + duration + "ms");
}
}
}
KafkaTestDao
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface KafkaTestDao {
// stream-car-test4-db 토픽 컨슈머에 insert
Integer regCarInfo(String param);
}
kafkaTest.xml
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="kr.co.common.kafka.KafkaTestDao">
<insert id="regCarInfo" parameterType="java.lang.String">
INSERT INTO TB_KAFKA_TEST (
BRAND_NAME
) VALUES (
#{param}
)
</insert>
</mapper>
포스트맨으로 프로듀서 api를 호출하여 10건을 보냈다. 스트림즈로 브랜드만 필터링되어 db에 10건이 잘 저장이 되는 것을 확인할 수 있었다!!! 😊
'Infra > Kafka, MQTT' 카테고리의 다른 글
[MQTT,Kafka] EMQX 웹 소켓 사용하여 대시보드에 실시간 데이터 보여주기 (Throughput 테스트) (0) | 2024.07.11 |
---|---|
[Kafka] broker scale out/ scale in (카프카 브로커 스케일 아웃/스케일 인) (1) | 2024.07.01 |
[Kafka] 카프카 브로커 하나씩 다운시킨 후 메시지 전송 테스트 (고가용성 테스트, replication test) (0) | 2024.06.10 |
[Kafka] 스프링 부트에 Kafka Streams 적용하기 (1) | 2024.06.07 |
[Kafka] 이미 생성된 토픽의 replication factor 변경하기 (0) | 2024.05.27 |