기존에 작성한 유저에게 원하는 데이터를 보내는 로직에서 비효율성을 느끼는 부분은 크게 두 가지입니다

  1. 모든 실시간 데이터를 Kafka로부터 받아 Redis에 저장하고 있는 방식: 모든 데이터를 그대로 Redis에 저장하고 사용자 구독 정보를 추가로 Redis에 유지하는 방식**(Pub/Sub 메커니즘을 통해 특정 채널에 데이터를 전송하는 방식)**은 중복되는 작업과 메모리 사용이 증가할 수 있습니다. 특히 139개 종목의 모든 데이터를 관리하면서 각 종목에 대한 유저 구독 정보까지 Redis에서 관리하는 것은 비효율적일 수 있습니다.
  2. 데이터의 중복 처리와 실시간 필터링: 현재 로직에서는 Kafka로부터 데이터를 수신할 때 모든 데이터를 일단 Redis에 저장하고 그 후에 유저별 구독 정보를 확인하여 데이터를 필터링하는 방식인데, 이는 두 번의 필터링 및 저장을 요구하는 구조로 성능에 부담을 줄 수 있습니다.

구조

고객이 해당 종목(채널) 에 대해 구독을 하게되면 그에 맞는 유저 정보만 Redis 에 저장하여 웹소켓을 이용하여 해당 종목을 구독하는 유저에게만 해당 실시간 데이터 정보를 송신하여 구조 성능에 부담을 줄이고 Redis 에 실시간 데이터를 저장하지 않는 방법으로 (Redis pub/sub 특징) Redis 관리의 비효율성을 줄일 수 있었습니다.

ClientSubscribeService

해당 코드는 Redis 메시지 리스너 컨테이너(redisMessageListenerContainer)에 특정 리스너(coinRedisSubscriberListener)를 특정 채널(marketCode)에 구독시킨다는 의미입니다.

    public void subscribeUserToMarket(Long userId, String marketCode) {
        // 유저가 이미 구독 중인지 확인
        Boolean isAlreadySubscribed = redisTemplate.hasKey("market:" + marketCode + ":subscribers");

        // Redis에서 종목별로 구독한 유저 관리 (유저 ID를 종목 코드에 추가)
        redisTemplate.opsForSet().add("market:" + marketCode + ":subscribers", String.valueOf(userId));
        log.info("User {} subscribed to market {}", String.valueOf(userId), marketCode);

        // 채널(해당 종목)이 아직 구독되지 않았을 경우에만 Redis Pub/Sub 채널 구독을 추가
        if (!Boolean.TRUE.equals(isAlreadySubscribed)) {
            redisMessageListenerContainer.addMessageListener(coinRedisSubscriberListener, new ChannelTopic(marketCode));
            log.info("Subscribed to Redis channel for market: {}", marketCode);
        }
    }

CoinMarketDataConsumer

발행된 채널에 실시간 데이터를 보내는 로직입니다.

@Slf4j
@Service
@RequiredArgsConstructor
public class CoinMarketDataConsumer {

    private final ObjectMapper objectMapper;
    private final RedisTemplate<String, Object> redisTemplate;
    private final CheckChannelInRedis checkChannelInRedis;

    // Kafka 메시지 수신 및 처리
    @KafkaListener(topics = "upbit-data", groupId = "coin-group")
    public void listener(ConsumerRecord<String, String> data) {
        log.info("Received coin message: {}", data.toString());

        try {
            // JSON 데이터를 객체로 변환
            Map<String, String> parsedData = objectMapper.readValue(data.value(), Map.class);

            // 특정 필드가 있는지 확인하고 처리
            if (parsedData.containsKey("market")) {
                String marketCode = parsedData.get("market");
                log.info("Market Code: {}", marketCode);

                // 유저 구독 정보를 확인하여 필요한 종목의 데이터만 처리
                if (checkChannelInRedis.isMarketSubscribed(marketCode)) {
                    Map<String, Object> marketData = new HashMap<>();
                    marketData.put("tradePrice", parsedData.get("tradePrice"));
                    marketData.put("signedChangePrice", parsedData.get("signedChangePrice"));
                    marketData.put("signedChangeRate", parsedData.get("signedChangeRate"));

                    // Redis Pub/Sub으로 데이터 전송
                    redisTemplate.convertAndSend(marketCode, marketData);
                    log.info("Coin Channel has been successfully published for market: {}", marketCode);
                } else {
                    log.info("No subscribers for coin market: {}, skipping Redis publish.", marketCode);
                }
            }
        } catch (Exception e) {
            // JSON 역직렬화 실패 처리
            log.error("Failed to deserialize message", e);
        }
    }

}

RedisListenerConfig

두개의 stock, coin 모듈의 MessageListener 를 MessageListenerAdapter 로 통합하여 MessageListener 를 Bean 으로 자동으로 수집하는 방식으로 변경하여 추후에 MessageListener 가 확장될 경우에 유지 보수 측면에서 이점을 가져오고자 하였습니다.


@Configuration
@Slf4j
@RequiredArgsConstructor
public class RedisListenerConfig {

    private final ApplicationContext applicationContext;

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(
        RedisConnectionFactory redisConnectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);

        // 모든 등록된 MessageListenerAdapter를 패턴과 함께 추가
        messageListenerAdapters().forEach((pattern, adapter) -> {
            container.addMessageListener(adapter, new PatternTopic(pattern));
        });

        return container;
    }

    /**
     * Map<String, MessageListenerAdapter>를 수동으로 정의하지 않고,
     * Spring에서 모든 MessageListener Bean을 자동으로 수집하는 방식으로 변경:
     * @return
     */
    @Bean
    public Map<String, MessageListenerAdapter> messageListenerAdapters() {
        // 모든 MessageListener를 가져와서 MessageListenerAdapter로 감싸기
        return applicationContext.getBeansOfType(MessageListener.class)
            .entrySet()
            .stream()
            .collect(Collectors.toMap(
                entry -> entry.getKey().equalsIgnoreCase("coinRedisSubscriberListener") ? "krw-*" : "other-*",
                entry -> new MessageListenerAdapter(entry.getValue(), "onMessage")
            ));
    }
}

구독 기반 WebSocket

사용자ID 기반 WebSocket을 통한 구독된 종목의 실시간 전달