'십시일반'이라는 말이 있듯이, 프로그램 성능을 향상시키기 위해서는 모든 개발 언어에서, multi-threading과 async는 매우 중요합니다. 저 역시, 개발자로서 초기에 매우 많은 시간을 투자하였습니다.
근래에는, 파이썬으로 많은 작업을 하면서 개발을 하는데 무리는 없지만, 간단하게 해당 내용을 정리 및 기록하고자 합니다.
[코드]
- 모든 작업은 동일한 print_numbers()와 print_letters() 함수를 각각 쓰레딩 및 async 방법으로 실행시 성능 테스트
- 0) single_thread 실행을 기준으로 추가적인 1)threading ~5)gevent방법을 비교해보자.
import os
import threading
import concurrent.futures
import multiprocessing
import asyncio
import gevent
import time
##################
# 공통 함수
###
def print_numbers():
for i in range(1, 6):
time.sleep(1)
def print_letters():
for letter in ['A', 'B', 'C', 'D', 'E']:
time.sleep(1)
##################
# 0) single_thread 에서 순차적으로 실행
###
def single_thread_module():
start_time = time.time()
print_numbers()
print_letters()
end_time = time.time()
print(f"Single-thread execution time: {end_time - start_time:.2f} seconds")
##################
# 1) 표준 라이브러리의 threading 모듈을 사용
# - 글로벌 인터프리터 락(GIL)로 인해 CPU 바운드 작업에는 한계가 있음.
###
def threading_module():
start_time = time.time()
# Thread 객체 생성
t1 = threading.Thread(target=print_numbers)
t2 = threading.Thread(target=print_letters)
# Thread 시작
t1.start()
t2.start()
# Thread 료될 때까지 대기
t1.join()
t2.join()
end_time = time.time()
print(f"Threading execution time: {end_time - start_time:.2f} seconds")
##################
# 2) concurrent.futures.ThreadPoolExecutor 사용
# [비교]
# 쓰레드를 직접 관리하는 대신, 작업을 스레드 풀에 제출하고, 비동기적으로 처리
# 자동으로 쓰레드 종료를 관리
###
def concurrent_mudule():
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
future1 = executor.submit(print_numbers)
future2 = executor.submit(print_letters)
concurrent.futures.wait([future1, future2])
end_time = time.time()
print(f"ThreadPoolExecutor execution time: {end_time - start_time:.2f} seconds")
##################
# 3) multiprocessing 사용
# - 파이썬 GIL(Global Interpreter Lock) 특성 때문에 CPU 바운드 작업경우 멀티 쓰레딩 성능 하락이 발생에 대한 해결책!
# 그 이유? 멀티 프로세싱을 하기때문에 간섭(?이 발생하지 않음
###
def multiprocessing_module():
start_time = time.time()
p1 = multiprocessing.Process(target=print_numbers)
p2 = multiprocessing.Process(target=print_letters)
p1.start()
p2.start()
p1.join()
p2.join()
end_time = time.time()
print(f"Multiprocessing execution time: {end_time - start_time:.2f} seconds")
##################
# 4) asyncio 모듈을 사용한 비동기 프로그래맹(멀티 쓰레딩 하님)
# - asyncio는 코루틴을 사용해 비동기적으로 I/O 바운드 작업을 수행(CPU 바운드 작업 비추)
# - 단일 스레드에서 실행되지만, 비동기 방식으로 여러 작업을 동시에 처리(동시성 지원)
###
async def async_print_numbers():
for i in range(1, 6):
await asyncio.sleep(1)
async def async_print_letters():
for letter in ['A', 'B', 'C', 'D', 'E']:
await asyncio.sleep(1)
async def main():
task1 = asyncio.create_task(async_print_numbers())
task2 = asyncio.create_task(async_print_letters())
await task1
await task2
def asyncio_module():
start_time = time.time()
asyncio.run(main())
end_time = time.time()
print(f"Asyncio execution time: {end_time - start_time:.2f} seconds")
##################
# 5) gevent
# - gevent는 코루틴을 기반으로 한 라이브러리로, 네트워크 작업 등에서 동시성을 극대화할
# - 비동기 방식이지만, 코드가 매우 간결해지는 장점
###
def gevent_module():
start_time = time.time()
g1 = gevent.spawn(print_numbers)
g2 = gevent.spawn(print_letters)
gevent.joinall([g1, g2])
end_time = time.time()
print(f"Gevent execution time: {end_time - start_time:.2f} seconds")
if __name__ == '__main__':
print(f'{__file__}')
print(f'{os.path.dirname(__file__)}')
single_thread_module()
threading_module()
concurrent_mudule()
multiprocessing_module()
asyncio_module()
gevent_module()
[실행 결과]
D:\mmPRJ\64B\mmFNAVI\mmThread_Work.py
Single-thread execution time: 10.07 seconds
Threading execution time: 5.03 seconds
ThreadPoolExecutor execution time: 5.04 seconds
Multiprocessing execution time: 5.31 seconds
Asyncio execution time: 5.02 seconds
Gevent execution time: 10.06 seconds
[정리]
- 파이썬에서 멀티쓰레딩을 구현하는 방법은 여러 가지가 있으며, 사용하려는 작업의 성격(예: CPU 바운드, I/O 바운드)에 따라 적합한 방법을 선택해야 합니다.
- GIL을 피하고 CPU 바운드 작업인 경우는 multiprocessing 사용
- asyncio는 단일 스레드에서 비동기 처리를 지원하는 I/O 바운드 작업에 적합
- 테스트 방법이 CPU 바운드 작업이 아닌 단순 지연기 때문에, 멀티쓰레딩/비동기 방식이 비슷한 성능을 보일 것으로 예상되지만, 상황에 따라 다를 수 있습니다.
[경고 전문]
FutureWarning: The behavior of DataFrame concatenation with empty or all-NA entries is deprecated. In a future version, this will no longer exclude empty or all-NA columns when determining the result dtypes. To retain the old behavior, exclude the relevant entries before the concat operation.
[의미]
FutureWarning 메시지는 pandas의 DataFrame을 결합할 때, 빈 값(NA 또는 None)이나 전부 NA인 열이 있을 경우 향후 버전에서는 이를 처리하는 방식이 변경될 것이라는 경고입니다. 현재 pandas는 이런 열들을 무시하고 처리하지만, 미래에는 이를 고려해 결과의 데이터 유형(dtype)을 결정할 것입니다.
이 경고를 해결하려면, concat 작업 전에 빈 열이나 모두 NA인 열을 미리 제거하거나 제외해야 합니다. 이를 위해 dropna() 또는 열 필터링을 사용할 수 있습니다.
import pandas as pd
# 여러 개의 DataFrame 예시 (빈 값 또는 전부 NA인 열 포함)
df1 = pd.DataFrame({'A': [1, 2, 3], 'B': [None, None, None]})
df2 = pd.DataFrame({'A': [4, 5, 6], 'B': [None, None, None]})
# DataFrame들을 결합하기 전에 모든-NA 열 제거
df1_cleaned = df1.dropna(axis=1, how='all')
df2_cleaned = df2.dropna(axis=1, how='all')
# 결합
combined_df = pd.concat([df1_cleaned, df2_cleaned], ignore_index=True)
# 결과 출력
print(combined_df)
1) dropna(axis=1, how='all'): 모든 값이 NA인 열을 제거합니다.
axis=1은 열을 기준으로 작업하는 것이고, how='all'은 모든 값이 NA인 열만 제거하라는 의미입니다.
( 부분은 how='any')
2) pd.concat([df1_cleaned, df2_cleaned]): 이후 concat을 사용해 DataFrame을 결합합니다.
협업을 하다 보면, 엉뚱한(?) 문제로 고생을 하는 경우가 있습니다.
이번에 생각지도 못한 부분에 오류(?)가 있어 정리를 합니다.(아래에 제가 사용하는 코드 일부를 발취하였습니다.)
def F_Base_MakeFile(file_path, dataDf, sheetName=Get_strDay()[2:]):
# 파일이 존재하지 않으면 파일 생성
if not os.path.exists(file_path):
dataDf.to_excel(file_path, index=False, sheet_name=sheetName)
print(f"[F_Base_MakeFile]{file_path} 파일 생성: 시트이름: {sheetName}")
else:
# 파일이 존재하면, 엑셀 파일을 불러옴
book = load_workbook(file_path)
if sheetName in book.sheetnames:
# 시트가 존재하면 기존 데이터를 불러옴
existing_df = pd.read_excel(file_path, sheet_name=sheetName)
# 컬럼이 일치하는지 확인
if list(existing_df.columns) == list(dataDf.columns):
# 컬럼이 일치하면 데이터를 추가
with pd.ExcelWriter(file_path, engine='openpyxl', mode='a', if_sheet_exists='overlay') as writer:
dataDf.to_excel(writer, index=False, header=False, startrow=len(existing_df) + 1,sheet_name=sheetName)
print(f"[F_Base_MakeFile]{file_path}파일 시트{sheetName}에 데이터 추가완료.")
else:
raise ValueError(f"[F_Base_MakeFile]Error: 시트{sheetName}컬럼과 입력 DF 컬럼 불일치")
else:
# 시트가 존재하지 않으면 새로운 시트를 생성
with pd.ExcelWriter(file_path, engine='openpyxl', mode='a') as writer:
dataDf.to_excel(writer, index=False, sheet_name=sheetName)
print(f"[F_Base_MakeFile]{file_path} 파일에 새로운 시트{sheetName}생성")
위에 보면 아래와 엑셀 파일 생성하는 부분에 'overlay'부분이 보입니다.
1.'overlay'는 기존 시트를 유지하면서, 새 데이터를 덮어씁니다.
새 데이터프레임의 내용이 기존 시트의 해당 위치에 쓰여지며, 기존 데이터와 겹치는 부분만 업데이트되게 됩니다. 또한 기존 시트의 내용 중에서 새로 작성한 데이터프레임과 겹치지 않는 부분은 그대로 유지됩니다. 예를 들어, 새 데이터프레임에 포함되지 않은 열이나 행은 기존 시트에 그대로 남아있습니다.
2. ' replace'는 기존 시트를 완전히 대체합니다.
지정된 시트 이름이 이미 존재할 경우, 해당 시트의 모든 데이터를 삭제하고 새로운 데이터로 완전히 덮어쓰게 됩니다.
기존 시트의 구조(예: 셀의 서식, 데이터)가 사라지고, 새로 작성한 데이터프레임만 시트에 남게 됩니다.