웹스쿨

공시뉴스 초단위로 불러오기, 쓰레드 문제 해결 본문

개인 프로젝트/파이썬 뉴스매매

공시뉴스 초단위로 불러오기, 쓰레드 문제 해결

마스터욱 2023. 3. 29. 00:47
반응형

 

깊은 삽질이 하나 끝났습니다.

쓰레드를 돌리는데 300회 전후반으로 계속 프로그램이 에러없이 꺼지는 문제가 발생했습니다.

결론적으로 해결은 Timer 사용으로 해결봤습니다.

 

소스공개 

import sys
import lib as lib
#import kiwoom as ki

from PyQt5.QtWidgets import *
from PyQt5.QtGui import *
from PyQt5.QtCore import *
from PyQt5.QAxContainer import *

#import pandas as pd
from pprint import pprint
import re
import requests
from threading import Timer, Thread, Event

import json
from collections import OrderedDict

class perpetualTimer():
def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)

def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()
self.thread.cancel()

def start(self):
self.thread.start()

def cancel(self):
self.thread.cancel()

class MyWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("뉴스매매(Makeby 욱)")
self.setGeometry(300, 300, 1200, 700)
self.setTable()

#self.t = perpetualTimer(1, self.startNews)
#self.t.start()

#timer = QTimer()
#timer.timeout.connect(self.startNews)
#timer.start(1000)
self.current_timer = ""
self.start_timer()

self.apiUrl = "APIURL 은 비밀"

self.thread_cnt = 0
self.last_title = ""

#키움개인정보
self.user_id = "dev84" #테스트
#self.user_id = ""

#print(sys.version)

#키움증권
self.kiwoom = QAxWidget("KHOPENAPI.KHOpenAPICtrl.1")

self.kiwoom.OnEventConnect.connect(self.event_connect)
#self.kiwoom.event_connect_loop = QEventLoop()
#self.kiwoom.event_connect_loop.exec_()
#print(self.kiwoom)
#ki_instance = ki.Kiwoom()

self.btn1 = QPushButton("로그인", self)
self.btn1.move(1000, 20)
#btn1.clicked.connect(ki_instance.btn1_clicked)
self.btn1.clicked.connect(self.btn1_clicked)

btn2 = QPushButton("상태체크", self)
btn2.move(1100, 20)
#btn2.clicked.connect(ki_instance.btn2_clicked)
btn2.clicked.connect(self.btn2_clicked)

'''
self.ed = QLineEdit()
self.ed.move(700, 50)
self.ed.setText("홍길동") # 텍스트 쓰기
text = self.ed.text() # 텍스트 읽기
self.ed.setPlaceholderText("이름을 입력하시오") # Watermark로 텍스트 표시
self.ed.selectAll() # 텍스트 모두 선택
#ed.setReadOnly(True)# 에디트는 읽기 전용으로
#e.setEchoMode(QLineEdit.Password)# Password 스타일 에디트
'''
keywordLabel = QLabel("종목&키워드 매칭(종목코드:키워드1,키워드2,키워드3...)", self)
keywordLabel.setGeometry(510, 50, 500, 50)

self.keywordbtn = QPushButton("매칭 저장", self)
self.keywordbtn.move(1100, 60)
self.keywordbtn.clicked.connect(self.keywordbtn_clicked)

self.textEdit = QTextEdit(self)
self.textEdit.resize(670, 250)
self.textEdit.move(510, 100)


logLabel = QLabel("로그", self)
logLabel.setGeometry(510, 370, 500, 50)

self.textEdit2 = QTextEdit(self)
self.textEdit2.resize(670, 250)
self.textEdit2.move(510, 420)
self.textEdit2.setReadOnly(True)

"""""
btn1 = QPushButton("Click me", self)
btn1.move(20, 20)
btn1.clicked.connect(self.btn1_clicked)
"""

def start_timer(self):
if self.current_timer:
self.current_timer.stop()
self.current_timer.deleteLater()

self.current_timer = QTimer()
self.current_timer.timeout.connect(self.startNews)
self.current_timer.setSingleShot(True)
self.current_timer.start(1000)

def keywordbtn_clicked(self):
#print("저장")
content = self.textEdit.toPlainText()
#print(content)
#print(self.user_id)

post_data = {"type": "keyword_save", "stock_id": self.user_id, "content": content}
r = requests.post(self.apiUrl, data=post_data)
result_json = r.text #{"result":"OK"}
#print(result_json)

# JSON 디코딩
dict = json.loads(result_json)
#print("result = " + dict['result']) #OK
if dict['result'] == "OK":
w = QWidget() # The QWidget widget is the base class
w.setWindowTitle('키워드저장버튼')
w.resize(400, 200)
result = QMessageBox.information(w, "Information", "저장완료")

#if result == QMessageBox.Ok:
# myTextbox.setText("Clicked OK on Information.")

def btn1_clicked(self):
#print("로그인 버튼 클릭")
ret = self.kiwoom.dynamicCall("CommConnect()")
if ret == 0:
self.statusBar().showMessage("로그인 창 열기 성공")
#self.btn1.setText('로그인 정보보기')
#self.getLoginInfo()
else:
self.statusBar().showMessage("로그인 창 열기 실패")
#print(ret)

def btn1_clicked_logined(self):
print("ok")

#로그인 상태 확인
def btn2_clicked(self):
if self.kiwoom.dynamicCall("GetConnectState()") == 0:
self.statusBar().showMessage("Not connected")
else:
self.statusBar().showMessage("Connected")

#로그인 유저 정보 호출
def getLoginInfo(self, type):
#print("getLoginInfo")
info = self.kiwoom.dynamicCall("GetLoginInfo(QString)", type)
#print(info)
return info

#로그인 후 실행되는 함수
def event_connect(self, code):
if code == 0:
self.statusBar().showMessage("로그인 성공")
self.user_id = self.getLoginInfo("USER_ID")
self.btn1.setText(self.user_id + "님 로그인중");
#버튼 이벤트 해제해야 하는데 일단 스킵
else:
self.statusBar().showMessage("로그인 실패")

#print("event_connect 호출됨")
#self.event_connect_loop.exit()

def setTable(self):
#self.setGeometry(5,5,200,200)

newsLabel = QLabel("실시간 뉴스", self)
newsLabel.setGeometry(5, 5, 500, 50)

self.tableWidget = QTableWidget(self)

#self.tableWidget.resize(500, 500)
self.tableWidget.setGeometry(5, 50, 500, 500)

self.tableWidget.setRowCount(20)
self.tableWidget.setColumnCount(2)

#self.tableWidget.setVerticalHeaderLabels(['1','2'])
self.tableWidget.setHorizontalHeaderLabels(['제목', '시간'])

#데이터 넣기
#self.setTableWidgetData()
#self.startNews()

def startNews(self):
self.thread_cnt = self.thread_cnt + 1
print("뉴스 쓰레드 가동(" + str(self.thread_cnt) + ")")

#뉴스 쓰레드 가동(283) 에서
#Process finished with exit code -1073740940 (0xC0000374) 에러발생
#Process finished with exit code -1073741819 (0xC0000005)
#Process finished with exit code -1073741819 (0xC0000005)

#뉴스 저장하기
#self.setNews2()

#뉴스 가져오기
#self.getNews()

#뉴스 쓰레드 가동(281) 에서
#Process finished with exit code -1073740940 (0xC0000374)
self.getSetNews()

#self.t = perpetualTimer(1, self.startNews)
#self.t.start()

#timer = QTimer()
#timer.timeout.connect(self.startNews)
#timer.start(1000)

self.start_timer()

def setTableWidgetData(self):
self.tableWidget.setItem(0, 0, QTableWidgetItem("(0,0)"))
self.tableWidget.setItem(0, 1, QTableWidgetItem("(0,1)"))
self.tableWidget.setItem(1, 0, QTableWidgetItem("(1,0)"))
self.tableWidget.setItem(1, 1, QTableWidgetItem("(1,1)"))

"""
def btn1_clicked(self):
QMessageBox.about(self, "message", "clicked")
"""

#API 서버에서 뉴스크롤링 실행시키고, 바로 가져오기
def getSetNews(self):
post_data = {"type":"getset"}
r = requests.post(self.apiUrl, data=post_data)
result_json = r.text #{"result":"OK"}

# JSON 디코딩
dict = json.loads(result_json)
#print("getsetNews = " + dict['result'])

#print(dict['list'])
cnt = 0
#self.tableWidget.setRowCount(20)

for row in dict['list']:
if cnt == 0:
if self.last_title == row['title']:
break
else:
#print(row['title'] + " " + row['time'])
self.last_title = row['title']
#for i in range(0, 20):
# self.tableWidget.setItem(i, 0, QTableWidgetItem("1"))
# self.tableWidget.setItem(i, 1, QTableWidgetItem("1"))

title = row['title'][0:40]
self.tableWidget.setItem(cnt, 0, QTableWidgetItem(title))
self.tableWidget.setItem(cnt, 1, QTableWidgetItem(row['time']))

#데이터 갱신을 위해 포커스를 주자
self.tableWidget.setCurrentCell(cnt, 0)
self.tableWidget.setCurrentCell(cnt, 1)
cnt = cnt + 1

#각 열크기 맞춤
self.tableWidget.resizeColumnsToContents()
self.tableWidget.resizeRowsToContents()


#API 서버에서 뉴스크롤링 실행시키기
def setNews2(self):
post_data = {"type":"set"}
r = requests.post(self.apiUrl, data=post_data)
result_json = r.text #{"result":"OK"}

# JSON 디코딩
dict = json.loads(result_json)
print("setNews2 = " + dict['result'])


#파이썬 자체에서 크롤링
def setNews(self):
#공시정보 가져오기
#lib.get_financial_statements('http://dart.fss.or.kr/api/search.json?auth=5d7fe977e575fb2ec15661d0a1556f40793237f6')
outhtml = lib.get_financial_statements('크롤링 주소는 비밀')
#print(outhtml)
#output = re.match(r'(?s).*<!-- List BBS Block Start -->(.*)<!-- //List BBS Block End -->.*', outhtml, re.M|re.I)
#print(output.group(1))
#output = re.match(r'(?s).*<div class="newListArea">(.*)</div>(.*)</div>.*', output.group(1), re.M|re.I)
output = re.match(r'(?s).*<!-- %%LIST data%% -->(.*)<!-- %%ENDLIST data%% -->.*', outhtml, re.M | re.I)
output_data = ''
if output is not None:
output_data = output.group(1)
#print(output)
else:
print("젠장... 비었음...")

#print(output_data)
outlist = re.split('</tr>', output_data)
#pprint(outlist)

cnt = 0
all_json = OrderedDict()
for row in outlist:
#date = lib.get_between_string(row, '<div id="date_0">', '</div>')
#print(row)
row_list = re.split('<td', row)
#pprint(row_list)
#print(row_list[2])
#print(lib.strip_tags(row_list[3]))
#print(row_list[3])
date = ''
time = ''
title = ''
try:
row_json = OrderedDict()
tmp = re.match(r'(?s).*<div id="date_.*">(.*)</div></td>.*', row_list[1])
date = tmp.group(1)

tmp = re.match(r'(?s).*<div id="time_.*">(.*)</div></td>.*', row_list[2])
time = tmp.group(1)

tmp = re.match(r'(?s).*<div id=\'title_.*\'>(.*)</div></a>.*', row_list[3])
title = tmp.group(1)
title = title.replace('&nbsp; ','')

row_json['date'] = date
row_json['time'] = time
row_json['title'] = title

all_json[str(cnt)] = row_json
except:
'''
print("끝줄에러")
'''

'''
test = ""
test += "date = " + date + " || "
test += "time = " + time + " || "
test += "title = " + title + " || "
print(str(cnt) + " = " + test)
'''
cnt = cnt + 1


#pprint(all_json)
jsonString = json.dumps(all_json)
post_data = {'data': jsonString, "type": "news_toss"}
#print(jsonString)
#pprint(post_data)

#디비에 저장
r = requests.post(self.apiUrl, data=post_data)
result_json = r.text #{"result":"OK"}


def getNews(self):
post_data = {"type": "get"}
r = requests.post(self.apiUrl, data=post_data)
result_json = r.text #{"result":"OK"}
#print("result_json = " + result_json)

# JSON 디코딩
dict = json.loads(result_json)

# Dictionary 데이타 체크
print("getNews = " + dict['result'])

#print(dict['list'])
cnt = 0
#self.tableWidget.setRowCount(20)

for row in dict['list']:
if cnt == 0:
if self.last_title == row['title']:
break
else:
#print(row['title'] + " " + row['time'])
self.last_title = row['title']
#for i in range(0, 20):
# self.tableWidget.setItem(i, 0, QTableWidgetItem("1"))
# self.tableWidget.setItem(i, 1, QTableWidgetItem("1"))

title = row['title'][0:40]
self.tableWidget.setItem(cnt, 0, QTableWidgetItem(title))
self.tableWidget.setItem(cnt, 1, QTableWidgetItem(row['time']))

#데이터 갱신을 위해 포커스를 주자
self.tableWidget.setCurrentCell(cnt, 0)
self.tableWidget.setCurrentCell(cnt, 1)
cnt = cnt + 1

#각 열크기 맞춤
self.tableWidget.resizeColumnsToContents()
self.tableWidget.resizeRowsToContents()


if __name__ == "__main__":
app = QApplication(sys.argv)
myWindow = MyWindow()
myWindow.show()
app.exec_()

이 게시글은
https://webschool.kr/?v=board_view&board_key=30&idx=426
에서 작성한 글입니다. 소스코드의 경우 해당 블로그에서 이뿌게 노출이 되지 않을 수 있사오니, 위 링크로 들어오셔서 보시길 바랍니다.

반응형