일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- visualvm
- 웹개발
- java
- 프로그래머스
- 스파르타코딩클럽
- 데이터베이스
- docker
- 카프카
- 개인프로젝트
- 항해99
- JWT
- 생성자 주입
- CentOS
- EC2
- 패스트캠퍼스
- MYSQL
- 스프링의 정석
- DB
- AWS
- 남궁성과 끝까지 간다
- Kafka
- Spring Security
- 시큐리티
- Spring
- 쇼트유알엘
- 스웨거
- WEB SOCKET
- emqx
- @jsonproperty
- JavaScript
- Today
- Total
Nellie's Blog
[MQTT] Kafka → 웹 소켓 메시지 전송 테스트(Vue3으로 화면 만들어 테스트) 본문
카프카에서 웹소켓으로 메시지를 전송하여 실시간 처리를 해야한다.
지금 나는 백엔드 개발자인데, 프론트에 내가 만든 mqtt 정보 (url, 인증정보, 포트 등) 을 제공해주어야 한다.
백엔드 단에서는 테스트가 모두 완료되었지만,
제공하기 전에, 내가 직접 vue 프론트 프로젝트를 만들어서 내가 보낸 카프카 메시지를 잘 받아오는지 확인하고 싶었다.
백엔드 간단한 코드와 vue 코드를 직접 작성하고, 테스트 하는 과정을 정리했다.
사용한 기술 및 버전
- 스프링 부트 : 2.5.4
- 자바 : 1.8
- 카프카 : 3.7.0
- MQTT : 1.2.5 (vue 에서는 5.9.1)
- vue : 3.2.13
백엔드 코드
1. 차량 ID 별로 MQTT 토픽 생성하여 전송
먼저 카프카 컨슈머에서 데이터를 param으로 받고,
objectMapper.readTree(param) 으로 만들어서 차량 아이디를 꺼내준다.
그리고 메시지를 보낼 때 /topic/{꺼낸 차량 아이디} 로 보내준다.
카프카 컨슈머에서 mqtt로 메시지를 전송하는데, 차량별로 각각의 토픽을 만들어서 전송해주려고 한다. (차량 5개일 때, 토픽도 5개)
그래서 Depth 있게 /topic/{차량id} 이런 식으로 토픽을 생성하려고 한다.
(ex. /topic/obu_id_1, /topic/obu_id_2, /topic/obu_id_3 … )
KafkaConsumerTest
@KafkaListener(topics = "red-data", groupId = "red-data-consumer")
public void redDataConsume(String param) {
ObjectMapper objectMapper = new ObjectMapper();
try {
JsonNode jsonNode = objectMapper.readTree(param);
String obuId = jsonNode.get("obu_ID").asText();
System.out.println("😃 : "+obuId);
// 🎯 차량 별 토픽으로 MQTT 메시지 전송 !!
mqttClientServiceTest.sendMessage("/topic/"+obuId, param);
} catch (Exception e) {
e.printStackTrace();
System.out.println("JSON 파싱 중 오류 발생: " + e.getMessage());
}
}
2. MqttConfigTest
MqttPahoMessageDrivenChannelAdapter(메시지 수신 담당) 에 수신할 토픽을 /topic → /topic/# (와일드카드사용) 로 수정하여 , /topic 하위의 모든 토픽을 수신하도록 하였다. 나머지는 건드리지 않았다.
그리고 이 코드는 프론트와 관계가 없다. 프론트에 보여줄 용도가 아닌 백엔드에서 확인할 용도이다.
(지금 글과는 무관한 코드라는 뜻)
@Configuration
public class MqttConfigTest {
@Value("${mqtt.broker.url}")
private String broker;
@Value("${mqtt.client.sub-id}")
private String clientSubId;
@Value("${mqtt.client.pub-id}")
private String clientPubId;
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{broker}); // 브로커 URI 설정
options.setCleanSession(true); // 클린 세션 설정 (클라이언트가 브로커에 다시 연결될 때 이전 세션의 모든 상태 정보 (ex. 구독, 메시지큐 등)가 삭제되고 새로운 세션이 시작 됨
options.setKeepAliveInterval(120); // Keep Alive 설정 (초 단위)
options.setUserName("test"); // 사용자 이름 설정
options.setPassword("test".toCharArray()); // 비밀번호 설정
options.setAutomaticReconnect(true); // 자동 재연결 설정
options.setConnectionTimeout(30); // 연결 타임아웃 설정 (초 단위)
return options;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(mqttConnectOptions()); // 연결 옵션 설정
return factory;
}
@Bean
public MqttPahoMessageHandler mqttPahoMessageHandler() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientPubId, mqttClientFactory());
messageHandler.setAsync(true); // 비동기 메시지 전송 설정
messageHandler.setDefaultTopic("/topic"); // 기본 토픽 설정
messageHandler.setDefaultQos(0); // QoS 설정 (0, 1, 2) -> 실시간 데이터는 보통 0을 사용
return messageHandler;
}
@Bean
public MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter() {
// 🎯 /topic/하위로 들어오는 모든 토픽 수신
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientSubId, mqttClientFactory(), "/topic/#");
adapter.setCompletionTimeout(10000); // 메시지 수신 타임아웃 설정
adapter.setConverter(new DefaultPahoMessageConverter()); // 메시지 변환기 설정
adapter.setQos(1); // QoS 설정
adapter.setOutputChannel(mqttInputChannel()); // 출력 채널 설정
adapter.setRecoveryInterval(60000); // 재연결 간격 설정 (1분)
return adapter;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); // 수신된 토픽 가져오기
String payload = message.getPayload().toString(); // 수신된 메시지 페이로드 가져오기
System.out.println("페이로드 : " + payload + "/ 토픽 : " + topic); // 메시지 및 토픽 출력
};
}
}
프론트엔드 코드
1. Vue Component 컴포넌트 작성
src/components/MqttComponent.vue 파일을 생성하고 코드 작성. 이 컴포넌트는 MQTT 브로커에 연결하고, 특정 토픽을 구독하며, 수신된 메시지를 화면에 표시하는 역할을 한다.
일단, 하드코딩으로 /topic/obu_id_test_data를 구독하게 코딩했다.
프론트에서는 대시보드에서 차량 리스트에서 특정 차량을 클릭하면, 특정 차량의 id를 받아서 /topic/{차량id} 로 구독하면 된다.
지금은 프론트에서 잘 표시가 되는지 테스트만 하는 거니까, 하드코딩으로만 테스트 했다.
코드는 간단하다. 크게 보면 아래 두가지 로직만 있으면 된다.
- MQTT 클라이언트 생성
- 토픽 구독
<template>
<div>
<h1>MQTT Messages</h1>
<h2>test</h2>
<div v-if="messages.length">
<ul>
<li v-for="(msg, index) in messages" :key="index">
{{ msg }}
</li>
</ul>
</div>
<div v-else>
<p>아직 수신된 메시지가 없습니다.</p>
</div>
</div>
</template>
<script>
import mqtt from "mqtt";
export default {
data() {
return {
client: null, // MQTT 클라이언트
messages: [], // 받을 메시지 리스트
};
},
mounted() {
// 🎯 1. MQTT 연결 옵션 지정
const options = {
host: "192.168.15.15",
port: 8083,
protocol: "ws", // WebSocket protocol
username: "test",
password: "test",
};
// 🎯 2. MQTT client 생성 (MQTT 연결 및 유저 정보 입력)
this.client = mqtt.connect(`ws://${options.host}:${options.port}/mqtt`, {
username: options.username,
password: options.password,
});
// 🎯 3. MQTT broker에 연결
this.client.on("connect", () => {
console.log("MQTT broker에 연결되었습니다.");
// 3-1) 특정 토픽을 구독
this.client.subscribe(`/topic/obu_id_test_data`, (err) => { // obu_id_test_data 는 하드코딩된 상태. 프론트에서 수정해주어야 한다!!
if (err) {
console.error("Subscription error:", err);
} else {
console.log("topic을 구독합니다.");
}
});
});
// 🎯 4. 메시지가 들어왔을 때 처리 로직 (화면에 보여줄 메시지 리스트에 push)
this.client.on("message", (topic, message) => {
// Convert the message buffer to string
const msg = message.toString();
console.log(`토픽으로부터 메시지 수신 ${topic}: ${msg}`);
this.messages.push(msg); // 받은 메시지를 messages 리스트에 추가
});
// 🎯 5. 에러 처리
this.client.on("error", (error) => {
console.error("Connection error:", error);
});
},
beforeUnmount() {
// Clean up on component destroy
if (this.client) {
this.client.end();
}
},
};
</script>
<style scoped>
/* Add your styles here */
</style>
2. App.vue 수정
메인 화면인 src/App.vue 파일을 수정하여 위에서 만든 MqttComponent를 추가해준다.
<template>
<img alt="Vue logo" src="./assets/logo.png" />
<!-- <HelloWorld msg="Welcome to Your Vue.js App" /> -->
<MqttComponent msg="Welcome to Your Vue.js App" /> // 🎯 추가 !
</template>
<script>
// import HelloWorld from "./components/HelloWorld.vue";
import MqttComponent from "./components/MqttComponent.vue"; // 🎯 추가 !
export default {
name: "App",
components: {
// HelloWorld,
MqttComponent, // 🎯 추가 !
},
};
</script>
<style>
#app {
font-family: Avenir, Helvetica, Arial, sans-serif;
-webkit-font-smoothing: antialiased;
-moz-osx-font-smoothing: grayscale;
text-align: center;
color: #2c3e50;
margin-top: 60px;
}
</style>
기본 Vue 페이지.
개발자 도구 콘솔을 보니
실행하기만 해도, 브로커 연결 및 토픽 구독이 진행된다.
프로듀서로 100건을 보내보자. 포스트맨 Send!!
포스트맨으로 카프카에 메시지를 쏴봤다.
실시간으로 100건이 순식간에 카프카 → MQTT (웹소켓용) 를 거쳐 화면에 보여지는 것을 확인할 수 있다..!
(EMQX 라는 웹소켓을 지원하는 MQTT 브로커를 사용하여 웹소켓을 이용하는 방식으로 진행했다)
아주 순식간에 100건이 쌓였다.
실시간 소켓의 위력은 대단한 것 같다!!
'Infra > Kafka, MQTT' 카테고리의 다른 글
[Kafka] Kafka Producer 압축 방법을 어떤 걸 써야할까? (gzip, snappy, lz4, zstd 성능 비교 테스트) (0) | 2024.11.07 |
---|---|
[MQTT] MQTTX 사용해서 일정한 주기로 웹소켓 메시지 전송하기 (0) | 2024.08.26 |
[MQTT,Kafka] EMQX 웹 소켓 사용하여 대시보드에 실시간 데이터 보여주기 (Throughput 테스트) (0) | 2024.07.11 |
[Kafka] broker scale out/ scale in (카프카 브로커 스케일 아웃/스케일 인) (1) | 2024.07.01 |
[Kafka] 카프카 컨슈머에서 받은 데이터 MariaDB로 전송하기 (0) | 2024.06.18 |