It is s_rae. I am studying programming hard.
As a practice of application development, I made an interface that can operate the robot. https://github.com/samanthanium/dullahan_local You can see the source code at.
Continuing from the previous post, this time I'll explain the source a bit. Last time: https://qiita.com/s_rae/items/d808c3eccd8e173dc55b
Since it was made for hobby purposes, I will omit the error handling.
We use this technology for this project. --Django framework
Django mainly served web pages over HTTP, managed users in sessions and worked on the sqlite database.
By the way, a Django project is made up of multiple'apps' (code that can be reused for other projects).
This project --dullahan (project main app) --pages (for Home, About, Login web pages) --control_system (page for sending commands and registering devices) --bluetooth_interface (including Consumer and function of Bluetooth device)
It is composed of ‘app’.
The reason I used Django is that it has many features and can be created quickly. I also wanted to practice Python.
Django Channels Django Channels is a library created for the Django framework. It's possible to add features such as Websocket to the Django framework.
Channels is a mechanism that uses the'Consumer'class to process WebSocket messages. You can do something similar to view in the Django framework.
Pybluez Pybluez is a library created to use the Bluetooth function of the system in Python. I won't explain the Pybluez code here, but you can write the code in a similar way to the socket module.
User logs in-> Add device-> Send command to device I will explain the flow of feeling.
pages/views.py
#...
def log_in(req):
form = AuthenticationForm()
if req.method == 'POST':
form = AuthenticationForm(data=req.POST)
if form.is_valid():
login(req, form.get_user())
return redirect(reverse('get_home'))
else:
return render(req, 'login.html', {'form': form})
return render(req, 'login.html', {'form': form})
#...
def sign_up(req):
form = UserCreationForm()
if req.method == 'POST':
form = UserCreationForm(data=req.POST)
if form.is_valid():
form.save()
return redirect(reverse('log_in'))
else:
print(form.errors)
return render(req, 'signup.html', {'form': form})
Here is the function for user login. You can easily log in using the method "form.get_user" included in Django's Forms.
After that
@login_required
You can check the current user by using the decorator in view.
Next is the screen to search for and record the surrounding Bluetooth devices.
control_system/views.py
from bluetooth_interface import bt_functions as bt
#...
@login_required
def device_search_ajax(req):
try:
if req.method == "GET":
nearby_devices = json.dumps(bt.search())
return JsonResponse({'nearby_devices':nearby_devices}, status=200)
else:
#...
except:
#Error handle
#...
When searching for Bluetooth devices around you, use an Ajax request.
bt.search is a function to search for Bluetooth devices using Pybluez.
This Kawa uses Websocket instead of HTTP to exchange messages with the robot.
control_system/consumers.py
#...
class CommandConsumer(WebsocketConsumer):
def connect(self):
#Find the model of the device selected by the user
self.device_id = self.scope['url_route']['kwargs']['device_id']
self.device_group_name = 'command_%s' % self.device_id
device = Device.objects.get(id=self.device_id)
#Allow consumers for Bluetooth to communicate serially with devices
async_to_sync(self.channel_layer.send)('bt-process', {
'type': 'bt_connect',
'group_name': self.device_group_name,
'uuid': device.uuid,
'device' : device.id,
}
)
#Add user to group
async_to_sync(self.channel_layer.group_add)(
self.device_group_name,
self.channel_name
)
self.accept()
#...
def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json['message']
device_id = text_data_json['device']
#Send a command to the device
async_to_sync(self.channel_layer.send)('bt-process', {
'type': 'bt_send_serial',
'device': device_id,
'message': message,
}
)
#...
#Save and forward reply messages received from Bluetooth devices
def command_message(self, event):
message = event['message']
device_id = event['device']
#Save to device model with timestamp
device = Device.objects.get(id=device_id)
new_history = CommandHistory(device=device, command=message )
new_history.save()
command_history = {}
all_history = device.history.all()
for command in all_history:
command_history[command.timestamp.strftime('%H:%M:%S')] = command.command + '\n'
#Send command history to user
self.send(text_data=json.dumps({
'message': command_history,
'device': device_id
}))
This is a little complicated, but to explain it briefly
--Connect opens Websocket communication with the user and enables communication with the Bluetooth device. ‘Bt-process’ is a consumer for Bluetooth devices that is currently running in the background. I feel like trying to use ‘bt_connect’ of ‘bt-process’.
--Receive sends a command to the device contained in the message to the robot when the user sends a message using the web page. This also sends a message to ‘bt-process’ in the same way as above.
--command_message saves the reply received from the robot in the robot's database model. The message has a Many-to-one relationship with the robot model. It is a form that is connected using the id of the robot.
Next is the consumer class for communicating with the robot.
bluetooth_interface/consumers.py
#...
from bluetooth_interface import bt_functions
#...
class BTConsumer(SyncConsumer):
def bt_connect(self, message):
#Remember to be able to send a message to the user
self.device_group_name = message['group_name']
self.sock = {}
#Bluetooth communication port number
self.port_num = 1
#Make a communication socket
sock, msg = bt_functions.connect(message['uuid'], self.port_num)
#Record socket with device id
if sock != 1:
self.sock = {message['device']:[sock, self.port_num]}
#Send communication status to user
async_to_sync(self.channel_layer.group_send)(
self.device_group_name,
{
'type': 'command_message',
'device': message['device'],
'message': msg,
}
)
#...
def bt_send_serial(self, event):
#Take the device id and message requested by the user
device_id = int(event['device'])
message = event['message']
#Select a socket to communicate with the device and send a message
if device_id in self.sock.keys():
result = bt_functions.send_serial(self.sock[device_id][0], message)
#Send a reply message to the user
async_to_sync(self.channel_layer.group_send)(
self.device_group_name,
{
'type': 'command_message',
'device': device_id,
'message': result,
}
)
#...
I was quite confused about how to do it here. As a result
--Create a consumer that allows the robot to send messages (sensor data, replies to commands, etc.) at any time --Added a function that allows the user to send a message to the robot at any time.
I made it possible.
Django Channels uses the Channels Layer to allow communication between Consumer classes and processes.
So even if the user logs out or does other work, they are always connected to the robot in another process.
I haven't created it yet, but I can always get sensor data from robots and IoT devices in this way.
See the Django Channels documentation for more details. https://channels.readthedocs.io/en/latest/topics/channel_layers.html https://channels.readthedocs.io/en/latest/topics/worker.html
I think it would have been easier to create a project with Node.js, which is easy to use with Async.
But I used Django and Django Channels because I wanted to take advantage of many of Django's built-in features (security, user management, Forms, etc.).
It took about 2 weeks to make it so far. There were many new things, so it was mostly time to read and study documents.
Thank you for reading this time as well.