Infra/Kafka, MQTT

[MQTT,Kafka] EMQX 웹 소켓 사용하여 대시보드에 실시간 데이터 보여주기 (Throughput 테스트)

Nellie Kim 2024. 7. 11. 13:15
728x90

차량의 실시간 위치 데이터 (위도, 경도) 를 MQTT를 사용한 웹소켓으로 대시보드에 실시간으로 변하는 데이터를 보여주었다. 카프카 컨슈머에서 받은 데이터를 바로 MQTT로 메시지 전송하였다. 

MQTT 프로토콜을 지원하는 메시지 브로커인 EMQX 오픈 소스 브로커를 사용하였다.

 

실시간으로 데이터를 보여주는 것이 목적이었고, 그 다음으로는 초당 몇건을 보여줄 수 있는지를 테스트 했다.  

 

  • 스프링 부트 버전 : 2.5.4
  • 자바 버전 : 1.8
  • 카프카 버전 : 3.7.0
  • MQTT 버전 : 1.2.5

 

pom.xml

의존성을 아래와 같이 받아준다. 

<!-- Eclipse Paho MQTT Client -->
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

 

application.yml

MQTT 웹소켓을 아래와 같이 설정한다. 

mqtt:
  broker:
    url: ws://192.168.1.11:8083/mqtt
  client:
    id: mqttx_425fsgf31

 

 

MqttConfig.java

형식적인 MQTT 설정 파일이다.

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Component;

@Configuration
public class MqttConfigTest {

    @Value("${mqtt.broker.url}")
    private String broker;

    @Value("${mqtt.client.id}")
    private String clientId;

    // MQTT 연결 옵션을 설정하는 Bean입니다.
    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{broker}); // 브로커 URI 설정
        options.setCleanSession(true); // 클린 세션 설정
        return options;
    }

    // MQTT 클라이언트 팩토리를 설정하는 Bean입니다.
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(mqttConnectOptions()); // 연결 옵션 설정
        return factory;
    }

    // MQTT 메시지 핸들러를 설정하는 Bean입니다.🎯
    @Bean
    public MqttPahoMessageHandler mqttPahoMessageHandler() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
        messageHandler.setAsync(true); // 비동기 메시지 전송 설정
//        messageHandler.setDefaultTopic("/topic"); // 기본 토픽 설정
        return messageHandler;
    }

    // MQTT 메시지 수신 어댑터를 설정하는 Bean입니다.
    @Bean
    public MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(), "/topic");
        adapter.setCompletionTimeout(5000); // 메시지 수신 타임아웃 설정
        adapter.setConverter(new DefaultPahoMessageConverter()); // 메시지 변환기 설정
        adapter.setQos(1); // QoS 설정
        adapter.setOutputChannel(mqttInputChannel()); // 출력 채널 설정
        return adapter;
    }

    // MQTT 메시지를 처리하기 위한 입력 채널을 설정하는 Bean입니다.
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    // 수신된 MQTT 메시지를 처리하는 핸들러를 설정하는 Bean입니다.
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return message -> {
            String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); // 수신된 토픽 가져오기
            String payload = message.getPayload().toString(); // 수신된 메시지 페이로드 가져오기
            System.out.println("Received message: " + payload + " from topic: " + topic); // 메시지 및 토픽 출력
        };
    }
}

 

MqttClientService.java

핵심 코드이다. MQTT에 메시지를 전송하는 메서드이다.

import lombok.RequiredArgsConstructor;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class MqttClientServiceTest {
    private final MqttPahoMessageHandler mqttPahoMessageHandler;

    // 주어진 topic과 payload를 사용하여 MQTT 메시지를 전송
    public void sendMessage(String topic, String payload) {
        mqttPahoMessageHandler.handleMessage(
                MessageBuilder.withPayload(payload)
                        .setHeader(MqttHeaders.TOPIC, topic) // MQTT 메시지의 토픽 설정
                        .build()
        );
    }
}

 

KafkaConsumerTest.java

카프카에 해당 토픽을 바라보는 컨슈머안에 MQTT를 적용했다. 

mqttClientServiceTest.sendMessage 메서드를 사용하여 메시지를 MQTT에 전송하여 웹소켓으로 화면에 보여준다.

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import kr.co.iabacus.autodriving.common.mqtt.svc.MqttClientServiceTest;
import kr.co.iabacus.autodriving.common.redis.RedisUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.DependsOn;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.Map;

@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaConsumerTest {
    private final RedisUtils redisUtils;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final MqttClientServiceTest mqttClientServiceTest;

    @KafkaListener(topics="car-data", groupId="car-data-consumer")
    public void carDataConsume(String param) {
        try {
            // JSON 문자열을 JsonNode 객체로 변환
            JsonNode jsonNode = objectMapper.readTree(param);

            String obuId = jsonNode.path("obu_ID").asText();

            // Redis에 데이터 저장
            redisUtils.setData("car-data=" + obuId, param);

            Map<String, Object> dataMap = objectMapper.readValue(param, new TypeReference<Map<String, Object>>(){});

            Map<String, Object> gpsData = (Map<String, Object>) dataMap.get("gps_data"); // gps_data 부분 추출

            log.info("🎈 0.1초마다 들어오는 gps 데이터 🎈 {}", gpsData);
            
            // MQTT 로 메시지 전송 🎯
            mqttClientServiceTest.sendMessage("hj_topic", gpsData.toString());

        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("JSON 파싱 중 오류 발생: " + e.getMessage());
        }
    }

 

스프링부트 로그

 

EMQX 대시보드 

컨슈머에서 웹소켓에 넣어주어 실시간으로 Payload가 들어오는 것을 확인할 수 있다.

 

서버 로그로 초당 건수 테스트

EMQX 로깅 설정에서 로그 레벨을 debug 로 하고, 

 

도커 로그 명령어로 메시지를 찍어보았다.

 docker logs -f emqx1 | grep Payload

 

 

도커 로그에 찍힌 초당 메시지 수를 확인해보았다. 

 

100개 메시지 받는데 소요시간 : 0.91 초 
1개 메시지 받는데 소요시간 : 0.009 초

초당 Throuput : 108.85개 /초