元大證劵API 也是使用 ComTypes
目前串接進度:串接到 Open 呼叫 Login 沒有反應
使用需修改
1. GetModule 需要修改目錄位置
2. 由於使用 x32 的 dll, 所以需要使用 python 32位元的版本
3. 帳號/密碼使用元大給的測試帳/密
目前串接進度:串接到 Open 呼叫 Login 沒有反應
使用需修改
1. GetModule 需要修改目錄位置
2. 由於使用 x32 的 dll, 所以需要使用 python 32位元的版本
3. 帳號/密碼使用元大給的測試帳/密
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import sys, os, time | |
import ctypes, comtypes, pythoncom | |
from ctypes import * | |
from comtypes.client import GetModule, CreateObject | |
from comtypes.client import ShowEvents, GetEvents, PumpEvents | |
import queue as queue | |
q = queue.Queue () | |
class switch(object): | |
def __init__(self, value): | |
self.value = value | |
self.fall = False | |
def __iter__(self): | |
"""Return the match method once, then stop""" | |
yield self.match | |
raise StopIteration | |
def match(self, *args): | |
"""Indicate whether or not to enter a case suite""" | |
if self.fall or not args: | |
return True | |
elif self.value in args: # changed for v1.5, see below | |
self.fall = True | |
return True | |
else: | |
return False | |
class Job: | |
STOCK_OPEN = 1 | |
STOCK_CLOSE = 2 | |
STOCK_LOGIN = 3 | |
STOCK_WATCH = 4 | |
def __init__ (self, do_type): | |
self.do_type = do_type | |
def DoJob(Bot, x): | |
for case in switch(x.do_type): | |
print ("DoJob: " + str (x.do_type)) | |
if case(Job.STOCK_OPEN): | |
Bot.open () | |
break | |
if case(Job.STOCK_CLOSE): | |
time.sleep(100) | |
Bot.close () | |
break | |
if case(Job.STOCK_LOGIN): | |
Bot.login () | |
break | |
class PolaisB2BTraderEvents(object): | |
def __init__(self, parent): | |
self.parent = parent | |
def OnResponse(self, intmark, dwIndex, strIndex, Handle, aryValue): | |
print ("OnResponse: " + aryValue) | |
for case in switch(intmark): | |
if case(0): | |
if aryValue == "Is Connected!!": | |
j = Job(Job.STOCK_LOGIN) | |
q.put (j) | |
break | |
if case(1): | |
print ("OnResponse: intmark 1") | |
break | |
class YuantaWapper: | |
def __init__(self, uid, bot): | |
self.uid = uid | |
self.bot = bot | |
path = os.path.join("c:\\B2BAPI_User_Docs", "PolarisB2BAPI.dll") | |
PolarisB2BAPI = GetModule(path) | |
import comtypes.gen.PolarisB2BAPI as PolarisB2BAPI | |
self.PolarisB2BAPI = PolarisB2BAPI | |
self.PolaisB2BTrader = CreateObject(PolarisB2BAPI.PolaisB2BTrader) | |
self.PolaisB2BTraderEvents = PolaisB2BTraderEvents (self) | |
self.PolaisB2BTraderEventsConnect = GetEvents (self.PolaisB2BTrader, self.PolaisB2BTraderEvents) | |
class StockBot: | |
def __init__(self, botuid, account, pwd, acc_type): | |
self.Yuanta = YuantaWapper (botuid, self) | |
self.Account = account | |
self.Pwd = pwd | |
self.AccType = acc_type | |
def open (self): | |
self.Yuanta.PolaisB2BTrader.Open () | |
def close (self): | |
self.Yuanta.PolaisB2BTrader.Close () | |
def login (self): | |
#from struct import pack | |
intFunID = int(0) #總帳登入 | |
abyAccount = (self.AccType + self.Account).ljust(22) | |
retValue = self.Yuanta.PolaisB2BTrader.Login (intFunID, abyAccount, self.Pwd) | |
print ('login: ' + str(retValue)) | |
if __name__ == "__main__": | |
comtypes.CoInitialize() | |
Bot = StockBot(0, '98875005091', '1234', 'S') | |
j = Job(Job.STOCK_OPEN) | |
q.put (j) | |
while 1: | |
while not q.empty(): | |
next_job = q.get() | |
DoJob (Bot, next_job) | |
pythoncom.PumpWaitingMessages() |
4 則留言:
你好,
我用上面的程式來取得報價資料,
發現它常常跑幾分鐘就卡住不會動了,
請問有什麼想法嗎?
你好
pythoncom.PumpWaitingMessages() 這個有一直呼叫嗎?
像群益是會一直從 event 回傳報價資料
因為我也沒有串元大報價,所以不知道他報價的方式
不過現在元大有新的 Smart API 看說明文件應該也可以取得股票報價
而且元大自已也有給 Smart API Python 的報價串接
https://www.yuantafutures.com.tw/file-repository/content/smartAPI/page8-1.html
提供你參考
你好,
請問我在使用元大Smart API時,想要一次報價多個商品,但它只能手動輸入。
是否要根據元大的行情API中AddMktReg(),寫一個script來操作?
另外您的程式中,PolarisB2BAPI這是元大提供的是嗎?
因為我在元大提供的行情跟交易API中,都沒有看到這個
謝謝
請問有試出login return 1的結果嗎,我用測試帳號怎麼試都是return 0,但用他們的測試程式是可以登入的。
張貼留言