This article is a continuation of the previous Discord Bot with recording function starting with Python: (4) Playing music files.
In this article, as a preparatory step to start implementing voice recording, try hitting the voice communication API using the Discord API to understand how communication is performed.
We plan to write 7 articles in total, and have finished writing up to 5 articles.
As you can see by removing the beginner
from the tag, the following process is a bit cumbersome and mainly involves playing with the lower layers.
A schematic diagram of communication when sending and receiving audio with Discord is shown below.
Since it is a schematic diagram, detailed explanation is omitted, but I think it would be good if we could grasp that the connection is made through various processes.
This flow is implemented without using discord.py, and the transmission and reception of Discord audio is handled in detail.
All subsequent information is official reference (Voice Connection Gateway, Regular Gateway It is described based on docs / topics / gateway # gateways)).
Discord Gateway has a Gateway that sends and receives information about normal Gateway voice. To get the endpoint URL to connect to the voice Gateway, first authenticate with the normal Gateway and then connect to the voice Gateway. Information will be sent.
First, create a script to try WebSocket connection with Python.
op10 Hello
import json
import asyncio
import aiohttp
from pprint import pprint
class Gateway:
def __init__(self, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
self.endpoint = 'wss://gateway.discord.gg/?v=6&encoding=json'
loop.create_task(self.receive_data())
async def receive_data(self):
async with aiohttp.ClientSession() as session:
socket = await session.ws_connect(self.endpoint)
while True:
packet = await socket.receive()
if packet.type in (aiohttp.WSMsgType.CLOSED,
aiohttp.WSMsgType.CLOSING,
aiohttp.WSMsgType.CLOSE,
aiohttp.WSMsgType.ERROR):
print(packet)
print('==End of connection==')
break
pprint(json.loads(packet.data))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
ws = Gateway(loop)
loop.run_forever()
When this is executed, the coroutine receive_data
for connecting to the Gateway and displaying the data received from the Gateway one by one is called from the create_task
function in the Gateway and starts processing. When this is executed, the following data will be sent from the Discord Gateway.
{'d': {'_trace': ['["gateway-prd-main-xwmj",{"micros":0.0}]'],
'heartbeat_interval': 41250},
'op': 10,
's': None,
't': None}
Data sent from Discord uses d
and ʻop, and rarely
t to represent the data. In ʻop
, the type of data is stored, and in d
, the information body of the data is stored. t
is basically None
, but if you need to convey more detailed information, it will be a character string that conveys the details of that information.
op1 Heartbeat
Here, ʻop = 10. This is a reply called
Hello, which, as the name implies, is the data sent when you connect for the first time. The important data in Hello is
heartbeat_interval. Here it is
41250, which requires sending a simple piece of data called
Heartbeatto tell you that the Gateway is still connected every specified millisecond (41.25 seconds). There is. Create a class that inherits
threading.Threadas an auxiliary class that performs this Heartbeat process. By writing the desired process in the
run` function and calling the start function from the instance, the process will be performed in another thread.
import json
import asyncio
import aiohttp
import threading
from pprint import pprint
class HeartbeatHandler(threading.Thread):
def __init__(self, ws, interval):
self.ws = ws
self.interval = interval
self.stop_ev = threading.Event()
super().__init__()
def run(self):
self.send()
while not self.stop_ev.wait(self.interval):
self.send()
def send(self):
data = self.get_payload()
asyncio.run_coroutine_threadsafe(
self.ws.socket.send_json(data),
self.ws.loop
)
print('==Send==')
print(data)
def stop(self):
self.stop_ev.set()
def get_payload(self):
raise NotImplementedError
class GatewayHeartbeat(HeartbeatHandler):
def __init__(self, ws, interval):
super().__init__(ws, interval)
def get_payload(self):
return {'op': 1, 'd': None}
class Gateway:
def __init__(self, loop=None):
if loop is None:
self.loop = asyncio.get_event_loop()
else:
self.loop = loop
self.endpoint = 'wss://gateway.discord.gg/?v=6&encoding=json'
self.loop.create_task(self.receive_data())
async def receive_data(self):
async with aiohttp.ClientSession() as session:
self.socket = await session.ws_connect(self.endpoint)
while True:
packet = await self.socket.receive()
if packet.type in (aiohttp.WSMsgType.CLOSED,
aiohttp.WSMsgType.CLOSING,
aiohttp.WSMsgType.CLOSE,
aiohttp.WSMsgType.ERROR):
print(packet)
print('==End of connection==')
break
print('==Receive==')
pprint(json.loads(packet.data))
await self.handle_message(json.loads(packet.data))
if hasattr(self, 'heartbeat'):
self.heartbeat.stop()
async def handle_message(self, msg):
op = msg.get('op')
d = msg.get('d')
t = msg.get('t')
if op == 10:
self.heartbeat = GatewayHeartbeat(
self, d['heartbeat_interval'] / 1000
)
self.heartbeat.start()
return
if __name__ == "__main__":
loop = asyncio.get_event_loop()
ws = Gateway(loop)
loop.run_forever()
It's called run_coroutine_threadsafe
** Anyway! !! Since there is a ** function, use it. When this is executed, the state of communication with each other is output every 40 seconds.
==Receive==
{'d': {'_trace': ['["gateway-prd-main-w7j9",{"micros":0.0}]'],
'heartbeat_interval': 41250},
'op': 10,
's': None,
't': None}
==Send==
{'op': 1, 'd': None}
==Receive==
{'d': None, 'op': 11, 's': None, 't': None}
==Send==
{'op': 1, 'd': None}
==Receive==
{'d': None, 'op': 11, 's': None, 't': None}
...
If you do not do this, the connection will be disconnected from the Discord Gateway after 40 seconds have passed. However, if you are only doing Heartbeat, you may receive a request for reconnection from Discord side. For the time being, we will not perform processing such as reconnection here.
op2 Identify
Next, you need to send a bot token to inform the Gateway of the connection information. This information is sent by op2, but in addition to the Bot token, simple connection information is added to properties
in the payload. In addition, if you are operating a large-scale bot and performing Sharding
, additional processing is required, but here we will perform processing on the assumption that a small-scale bot does not use Sharding.
class Gateway:
def __init__(self, loop=None):
if loop is None:
self.loop = asyncio.get_event_loop()
else:
self.loop = loop
self.endpoint = 'wss://gateway.discord.gg/?v=6&encoding=json'
self.loop.create_task(self.receive_data())
self.identified = asyncio.Event()
async def receive_data(self):
async with aiohttp.ClientSession() as session:
self.socket = await session.ws_connect(self.endpoint)
while True:
packet = await self.socket.receive()
if packet.type in (aiohttp.WSMsgType.CLOSED,
aiohttp.WSMsgType.CLOSING,
aiohttp.WSMsgType.CLOSE,
aiohttp.WSMsgType.ERROR):
print('==End of connection==')
print(packet)
break
print('==Receive==')
pprint(json.loads(packet.data))
await self.handle_message(json.loads(packet.data))
if hasattr(self, 'heartbeat'):
self.heartbeat.stop()
async def identify(self):
payload = {
'op': 2,
'd': {
'token': 'BOT_TOKEN',
'properties': {
'$os': 'linux',
'$browser': 'python',
'$device': 'python',
},
'v': 3
}
}
print('==Send==')
print(payload)
await self.socket.send_json(payload)
self.identified.set()
async def handle_message(self, msg):
op = msg.get('op')
d = msg.get('d')
t = msg.get('t')
if op == 10:
self.heartbeat = GatewayHeartbeat(
self, d['heartbeat_interval'] / 1000
)
self.heartbeat.start()
await self.identify()
return
Authentication is performed by sending the token of the bot, and the information of the bot and the information of the server on which the bot is installed will be received. It is easy to overlook because various information is sent, but if the information t = READY
is sent with ʻop = 0, it means that you are ready to communicate with each other using Gateway. Also, the
session_id in the
d` is used for voice connection, so save it.
{'d': {
...
'session_id': 'f0d7bba081bc0df51e43c1eef8092adcb',
...
},
'op': 0,
's': 1,
't': 'READY'}
op4 Gateway Voice State Update
In order to get the information to connect to the voice gateway, it is necessary to send the connection to the normal gateway with ʻop = 4`.
In ʻop = 4`, the ID of the server and audio channel and its own mute state are specified and sent to the Gateway. This will give you the endpoint URL of the voice gateway used by the server.
class Gateway:
...
async def voice_state_update(self):
payload = {
'op': 4,
'd': {
'guild_id': '705...',
'channel_id': '706...',
"self_mute": False, #Whether to mute
"self_deaf": False, #Whether to mute the speaker
}
}
print('==Send==')
print(payload)
await self.socket.send_json(payload)
async def handle_message(self, msg):
op = msg.get('op')
d = msg.get('d')
t = msg.get('t')
if op == 10:
self.heartbeat = GatewayHeartbeat(
self, d['heartbeat_interval'] / 1000
)
self.heartbeat.start()
await self.identify()
return
if op == 0:
if t == 'READY':
self.session_id = d['session_id']
await self.voice_state_update()
When this is executed, the bot will be connected to the audio channel and the following two data will be received.
==Receive==
{'d': {'channel_id': '705...',
'deaf': False,
'guild_id': '706...',
'member': ...,
'mute': False,
'self_deaf': False,
'self_mute': False,
'self_video': False,
'session_id': 'f0d7bba081bc0df51e43c1eef8092adcb',
'suppress': False,
'user_id': '743...'},
'op': 0,
's': 3,
't': 'VOICE_STATE_UPDATE'}
==Receive==
{'d': {'endpoint': 'japan396.discord.media:80',
'guild_id': '705...',
'token': '0123456789abcdef'},
'op': 0,
's': 4,
't': 'VOICE_SERVER_UPDATE'}
The endpoint
of VOICE_SERVER_UPDATE
below is the endpoint of the voice gateway, and token
is used as the authentication token.
op3 Heartbeat
From here, communication with the voice gateway starts.
Create a new class for WebSocket communication with the endpoint obtained earlier.
class Gateway:
...
async def handle_message(self, msg):
op = msg.get('op')
d = msg.get('d')
t = msg.get('t')
if op == 10:
self.heartbeat = GatewayHeartbeat(
self, d['heartbeat_interval'] / 1000
)
self.heartbeat.start()
await self.identify()
return
if op == 0:
if t == 'READY':
self.session_id = d['session_id']
await self.voice_state_update()
if t == 'VOICE_SERVER_UPDATE':
self.voice_endpoint = d['endpoint']
self.token = d['token']
self.voice_gw = VoiceGateway(self, self.loop)
class VoiceGateway:
def __init__(self, gateway, loop=None):
self.gateway = gateway
if loop is None:
self.loop = asyncio.get_event_loop()
else:
self.loop = loop
self.endpoint = f'wss://{gateway.voice_endpoint.replace(":80", "")}/?v=4'
self.loop.create_task(self.receive_data())
self.identified = asyncio.Event()
async def receive_data(self):
async with aiohttp.ClientSession() as session:
self.socket = await session.ws_connect(self.endpoint)
while True:
packet = await self.socket.receive()
if packet.type in (aiohttp.WSMsgType.CLOSED,
aiohttp.WSMsgType.CLOSING,
aiohttp.WSMsgType.CLOSE,
aiohttp.WSMsgType.ERROR):
print('**End of connection**')
print(packet)
break
print('**Receive**')
pprint(json.loads(packet.data))
await self.handle_message(json.loads(packet.data))
async def handle_message(self, msg):
pass
If this is executed and it works well, the Heartbeat interval will be returned as in the case of the first Gateway.
==Receive==
{'d': ...
'op': 0,
's': 5,
't': 'VOICE_SERVER_UPDATE'}
**Receive**
{'d': {'heartbeat_interval': 13750.25, 'v': 4}, 'op': 8}
To keep the connection, this time send Heartbeat with ʻop = 3`. A time stamp is given as the data.
import json
import asyncio
import aiohttp
import threading
import time # <-add to
from pprint import pprint
class VoiceGatewayHeartbeat(HeartbeatHandler):
def __init__(self, ws, interval):
super().__init__(ws, interval)
def get_payload(self):
#Time in milliseconds'd'Set to
return {'op': 3, 'd': time.time_ns()//1000}
class VoiceGateway:
...
async def handle_message(self, msg):
op = msg.get('op')
d = msg.get('d')
t = msg.get('t')
if op == 8:
self.heartbeat = VoiceGatewayHeartbeat(
self, d['heartbeat_interval'] / 1000
)
return
Since this Heartbeat needs to be operated after authenticating the bot, communication is not started and it is left as it is.
op0 Identify
To authenticate with the voice gateway, use the server ID, bot user ID, session_id
, and token
as the payload and send with ʻop = 0`.
class VoiceGateway:
...
async def identify(self):
payload = {
'op': 0,
'd': {
'token': self.gateway.token,
'user_id': '743853432007557210',
'server_id': '705052322761277540',
'session_id': self.gateway.session_id,
}
}
print('**Send**')
print(payload)
await self.socket.send_json(payload)
async def handle_message(self, msg):
op = msg.get('op')
d = msg.get('d')
t = msg.get('t')
if op == 8:
await self.identify()
self.heartbeat = VoiceGatewayHeartbeat(
self, d['heartbeat_interval'] / 1000
)
self.heartbeat.start()
return
If you are successfully authenticated, you will receive ʻop2 Ready. ʻIp
and port
correspond to the address for obtaining voice information, modes
corresponds to the voice encryption method supported by Discord, and ssrc
corresponds to the identifier.
**Receive**
{'d': {'heartbeat_interval': 13750.25, 'v': 4}, 'op': 8}
**Send**
{'op': 0, 'd': {'token': '871d40956f7cf34a', 'user_id': '743853432007557210', 'server_id': '705052322761277540', 'session_id': 'c412a670dbed864b559a25009459f15a'}}
==Send==
{'op': 3, 'd': 1598314493140616}
**Receive**
{'d': {'experiments': ['bwe_conservative_link_estimate',
'bwe_remote_locus_client'],
'ip': '123.123.123.123',
'modes': ['aead_aes256_gcm',
'xsalsa20_poly1305_lite',
'xsalsa20_poly1305_suffix',
'xsalsa20_poly1305'],
'port': 50004,
'ssrc': 364117},
'op': 2}
**Receive**
{'d': 1598314493140616, 'op': 6}
==Send==
{'op': 3, 'd': 1598314506891112}
**Receive**
{'d': 1598314506891112, 'op': 6}
I make a UDP connection to the IP obtained in the previous communication and obtain voice data, but this IP is obfuscated through NAT -connections # ip-discovery), so you need to get the address and port that are open to the outside. To get it, send the following UDP packet to the server of ʻip,
port`.
field | Description | size |
---|---|---|
type | 0x1 | 2 bytes |
length | 70 | 2 bytes |
SSRC | Unsigned integer | 4 bytes |
IP address | ascii code(The surplus is0x0 (Null character)Pack. 0 when sending) |
64 bytes |
port | Unsigned integer(0 when sending) | 2 bytes |
When this is sent, the same 74-byte packet containing data in the IP and port is sent, so the IP and port information is obtained from this packet.
import json
import asyncio
import aiohttp
import threading
import time
import socket # <-add to
import struct # <-add to
from pprint import pprint
class VoiceGateway:
...
async def ip_discovering(self):
self.udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp.setblocking(False)
packet = bytearray(74)
packet[:2] = struct.pack('>H', 1)
packet[2:4] = struct.pack('>H', 70)
packet[4:8] = struct.pack('>I', self.ssrc)
self.udp.sendto(bytes(packet), (self.ip, self.port))
data = await self.loop.sock_recv(self.udp, 2048)
self.external_ip, self.external_port = struct.unpack_from(
'>64sH', data, 8
)
self.external_ip = self.external_ip.decode(encoding='ascii').rstrip('\x00')
print(self.external_ip, self.external_port)
async def handle_message(self, msg):
op = msg.get('op')
d = msg.get('d')
t = msg.get('t')
if op == 8:
await self.identify()
self.heartbeat = VoiceGatewayHeartbeat(
self, d['heartbeat_interval'] / 1000
)
self.heartbeat.start()
return
if op == 2:
self.ip = d['ip']
self.port = d['port']
self.modes = d['modes']
self.ssrc = d['ssrc']
await self.ip_discovering()
Struct
(standard package) is used to create UDP packet data. When this is executed, the UDP packet is received using the event loop and the IP and port are output to the console.
**Receive**
{'d': ...,
'op': 2}
201.158.201.158 54345
The reason for going through such a tedious process is to get the key to decrypt the encrypted voice. By sending the external IP and port obtained in this process to the voice gateway, you can get the key to decryption as a reply. A so-called libsodium is used for voice encryption, and in the case of Python, encryption and decryption using libsodium can be performed by adding the PyNaCl package.
op1 Select Protocol
Makes it possible to get the key to use with libsodium. For mode
in the payload of op1, it is necessary to select one of the encryption methods from the modes obtained in ʻop2 earlier, but here we will consistently use
xsalsa20_poly1305. .. When ʻop1
is sent, ʻop4 Session Description` is sent as a reply. There is a key for decryption in this payload, so take it out.
class VoiceGateway:
...
async def select_protocol(self):
payload = {
'op': 1,
'd': {
'protocol': 'udp',
'data': {
'address': self.external_ip,
'port': self.external_port,
'mode': 'xsalsa20_poly1305'
}
}
}
print('**Send**')
print(payload)
await self.socket.send_json(payload)
async def receive_audio_packet(self):
while True:
data = await self.loop.sock_recv(self.udp, 2048)
print('**Voice reception**')
print(data)
async def handle_message(self, msg):
op = msg.get('op')
d = msg.get('d')
t = msg.get('t')
if op == 8:
await self.identify()
self.heartbeat = VoiceGatewayHeartbeat(
self, d['heartbeat_interval'] / 1000
)
self.heartbeat.start()
return
if op == 2:
self.ip = d['ip']
self.port = d['port']
self.modes = d['modes']
self.ssrc = d['ssrc']
await self.ip_discovering()
await self.select_protocol()
if op == 4:
self.secret_key = d['secret_key']
self.loop.create_task(self.receive_audio_packet())
After receiving ʻop4`, the voice data will be sent to the UDP socket, so create_task is performed and the task to receive the voice data is started.
**Send**
{'op': 1, 'd': {'protocol': 'udp', 'data': {'address': '106.73.199.128', 'port': 42057, 'mode': 'xsalsa20_poly1305'}}}
**Receive**
{'d': {'audio_codec': 'opus',
...
'mode': 'xsalsa20_poly1305',
'secret_key': [244,
157,
...
214],
'video_codec': None},
'op': 4}
**Voice reception**
b'\x81\xc9\x00\x07\x00\x07\xdd(\x9fI\xb9\xd6\x00G\xce\xa2\xa4\x85M[\xed\xd3\x0fu\x15\x89|\xa6W\x1e\xc3U\x06\xc8\xd5S\x8fJ\x08\xfcx\xff\xe9\x83k\xca\xa9\xec'
**Voice reception**
b'\x81\xc9\x00\x07\x00\x07\xdd(\x00\x9c^\x83\x90\xc5V\xafX\xff\x14\x97\xf5\xf1/\xad\x15\x89|\xa6W\x1e\xc3U\x06\xc8\xd5S\x8fJ\x08\xfcx\xff\xe9\x83k\xcb\xa9\x02'
**Voice reception**
b'\x81\xc9\x00\x07\x00\x07\xdd(j\x88B\\O\xd0\rs`\xc1_\x92\xc6\xe6\xe7=\x15\x89|\xa6W\x1e\xc3U\x06\xc8\xd5S\x8fJ\x08\xfcx\xff\xe9\x83k\xc8\xa9\xfd'
**Voice reception**
b'\x81\xc9\x00\x07\x00\x07\xdd(\x05\x02\xf56\x8a\x13\x9e\xc2\xb6\x8c,\xe6r5\x0e\n\x15\x89|\xa6W\x1e\xc3U\x06\xc8\xd5S\x8fJ\x08\xfcx\xff\xe9\x83k\xc9\xa9\x14'
The protocols used to send and receive Discord audio are RTP and RTCP. Each packet in which voice data is stored is an RTP packet, which transmits voice data for 20 ms at a time, and an RTCP packet transmits supplementary information related to those voice data.
To distinguish between RTP and RTCP, focus on the value of the second byte of the packet. According to the protocol definition, the second byte of RTCP is range 200 to 204, so it can be identified there.
To calculate the RTP header length, pay attention to X = 1st byte 4th bit
and CC =1st byte 5-8th
. I will not explain the role of each bit, but
If $ X = 0 $
If $ X = 1 $
It can be calculated as follows. len (EX_header) is a value indicating the additional header length, which corresponds to the value of 2 bytes from the 14 + 4 × CC
byte.
For details, please refer to the table on Wikipedia.
This time, there is no problem if you can get only Timestamp
which is the voice transmission time in the RTP header, so [API Reference](https://discord.com/developers/docs/topics/voice-connections#encrypting-and" -sending-voice-voice-packet-structure) and extract the 4th-8th bytes.
Now you can retrieve the audio data for the time being.
From this information, it is possible to separate the payload and header from RTP packets, but there are still some problems.
--The data sent must be encrypted data --The audio sent must be streaming audio in the form of Opus.
In the former case, the key has already been obtained, so decryption should be performed based on this. The latter requires a little complicated processing and makes a C library called libopus
available from Python, and if you call its decoding function, it can be saved as normal Wav data.
If you manage to clear these two, you will be able to save the audio data. Next time, I will extend the existing discord.py and save the audio data based on the knowledge gained from this lower layer on how to retrieve the data.
Recommended Posts