Nellie's Blog

[Kafka] 카프카 컨슈머에서 받은 데이터 MariaDB로 전송하기 본문

Infra/Kafka, MQTT

[Kafka] 카프카 컨슈머에서 받은 데이터 MariaDB로 전송하기

Nellie Kim 2024. 6. 18. 14:24
728x90

카프카 스트림즈로 필터링된 토픽을 컨슈밍하는 컨슈머를 만들고, 그 컨슈머에서 받은 데이터를 DB로 insert하도록 하는 기능을 구현했다. 

 

카프카 커넥트로 하려고 했으나, 번거로워서 직접 컨슈머 코드 안에서 dao를 사용하여 db에 접근하여 insert 하도록 했다. 

 

전체 코드이다. 

 

프로듀서

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerTest {
    /**
     * SASL 인증을 위한 JAAS Template
     */
    private static final String JAAS_TEMPLATE = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\\"admin\\" password=\\"admin-secret\\";";

    public static Object test() {
        // 1. 설정 세팅
        Properties prop = new Properties();
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "123.143.255.255:9092, 123.143.255.255:9093, 123.143.255.255:9094"); // kafka host 및 server 설정
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");   // serialize 설정
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정

        // 1-1) SASL 설정 추가 (프로듀서 생성 전에 추가해야 함)
        prop.put("security.protocol", "SASL_PLAINTEXT");
        prop.put("sasl.mechanism", "SCRAM-SHA-256");
        String jaasConfig = String.format(JAAS_TEMPLATE, "admin1", "admin1");
        prop.put("sasl.jaas.config", jaasConfig);

        // 2. producer 생성
        KafkaProducer<String, String> producer = new KafkaProducer<>(prop);

        // 3. message 전달

        // 시작 시간 기록
        long startTime = System.nanoTime();

        String id = "id";
        String brand = "brand";
        for (int i = 0; i < 10; i++) {
            String jsonValue = "{ \\"carId\\": \\"" + id+ "-"+ i + "\\", \\"carData\\": { \\"cleanData\\": true, \\"locationData\\": 123, \\"carBrand\\": \\"" + brand + i + "\\" } }";
            producer.send(new ProducerRecord<>("stream-all-test4", "car-test4-"+ i, jsonValue)); // ⭐ 데이터 보낼 토픽 이름
        }
        // 종료 시간 기록
        long endTime = System.nanoTime();

        // 실행 시간 계산 및 출력 (단위: milliseconds)
        long duration = (endTime - startTime) / 1000000;  // 나노초를 밀리초로 변환
        System.out.println("🍀 프로듀서 실행 시간: " + duration + "ms");

        // 종료
        producer.flush();
        producer.close();
        return 1;
    }
}

 

 

디비용 토픽으로 쏴주는 스트림즈

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;

import java.util.Properties;
    /**
     * [ 스트림즈 2 ]
     * stream-all-test4 토픽에서 정제 후 stream-car-test4-db 토픽으로 보내는 스트림즈
     */
    @Bean
    public KafkaStreams kafkaStreamsForDB() {
        // 1. 설정 세팅
        Properties prop = new Properties();
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "123.143.255.255:9092"); // kafka host 및 server 설정
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-test4-db"); // ⭐ application id
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 기본값이 none 인듯..?

        // 1-1) SASL 설정 추가 (프로듀서 생성 전에 추가해야 함)
        prop.put("security.protocol", "SASL_PLAINTEXT");
        prop.put("sasl.mechanism", "SCRAM-SHA-256");
        String jaasConfig = String.format(JAAS_TEMPLATE, "admin", "admin-secret");
        prop.put("sasl.jaas.config", jaasConfig);

        // 2. 스트림즈 빌더 생성
        final StreamsBuilder streamsBuilder = new StreamsBuilder();

        // 2-1) 소스 프로세서 생성
        KTable<String, String> streamLog = streamsBuilder.table("stream-all-test4"); // "stream-all-test" 토픽에서 KTable 생성 // ⭐ 데이터 꺼낼 토픽 이름

        // 2-2) 스트림 프로세서 생성

        // 시작 시간 기록
        long startTime = System.nanoTime();

        streamLog.toStream().map((key, value) -> { // KTable을 KStream으로 변환하고, key-value 맵핑
                    try {
                        ObjectMapper objectMapper = new ObjectMapper();

                        // JSON 문자열을 StreamsTestDto 객체로 변환
                        StreamsTestDto dto = objectMapper.readValue(value, StreamsTestDto.class);

                        // dto의 name 필드를 key-value 쌍으로 반환
//                        return KeyValue.pair("name", dto.getName());
                        return KeyValue.pair(dto.getCarId(), dto.getCarData().getCarBrand()); //Key: carId , Value: 브랜드

                    } catch (Exception e) {
                        e.printStackTrace();
                        return KeyValue.pair(-1, "error"); // 예외 발생 시 key -1, value "error" 반환
                    }
                })
                // 2-3) 싱크 프로세서 생성: 변환된 데이터를 "stream-name-test" 토픽으로 전송
                .to("stream-car-test4-db"); // ⭐ 데이터 보낼 토픽 이름 🎯

        // 종료 시간 기록
        long endTime = System.nanoTime();

        // 실행 시간 계산 및 출력 (단위: milliseconds)
        long duration = (endTime - startTime) / 1000000;  // 나노초를 밀리초로 변환
        System.out.println("🍀 스트림즈 필터링 실행 시간: " + duration + "ms");

        // 3. KafkaStreams 객체 생성 및 시작
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), prop);
        kafkaStreams.start();

        return kafkaStreams;
    }
}

 

 

컨슈머

import lombok.RequiredArgsConstructor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class KafkaConsumerTest {

    private long startTime;  // 전체 처리 시작 시간
    private int messageCount;  // 처리한 메시지 수
    private final KafkaTestDao kafkaTestDao;

    @KafkaListener(topics="stream-car-test4-db", groupId="consumer-test4-db") //⭐ 그룹 id
    public void testForDB(String param) {
        if (messageCount == 0) {
            // 첫 번째 메시지일 때 전체 처리 시작 시간 기록
            startTime = System.currentTimeMillis();
        }

        System.out.println("🍎 stream-car-test4-db 토픽을 바라보는 컨슈머가 가져온 메시지 🍎 " + param);
        kafkaTestDao.regCarInfo(param); // 🎯🎯🎯🎯🎯🎯🎯 db 에 저장!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

        messageCount++;

        // 모든 메시지를 처리한 후에 실행 시간 출력
        if (messageCount >= 9) {
            long endTime = System.currentTimeMillis();
            long duration = endTime - startTime;
            System.out.println("🍀 messageCount - db: " + messageCount);
            System.out.println("🍀 전체 메시지 컨슈밍 시간 - db: " + duration + "ms");
        }
    }
}

 

 

KafkaTestDao

import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface KafkaTestDao {

    // stream-car-test4-db 토픽 컨슈머에 insert
    Integer regCarInfo(String param);
}

 

 

kafkaTest.xml

<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="kr.co.common.kafka.KafkaTestDao">
	<insert id="regCarInfo" parameterType="java.lang.String">
		INSERT INTO TB_KAFKA_TEST (
		BRAND_NAME
		) VALUES (
		#{param}
		)
	</insert>
</mapper>

 

 

포스트맨으로 프로듀서 api를 호출하여 10건을 보냈다. 스트림즈로 브랜드만 필터링되어 db에 10건이 잘 저장이 되는 것을 확인할 수 있었다!!! 😊