Recevoir le websocket de l'API kabu station ® en Python

Aperçu

Dans la continuité de précédent, au Kabucom Securities utilisera l'API de la station kabu fournie aux individus de Python. Cette fois, la marque sera livrée par Websocket en Python. En même temps, nous présenterons également le code pour l'enregistrement, la désinscription et la désinscription de tous les problèmes.

environnement

Forfaits supplémentaires

code

Enregistrement de la marque

import json
import requests
import yaml

# ---

def get_token():
    with open('auth.yaml', 'r') as yml:
        auth = yaml.safe_load(yml)

    url = 'http://localhost:18080/kabusapi/token'
    headers = {'content-type': 'application/json'}
    payload = json.dumps(
        {'APIPassword': auth['PASS'],}
        ).encode('utf8')

    response = requests.post(url, data=payload, headers=headers)

    return json.loads(response.text)['Token']

# ---

token = get_token()

EXCHANGES = {
    1: 'TSE',
    3: 'Certificat de nom',
    5: 'Fortune',
    6: 'Certificat de facture',
}

payload = json.dumps({
    'Symbols': [
        {'Symbol': 8306 ,'Exchange': 1},  # MUFG
        {'Symbol': 9433 ,'Exchange': 1},  # KDDI
        # ...Jusqu'à 50 peuvent être enregistrés
    ],}).encode('utf8')

url = 'http://localhost:18080/kabusapi/register'
headers = {'Content-Type': 'application/json', 'X-API-KEY': token,}
response = requests.put(url, payload, headers=headers)

regist_list = json.loads(response.text)

print('Marque d'enregistrement de la livraison')
for regist in regist_list['RegistList']:
    print("{} {}".format(
        regist['Symbol'],
        EXCHANGES[regist['Exchange']]))

Annulation de l'enregistrement de la marque

Changez uniquement l'URL. L'unité d'affichage peut être détournée.

url = 'http://localhost:18080/kabusapi/unregister'

Annuler tous les stocks enregistrés

payload n'est plus nécessaire.

url = 'http://localhost:18080/kabusapi/unregister/all'
headers = {'Content-Type': 'application/json', 'X-API-KEY': token,}
response = requests.put(url, headers=headers)

Affichage de livraison

Continuer à recevoir la distribution des marques déposées. Quittez avec Ctrl + C. Notez que «.text» n'est pas ajouté à «response» qui est «json.loads».

import asyncio
import json
import websockets

# ---

async def stream():
    uri = 'ws://localhost:18080/kabusapi/websocket'

    async with websockets.connect(uri, ping_timeout=None) as ws:
        while not ws.closed:
            response = await ws.recv()
            board = json.loads(response)
            print("{} {} {}".format(
                board['Symbol'],
                board['SymbolName'],
                board['CurrentPrice'],
            ))

loop = asyncio.get_event_loop()
loop.create_task(stream())
try:
    loop.run_forever()
except KeyboardInterrupt:
    exit()

Puisque le côté serveur n'implémente pas le battement de cœur, l'argument de ping_timeout = None est requis dans websockets.connect.

[Demande] Support ping / pong WebSocket   Issue#8 https://github.com/kabucom/kabusapi/issues/8

Recommended Posts

Recevoir le websocket de l'API kabu station ® en Python
Utilisez l'API kabu Station® de Python
API Evernote en Python
API C en Python 3
Essayez d'utiliser l'API kabu station de au Kabucom Securities
kabu Station® API - wrapper Python mis à jour pour l'API PUSH
Hit API de Mastodon en Python
Recevoir des arguments d'exécution dans Python 2.7
Jugement d'équivalence d'objet en Python
API Blender Python dans Houdini (Python 3)
Implémentation du tri rapide en Python
Touchons l'API de Netatmo Weather Station avec Python. #Python #Netatmo
Manipulation des pixels d'image en Python
Obtenir l'API arXiv en Python
Frappez l'API Sesami en Python
Diviser timedelta dans la série Python 2.7
Échappement automatique des paramètres MySQL en python
Gestion des fichiers JSON en Python
Implémentation du jeu de vie en Python
Affichage de la forme d'onde audio en Python
Créez Gmail en Python sans utiliser l'API
Accédez à l'API Web en Python
Implémentez rapidement l'API REST en Python
La loi des nombres en python
Implémentation du tri original en Python
Brouillage réversible d'entiers en Python
Accéder à l'API Twitter avec Python
Fonctionnement de la souris à l'aide de l'API Windows en Python
Essayez d'utiliser l'API Wunderlist en Python
Vérifiez le comportement du destroyer en Python
Pratique d'utilisation de ceci en Python (mauvais)
Théorie générale de la relativité en Python: Introduction
Essayez d'utiliser l'API Kraken avec Python
Arborescence de sortie des fichiers en Python
Afficher une liste d'alphabets en Python 3
Comparaison des modules de conversion japonais en Python3
Vérifier et recevoir le port série en Python (vérification du port)
Résumé de diverses instructions for en Python
Tweet à l'aide de l'API Twitter en Python
Obtenez les données de l'API Google Fit en Python
Le résultat de l'installation de python sur Anaconda
Obtenez des données Youtube en Python à l'aide de l'API Youtube Data
Développement et déploiement de l'API REST en Python à l'aide de Falcon Web Framework
Modèles Gang of Four (GoF) en Python
Essayez rapidement l'API Face de Microsoft en Python
Principes de base pour exécuter NoxPlayer en Python
Remplacement en bloc des chaînes dans les tableaux Python
Recevez une liste des résultats du traitement parallèle en Python avec starmap
Projet Euler # 16 "Somme des pouvoirs" en Python
Traffic Safety-kun: Reconnaissance des panneaux de signalisation en Python
Résumé des méthodes intégrées, etc. de la liste Python
Utilisation d'opérateurs non logiques de ou en python
À la recherche du FizzBuzz le plus rapide en Python
Exemple pratique d'architecture hexagonale en Python
Projet Euler # 17 "Nombre de caractères" en Python
Equation de mouvement à double pendule en python
Débarrassez-vous des images DICOM en Python
Statut de chaque système de traitement Python en 2020
Connectez-vous à l'API Websocket de Coincheck depuis Python
Projet Euler # 1 "Multiple de 3 et 5" en Python