Continuing from previous, kabu station API provided by kabu.com Securities to individuals will be used from Python. This time, we will receive the distribution by Websocket of the brand in Python. At the same time, we will also introduce the code for registering, deregistering, and deregistering all issues.
import json
import requests
import yaml
# ---
def get_token():
with open('auth.yaml', 'r') as yml:
auth = yaml.safe_load(yml)
url = 'http://localhost:18080/kabusapi/token'
headers = {'content-type': 'application/json'}
payload = json.dumps(
{'APIPassword': auth['PASS'],}
).encode('utf8')
response = requests.post(url, data=payload, headers=headers)
return json.loads(response.text)['Token']
# ---
token = get_token()
EXCHANGES = {
1: 'TSE',
3: 'Nagoya Stock Exchange',
5: 'Fukuoka Stock Exchange',
6: 'Sapporo Stock Exchange',
}
payload = json.dumps({
'Symbols': [
{'Symbol': 8306 ,'Exchange': 1}, # MUFG
{'Symbol': 9433 ,'Exchange': 1}, # KDDI
# ...Up to 50 can be registered
],}).encode('utf8')
url = 'http://localhost:18080/kabusapi/register'
headers = {'Content-Type': 'application/json', 'X-API-KEY': token,}
response = requests.put(url, payload, headers=headers)
regist_list = json.loads(response.text)
print('Delivery registration brand')
for regist in regist_list['RegistList']:
print("{} {}".format(
regist['Symbol'],
EXCHANGES[regist['Exchange']]))
Only change the URL. The display unit can be diverted.
url = 'http://localhost:18080/kabusapi/unregister'
payload
is no longer needed.
url = 'http://localhost:18080/kabusapi/unregister/all'
headers = {'Content-Type': 'application/json', 'X-API-KEY': token,}
response = requests.put(url, headers=headers)
Continue to receive distribution of registered brands. Exit with Ctrl + C.
Note that .text
is not added to response
which is json.loads
.
import asyncio
import json
import websockets
# ---
async def stream():
uri = 'ws://localhost:18080/kabusapi/websocket'
async with websockets.connect(uri, ping_timeout=None) as ws:
while not ws.closed:
response = await ws.recv()
board = json.loads(response)
print("{} {} {}".format(
board['Symbol'],
board['SymbolName'],
board['CurrentPrice'],
))
loop = asyncio.get_event_loop()
loop.create_task(stream())
try:
loop.run_forever()
except KeyboardInterrupt:
exit()
Since the server side does not implement heartbeat, the argument of ping_timeout = None
is required in websockets.connect
.
[Request] WebSocket ping / pong support Issue#8 https://github.com/kabucom/kabusapi/issues/8
Recommended Posts