728x90
회사에서 서비스를 운영하며 생긴 스프링부트 에러 부분을 정리해보았다.
문제 상황
1) 실제로 카프카에 들어오는 데이터
외부 회사에서 들어오는 데이터이다.
{
"obu_ID": "obu_id_test_data",
"vehicle_num": "2242",
"service_module_data": {
"startup_mode": false,
"auto_mode": true,
// 중략....
"LDC_on": true, // 문제가 되는 부분
"PDU_on": false, // 문제가 되는 부분
// 중략....
}
}
2) 자바 DTO 클래스
들어오는 데이터를 ObjectMapper로 자바객체로 변환해주기 위해 만든 Dto 클래스이다.
@Data
public class ServiceModuleData { // 서비스 모듈 데이터
private boolean startup_mode; // 시작 모드
private boolean auto_mode; // 자동 모드
//....(중략)
private boolean LDC_on; // LDC 켜기 🎯 문제되는 부분!
private boolean PDU_on; // PDU 켜기 🎯 문제되는 부분!
//....(중략)
}
3) 카프카 스트림즈 클래스
여기서 또 주의할 점은, KeyValue.pair 해서 key-value 로 넣을 때는,
객체로 넣지 말고, 아래처럼 이렇게 String으로 바꿔서 넣어야 한다! (당연한거지만 ^^;;)
return KeyValue.pair(dto.getObu_ID(), objectMapper.writeValueAsString(dto.getInvehicle_data()));
/**
* 카프카 스트림즈 생성하고, 특정 토픽 데이터를 필터링 -> 다른 토픽으로 저장하는 클래스 (정제용)
*/
@Configuration
@EnableKafka
public class StreamsFilter {
/**
* [ 스트림즈 1 ]
* stream-all-test 토픽에서 정제 후 stream-car-test 토픽으로 보내는 스트림즈
*/
@Bean
public KafkaStreams kafkaStreams() {
// 1. 설정 세팅
Properties prop = new Properties();
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "123.143.255.255:9092");
prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-filter");
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 1-1) SASL 설정 추가 (프로듀서 생성 전에 추가해야 함)
prop.put("security.protocol", "SASL_PLAINTEXT");
prop.put("sasl.mechanism", "SCRAM-SHA-256");
String jaasConfig = String.format(JAAS_TEMPLATE, "admin", "admin-secret");
prop.put("sasl.jaas.config", jaasConfig);
// 2. 스트림즈 빌더 생성
final StreamsBuilder streamsBuilder = new StreamsBuilder();
// 2-1) 소스 프로세서 생성
KTable<String, String> streamLog = streamsBuilder.table("all-data"); // "all-data" 토픽에서 KTable 생성
// 2-2) 스트림 프로세서 생성
streamLog.toStream().map((key, value) -> { // KTable을 KStream으로 변환하고, key-value 맵핑
try {
ObjectMapper objectMapper = new ObjectMapper();
// StreamsTestDto dto = objectMapper.readValue(value, StreamsTestDto.class);
VehicleData dto = objectMapper.readValue(value, VehicleData.class);
// dto의 name 필드를 key-value 쌍으로 반환
// return KeyValue.pair("name", dto.getName()); // 이렇게 해도 에러 발생한다!!
return KeyValue.pair(dto.getObu_ID(), objectMapper.writeValueAsString(dto.getInvehicle_data())); // 🎯 이렇게 String으로 바꿔서 넣어야 한다!
} catch (Exception e) {
e.printStackTrace();
return KeyValue.pair("error-key", "error-value");
}
})
// 2-3) 싱크 프로세서 생성: 변환된 데이터를 "stream-name-test" 토픽으로 전송
.to("car-data");
// 3. KafkaStreams 객체 생성 및 시작
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), prop);
kafkaStreams.start();
return kafkaStreams;
}
}
이렇게만 하고 돌려보면,
UnrecognizedPropertyException 이 짠하고 나타난다! ;;
해결 방법
@JsonProperty 를 사용해서 해결 !
@Data
public class ServiceModuleData { // 서비스 모듈 데이터
private boolean startup_mode; // 시작 모드
private boolean auto_mode; // 자동 모드
//....(중략)
@JsonProperty("LDC_on") // 추가 🎯
private boolean LDC_on;
@JsonProperty("PDU_on") // 추가 🎯
private boolean PDU_on;
//....(중략)
}
바로 해결된다.
물론 사용하기 위해 Jackson 라이브러리인 com.fasterxml.jackson.core 패키지를 dependencies에 추가해주고 사용하면 된다!
<!-- Jackson Core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.13.2</version>
</dependency>