기존에 작성한 유저에게 원하는 데이터를 보내는 로직에서 비효율성을 느끼는 부분은 크게 두 가지입니다
고객이 해당 종목(채널) 에 대해 구독을 하게되면 그에 맞는 유저 정보만 Redis 에 저장하여 웹소켓을 이용하여 해당 종목을 구독하는 유저에게만 해당 실시간 데이터 정보를 송신하여 구조 성능에 부담을 줄이고 Redis 에 실시간 데이터를 저장하지 않는 방법으로 (Redis pub/sub 특징) Redis 관리의 비효율성을 줄일 수 있었습니다.
해당 코드는 Redis 메시지 리스너 컨테이너(redisMessageListenerContainer
)에 특정 리스너(coinRedisSubscriberListener
)를 특정 채널(marketCode
)에 구독시킨다는 의미입니다.
public void subscribeUserToMarket(Long userId, String marketCode) {
// 유저가 이미 구독 중인지 확인
Boolean isAlreadySubscribed = redisTemplate.hasKey("market:" + marketCode + ":subscribers");
// Redis에서 종목별로 구독한 유저 관리 (유저 ID를 종목 코드에 추가)
redisTemplate.opsForSet().add("market:" + marketCode + ":subscribers", String.valueOf(userId));
log.info("User {} subscribed to market {}", String.valueOf(userId), marketCode);
// 채널(해당 종목)이 아직 구독되지 않았을 경우에만 Redis Pub/Sub 채널 구독을 추가
if (!Boolean.TRUE.equals(isAlreadySubscribed)) {
redisMessageListenerContainer.addMessageListener(coinRedisSubscriberListener, new ChannelTopic(marketCode));
log.info("Subscribed to Redis channel for market: {}", marketCode);
}
}
발행된 채널에 실시간 데이터를 보내는 로직입니다.
@Slf4j
@Service
@RequiredArgsConstructor
public class CoinMarketDataConsumer {
private final ObjectMapper objectMapper;
private final RedisTemplate<String, Object> redisTemplate;
private final CheckChannelInRedis checkChannelInRedis;
// Kafka 메시지 수신 및 처리
@KafkaListener(topics = "upbit-data", groupId = "coin-group")
public void listener(ConsumerRecord<String, String> data) {
log.info("Received coin message: {}", data.toString());
try {
// JSON 데이터를 객체로 변환
Map<String, String> parsedData = objectMapper.readValue(data.value(), Map.class);
// 특정 필드가 있는지 확인하고 처리
if (parsedData.containsKey("market")) {
String marketCode = parsedData.get("market");
log.info("Market Code: {}", marketCode);
// 유저 구독 정보를 확인하여 필요한 종목의 데이터만 처리
if (checkChannelInRedis.isMarketSubscribed(marketCode)) {
Map<String, Object> marketData = new HashMap<>();
marketData.put("tradePrice", parsedData.get("tradePrice"));
marketData.put("signedChangePrice", parsedData.get("signedChangePrice"));
marketData.put("signedChangeRate", parsedData.get("signedChangeRate"));
// Redis Pub/Sub으로 데이터 전송
redisTemplate.convertAndSend(marketCode, marketData);
log.info("Coin Channel has been successfully published for market: {}", marketCode);
} else {
log.info("No subscribers for coin market: {}, skipping Redis publish.", marketCode);
}
}
} catch (Exception e) {
// JSON 역직렬화 실패 처리
log.error("Failed to deserialize message", e);
}
}
}
두개의 stock, coin 모듈의 MessageListener 를 MessageListenerAdapter 로 통합하여 MessageListener 를 Bean 으로 자동으로 수집하는 방식으로 변경하여 추후에 MessageListener 가 확장될 경우에 유지 보수 측면에서 이점을 가져오고자 하였습니다.
@Configuration
@Slf4j
@RequiredArgsConstructor
public class RedisListenerConfig {
private final ApplicationContext applicationContext;
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory redisConnectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
// 모든 등록된 MessageListenerAdapter를 패턴과 함께 추가
messageListenerAdapters().forEach((pattern, adapter) -> {
container.addMessageListener(adapter, new PatternTopic(pattern));
});
return container;
}
/**
* Map<String, MessageListenerAdapter>를 수동으로 정의하지 않고,
* Spring에서 모든 MessageListener Bean을 자동으로 수집하는 방식으로 변경:
* @return
*/
@Bean
public Map<String, MessageListenerAdapter> messageListenerAdapters() {
// 모든 MessageListener를 가져와서 MessageListenerAdapter로 감싸기
return applicationContext.getBeansOfType(MessageListener.class)
.entrySet()
.stream()
.collect(Collectors.toMap(
entry -> entry.getKey().equalsIgnoreCase("coinRedisSubscriberListener") ? "krw-*" : "other-*",
entry -> new MessageListenerAdapter(entry.getValue(), "onMessage")
));
}
}