코인(업비트)과 주식(한국투자증권)의 실시간 데이터를 수집하기 위해 WebSocket을 사용하였습니다. 실시간으로 들어오는 데이터를 빠르게 처리하고 안정적으로 전달하기 위해 WebSocket 클라이언트를 구현하였습니다. 수집된 데이터는 Kafka에 적재하여 실시간 데이터를 누락없이 처리할 수 있도록 구성하였습니다.
WebSocket 클라이언트를 통해 업비트와 한국투자증권 서버에 연결하여 실시간 데이터를 수집합니다. WebSocket 핸들러를 통해 수신된 데이터를 실시간으로 파싱하여 Kafka로 전송합니다. 또한, 주식 데이터와 코인 데이터를 각각 처리할 수 있는 WebSocket 설정을 통해 구조적인 유연성을 제공하였습니다.
업비트와 한국투자증권의 데이터를 Kafka로 전송하기 위해 Kafka Producer 설정을 구성하였습니다.
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 중복 전송 방지
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 데이터 순서 보장
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
ENABLE_IDEMPOTENCE_CONFIG
와 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
설정을 통해 데이터의 무결성을 보장하며, 순서가 중요한 데이터 스트림에서 신뢰성을 제공합니다.업비트 서버에 WebSocket 연결을 설정하고, 수신된 데이터를 처리하여 Kafka로 전송하는 로직입니다.
@Slf4j
@Service
@RequiredArgsConstructor
public class UpbitWebSocketService {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
private WebSocketSession webSocketSession;
// 웹소켓 연결을 설정하는 메서드
public void connectWebSocket() {
if (webSocketSession != null && webSocketSession.isOpen()) {
log.info("WebSocket is already connected.");
return;
}
StandardWebSocketClient client = new StandardWebSocketClient();
client.doHandshake(new UpbitWebSocketHandler(), "wss://api.upbit.com/websocket/v1");
log.info("Upbit WebSocket connection initiated.");
}
// WebSocket 연결 해제
public void disconnectWebSocket() throws IOException {
if (webSocketSession != null && webSocketSession.isOpen()) {
try {
webSocketSession.close();
log.info("WebSocket connection closed.");
} catch (Exception e) {
log.error("Failed to close WebSocket: {}", e.getMessage());
}
} else {
log.info("No active WebSocket connection to close.");
}
}
// WebSocket 핸들러 클래스 정의
private class UpbitWebSocketHandler extends BinaryWebSocketHandler {
@Override
public void afterConnectionEstablished(org.springframework.web.socket.WebSocketSession session) throws Exception {
log.info("WebSocket connected");
// 웹소켓 세션을 저장
webSocketSession = session;
// 종목 코드 리스트
List<String> marketCodes = getMarketCodes();
if (marketCodes.isEmpty()) {
log.error("종목 코드를 찾을 수 없습니다.");
session.close(CloseStatus.NORMAL); // 연결 종료
return;
}
log.info("원화 마켓 종목 개수: {}", marketCodes.size());
// 종목 코드 리스트를 포함하여 메시지 전송
String ticketValue = UUID.randomUUID().toString();
String message = "[{\\"ticket\\":\\"" + ticketValue + "\\"},{\\"type\\":\\"ticker\\",\\"codes\\":" + marketCodes.toString() + "},{\\"format\\":\\"DEFAULT\\"}]";
session.sendMessage(new TextMessage(message));
}
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) {
String payload = new String(message.getPayload().array(), StandardCharsets.UTF_8);
Map<String, Object> parsedData = parseWebSocketMessage(payload);
try {
String jsonMessage = objectMapper.writeValueAsString(parsedData);
String marketCode = String.valueOf(parsedData.get("market"));
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("upbit-data", marketCode, jsonMessage);
future.whenComplete((result, ex) -> {
if (ex == null) {
log.info("Message sent successfully to partition=[{}], offset=[{}] for stockCode={}",
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset(),
marketCode);
} else {
log.error("Message failed to send for stockCode={} due to: {}", marketCode, ex.getMessage());
}
});
} catch (JsonProcessingException e) {
log.error("Failed to serialize message", e);
}
}
private Map<String, Object> parseWebSocketMessage(String payload) {
try {
Map<String, Object> result = objectMapper.readValue(payload, Map.class);
Map<String, Object> parsedData = new HashMap<>();
parsedData.put("market", result.get("code"));
parsedData.put("tradePrice", result.get("trade_price"));
parsedData.put("signedChangePrice", result.get("signed_change_price"));
parsedData.put("signedChangeRate", result.get("signed_change_rate"));
return parsedData;
} catch (Exception e) {
log.error("Failed to parse WebSocket message: {}", e.getMessage(), e);
return new HashMap<>();
}
}
}
private List<String> getMarketCodes() {
String url = "<https://api.upbit.com/v1/market/all>";
try {
ResponseEntity<List<Map<String, Object>>> response = restTemplate.exchange(
url,
HttpMethod.GET,
null,
new ParameterizedTypeReference<List<Map<String, Object>>>() {}
);
List<Map<String, Object>> marketList = response.getBody();
return marketList.stream()
.filter(market -> market.get("market").toString().startsWith("KRW"))
.map(market -> market.get("market").toString())
.collect(Collectors.toList());
} catch (Exception e) {
log.error("Failed to fetch market codes: {}", e.getMessage(), e);
return List.of();
}
}
}
connectWebSocket()
메서드를 통해 업비트 WebSocket 서버와 연결하며, 연결 후 종목 코드와 관련된 데이터를 수신합니다. UpbitWebSocketHandler
클래스는 WebSocket 이벤트를 처리하는 핸들러로, 연결이 수립되면 종목 코드 리스트를 서버에 요청합니다.parseWebSocketMessage()
메서드는 WebSocket으로부터 수신한 메시지를 파싱하여 필요한 필드를 추출하고, 이를 Kafka를 통해 다른 시스템으로 전송하기 위해 JSON으로 변환합니다.한국투자증권 서버에 WebSocket 연결을 설정하고, 수신된 데이터를 처리하여 Kafka로 전송하는 로직입니다.
@Slf4j
@Service
@RequiredArgsConstructor
public class StockWebSocketService {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
private WebSocketSession webSocketSession;
// WebSocket 연결
public void connectWebSocket(String approvalKey, String custType, String trType, ApprovalRequest approvalRequest) {
if (webSocketSession != null && webSocketSession.isOpen()) {
log.info("WebSocket is already connected.");
return;
}
StandardWebSocketClient client = new StandardWebSocketClient();
client.doHandshake(new StockWebSocketHandler(approvalKey, custType, trType, approvalRequest), "ws://ops.koreainvestment.com:21000");
log.info("WebSocket connection initiated.");
}
// WebSocket 연결 해제
public void disconnectWebSocket() throws IOException {
if (webSocketSession != null && webSocketSession.isOpen()) {
try {
webSocketSession.close();
log.info("WebSocket connection closed.");
} catch (Exception e) {
log.error("Failed to close WebSocket: {}", e.getMessage());
}
} else {
log.info("No active WebSocket connection to close.");
}
}
// WebSocket 핸들러 클래스 정의
private class StockWebSocketHandler extends TextWebSocketHandler {
private final String approvalKey;
private final String custType;
private final String trType;
private final ApprovalRequest approvalRequest;
public StockWebSocketHandler(String approvalKey, String custType, String trType, ApprovalRequest approvalRequest) {
this.approvalKey = approvalKey;
this.custType = custType;
this.trType = trType;
this.approvalRequest = approvalRequest;
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
log.info("WebSocket connected");
// 웹소켓 세션을 저장
webSocketSession = session;
// 코스피100 종목코드 리스트
String[] stockCodes = {
"282330", "097950", "005830", "383220", "078930", "007070", "009540",
"267250", "329180", "011200", "105560", "030200", "033780", "003550",
"034220", "051900", "373220", "032640", "011070", "066570", "051910",
"035420", "005940", "005490", "010950", "034730", "011790", "302440",
"326030", "402340", "361610", "096770", "017670", "000660", "035250",
"010130", "001570", "011780", "000270", "024110", "251270", "003490",
"454910", "241560", "034020", "004990", "011170", "138040", "006800",
"028050", "006400", "028260", "207940", "032830", "018260", "009150",
"005930", "010140", "016360", "029780", "000810", "068270", "055550",
"002790", "090430", "450080", "036570", "066970", "271560", "316140",
"000100", "035720", "323410", "377300", "021240", "259960", "022100",
"047050", "003670", "086790", "352820", "036460", "071050", "015760",
"161390", "047810", "042700", "008930", "128940", "018880", "180640",
"009830", "012450", "042660", "000720", "086280", "012330", "004020",
"005380", "008770"
};
for (String stockCode : stockCodes) {
approvalRequest.setTr_key(stockCode);
// 요청 데이터 생성
String sendData = createRequestData(approvalKey, custType, trType, approvalRequest);
// WebSocket 서버로 메시지 전송
session.sendMessage(new TextMessage(sendData));
log.info("Sent message for stock code {}: {}", stockCode, sendData);
}
}
// 요청 데이터를 JSON 형태로 생성
private String createRequestData(String approvalKey, String custType, String trType, ApprovalRequest approvalRequest) {
return "{\\n" +
" \\"header\\": {\\n" +
" \\"approval_key\\": \\"" + approvalKey + "\\",\\n" +
" \\"custtype\\": \\"" + custType + "\\",\\n" +
" \\"tr_type\\": \\"" + trType + "\\",\\n" +
" \\"content-type\\": \\"utf-8\\"\\n" +
" },\\n" +
" \\"body\\": {\\n" +
" \\"input\\": {\\n" +
" \\"tr_id\\": \\"" + approvalRequest.getTr_id() + "\\",\\n" +
" \\"tr_key\\": \\"" + approvalRequest.getTr_key() + "\\"\\n" +
" }\\n" +
" }\\n" +
"}";
}
// WebSocket 서버에서 메시지 수신 처리
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String payload = message.getPayload();
log.info("Received message: {}", payload);
// PINGPONG 또는 SUBSCRIBE SUCCESS 메세지 필터링
// 해당 메시지를 특별한 처리 없이 로그에 기록만 하고 실제 데이터 처리 로직을 실행하지 않고 반환
if (payload.contains("PINGPONG") || payload.contains("SUBSCRIBE SUCCESS")) {
log.info("PINGPONG 또는 SUBSCRIBE SUCCESS 메세지입니다.");
return;
}
String jsonData = parseStockData(payload);
if (jsonData != null) {
// 수신된 JSON 데이터를 Kafka로 전송
try {
// JSON 데이터를 Map으로 변환
Map<String, Object> parsedData = objectMapper.readValue(jsonData, Map.class);
// Map에서 stockCode 추출
String stockCode = String.valueOf(parsedData.get("stockCode"));
// Kafka로 메시지 전송 및 파티션과 오프셋 로깅 처리
CompletableFuture< SendResult<String, String>> future = kafkaTemplate.send(StockTopic.STOCK_DATA.getTopic(), stockCode, jsonData);
// 비동기적으로 Kafka 전송 결과 처리
future.whenComplete((result, ex) -> {
if (ex == null) {
// 메시지 전송 성공, 파티션 및 오프셋 로깅
log.info("Message sent successfully to partition=[{}], offset=[{}]",
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
} else {
// 메시지 전송 실패
log.error("Message failed to send due to: {}", ex.getMessage());
}
});
} catch (Exception e) {
log.error("Failed to send message to Kafka: {}", e.getMessage());
}
}
}
// 실시간 데이터를 파싱하여 필요한 정보만 추출 후 JSON 형식으로 변환
private String parseStockData(String rawData) {
try {
// "|" 문자로 데이터를 분리
// 수신된 데이터는 여러 부분으로 구성되어 있으므로 "|"를 기준으로 나눔
String[] parts = rawData.split("\\\\|");
// 분리된 데이터가 예상한 부분 수보다 적으면 잘못된 데이터 형식으로 간주하고 로그 출력
if (parts.length < 4) {
log.error("Invalid data format.");
return null;
}
// 주식 체결 데이터 추출
// parts[3]에 체결 데이터가 포함되어 있어서 이 걸 "^"로 다시 분리하여 세부 데이터를 추출
String[] tradeData = parts[3].split("\\\\^");
// 체결 데이터가 충분하지 않을 경우 로그에 에러 메시지를 남기고 종료
if (tradeData.length < 6) {
log.error("Insufficient trade data.");
return null;
}
// 체결 데이터에서 종목코드, 현재가, 전일 대비 가격, 전일 대비율 추출
String stockCode = tradeData[0]; // 종목코드
String currentPrice = tradeData[2]; // 현재가
String priceChange = tradeData[4]; // 전일 대비 가격
String changeRate = tradeData[5]; // 전일 대비율
// 추출된 데이터 로그
log.info("종목코드: {}, 현재가: {}, 전일 대비 가격: {}, 전일 대비율: {}%",
stockCode, currentPrice, priceChange, changeRate);
// JSON 형식으로 변환할 Map 생성
Map<String, Object> stockDataMap = new HashMap<>();
stockDataMap.put("stockCode", stockCode);
stockDataMap.put("currentPrice", currentPrice);
stockDataMap.put("priceChange", priceChange);
stockDataMap.put("changeRate", changeRate);
// Map을 JSON 문자열로 변환
return objectMapper.writeValueAsString(stockDataMap);
} catch (Exception e) {
// 파싱 중 오류
log.error("Error parsing stock data: {}", e.getMessage());
return null;
}
}
// WebSocket 연결 종료 처리
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
log.info("WebSocket connection closed: sessionId={}, status={}", session.getId(), status);
webSocketSession = null;
}
}
}