Since automatic trading of virtual currency transactions (bitcoin) has become commonplace, Python beginners generate their own 1-minute bars in order to automatically buy and sell with coin check. I couldn't get 1 minute bar with the API published by Coincheck, so I made it myself. After briefly explaining the core logic, I will post the entire source code and execution results.
Check the API provided by Coincheck. Coincheck provides API by REST, but some API using WebSocket is also provided. Currently, what you can get with WebSocket is
Therefore, ** Transaction history ** is used to create a 1-minute bar. Looking at how to use the transaction history,
REQUEST
{
"type": "subscribe",
"channel": "[pair]-trades"
}
Is the request format.
RESPONSE
[
"ID",
"Trading pair",
"Order rate",
"Quantity on order",
"How to order"
]
Can be confirmed that is the response format. Since we will make a 1-minute bitcoin this time, [pair] is btc_jpy
, so use" btc_jpn-trades "to specify the request.
Also, since websocket is used with python, install websocket-client with pip.
To create a 1-minute bar, basically follow the procedure below.
Below is a code fragment that implements this procedure
def __update_candle_data(self, trade):
last_candle = self.__candle[-1]
_dt = datetime.now().replace(second=0, microsecond=0) #Set second and microsecond to 0 to make 1 minute
mark_ts = _dt.timestamp()
last_ts = last_candle["timestamp"]
if last_ts == mark_ts: #Update within 1 minute
last_candle["high"] = max(last_candle["high"], trade["price"])
last_candle["low"] = min(last_candle["low"], trade["price"])
last_candle["close"] = trade["price"]
last_candle["volume"] += trade["volume"]
last_candle["buy"] += trade["volume"] if trade["type"] == 'buy' else 0
last_candle["sell"] += trade["volume"] if trade["type"] == 'sell' else 0
else: #Add a new foot if it is out of the 1 minute range
self.__candle.append(
{
"timestamp": mark_ts,
"open": trade["price"],
"high": trade["price"],
"low": trade["price"],
"close": trade["price"],
"volume": trade["volume"],
"buy": trade["volume"] if trade["type"] == 'buy' else 0,
"sell": trade["volume"] if trade["type"] == 'sell' else 0,
}
)
This function is called every time the transaction history is received by WebSocket. Set the callback to receive the transaction history. Below is a code fragment
self.ws = websocket.WebSocketApp(CoinCheckWebSocket.URL,
on_message = self.__on_message, #Callback to receive transaction information
on_close = self.__on_close,
on_open = self.__on_open, #Callback called immediately after WebSocket is opened
on_error = self.__on_error)
def __on_open(self, ws):
self.ws.send(json.dumps({ #Send the currency pair you want to receive inside this method
"type": "subscribe",
"channel": "btc_jpy-trades"
}))
def __on_message(self, ws, message):
if self.__connected == False:
self.__connected = True
trade = self.__make_trade(message)
if self.__init_flg == False:
self.__init_candle_data(trade)
self.__init_flg = True
else:
self.__update_candle_data(trade) #Update foot data here
With the implementation so far, basically 1-minute data can be created, but if there is no transaction history, even 1-minute data cannot be created. Therefore, if there is no transaction for 1 minute, data will be lost for 1 minute. Therefore, check the data every 30 seconds to prevent loss. Below is the code fragment. Since this process needs to be performed regularly, it is executed in a separate thread.
while self.ws.sock and self.ws.sock.connected:
time.sleep(1)
if _timer_count < 30: #Wait 30 seconds
print("wait until 30")
_timer_count += 1
continue
self.__thread_lock()
_dt = datetime.now().replace(second=0, microsecond=0)
mark_ts = _dt.timestamp()
last_candle = self.__candle[-1]
last_ts = last_candle["timestamp"]
#Not within 1 minute of the current time
if last_ts != mark_ts:
self.__candle.append( #Create new 1-minute data using the last 1-minute data
{
"timestamp": mark_ts,
"open": last_candle["close"],
"high": last_candle["close"],
"low": last_candle["close"],
"close": last_candle["close"],
"volume": 0,
"buy": 0,
"sell": 0,
}
)
self.__thread_unlock()
_timer_count = 0
When I actually try it, the Coincheck WebSocket is often disconnected. When disconnected, ʻon_error is called before that. Also, when observing the timing of the expiration several times, it was found that ʻon_open
may not be called in the first place and it may time out as it is. It is a problem that you cannot make a 1-minute bar as soon as it is cut, so if you are disconnected, you want to reconnect with the foot data created so far. Furthermore, if ʻon_open` is not called, it will be 100% cut off, so I would like to try reconnecting within 3 seconds. So, add the following processing
def __on_error(self, ws, error):
self.__reconnect() #Reconnect if an error occurs
def __candle_thread_terminate(self):
self.__connected = True #If the data completion thread is in a busy loop waiting for a connection, exit the loop.
time.sleep(1.5)
def __exit(self):
self.ws.close() #Disconnect WebSocket
self.__candle_thread_terminate() #End the thread for data completion
def __reconnect(self):
self.__exit() #Explicitly disconnect
time.sleep(2)
self.__connect() #Connect with Coincheck WebSocket
#Method executed by the data completion thread
def __check_candle(self, args):
_error_count = 0
while True:
if not self.__connected: #WebSocket connection is not established(on_message is never called)If wait
_error_count += 1
self._logger.debug("wait 1 sec")
time.sleep(1)
if not self.__opened and _error_count > 3: #On even if you wait 3 seconds_Reconnect if open is not called
self.ws.on_error = None #On when reconnecting_Prevent error from being called and going to connect twice
term_thread = threading.Thread(target=lambda: self.__reconnect()) #You can't end this thread unless you do it in another thread
term_thread.start()
break
else:
break
Basically, it processes according to the comment in the source code, but if you can not connect even after waiting for 3 seconds, if you call self.__reconnect ()
directly from the thread for data completion, that thread I was addicted to the fact that I had to execute it in a separate thread because I couldn't terminate myself. If you do not execute it in another thread, the number of data completion threads will double each time you disconnect, which is a terrifying situation.
coincheck_websocket
import websocket
import json
import threading
import time
from datetime import datetime
class CoinCheckWebSocket:
URL = "wss://ws-api.coincheck.com/"
MAX_CANDLE_LEN = 24 * 60
def __init__(self):
self.__candle = []
self.__init_flg = False
self.__f = open("coin1m.log", 'w')
self.__lock = threading.Lock()
self.__connect()
def __connect(self):
self.__connected = False
self.__opened = False
self.ws = websocket.WebSocketApp(CoinCheckWebSocket.URL,
on_message = self.__on_message,
on_close = self.__on_close,
on_open = self.__on_open,
on_error = self.__on_error)
self.wst = threading.Thread(target=lambda: self.ws.run_forever())
self.wst.daemon = True
self.wst.start()
# Candle Thread
self._check_candle_thread = threading.Thread(
target = self.__check_candle, args=("check_candle",)
)
self._check_candle_thread.daemon = True
self._check_candle_thread.start()
print("check candle thread start")
def __on_open(self, ws):
print("open")
self.ws.send(json.dumps({
"type": "subscribe",
"channel": "btc_jpy-trades"
}))
self.__opened = True
def __on_error(self, ws, error):
print("error")
print(error)
self.__reconnect()
def __on_close(self, ws):
print("close, why close")
def __on_message(self, ws, message):
#print(message)
if self.__connected == False:
self.__connected = True
trade = self.__make_trade(message)
#print(trade)
if self.__init_flg == False:
self.__init_candle_data(trade)
self.__init_flg = True
else:
self.__update_candle_data(trade)
def __candle_thread_terminate(self):
print("__candle_thread_terminate invoked")
self.__connected = True #Because it may be stopped by wait
time.sleep(1.5)
def __exit(self):
print("__exit invoked")
self.ws.close()
self.__candle_thread_terminate()
def __reconnect(self):
print("__reconnect invoked")
self.__exit()
time.sleep(2)
self.__connect()
def __make_trade(self, message):
elements = message.split(',')
trade = {
"id": int(elements[0][1:len(elements[0])-1]),
"price": float(elements[2][1:len(elements[2])-1]),
"volume": float(elements[3][1:len(elements[3])-1]),
"type": elements[4][1:len(elements[4])-2]
}
return trade
def __thread_lock(self):
_count = 0
while self.__lock.acquire(blocking=True, timeout=1) == False:
_count += 1
if _count > 3:
print("lock acquite timeout")
return False
return True
def __thread_unlock(self):
try:
self.__lock.release()
except Exception as e:
print("lock release a {}".format(e))
return False
return True
def __format_candle(self, candle):
dt = datetime.fromtimestamp(candle["timestamp"])
s_str = "{0:%Y-%m-%d %H:%M:%S}".format(dt)
fmt_str = "%s %.1f %.1f %.1f %.1f %.6f %.6f %.6f" % (s_str,
candle["open"],
candle["high"],
candle["low"],
candle["close"],
candle["volume"],
candle["buy"],
candle["sell"],
)
return fmt_str
def _write_log(self, candle):
fmt_str = self.__format_candle(candle)
fmt_str += '\r\n'
self.__f.write(fmt_str)
self.__f.flush()
def _print_log(self, candle):
fmt_str = self.__format_candle(candle)
print(fmt_str)
def __init_candle_data(self, trade):
_dt = datetime.now().replace(second=0, microsecond=0)
_stamp = _dt.timestamp()
self.__candle.append(
{
"timestamp": _stamp,
"open": trade["price"],
"high": trade["price"],
"low": trade["price"],
"close": trade["price"],
"volume": trade["volume"],
"buy": trade["volume"] if trade["type"] == 'buy' else 0,
"sell": trade["volume"] if trade["type"] == 'sell' else 0,
}
)
def __update_candle_data(self, trade):
last_candle = self.__candle[-1]
_dt = datetime.now().replace(second=0, microsecond=0)
mark_ts = _dt.timestamp()
last_ts = last_candle["timestamp"]
if last_ts == mark_ts:
print("append")
last_candle["high"] = max(last_candle["high"], trade["price"])
last_candle["low"] = min(last_candle["low"], trade["price"])
last_candle["close"] = trade["price"]
last_candle["volume"] += trade["volume"]
last_candle["buy"] += trade["volume"] if trade["type"] == 'buy' else 0
last_candle["sell"] += trade["volume"] if trade["type"] == 'sell' else 0
self._print_log(last_candle)
else:
print("add new")
self._write_log(last_candle)
self.__candle.append(
{
"timestamp": mark_ts,
"open": trade["price"],
"high": trade["price"],
"low": trade["price"],
"close": trade["price"],
"volume": trade["volume"],
"buy": trade["volume"] if trade["type"] == 'buy' else 0,
"sell": trade["volume"] if trade["type"] == 'sell' else 0,
}
)
def get_candle(self, type=0):
self.__thread_lock()
if type == 0:
candle = self.__candle[:-1]
else:
candle = self.__candle[:]
self.__thread_unlock()
return candle
def __check_candle(self, args):
_error_count = 0
while True:
if not self.__connected:
_error_count += 1
print("wait 1 sec")
time.sleep(1)
if not self.__opened and _error_count > 3:
#print("nonono reconnect!!!")
self.ws.on_error = None #Avoid being called twice
term_thread = threading.Thread(target=lambda: self.__reconnect()) #You can't end this thread unless you do it in another thread
term_thread.start()
break
else:
break
_timer_count = 0
while self.ws.sock and self.ws.sock.connected:
time.sleep(1)
if _timer_count < 30:
print("wait until 30")
_timer_count += 1
continue
print(">>>>>>check candle")
self.__thread_lock()
_dt = datetime.now().replace(second=0, microsecond=0)
mark_ts = _dt.timestamp()
last_candle = self.__candle[-1]
last_ts = last_candle["timestamp"]
#Not within 1 minute of the current time
if last_ts != mark_ts:
print("---->>>>>>> new in check candle")
self._write_log(last_candle)
self.__candle.append(
{
"timestamp": mark_ts,
"open": last_candle["close"],
"high": last_candle["close"],
"low": last_candle["close"],
"close": last_candle["close"],
"volume": 0,
"buy": 0,
"sell": 0,
}
)
if len(self.__candle) > (CoinCheckWebSocket.MAX_CANDLE_LEN * 1.5):
self.__candle = self.__candle[-CoinCheckWebSocket.MAX_CANDLE_LEN:]
self.__thread_unlock()
_timer_count = 0
print("check candle end")
if __name__ == "__main__":
chs = CoinCheckWebSocket()
while True:
time.sleep(60)
When executed as follows, a file called coin1m.log is generated, and 1-minute data is written in it.
>python3 coincheck_websocket.py
>cat coin1m.log
2019-12-19 03:30:00 750169.0 750169.0 749685.0 749714.0 1.265700 0.000000 1.265700
2019-12-19 03:31:00 749685.0 750428.0 749685.0 750415.0 0.348400 0.169315 0.179085
2019-12-19 03:32:00 750481.0 750481.0 750152.0 750152.0 0.347950 0.050000 0.297950
I have omitted the detailed processing explanation above, but if you have any mistakes or questions, please do not hesitate to contact us. However, ** I'm new to Python **. And ** investment is self-responsibility w **
I want an explicit type ヽ (´ ・ ω ・) ノ
Recommended Posts