Infra/Kafka, MQTT

[MQTT] Kafka → 웹 소켓 메시지 전송 테스트(Vue3으로 화면 만들어 테스트)

Nellie Kim 2024. 8. 9. 13:07
728x90

카프카에서 웹소켓으로 메시지를 전송하여 실시간 처리를 해야한다. 

지금 나는 백엔드 개발자인데, 프론트에 내가 만든 mqtt 정보 (url, 인증정보, 포트 등) 을 제공해주어야 한다. 

 

백엔드 단에서는 테스트가 모두 완료되었지만, 

제공하기 전에, 내가 직접 vue 프론트 프로젝트를 만들어서 내가 보낸 카프카 메시지를 잘 받아오는지 확인하고 싶었다. 

 

백엔드 간단한 코드와 vue 코드를 직접 작성하고, 테스트 하는 과정을 정리했다.  

 

 

사용한 기술 및 버전 

  • 스프링 부트 : 2.5.4
  • 자바 : 1.8
  • 카프카  : 3.7.0
  • MQTT : 1.2.5 (vue 에서는 5.9.1)
  • vue  : 3.2.13

 

백엔드 코드

1. 차량 ID 별로 MQTT 토픽 생성하여 전송

먼저 카프카 컨슈머에서 데이터를 param으로 받고,

objectMapper.readTree(param) 으로 만들어서 차량 아이디를 꺼내준다.

그리고 메시지를 보낼 때 /topic/{꺼낸 차량 아이디} 로 보내준다.

 

카프카 컨슈머에서 mqtt로 메시지를 전송하는데, 차량별로 각각의 토픽을 만들어서 전송해주려고 한다. (차량 5개일 때, 토픽도 5개)

그래서 Depth 있게 /topic/{차량id} 이런 식으로 토픽을 생성하려고 한다.

(ex. /topic/obu_id_1, /topic/obu_id_2, /topic/obu_id_3 … )

 

KafkaConsumerTest

    @KafkaListener(topics = "red-data", groupId = "red-data-consumer")
    public void redDataConsume(String param) {
        ObjectMapper objectMapper = new ObjectMapper();

        try {
            JsonNode jsonNode = objectMapper.readTree(param);
            String obuId = jsonNode.get("obu_ID").asText();
            System.out.println("😃 : "+obuId);

            // 🎯 차량 별 토픽으로 MQTT 메시지 전송 !!
            mqttClientServiceTest.sendMessage("/topic/"+obuId, param);

        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("JSON 파싱 중 오류 발생: " + e.getMessage());
        }
    }

 

2. MqttConfigTest 

 

MqttPahoMessageDrivenChannelAdapter(메시지 수신 담당) 에 수신할 토픽을 /topic → /topic/# (와일드카드사용) 로 수정하여 , /topic 하위의 모든 토픽을 수신하도록 하였다. 나머지는 건드리지 않았다.

 

그리고 이 코드는 프론트와 관계가 없다. 프론트에 보여줄 용도가 아닌 백엔드에서 확인할 용도이다. 

(지금 글과는 무관한 코드라는 뜻)

 

@Configuration
public class MqttConfigTest {
    @Value("${mqtt.broker.url}")
    private String broker;

    @Value("${mqtt.client.sub-id}")
    private String clientSubId;

    @Value("${mqtt.client.pub-id}")
    private String clientPubId;

    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{broker}); // 브로커 URI 설정
        options.setCleanSession(true); // 클린 세션 설정 (클라이언트가 브로커에 다시 연결될 때 이전 세션의 모든 상태 정보 (ex. 구독, 메시지큐 등)가 삭제되고 새로운 세션이 시작 됨
        options.setKeepAliveInterval(120); // Keep Alive 설정 (초 단위)
        options.setUserName("test"); // 사용자 이름 설정
        options.setPassword("test".toCharArray()); // 비밀번호 설정
        options.setAutomaticReconnect(true); // 자동 재연결 설정
        options.setConnectionTimeout(30); // 연결 타임아웃 설정 (초 단위)

        return options;
    }


    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(mqttConnectOptions()); // 연결 옵션 설정
        return factory;
    }


    @Bean
    public MqttPahoMessageHandler mqttPahoMessageHandler() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientPubId, mqttClientFactory());
        messageHandler.setAsync(true); // 비동기 메시지 전송 설정
        messageHandler.setDefaultTopic("/topic"); // 기본 토픽 설정
        messageHandler.setDefaultQos(0); // QoS 설정 (0, 1, 2) -> 실시간 데이터는 보통 0을 사용

        return messageHandler;
    }


    @Bean
    public MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter() {
        
        // 🎯 /topic/하위로 들어오는 모든 토픽 수신
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientSubId, mqttClientFactory(), "/topic/#");

        adapter.setCompletionTimeout(10000); // 메시지 수신 타임아웃 설정
        adapter.setConverter(new DefaultPahoMessageConverter()); // 메시지 변환기 설정
        adapter.setQos(1); // QoS 설정
        adapter.setOutputChannel(mqttInputChannel()); // 출력 채널 설정
        adapter.setRecoveryInterval(60000); // 재연결 간격 설정 (1분)

        return adapter;
    }
    

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }


    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return message -> {
            String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); // 수신된 토픽 가져오기
            String payload = message.getPayload().toString(); // 수신된 메시지 페이로드 가져오기

            System.out.println("페이로드 : " + payload + "/  토픽 : " + topic); // 메시지 및 토픽 출력
        };
    }
}

 

 

프론트엔드 코드

1. Vue Component 컴포넌트 작성

 

src/components/MqttComponent.vue 파일을 생성하고 코드 작성. 이 컴포넌트는 MQTT 브로커에 연결하고, 특정 토픽을 구독하며, 수신된 메시지를 화면에 표시하는 역할을 한다.

일단, 하드코딩으로 /topic/obu_id_test_data를 구독하게 코딩했다.

프론트에서는 대시보드에서 차량 리스트에서 특정 차량을 클릭하면, 특정 차량의 id를 받아서 /topic/{차량id} 로 구독하면 된다.

지금은 프론트에서 잘 표시가 되는지 테스트만 하는 거니까, 하드코딩으로만 테스트 했다.

코드는 간단하다. 크게 보면 아래 두가지 로직만 있으면 된다.

 

  1. MQTT 클라이언트 생성
  2. 토픽 구독
<template>
  <div>
    <h1>MQTT Messages</h1>
    <h2>test</h2>
    <div v-if="messages.length">
      <ul>
        <li v-for="(msg, index) in messages" :key="index">
          {{ msg }}
        </li>
      </ul>
    </div>
    <div v-else>
      <p>아직 수신된 메시지가 없습니다.</p>
    </div>
  </div>
</template>

<script>
import mqtt from "mqtt";

export default {
  data() {
    return {
      client: null, // MQTT 클라이언트 
      messages: [], // 받을 메시지 리스트 
    };
  },
  mounted() {
    // 🎯 1. MQTT 연결 옵션 지정
    const options = {
      host: "192.168.15.15",
      port: 8083,
      protocol: "ws", // WebSocket protocol
      username: "test",
      password: "test",
    };

    // 🎯 2. MQTT client 생성 (MQTT 연결 및 유저 정보 입력)
    this.client = mqtt.connect(`ws://${options.host}:${options.port}/mqtt`, {
      username: options.username,
      password: options.password,
    });

    // 🎯 3. MQTT broker에 연결
    this.client.on("connect", () => {
      console.log("MQTT broker에 연결되었습니다.");

      // 3-1) 특정 토픽을 구독 
      this.client.subscribe(`/topic/obu_id_test_data`, (err) => { // obu_id_test_data 는 하드코딩된 상태. 프론트에서 수정해주어야 한다!! 
        if (err) {
          console.error("Subscription error:", err);
        } else {
          console.log("topic을 구독합니다.");
        }
      });
    });

    // 🎯 4. 메시지가 들어왔을 때 처리 로직 (화면에 보여줄 메시지 리스트에 push)
    this.client.on("message", (topic, message) => {
      // Convert the message buffer to string
      const msg = message.toString();
      console.log(`토픽으로부터 메시지 수신 ${topic}: ${msg}`);
      this.messages.push(msg); // 받은 메시지를 messages 리스트에 추가
    });

    // 🎯 5. 에러 처리 
    this.client.on("error", (error) => {
      console.error("Connection error:", error);
    });
  },
  beforeUnmount() {
    // Clean up on component destroy
    if (this.client) {
      this.client.end();
    }
  },
};
</script>

<style scoped>
/* Add your styles here */
</style>

 

2. App.vue 수정

 

메인 화면인 src/App.vue 파일을 수정하여 위에서 만든 MqttComponent를 추가해준다.

 

<template>
  <img alt="Vue logo" src="./assets/logo.png" />
  <!-- <HelloWorld msg="Welcome to Your Vue.js App" /> -->
  <MqttComponent msg="Welcome to Your Vue.js App" /> // 🎯 추가 !
</template>

<script>
// import HelloWorld from "./components/HelloWorld.vue";
import MqttComponent from "./components/MqttComponent.vue"; // 🎯 추가 !

export default {
  name: "App",
  components: {
    // HelloWorld,
    MqttComponent, // 🎯 추가 !
  },
};
</script>

<style>
#app {
  font-family: Avenir, Helvetica, Arial, sans-serif;
  -webkit-font-smoothing: antialiased;
  -moz-osx-font-smoothing: grayscale;
  text-align: center;
  color: #2c3e50;
  margin-top: 60px;
}
</style>

 

 

기본 Vue 페이지. 

 

 

 

개발자 도구 콘솔을 보니

실행하기만 해도, 브로커 연결 및 토픽 구독이 진행된다.

 

 

프로듀서로 100건을 보내보자. 포스트맨 Send!!

 

 

포스트맨으로 카프카에 메시지를 쏴봤다.

실시간으로 100건이 순식간에 카프카 → MQTT (웹소켓용) 를 거쳐 화면에 보여지는 것을 확인할 수 있다..!

(EMQX 라는 웹소켓을 지원하는 MQTT 브로커를 사용하여 웹소켓을 이용하는 방식으로 진행했다)

 

 

 

아주 순식간에 100건이 쌓였다.

실시간 소켓의 위력은 대단한 것 같다!!