使用 Python3.8, Pyside2, Pandas
支持歷史回測 (讀取台灣期交所RPT檔案)
實現功能:import numpy | |
import mechanicalsoup | |
import pandas as pd | |
all_df = pd.DataFrame([], columns = ['Fund', 'Date', 'StockNo', 'StockName', 'Money']) | |
all_df.set_index(['Date', 'Fund'], inplace = True) | |
browser = mechanicalsoup.StatefulBrowser( | |
soup_config={'features': 'lxml', 'from_encoding':'utf-8'}, | |
raise_on_404=True, | |
user_agent='Mozilla/5.0 (compatible; MSIE 8.0; Windows NT 6.3; Win64; x64)' | |
) | |
req ="") | |
def dl_month (): | |
soup = browser.get_current_page() | |
options = soup.find("select", {"name": "ctl00$ContentPlaceHolder1$ddlQ_YM"}).find_all ('option') | |
return [options[-2]["value"], options[-1]["value"]] | |
def dl_page (month, calss_type): | |
form = browser.select_form('#aspnetForm') | |
form.form.find("select", {"name": "ctl00$ContentPlaceHolder1$ddlQ_Comid"})["disabled"] = "disabled" | |
del form.form.find("select", {"name": "ctl00$ContentPlaceHolder1$ddlQ_Class"})["disabled"] | |
form.form.find("select", {"name": "ctl00$ContentPlaceHolder1$ddlQ_Comid1"})["disabled"] = "disabled" | |
form.form.find("select", {"name": "ctl00$ContentPlaceHolder1$ddlQ_Class1"})["disabled"] = "disabled" | |
browser["ctl00$ContentPlaceHolder1$ddlQ_YM"] = month | |
browser["ctl00$ContentPlaceHolder1$rdo1"] = "rbClass" | |
browser["ctl00$ContentPlaceHolder1$ddlQ_Class"] = calss_type | |
form.choose_submit("ctl00$ContentPlaceHolder1$BtnQuery") | |
response = browser.submit_selected() | |
#browser.launch_browser() | |
soup = browser.get_current_page() | |
tag ='table')[3].find_all ('td') | |
tag_len = len (tag) | |
df = pd.DataFrame([], columns = ['Fund', 'StockNo', 'StockName', 'Money']) | |
fund_name = "" | |
idx = 10 | |
while idx < tag_len: | |
if tag[idx].text == "合計": | |
fund_name = "" | |
idx += 2 | |
continue | |
if fund_name == "": | |
fund_name = tag[idx].text.strip() | |
idx += 1 | |
continue | |
stock_type = tag[idx+1].text.strip() | |
if stock_type != "國內上市" and stock_type != "國內上櫃": | |
idx = idx + 9 | |
continue | |
df = df.append ({ | |
'Fund': fund_name, | |
"StockNo" : tag[idx+2].text, | |
"StockName" : tag[idx+3].text, | |
"Money" : numpy.int64 (tag[idx+4].text.replace(',', ''))}, | |
ignore_index=True) | |
idx = idx + 9 | |
df['Date'] = month | |
df.set_index(['Date', 'Fund'], inplace = True) | |
return df | |
months = dl_month () | |
for m in months: | |
dl_page (m, 'AA1') #第一次會失效?? | |
df1 = dl_page (m, 'AA1') | |
df2 = dl_page (m, 'AB1') | |
df3 = dl_page (m, 'AI1') | |
df4 = dl_page (m, 'AH1') | |
all_df = pd.concat([all_df, df1, df2, df3, df4], sort=True) | |
#月別基金變動 | |
print ("月別基金變動") | |
all_df.index.unique(level='Fund') | |
index_fund = all_df.index.unique(level='Fund') | |
for fund in index_fund: | |
df = all_df[all_df.index.get_level_values('Fund').isin([fund])] | |
prev_m = df[df.index.get_level_values('Date').isin([months[0]])] | |
last_m = df[df.index.get_level_values('Date').isin([months[1]])] | |
prev_stock = prev_m['StockNo'].tolist () | |
last_stock = last_m['StockNo'].tolist () | |
#保留持股 | |
intersection =list(set(prev_stock).intersection(last_stock)) | |
print (fund + " 移出前十持股") | |
for s in prev_stock: | |
if s not in intersection: | |
stockname = df[df['StockNo'] == str(s)].StockName[0] | |
print ("["+str(s)+"] "+ stockname) | |
print (fund + " 新增前十持股") | |
for s in last_stock: | |
if s not in intersection: | |
stockname = df[df['StockNo'] == str(s)].StockName[0] | |
print ("["+str(s)+"] "+ stockname) | |
print (fund + " 保留前十持股") | |
for s in intersection: | |
stockname = df[df['StockNo'] == str(s)].StockName[0] | |
print ("["+str(s)+"] "+ stockname) | |
#個股增減變動 | |
print ("個股增減變動") | |
index_stock = sorted (list(set(all_df['StockNo'].tolist ()))) | |
prev_m = all_df[all_df.index.get_level_values('Date').isin([months[0]])] | |
last_m = all_df[all_df.index.get_level_values('Date').isin([months[1]])] | |
for s in index_stock: | |
stockname = all_df[all_df['StockNo'] == str(s)].StockName[0] | |
prev_data = prev_m[prev_m['StockNo'] == str(s)] | |
prev_money = 0 | |
if not prev_data.empty: | |
prev_money = prev_data.Money.sum() | |
last_data = last_m[last_m['StockNo'] == str(s)] | |
last_money = 0 | |
if not last_data.empty: | |
last_money = last_data.Money.sum() | |
if prev_money <= 0: | |
print ("["+str(s)+"] "+ stockname + " 新增十大持股") | |
elif last_money <= 0: | |
print ("["+str(s)+"] "+ stockname + " 移出十大持股") | |
elif last_money > prev_money: | |
print ("["+str(s)+"] "+ stockname + " 十大持股增加") | |
else: | |
print ("["+str(s)+"] "+ stockname + " 十大持股減少") |
# -*- coding: utf-8 -*- | |
import wx, time | |
import wx.lib.anchors as anchors | |
from ctypes import byref, POINTER, windll | |
from comtypes import IUnknown, GUID | |
from comtypes.client import GetModule, GetBestInterface, GetEvents | |
user32 = windll.user32 | |
atl = windll.atl | |
import queue as queue | |
q = queue.Queue () | |
class switch(object): | |
def __init__(self, value): | |
self.value = value | |
self.fall = False | |
def __iter__(self): | |
"""Return the match method once, then stop""" | |
yield self.match | |
raise StopIteration | |
def match(self, *args): | |
"""Indicate whether or not to enter a case suite""" | |
if self.fall or not args: | |
return True | |
elif self.value in args: # changed for v1.5, see below | |
self.fall = True | |
return True | |
else: | |
return False | |
class Job: | |
def __init__ (self, do_type, do_value = 0): | |
self.do_type = do_type | |
self.do_value = do_value | |
q.put (self) | |
def DoJob(Bot, x): | |
for case in switch(x.do_type): | |
if case(Job.STOCK_LOGIN): | |
Bot.login () | |
break | |
if case(Job.STOCK_USERDEFINE): | |
Bot.user_define () | |
break | |
class YuantaOrdEvents(object): | |
def __init__(self, parent): | |
self.parent = parent | |
def OnLogonS(self, this, TLinkStatus, AccList, Casq, Cast): | |
print ('OnLogonS {},{},{},{}'.format (TLinkStatus, AccList, Casq, Cast)) | |
if TLinkStatus == 2: | |
datas = AccList.split(';') | |
for data in datas: | | (data) | | (Casq.split('=')[1]) | |
def OnReportQuery(self, this, RowCount, Results): | |
print ('OnReportQuery') | |
def OnDealQuery(self, this, RowCount, Results): | |
print ('OnDealQuery') | |
def OnUserDefinsFuncResult(self, this, RowCount, Results, WorkID): | |
print ('OnUserDefinsFuncResult {},{},{}'.format(RowCount, WorkID, Results)) | |
def OnOrdRptF(self, this, Omkt, Mktt, Cmbf, Statusc, Ts_Code, Ts_Msg, Bhno, AcNo, | |
Suba, Symb, Scnam, O_Kind, O_Type, Buys, S_Buys, O_Prc, O_Qty, Work_Qty, Kill_Qty, | |
Deal_Qty, Order_No, T_Date, O_Date, O_Time, | |
O_Src, O_Lin, A_Prc, Oseq_No, Err_Code, | |
Err_Msg, R_Time, D_Flag): | |
print ('OnOrdRptF') | |
def OnOrdMatF(self, this, Omkt, Buys, Cmbf, Bhno, | |
AcNo, Suba, Symb, Scnam, O_Kind, | |
S_Buys, O_Prc, A_Prc, O_Qty, Deal_Qty, | |
T_Date, D_Time, Order_No, O_Src, O_Lin, | |
Oseq_No): | |
print ('OnOrdMatF') | |
def OnOrdResult(self, this, ID, result): | |
print ('OnOrdResult') | |
def OnRfOrdRptRF(self, this, Exchange, Omkt, Statusc, Ts_Code, | |
Ts_Msg, Bhno, AcNo, Suba, Symb, | |
Scnam, O_Kind, Buys, S_Buys, PriceType, | |
O_Prc1, O_Prc2, O_Qty, Work_Qty, Kill_Qty, | |
Deal_Qty, Order_No, O_Date, O_Time, O_Src, | |
O_Lin, A_Prc, Oseq_No, Err_Code, Err_Msg, | |
R_Time, D_Flag): | |
print ('OnRfOrdRptRF') | |
def OnRfOrdMatRF(self, this, Exchange, Omkt, Bhno, AcNo, | |
Suba, Symb, Scnam, O_Kind, Buys, | |
S_Buys, PriceType, O_Prc1, O_Prc2, A_Prc, | |
O_Qty, Deal_Qty, O_Date, D_Time, Order_No, | |
O_Src, O_Lin, Oseq_No): | |
print ('OnRfOrdMatRF') | |
def OnRfOrdResult(self, this, ID, result): | |
print ('OnRfOrdResult') | |
def OnRfReportQuery(self, this, RowCount, Results): | |
print ('OnRfReportQuery') | |
def OnRfDealQuery(self, this, RowCount, Results): | |
print ('OnRfReportQuery') | |
class YuantaOrdWapper: | |
def __init__(self, handle, bot): | | = bot | |
Iwindow = POINTER(IUnknown)() | |
Icontrol = POINTER(IUnknown)() | |
Ievent = POINTER(IUnknown)() | |
res = atl.AtlAxCreateControlEx("Yuanta.YuantaOrdCtrl.64", handle, None, | |
byref(Iwindow), | |
byref(Icontrol), | |
byref(GUID()), | |
Ievent) | |
self.YuantaOrd = GetBestInterface(Icontrol) | |
self.YuantaOrdEvents = YuantaOrdEvents(self) | |
self.YuantaOrdEventsConnect = GetEvents(self.YuantaOrd, self.YuantaOrdEvents) | |
class StockBot: | |
def __init__(self, botuid, account, pwd): | |
self.Yuanta = YuantaOrdWapper (botuid, self) | |
self.Account = account | |
self.Pwd = pwd | |
self.YuantaAccount = [] | |
self.YuantaSN = [] | |
def login (self): | |
self.Yuanta.YuantaOrd.SetFutOrdConnection(self.Account, self.Pwd, '', '80') | |
print ('login') | |
def user_define (self): | |
vars = self.YuantaAccount[0].split('-') | |
Params = "Func=FA003|bhno={}|acno={}|suba=|type=1|currency=TWD".format (vars[1], vars[2]) | |
ret = self.Yuanta.YuantaOrd.UserDefinsFunc (Params, "FA003") | |
print ("user_define {}".format (ret)) | |
class MyApp(wx.App): | |
def MainLoop(self, run_func): | |
# Create an event loop and make it active. If you are | |
# only going to temporarily have a nested event loop then | |
# you should get a reference to the old one and set it as | |
# the active event loop when you are done with this one... | |
evtloop = wx.GUIEventLoop() | |
old = wx.EventLoop.GetActive() | |
wx.EventLoop.SetActive(evtloop) | |
# This outer loop determines when to exit the application, | |
# for this example we let the main frame reset this flag | |
# when it closes. | |
while self.keepGoing: | |
# At this point in the outer loop you could do | |
# whatever you implemented your own MainLoop for. It | |
# should be quick and non-blocking, otherwise your GUI | |
# will freeze. | |
# call_your_code_here() | |
run_func () | |
while not q.empty(): | |
next_job = q.get() | |
DoJob (Bot, next_job) | |
# This inner loop will process any GUI events | |
# until there are no more waiting. | |
while evtloop.Pending(): | |
evtloop.Dispatch() | |
# Send idle events to idle handlers. You may want to | |
# throttle this back a bit somehow so there is not too | |
# much CPU time spent in the idle handlers. For this | |
# example, I'll just snooze a little... | |
time.sleep(0.10) | |
evtloop.ProcessIdle() | |
wx.EventLoop.SetActive(old) | |
def OnInit(self): | |
self.keepGoing = True | |
return True | |
def run_job(): | |
while not q.empty(): | |
next_job = q.get() | |
DoJob (Bot, next_job) | |
if __name__ == "__main__": | |
app=MyApp() | |
frame = wx.Frame(None,wx.ID_ANY,"Hello") | |
frame.Show (False) | |
Bot = StockBot(frame.Handle, 'ACCOUNT', 'PWD') | |
Job(Job.STOCK_LOGIN) | |
app.MainLoop (run_job) | |
