GitHub網址:
https://github.com/kwedr/kchartgame
使用 Python3.8, Pyside2, Pandas
實現功能:閃電下單介面
觸價功能
支持歷史回測 (讀取台灣期交所RPT檔案)
隨機開檔功能
DDE即時模擬盤
歷史盤行情加速
歷史盤回到指定時間
下單標點位
輸出點位圖檔
我寫程式,所以我存在
GitHub網址:
https://github.com/kwedr/kchartgame
使用 Python3.8, Pyside2, Pandas
實現功能:import numpy | |
import mechanicalsoup | |
import pandas as pd | |
all_df = pd.DataFrame([], columns = ['Fund', 'Date', 'StockNo', 'StockName', 'Money']) | |
all_df.set_index(['Date', 'Fund'], inplace = True) | |
browser = mechanicalsoup.StatefulBrowser( | |
soup_config={'features': 'lxml', 'from_encoding':'utf-8'}, | |
raise_on_404=True, | |
user_agent='Mozilla/5.0 (compatible; MSIE 8.0; Windows NT 6.3; Win64; x64)' | |
) | |
req = browser.open("https://www.sitca.org.tw/ROC/Industry/IN2629.aspx") | |
def dl_month (): | |
soup = browser.get_current_page() | |
options = soup.find("select", {"name": "ctl00$ContentPlaceHolder1$ddlQ_YM"}).find_all ('option') | |
return [options[-2]["value"], options[-1]["value"]] | |
def dl_page (month, calss_type): | |
form = browser.select_form('#aspnetForm') | |
form.form.find("select", {"name": "ctl00$ContentPlaceHolder1$ddlQ_Comid"})["disabled"] = "disabled" | |
del form.form.find("select", {"name": "ctl00$ContentPlaceHolder1$ddlQ_Class"})["disabled"] | |
form.form.find("select", {"name": "ctl00$ContentPlaceHolder1$ddlQ_Comid1"})["disabled"] = "disabled" | |
form.form.find("select", {"name": "ctl00$ContentPlaceHolder1$ddlQ_Class1"})["disabled"] = "disabled" | |
browser["ctl00$ContentPlaceHolder1$ddlQ_YM"] = month | |
browser["ctl00$ContentPlaceHolder1$rdo1"] = "rbClass" | |
browser["ctl00$ContentPlaceHolder1$ddlQ_Class"] = calss_type | |
form.choose_submit("ctl00$ContentPlaceHolder1$BtnQuery") | |
response = browser.submit_selected() | |
#browser.launch_browser() | |
soup = browser.get_current_page() | |
tag = soup.select('table')[3].find_all ('td') | |
tag_len = len (tag) | |
df = pd.DataFrame([], columns = ['Fund', 'StockNo', 'StockName', 'Money']) | |
fund_name = "" | |
idx = 10 | |
while idx < tag_len: | |
if tag[idx].text == "合計": | |
fund_name = "" | |
idx += 2 | |
continue | |
if fund_name == "": | |
fund_name = tag[idx].text.strip() | |
idx += 1 | |
continue | |
stock_type = tag[idx+1].text.strip() | |
if stock_type != "國內上市" and stock_type != "國內上櫃": | |
idx = idx + 9 | |
continue | |
df = df.append ({ | |
'Fund': fund_name, | |
"StockNo" : tag[idx+2].text, | |
"StockName" : tag[idx+3].text, | |
"Money" : numpy.int64 (tag[idx+4].text.replace(',', ''))}, | |
ignore_index=True) | |
idx = idx + 9 | |
df['Date'] = month | |
df.set_index(['Date', 'Fund'], inplace = True) | |
return df | |
months = dl_month () | |
for m in months: | |
dl_page (m, 'AA1') #第一次會失效?? | |
df1 = dl_page (m, 'AA1') | |
df2 = dl_page (m, 'AB1') | |
df3 = dl_page (m, 'AI1') | |
df4 = dl_page (m, 'AH1') | |
all_df = pd.concat([all_df, df1, df2, df3, df4], sort=True) | |
#月別基金變動 | |
print ("月別基金變動") | |
all_df.index.unique(level='Fund') | |
index_fund = all_df.index.unique(level='Fund') | |
for fund in index_fund: | |
df = all_df[all_df.index.get_level_values('Fund').isin([fund])] | |
prev_m = df[df.index.get_level_values('Date').isin([months[0]])] | |
last_m = df[df.index.get_level_values('Date').isin([months[1]])] | |
prev_stock = prev_m['StockNo'].tolist () | |
last_stock = last_m['StockNo'].tolist () | |
#保留持股 | |
intersection =list(set(prev_stock).intersection(last_stock)) | |
print (fund + " 移出前十持股") | |
for s in prev_stock: | |
if s not in intersection: | |
stockname = df[df['StockNo'] == str(s)].StockName[0] | |
print ("["+str(s)+"] "+ stockname) | |
print (fund + " 新增前十持股") | |
for s in last_stock: | |
if s not in intersection: | |
stockname = df[df['StockNo'] == str(s)].StockName[0] | |
print ("["+str(s)+"] "+ stockname) | |
print (fund + " 保留前十持股") | |
for s in intersection: | |
stockname = df[df['StockNo'] == str(s)].StockName[0] | |
print ("["+str(s)+"] "+ stockname) | |
#個股增減變動 | |
print ("個股增減變動") | |
index_stock = sorted (list(set(all_df['StockNo'].tolist ()))) | |
prev_m = all_df[all_df.index.get_level_values('Date').isin([months[0]])] | |
last_m = all_df[all_df.index.get_level_values('Date').isin([months[1]])] | |
for s in index_stock: | |
stockname = all_df[all_df['StockNo'] == str(s)].StockName[0] | |
prev_data = prev_m[prev_m['StockNo'] == str(s)] | |
prev_money = 0 | |
if not prev_data.empty: | |
prev_money = prev_data.Money.sum() | |
last_data = last_m[last_m['StockNo'] == str(s)] | |
last_money = 0 | |
if not last_data.empty: | |
last_money = last_data.Money.sum() | |
if prev_money <= 0: | |
print ("["+str(s)+"] "+ stockname + " 新增十大持股") | |
elif last_money <= 0: | |
print ("["+str(s)+"] "+ stockname + " 移出十大持股") | |
elif last_money > prev_money: | |
print ("["+str(s)+"] "+ stockname + " 十大持股增加") | |
else: | |
print ("["+str(s)+"] "+ stockname + " 十大持股減少") |
# -*- coding: utf-8 -*- | |
import wx, time | |
import wx.lib.anchors as anchors | |
from ctypes import byref, POINTER, windll | |
from comtypes import IUnknown, GUID | |
from comtypes.client import GetModule, GetBestInterface, GetEvents | |
user32 = windll.user32 | |
atl = windll.atl | |
import queue as queue | |
q = queue.Queue () | |
class switch(object): | |
def __init__(self, value): | |
self.value = value | |
self.fall = False | |
def __iter__(self): | |
"""Return the match method once, then stop""" | |
yield self.match | |
raise StopIteration | |
def match(self, *args): | |
"""Indicate whether or not to enter a case suite""" | |
if self.fall or not args: | |
return True | |
elif self.value in args: # changed for v1.5, see below | |
self.fall = True | |
return True | |
else: | |
return False | |
class Job: | |
STOCK_LOGIN = 1 | |
STOCK_USERDEFINE = 2 | |
def __init__ (self, do_type, do_value = 0): | |
self.do_type = do_type | |
self.do_value = do_value | |
q.put (self) | |
def DoJob(Bot, x): | |
for case in switch(x.do_type): | |
if case(Job.STOCK_LOGIN): | |
Bot.login () | |
break | |
if case(Job.STOCK_USERDEFINE): | |
Bot.user_define () | |
break | |
class YuantaOrdEvents(object): | |
def __init__(self, parent): | |
self.parent = parent | |
def OnLogonS(self, this, TLinkStatus, AccList, Casq, Cast): | |
print ('OnLogonS {},{},{},{}'.format (TLinkStatus, AccList, Casq, Cast)) | |
if TLinkStatus == 2: | |
datas = AccList.split(';') | |
for data in datas: | |
self.parent.bot.YuantaAccount.append (data) | |
self.parent.bot.YuantaSN.append (Casq.split('=')[1]) | |
Job (Job.STOCK_USERDEFINE) | |
def OnReportQuery(self, this, RowCount, Results): | |
print ('OnReportQuery') | |
def OnDealQuery(self, this, RowCount, Results): | |
print ('OnDealQuery') | |
def OnUserDefinsFuncResult(self, this, RowCount, Results, WorkID): | |
print ('OnUserDefinsFuncResult {},{},{}'.format(RowCount, WorkID, Results)) | |
def OnOrdRptF(self, this, Omkt, Mktt, Cmbf, Statusc, Ts_Code, Ts_Msg, Bhno, AcNo, | |
Suba, Symb, Scnam, O_Kind, O_Type, Buys, S_Buys, O_Prc, O_Qty, Work_Qty, Kill_Qty, | |
Deal_Qty, Order_No, T_Date, O_Date, O_Time, | |
O_Src, O_Lin, A_Prc, Oseq_No, Err_Code, | |
Err_Msg, R_Time, D_Flag): | |
print ('OnOrdRptF') | |
def OnOrdMatF(self, this, Omkt, Buys, Cmbf, Bhno, | |
AcNo, Suba, Symb, Scnam, O_Kind, | |
S_Buys, O_Prc, A_Prc, O_Qty, Deal_Qty, | |
T_Date, D_Time, Order_No, O_Src, O_Lin, | |
Oseq_No): | |
print ('OnOrdMatF') | |
def OnOrdResult(self, this, ID, result): | |
print ('OnOrdResult') | |
def OnRfOrdRptRF(self, this, Exchange, Omkt, Statusc, Ts_Code, | |
Ts_Msg, Bhno, AcNo, Suba, Symb, | |
Scnam, O_Kind, Buys, S_Buys, PriceType, | |
O_Prc1, O_Prc2, O_Qty, Work_Qty, Kill_Qty, | |
Deal_Qty, Order_No, O_Date, O_Time, O_Src, | |
O_Lin, A_Prc, Oseq_No, Err_Code, Err_Msg, | |
R_Time, D_Flag): | |
print ('OnRfOrdRptRF') | |
def OnRfOrdMatRF(self, this, Exchange, Omkt, Bhno, AcNo, | |
Suba, Symb, Scnam, O_Kind, Buys, | |
S_Buys, PriceType, O_Prc1, O_Prc2, A_Prc, | |
O_Qty, Deal_Qty, O_Date, D_Time, Order_No, | |
O_Src, O_Lin, Oseq_No): | |
print ('OnRfOrdMatRF') | |
def OnRfOrdResult(self, this, ID, result): | |
print ('OnRfOrdResult') | |
def OnRfReportQuery(self, this, RowCount, Results): | |
print ('OnRfReportQuery') | |
def OnRfDealQuery(self, this, RowCount, Results): | |
print ('OnRfReportQuery') | |
class YuantaOrdWapper: | |
def __init__(self, handle, bot): | |
self.bot = bot | |
Iwindow = POINTER(IUnknown)() | |
Icontrol = POINTER(IUnknown)() | |
Ievent = POINTER(IUnknown)() | |
res = atl.AtlAxCreateControlEx("Yuanta.YuantaOrdCtrl.64", handle, None, | |
byref(Iwindow), | |
byref(Icontrol), | |
byref(GUID()), | |
Ievent) | |
self.YuantaOrd = GetBestInterface(Icontrol) | |
self.YuantaOrdEvents = YuantaOrdEvents(self) | |
self.YuantaOrdEventsConnect = GetEvents(self.YuantaOrd, self.YuantaOrdEvents) | |
class StockBot: | |
def __init__(self, botuid, account, pwd): | |
self.Yuanta = YuantaOrdWapper (botuid, self) | |
self.Account = account | |
self.Pwd = pwd | |
self.YuantaAccount = [] | |
self.YuantaSN = [] | |
def login (self): | |
self.Yuanta.YuantaOrd.SetFutOrdConnection(self.Account, self.Pwd, 'api.yuantafutures.com.tw', '80') | |
print ('login') | |
def user_define (self): | |
vars = self.YuantaAccount[0].split('-') | |
Params = "Func=FA003|bhno={}|acno={}|suba=|type=1|currency=TWD".format (vars[1], vars[2]) | |
ret = self.Yuanta.YuantaOrd.UserDefinsFunc (Params, "FA003") | |
print ("user_define {}".format (ret)) | |
class MyApp(wx.App): | |
def MainLoop(self, run_func): | |
# Create an event loop and make it active. If you are | |
# only going to temporarily have a nested event loop then | |
# you should get a reference to the old one and set it as | |
# the active event loop when you are done with this one... | |
evtloop = wx.GUIEventLoop() | |
old = wx.EventLoop.GetActive() | |
wx.EventLoop.SetActive(evtloop) | |
# This outer loop determines when to exit the application, | |
# for this example we let the main frame reset this flag | |
# when it closes. | |
while self.keepGoing: | |
# At this point in the outer loop you could do | |
# whatever you implemented your own MainLoop for. It | |
# should be quick and non-blocking, otherwise your GUI | |
# will freeze. | |
# call_your_code_here() | |
run_func () | |
while not q.empty(): | |
next_job = q.get() | |
DoJob (Bot, next_job) | |
# This inner loop will process any GUI events | |
# until there are no more waiting. | |
while evtloop.Pending(): | |
evtloop.Dispatch() | |
# Send idle events to idle handlers. You may want to | |
# throttle this back a bit somehow so there is not too | |
# much CPU time spent in the idle handlers. For this | |
# example, I'll just snooze a little... | |
time.sleep(0.10) | |
evtloop.ProcessIdle() | |
wx.EventLoop.SetActive(old) | |
def OnInit(self): | |
self.keepGoing = True | |
return True | |
def run_job(): | |
while not q.empty(): | |
next_job = q.get() | |
DoJob (Bot, next_job) | |
if __name__ == "__main__": | |
app=MyApp() | |
frame = wx.Frame(None,wx.ID_ANY,"Hello") | |
frame.Show (False) | |
Bot = StockBot(frame.Handle, 'ACCOUNT', 'PWD') | |
Job(Job.STOCK_LOGIN) | |
app.MainLoop (run_job) | |
# -*- coding: utf-8 -*- | |
import wx, time | |
import wx.lib.anchors as anchors | |
from ctypes import byref, POINTER, windll | |
from comtypes import IUnknown, GUID | |
from comtypes.client import GetModule, GetBestInterface, GetEvents | |
user32 = windll.user32 | |
atl = windll.atl | |
import queue as queue | |
q = queue.Queue () | |
class switch(object): | |
def __init__(self, value): | |
self.value = value | |
self.fall = False | |
def __iter__(self): | |
"""Return the match method once, then stop""" | |
yield self.match | |
raise StopIteration | |
def match(self, *args): | |
"""Indicate whether or not to enter a case suite""" | |
if self.fall or not args: | |
return True | |
elif self.value in args: # changed for v1.5, see below | |
self.fall = True | |
return True | |
else: | |
return False | |
class Job: | |
STOCK_LOGIN = 1 | |
STOCK_WATCH = 2 | |
def __init__ (self, do_type, do_value = 0): | |
self.do_type = do_type | |
self.do_value = do_value | |
q.put (self) | |
def DoJob(Bot, x): | |
for case in switch(x.do_type): | |
if case(Job.STOCK_LOGIN): | |
Bot.login () | |
break | |
if case(Job.STOCK_WATCH): | |
Bot.watch (x.do_value) | |
break | |
class YuantaQuoteEvents(object): | |
def __init__(self, parent): | |
self.parent = parent | |
def OnMktStatusChange (self, this, Status, Msg, ReqType): | |
print ('OnMktStatusChange {},{},{}'.format (ReqType, Msg, Status)) | |
if Status == 2: | |
Job(Job.STOCK_WATCH, ReqType) | |
def OnRegError(self, this, symbol, updmode, ErrCode, ReqType): | |
print ('OnRegError {},{},{},{}'.format (ReqType, ErrCode, symbol, updmode)) | |
def OnGetMktData(self, this, PriType, symbol, Qty, Pri, ReqType): | |
print ('OnGetMktData') | |
def OnGetMktQuote(self, this, symbol, DisClosure, Duration, ReqType): | |
print ('OnGetMktQuote') | |
def OnGetMktAll(self, this, symbol, RefPri, OpenPri, HighPri, LowPri, UpPri, DnPri, MatchTime, MatchPri, MatchQty, TolMatchQty, | |
BestBuyQty, BestBuyPri, BestSellQty,BestSellPri, FDBPri, FDBQty, FDSPri, FDSQty, ReqType): | |
#print ('OnGetMktAll\n') | |
print ('{} {} c:{} o:{} h:{} l:{} v:{}'.format (ReqType, MatchTime, MatchPri, OpenPri, HighPri, LowPri, TolMatchQty)) | |
def OnGetDelayClose(self, this, symbol, DelayClose, ReqType): | |
print ('OnGetDelayClose') | |
def OnGetBreakResume(self, this, symbol, BreakTime, ResumeTime, ReqType): | |
print ('OnGetBreakResume') | |
def OnGetTradeStatus(self, this, symbol, TradeStatus, ReqType): | |
print ('OnGetTradeStatus') | |
def OnTickRegError(self, this, strSymbol, lMode, lErrCode, ReqType): | |
print ('OnTickRegError') | |
def OnGetTickData(self, this, strSymbol, strTickSn, strMatchTime, strBuyPri, strSellPri, strMatchPri, strMatchQty, strTolMatQty, | |
strMatchAmt, strTolMatAmt, ReqType): | |
print ('OnGetTickData') | |
def OnTickRangeDataError(self, this, strSymbol, lErrCode, ReqType): | |
print ('OnTickRangeDataError') | |
def OnGetTickRangeData(self, this, strSymbol, strStartTime, strEndTime, strTolMatQty, strTolMatAmt, ReqType): | |
print ('OnGetTickRangeData') | |
def OnGetTimePack(self, this, strTradeType, strTime, ReqType): | |
print ('OnGetTimePack {},{}'.format (strTradeType, strTime)) | |
def OnGetDelayOpen(self, this, symbol, DelayOpen, ReqType): | |
print ('OnGetDelayOpen') | |
def OnGetFutStatus(self, this, symbol, FunctionCode, BreakTime, StartTime, ReopenTime, ReqType): | |
print ('OnGetFutStatus') | |
def OnGetLimitChange(self, this, symbol, FunctionCode, StatusTime, Level, ExpandType, ReqType): | |
print ('OnGetLimitChange') | |
class YuantaQuoteWapper: | |
def __init__(self, handle, bot): | |
self.bot = bot | |
Iwindow = POINTER(IUnknown)() | |
Icontrol = POINTER(IUnknown)() | |
Ievent = POINTER(IUnknown)() | |
res = atl.AtlAxCreateControlEx("YUANTAQUOTE.YuantaQuoteCtrl.1", handle, None, | |
byref(Iwindow), | |
byref(Icontrol), | |
byref(GUID()), | |
Ievent) | |
self.YuantaQuote = GetBestInterface(Icontrol) | |
self.YuantaQuoteEvents = YuantaQuoteEvents(self) | |
self.YuantaQuoteEventsConnect = GetEvents(self.YuantaQuote, self.YuantaQuoteEvents) | |
class StockBot: | |
def __init__(self, botuid, account, pwd): | |
self.Yuanta = YuantaQuoteWapper (botuid, self) | |
self.Account = account | |
self.Pwd = pwd | |
def login (self): | |
#T port 80/443 , T+1 port 82/442 , reqType=1 T盤 , reqType=2 T+1盤 | |
self.Yuanta.YuantaQuote.SetMktLogon(self.Account, self.Pwd, '203.66.93.84', '80', 1, 0) | |
self.Yuanta.YuantaQuote.SetMktLogon(self.Account, self.Pwd, '203.66.93.84', '82', 2, 1) | |
print ('login') | |
def watch (self, ret_type): | |
ret = self.Yuanta.YuantaQuote.AddMktReg ('1101', "4", ret_type, 0) | |
print ("AddMktReg {}".format (ret)) | |
class MyApp(wx.App): | |
def MainLoop(self, run_func): | |
# Create an event loop and make it active. If you are | |
# only going to temporarily have a nested event loop then | |
# you should get a reference to the old one and set it as | |
# the active event loop when you are done with this one... | |
evtloop = wx.GUIEventLoop() | |
old = wx.EventLoop.GetActive() | |
wx.EventLoop.SetActive(evtloop) | |
# This outer loop determines when to exit the application, | |
# for this example we let the main frame reset this flag | |
# when it closes. | |
while self.keepGoing: | |
# At this point in the outer loop you could do | |
# whatever you implemented your own MainLoop for. It | |
# should be quick and non-blocking, otherwise your GUI | |
# will freeze. | |
# call_your_code_here() | |
run_func () | |
while not q.empty(): | |
next_job = q.get() | |
DoJob (Bot, next_job) | |
# This inner loop will process any GUI events | |
# until there are no more waiting. | |
while evtloop.Pending(): | |
evtloop.Dispatch() | |
# Send idle events to idle handlers. You may want to | |
# throttle this back a bit somehow so there is not too | |
# much CPU time spent in the idle handlers. For this | |
# example, I'll just snooze a little... | |
time.sleep(0.10) | |
evtloop.ProcessIdle() | |
wx.EventLoop.SetActive(old) | |
def OnInit(self): | |
self.keepGoing = True | |
return True | |
def run_job(): | |
while not q.empty(): | |
next_job = q.get() | |
DoJob (Bot, next_job) | |
if __name__ == "__main__": | |
app=MyApp() | |
frame = wx.Frame(None,wx.ID_ANY,"Hello") | |
frame.Show (False) | |
Bot = StockBot(frame.Handle, ACCOUNT, PWD) | |
Job(Job.STOCK_LOGIN) | |
app.MainLoop (run_job) | |