[인턴] CH03. 스마트유량계 프로젝트 진행 과정에서 마주한 문제와 해결 과정
프로젝트 개요
이전 포스팅인 [인턴] CH01. 프로젝트 소개와 방향성에서 다룬 바와 같이 본 프로젝트인 "인공지능 스마트 유량계 프로젝트"는 인공지능(AI)과 사물인터넷(IoT)를 결합한 개념인 AIoT 기술과 LSTM 기반 알고리즘을 활용해 상수관망 내 유량과 수압 데이터를 예측하고 이상징후를 감지하는 시스템이다. 이를 통해 상수관망의 문제를 신속하게 감지하고 대응할 수 있도록 지원하는 것이 프로젝트의 핵심 목표이다.
기존에 회사가 보유하고 있는 LSTM 코드를 기반으로 다양하고 복잡한 요구사항을 반영한 파이프라인을 구축해야하다보니 진행 과정에서 많은 문제가 발생했다.
이번 포스팅에서는 요구사항 및 문제 상황을 정리하고, 이를 해결하기 위해 적용한 방법과 결과를 공유하고자 한다.
1️⃣ 데이터 손실 문제 발생
시스템의 데이터 수집 방식
아키텍처를 살펴보면 데이터 수신부(Receiver Class)는 시리얼 통신으로 센서로부터 초당 데이터를 수신하고, LSTM 예측(LSTM Prediction Class)은 해당 데이터를 이용하여 1시간 단위로 예측 기능을 수행하는 구조이다.
정리하자면, 센서에서 초당 데이터를 수신하여 1시간 동안 쌓인 3,600개의 데이터를 기반으로 예측을 수행하는 방식이다.
문제 분석
테스트 환경
먼저, 문제 분석을 하기에 앞서 테스트를 진행하기 위해 사용한 서버 사양은 다음과 같다.
- CPU: 10코어
- 메모리: 16GB
- 저장공간: 512GB SSD
또한, 실제 시리얼 통신을 테스트하기 위해 센서 기기를 제작하는 회사에 직접 가야하는 제약사항이 있어 테스트 데이터를 기반으로 시리얼 통신 형태(프로토콜)의 텍스트 파일을 구성하고, 이를 파싱하여 테스트를 진행했다.
동기 방식
처음 구현을 했을 때 하나의 작업이 끝난 후에만 다음 작업을 수행하는 동기 방식으로 진행했다. 즉, 센서에서 데이터를 수신하는 작업과 LSTM 예측 수행 작업이 동시에 이루어지지 않고 순차적으로 진행되었다.
- 센서에서 초당 데이터 수신
- 1시간(3,600개 기준) 데이터를 이용해 LSTM 예측 수행
- LSTM 예측이 끝난 후 다시 새로운 데이터 수신
이 과정에서 문제점은 무엇일까? 예측 수행 중에는 새로운 데이터 수신이 중단되므로, 예측이 오래 걸릴수록 새롭게 들어오는 데이터가 손실될 가능성이 매우 높다.
예측 수행 시간
위에서 언급했던 동기 방식으로 손실 없이 처리해야한다면, 초당 데이터를 예측 수행시간이 최소한 초 단위보다 짧아야 한다.
그런데 이건 실질적으로 말이 되지 않는다. 로그 데이터를 통해서 LSTM 예측 수행시간을 확인하면 다음과 같다.
예측 실행 전 로그
2025-03-14 22:48:28 - INFO - handle_sensor_data - Running LSTM Prediction at 3600 data points
예측 실행 후 로그
2025-03-14 22:48:36 - INFO - run_lstm_prediction - [LSTM Prediction] Time: 8.1081s
평균적으로 예측 수행시간은 8초 정도 소요되었고, 이 시간동안 8개의 데이터가 손실됨을 알 수 있다. 즉, 데이터 수집과 예측이 병렬적으로 이루어지지 않는 구조적 문제로 인해서 데이터 손실이 발생한다.
해결 방안
위의 문제를 해결하기 위한 방안을 한 문장으로 정리하자면 "예측 수행 중에도 센서 데이터가 실시간으로 수신되도록 병렬적인 수행 고려와 예측 시간이 증가하더라도 데이터가 손실되지 않도록 개선하기"라고 할 수 있다.
가장 중요한 것은 예측 수행 중에도 데이터 수신이 지속적으로 이루어지도록 병렬적으로 처리하는 것인데, 이를 위해서는 먼저 CPU Bound와 I/O Bound 작업을 구분하고, 적절한 병렬 처리 방식을 선택해야 한다.
CPU Bound vs I/O Bound
작업 | 설명 | 특성 |
CPU Bound | 연산량이 많아 CPU 사용량이 높은 작업 | 예측, 모델 학습, 계산 |
I/O Bound | 주로 입출력 작업(파일 저장, 네트워크 요청 등)이 중심이 되는 작업 | 데이터 송/수신, 네트워크 요청 |
병렬 처리를 도입하기 위해서는 처리해야 할 작업이 CPU에 의존하는지 또는 I/O에 의존하는 구분해야 한다. 계산 및 비교가 많다면 CPU Bound, 파일 및 네트워크 관련 작업이라면 I/O Bound라고 생각하면 된다.
병렬 처리 방식 비교: 멀티 쓰레딩 / 멀티 프로세싱 / 비동기
방식 | 개념 | 장점 | 단점 | 적용 대상 |
멀티 쓰레딩 | 하나의 프로세스에서 여러 개의 스레드를 실행 | I/O Bound 작업에 적합 | GIL(Global Interpreter Lock) 때문에 CPU Bound 작업에는 효과적이지 않음 |
파일 I/O, 네트워크 요청 등 |
멀티 프로세싱 | 여러 개의 프로세스를 실행하여 병렬 처리 |
CPU Bound 작업에 적합, GIL 영향을 받지 않음 |
프로세스 간 메모리 공유가 어렵고, 생성 비용이 큼 | 머신러닝 모델 예측, 연산량이 많은 작업 |
비동기 | 이벤트 루프를 이용하여 논블로킹 방식으로 실행 | I/O Bound 작업에 적합, 가벼운 처리 가능 | CPU Bound 작업에는 적합하지 않음 | 실시간 데이터 수신, API 요청 등 |
병렬 처리 진행에 필요한 방식은 멀티 쓰레딩, 멀티 프로세싱, 비동기이다. 해당 포스팅에서는 각 방식에 대한 자세한 개념은 생략하고 멀티 쓰레딩 방식을 제외한 멀티 프로세싱 / 비동기 방식을 채택한 이유에 대해 설명하고자 한다.
채택 이유
LSTM 예측
먼저 LSTM 예측에는 멀티 프로세싱을 도입하였는데, 위의 표를 보면 알겠지만 다른 방식들고 달리 멀티 프로세싱은 CPU Bound 작업에 적합여 예측과 같이 연산량이 많은 작업에 적용할 수 있다.
여기서 멀티 쓰레딩과 멀티 프로세싱의 차이점을 잠깐 살펴봐도 좋을 것 같다. 멀티 쓰레딩은 GIL(Global Interpreter Lock)의 영향을 받아 병렬 연산 효과를 보기 어려운 반면 멀티 프로세싱은 GIL의 영향을 받지 않아 연산량이 많은 LSTM 예측에 적합하다.
GIL은 Python 인터프리터가 한 번에 하나의 쓰레드만 실행하도록 제한하는 메커니즘으로 안정성을 목적으로 제한을 하는데, 결국 멀티 쓰레딩을 사용해도 여러 개의 쓰레드가 동시에 실행되지 않고 CPU 작업이 많은 경우 오히려 성능 저하, 즉 병목 현상을 초래한다. 이와 반대로 I/O Bound 작업에서는 GIL이 자주 해제되기 때문에 네트워크 요청, 파일 I/O에서 효율적으로 사용할 수 있는 것이다.
# LSTM 예측 수행(동기)
def run_lstm_prediction(input_csv_path, model_type, output_csv_path):
result = subprocess.run(['python', 'src/lstm/3_Prediction.py', '-f', input_csv_path, '-m', model_type, '-a'],
capture_output=True, text=True)
기존에 동기 방식으로 run_lstm_prediction 함수를 실행하였다면,
import multiprocessing
# LSTM 예측 수행(멀티 프로세싱 도입)
def run_lstm_prediction_parallel(input_data, num_cores=2):
with multiprocessing.Pool(processes=num_cores) as pool:
results = pool.starmap(run_lstm_prediction, input_data)
return results
위와 같이 멀티 프로세싱을 도입하는 방식으로 코드를 수정했다. 여기서 코어 갯수(num_cores)를 두 개로 설정하였는데, 만약 모든 CPU 코어를 사용하고 싶다면 processes=multiprocessing.cpu_count()를 기입하면 된다. 일반적으로 모든 CPU 코어를 사용하는 게 아닌multiprocessing.cpu_count() - 1 혹은 multiprocessing.cpu_count * 0.75를 사용하는데, 데이터 공유 문제인 IPC(Inter-Process Communication) 비용과 컨텍스트 스위칭 비용이 증가하므로 적절한 테스트를 통해 코어 수를 선택하는 것이 중요하다.
2025-03-14 22:48:36 - INFO - run_lstm_prediction - [LSTM Prediction] Time: 4.7926s
운영환경이 CPU가 4코어이므로 테스트 환경에서는 CPU 2코어로 설정하여 최종적으로 평균 4.7초 수행시간이 걸려 기존 8.1초 대비 예측 시간이 41.98% 단축되었다. 예측 시간이 짧아졌기 때문에 데이터 손실 가능성을 줄였는데, 아직 데이터 수신이 동기방식이므로 여전히 문제가 발생한다. 다음은 데이터 수신에 관한 내용을 소개하고자 한다.
데이터 수신
데이터 수신 작업은 연산과 같은 CPU Boud 작업이 아니라 센서로부터 데이터를 받아오는 입출력(I/O) 작업이므로 I/O Bound 성격을 가지고 있다. 즉, 네트워크나 시리얼 포트를 통해 데이터를 수신하는 대기 시간이 길다는 특징이 있다.
따라서 병렬적으로 데이터를 수신해야할 때 멀티 쓰레딩과 비동기 방식 중 어떤 것이 더 적합한지 고려해야 한다.
방식 | 평균 실행시간(sec) | 평균 CPU 사용량(%) |
멀티 쓰레딩 | 1.3033 | 18.69 |
비동기 | 1.3015 | 14.12 |
테스트 진행을 위해 실제 네트워크 요청이 아니라 time.sleep(1)을 사용하여 I/O 대기시간을 1초로 설정한 후 테스트 데이터 1,000줄을 읽고, 동시에 간단한 CPU 연산을 수행하는 환경을 만들었다. 위의 결과를 보면 실행 속도도 그렇게 큰 차이를 보이지 않고, 마찬가지로 CPU 사용량 눈에 띠는 차이는 보이지 않는다.
그러면 어떤 것을 선택해야할까? 운영 환경에서의 안정성, 확장성 및 유지보수성의 이유로 비동기 방식이 더 적합하다는 결론을 내렸다.
- 확장성
- 멀티 쓰레딩은 쓰레드 개수에 제한이 있는데 반해 비동기 방식은 논블로킹(Non-Blocking)방식이므로 많은 작업도 동시에 처리가 가능하여 센서 데이터 수신이 많아질수록 데이터를 안정적으로 처리할 수 있음
- 안정성
- 멀티 쓰레딩 방식은 여러 개의 쓰레드가 공유된 메모리를 사용하므로 Race Condition이나 Deadlock 발생 가능성이 있지만, 비동기 방식은 이벤트 루프를 활용해 순차적으로 실행되므로 동시 실행 시 안정성이 높음
- 유지보수성
- 멀티 쓰레드 방식은 쓰레드 간 동기화 문제로 인해 예상치 못한 오류가 발생할 가능성이 높은데, 비동기 방식은 단일 이벤트 루프에서 실행되므로 코드 흐름 예측이 쉽고 유지보수가 편리함
추가적으로 운영 환경에서는 네트워크 요청으로 데이터를 수신하는 데 CPU를 거의 사용하지 않고 응답을 기다리는 시간이 많기 때문에 이벤트 루프를 활용하는 비동기 방식이 네트워크 요청을 처리하는 데 최적화되어있어 최종적으로 비동기 방식을 채택했다.
이 결과 기존에 발생하던 데이터 손실 없이 모든 데이터를 정상적으로 수신할 수 있게 되었다.
2️⃣ 10분 단위 동기화 처리
도입 배경
시스템은 초 단위로 데이터를 수신받지만, 분 단위로 송신을 진행한다. 즉, 초 단위 데이터 중에서 sec == 0인 데이터인 분(min) 데이터를 이상치 로직과 시(hr)가 정각일 경우에는 예측까지 수행한 후 반환한다. 협력 업체들과 통신 프로토콜에 맞춰서 송신을 진행하도록 구현을 하였는데, 이 중 애플리케이션을 담당하는 업체에서 화면상 표시(동기화)를 10분마다 진행한다고 하여 10개의 1분 데이터를 모아서 10분 주기로 송신하는 추가적인 구현이 필요했다.
생산자 · 소비자 패턴
협력 업체의 요구사항에 처음에는 "어떻게 구현해야할까?"라는 많은 고민을 했다. 단순히 10분마다 한 번씩 DB에서 최근 10개 데이터를 조회 후 전송하는 로직으로 구현해야하나라고 생각했는데, 다른 기능에서도 DB 조회를 빈번하게 수행하고 있어서 최근 10개 데이터를 조회하는 것은 불필요하다고 판단했다.
고민하던 중 데이터 엔지니어링에 관심이 있어 최근에 유심히 본 Kafka의 구조에서 힌트를 얻어 Kafka의 생산자·소비자(Producer-Consumer) 패턴을 적용하기로 결정했다. 이전 포스팅에서 언급했듯이 Kafka를 사용하면 좋겠지만, 제한된 개발 기간 내에 Kafka를 배우기에는 상당한 시간적 비용이 들기 때문에 Queue를 이용한 생산자·소비자 패턴을 직접 구현하기로 결심했다.
방식 | 설명 |
DB 조회 | 10분마다 DB에서 최근 10개 데이터를 조회하여 송신 |
생산자·소비자 패턴 by Queue | 데이터가 발생할 때마다 Queue에 저장하고, 10개씩 모이면 송신 |
해당 테스트 같은 경우 센서 기기와 직접 연결할 기회가 생겨 테스트를 진행했었다.
각 방식의 코드를 먼저 살펴보자
DB 조회 방식
import time
from db.crud import create_table, insert_sensor_data, fetch_last_records
from serial_communication import SensorReceiver
# 시리얼 데이터 수신 객체 생성
sensor_receiver = SensorReceiver(port='COM3', baudrate=115200)
# 시리얼 통신을 통해 데이터를 수신하고 DB에 저장
def read_sensor_data_from_serial():
db_processing_times = []
# 100개 데이터 테스트
for idx in range(100):
data_line = sensor_receiver.read_data()
if not data_line:
continue
# 파싱 진행
parsed_data = sensor_receiver.parse_sensor_data(data_line)
if not parsed_data:
continue
sensor_id, timestamp, flow_rate, pressure = parsed_data
# DB에 저장
insert_sensor_data(timestamp, flow_rate, pressure)
# 10분마다 데이터베이스에서 10개 조회
if (idx + 1) % 10 == 0:
start_time = time.time()
last_data = fetch_last_records()
end_time = time.time()
query_time = end_time - start_time
db_processing_times.append(query_time)
print(f"[Database] Query Time: {query_time:.6f} sec")
time.sleep(0.1) # 센서 데이터 수신 간격 시뮬레이션
avg_db_time = sum(db_processing_times) / len(db_processing_times) if db_processing_times else 0
print(f"\n[Database] Average Query Time: {avg_db_time:.6f} sec")
if __name__ == "__main__":
create_table()
read_sensor_data_from_serial()
코드 내용을 간단히 설명하자면 시리얼 통신을 통해 센서 데이터를 수신 후 파싱 후 DB에 적재를 진행한다. 이후 최신 10개의 데이터를 조회하는 시간을 측정하여 최종적으로 평균 데이터 조회 시간을 얻는다.
생산자 - 소비자 패턴
import queue
import threading
import time
from db.crud import create_table
from serial_communication import SensorReceiver
# Queue 설정 (최대 크기 10)
data_queue = queue.Queue(maxsize=10)
queue_processing_times = []
sensor_receiver = SensorReceiver(port='COM3', baudrate=115200)
# 시리얼 통신을 통해 데이터를 수신하고 Queue에 저장
def producer():
# 100개 데이터 테스트
for idx in range(100):
data_line = sensor_receiver.read_data()
if not data_line:
continue
parsed_data = sensor_receiver.parse_sensor_data(data_line)
if not parsed_data:
continue
sensor_id, timestamp, flow_rate, pressure = parsed_data
data = (timestamp, flow_rate, pressure)
# Queue에 데이터 저장
data_queue.put(data)
# Queue가 가득 차면 소비자 실행
if data_queue.qsize() == 10:
start_time = time.time()
consumer()
end_time = time.time()
batch_time = end_time - start_time
queue_processing_times.append(batch_time)
print(f"[Consumer/Producer] Processing Time: {batch_time:.6f} sec")
time.sleep(0.1) # 센서 데이터 수신 간격 시뮬레이션
avg_queue_time = sum(queue_processing_times) / len(queue_processing_times) if queue_processing_times else 0
print(f"\n[Consumer/Producer] Avg Processing Time: {avg_queue_time:.6f} sec")
# Queue에서 10개 데이터 처리
def consumer():
batch = [data_queue.get() for _ in range(10)]
if __name__ == "__main__":
create_table()
producer_thread = threading.Thread(target=producer)
producer_thread.start()
producer_thread.join()
데이터베이스 조회 방식과 동일하게 시리얼 통신을 통해 센서 데이터를 수신하고 DB에 적재하는 대신 Queue에 저장하여 최대 크기인 10개가 찰 때마다 Consumer가 데이터를 처리하는 내용이다.
위의 두 가지 방식을 통해 평균 수행 시간을 얻을 수 있었는 데 결과는 다음과 같다.
방식 | 평균 수행 시간 |
DB 조회 | 0.000521 sec |
생산자·소비자 패턴 by Queue | 0.000065 sec |
생산자·소비자 패턴 by Queue 방식이 DB 조회 방식보다 8.01배 빠르며, DB 조회 방식 대비 87.52% 성능이 향상됨을 알 수 있다. 이유는 DB에서 데이터를 읽어오는 SELECT 쿼리는 메모리에서 즉시 데이터를 저장하고 소비할 수 있는 생산자·소비자 패턴에 비해 느리기 때문이다. 속도 면에서도 장점 뿐만 아니라, 해당 프로젝트에서는 발생하지 않은 경우이지만 생산자(Producer)가 데이터를 추가하는 동안 소비자(Consumer)는 별도로 데이터를 처리하기 때문에 데이터가 빠르게 쌓이더라도 처리가 지연되지 않은 장점이 있다. 이러한 이유로 Kafka와 같은 기술이 해당 구조를 기반으로 구현되어있는지 알게되었다.
3️⃣ 문제를 해결해나가며 느낀 점
정량적인 테스트를 통한 최적화의 필요성
처음에는 "멀티 프로세싱과 비동기 처리를 적용하면 당연히 성능이 좋아지겠지"라고 생각했지만, 실제 테스트를 진행하다보니 단순한 판단만으로는 최적화를 이야기할 수 없다는 것을 깨달았다. 이번 경험을 통해 성능 최적화는 추측이 아니라 정량적인 분석을 기반으로 이루어져야 한다는 것을 크게 느꼈다. 앞으로는 기능을 구현할 때 "이 방식이 더 좋을 것이다"라는 주관적인 판단을 내리는 것이 아닌 가설을 세우고 직접 테스트를 통해 검증하는 태도를 지니도록 해야겠다.
외부에서 알게 된 인사이트
10분 단위로 데이터를 동기화하는 요구사항을 해결할 때 Kafka의 아키텍처에서 영감을 얻었다. 평소 데이터 엔지니어링에 관심이 많아 우연히 접한 내용이 실제 문제를 해결하는 데 도움을 주었다. 이 경험을 통해 수시로 다양한 기술을 탐색하는 것이 실제 문제 해결에 영향을 미칠 수 있다는 사실을 깨달았다. "이 기술을 현재 내 프로젝트에 어떻게 적용할 수 있을까?"라는 질문을 지속적으로 나에게 던지며 더 나은 해결책을 고민하는 습관을 길러야겠다라고 생각했다.