Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- 항해99
- MYSQL
- @jsonproperty
- JavaScript
- 스파르타코딩클럽
- 스프링의 정석
- Spring
- CentOS
- 남궁성과 끝까지 간다
- 개인프로젝트
- Spring Security
- java
- WEB SOCKET
- JWT
- 스웨거
- 데이터베이스
- EC2
- 프로그래머스
- AWS
- docker
- DB
- Kafka
- 카프카
- emqx
- visualvm
- 패스트캠퍼스
- 생성자 주입
- 웹개발
- 시큐리티
- 쇼트유알엘
Archives
- Today
- Total
Nellie's Blog
[MQTT,Kafka] EMQX 웹 소켓 사용하여 대시보드에 실시간 데이터 보여주기 (Throughput 테스트) 본문
Infra/Kafka, MQTT
[MQTT,Kafka] EMQX 웹 소켓 사용하여 대시보드에 실시간 데이터 보여주기 (Throughput 테스트)
Nellie Kim 2024. 7. 11. 13:15728x90
차량의 실시간 위치 데이터 (위도, 경도) 를 MQTT를 사용한 웹소켓으로 대시보드에 실시간으로 변하는 데이터를 보여주었다. 카프카 컨슈머에서 받은 데이터를 바로 MQTT로 메시지 전송하였다.
MQTT 프로토콜을 지원하는 메시지 브로커인 EMQX 오픈 소스 브로커를 사용하였다.
실시간으로 데이터를 보여주는 것이 목적이었고, 그 다음으로는 초당 몇건을 보여줄 수 있는지를 테스트 했다.
- 스프링 부트 버전 : 2.5.4
- 자바 버전 : 1.8
- 카프카 버전 : 3.7.0
- MQTT 버전 : 1.2.5
pom.xml
의존성을 아래와 같이 받아준다.
<!-- Eclipse Paho MQTT Client -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
application.yml
MQTT 웹소켓을 아래와 같이 설정한다.
mqtt:
broker:
url: ws://192.168.1.11:8083/mqtt
client:
id: mqttx_425fsgf31
MqttConfig.java
형식적인 MQTT 설정 파일이다.
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Component;
@Configuration
public class MqttConfigTest {
@Value("${mqtt.broker.url}")
private String broker;
@Value("${mqtt.client.id}")
private String clientId;
// MQTT 연결 옵션을 설정하는 Bean입니다.
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{broker}); // 브로커 URI 설정
options.setCleanSession(true); // 클린 세션 설정
return options;
}
// MQTT 클라이언트 팩토리를 설정하는 Bean입니다.
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(mqttConnectOptions()); // 연결 옵션 설정
return factory;
}
// MQTT 메시지 핸들러를 설정하는 Bean입니다.🎯
@Bean
public MqttPahoMessageHandler mqttPahoMessageHandler() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
messageHandler.setAsync(true); // 비동기 메시지 전송 설정
// messageHandler.setDefaultTopic("/topic"); // 기본 토픽 설정
return messageHandler;
}
// MQTT 메시지 수신 어댑터를 설정하는 Bean입니다.
@Bean
public MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(), "/topic");
adapter.setCompletionTimeout(5000); // 메시지 수신 타임아웃 설정
adapter.setConverter(new DefaultPahoMessageConverter()); // 메시지 변환기 설정
adapter.setQos(1); // QoS 설정
adapter.setOutputChannel(mqttInputChannel()); // 출력 채널 설정
return adapter;
}
// MQTT 메시지를 처리하기 위한 입력 채널을 설정하는 Bean입니다.
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
// 수신된 MQTT 메시지를 처리하는 핸들러를 설정하는 Bean입니다.
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); // 수신된 토픽 가져오기
String payload = message.getPayload().toString(); // 수신된 메시지 페이로드 가져오기
System.out.println("Received message: " + payload + " from topic: " + topic); // 메시지 및 토픽 출력
};
}
}
MqttClientService.java
핵심 코드이다. MQTT에 메시지를 전송하는 메서드이다.
import lombok.RequiredArgsConstructor;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class MqttClientServiceTest {
private final MqttPahoMessageHandler mqttPahoMessageHandler;
// 주어진 topic과 payload를 사용하여 MQTT 메시지를 전송
public void sendMessage(String topic, String payload) {
mqttPahoMessageHandler.handleMessage(
MessageBuilder.withPayload(payload)
.setHeader(MqttHeaders.TOPIC, topic) // MQTT 메시지의 토픽 설정
.build()
);
}
}
KafkaConsumerTest.java
카프카에 해당 토픽을 바라보는 컨슈머안에 MQTT를 적용했다.
mqttClientServiceTest.sendMessage 메서드를 사용하여 메시지를 MQTT에 전송하여 웹소켓으로 화면에 보여준다.
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import kr.co.iabacus.autodriving.common.mqtt.svc.MqttClientServiceTest;
import kr.co.iabacus.autodriving.common.redis.RedisUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.DependsOn;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.Map;
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaConsumerTest {
private final RedisUtils redisUtils;
private final ObjectMapper objectMapper = new ObjectMapper();
private final MqttClientServiceTest mqttClientServiceTest;
@KafkaListener(topics="car-data", groupId="car-data-consumer")
public void carDataConsume(String param) {
try {
// JSON 문자열을 JsonNode 객체로 변환
JsonNode jsonNode = objectMapper.readTree(param);
String obuId = jsonNode.path("obu_ID").asText();
// Redis에 데이터 저장
redisUtils.setData("car-data=" + obuId, param);
Map<String, Object> dataMap = objectMapper.readValue(param, new TypeReference<Map<String, Object>>(){});
Map<String, Object> gpsData = (Map<String, Object>) dataMap.get("gps_data"); // gps_data 부분 추출
log.info("🎈 0.1초마다 들어오는 gps 데이터 🎈 {}", gpsData);
// MQTT 로 메시지 전송 🎯
mqttClientServiceTest.sendMessage("hj_topic", gpsData.toString());
} catch (Exception e) {
e.printStackTrace();
System.out.println("JSON 파싱 중 오류 발생: " + e.getMessage());
}
}
스프링부트 로그
EMQX 대시보드
컨슈머에서 웹소켓에 넣어주어 실시간으로 Payload가 들어오는 것을 확인할 수 있다.
서버 로그로 초당 건수 테스트
EMQX 로깅 설정에서 로그 레벨을 debug 로 하고,
도커 로그 명령어로 메시지를 찍어보았다.
docker logs -f emqx1 | grep Payload
도커 로그에 찍힌 초당 메시지 수를 확인해보았다.
100개 메시지 받는데 소요시간 : 0.91 초
1개 메시지 받는데 소요시간 : 0.009 초
초당 Throuput : 108.85개 /초
'Infra > Kafka, MQTT' 카테고리의 다른 글
[MQTT] MQTTX 사용해서 일정한 주기로 웹소켓 메시지 전송하기 (0) | 2024.08.26 |
---|---|
[MQTT] Kafka → 웹 소켓 메시지 전송 테스트(Vue3으로 화면 만들어 테스트) (0) | 2024.08.09 |
[Kafka] broker scale out/ scale in (카프카 브로커 스케일 아웃/스케일 인) (1) | 2024.07.01 |
[Kafka] 카프카 컨슈머에서 받은 데이터 MariaDB로 전송하기 (0) | 2024.06.18 |
[Kafka] 카프카 브로커 하나씩 다운시킨 후 메시지 전송 테스트 (고가용성 테스트, replication test) (0) | 2024.06.10 |