In a common example using Omron's environmental sensor, the mode of the environmental sensor is changed to Beacon mode without data storage, and the observation data at that time is acquired when riding on the BLE advertisement packet.
This method simplifies the processing on the data receiving side, but has the disadvantage that data is likely to be missing.
The method introduced in this article aims to operate the Beacon mode of Omron's environmental sensor in the mode with data storage to prevent missing data as much as possible.
If you change the Beacon mode of Omron's environmental sensor to the mode with data storage, the observation data will not be included in the BLE advertisement packet, but the observation data will be stored in the flash memory inside the environment sensor.
raspi-wxbeacon2
This is the program I made this time. https://github.com/cnaos/raspi-wxbeacon2
If you repeat this regularly, the data will be transferred to influxDB, so let's graph it with grafana etc.
I have prepared a sample configuration file for systemd under the etc folder. Please rewrite Working Directory and ExecStart.
I will explain only the main points. For details, refer to Omron User's Manual.
Omron's environmental sensor is equipped with a flash memory, which enables recording of observation data at set intervals. In order to record the observation data, the following two are required.
The beacon mode of the environment sensor is the default beacon mode immediately after the power is turned on.
0x08: Assuming Event Beacon (ADV)
.
To easily determine the beacon mode, use a Bluetooth search app, etc. The short name of the device is "Env", It is OK if the device name is displayed as "Env Sensor-BL01".
If you change the measurement interval of the environment sensor, the data recording page position will be reset and the data will be saved from page 0.
Please note that old data will be overwritten.
The main.py command has options for changing the measurement interval and setting the current time. Specify the BLE MAC address of the device for which you want to change the measurement interval and the measurement interval (seconds).
Change measurement interval
./main.py --addr XX:XX:XX:XX:XX:XX --setinterval 300
XX: XX: XX: XX: XX: XX
.If you write the UUID of the BLE service of Omron's environmental sensor and the UUID of the BLE Characteristics properly
「0C4CXXXX-7700-46F4-AA96D5E974E32A54」
However, since only the 4 digits of XXXX change, take out only this part
「Time Information(0x3031)」
It is described as.
In the following example, the Latest Data (0x3001) of the Omron environment sensor is read. The latest observation data of the environmental sensor can be read from this BLE Characteristics.
example/exampl1.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import struct
from bluepy.btle import Peripheral, UUID
OMRON_LATEST_DATA_UUID = UUID('%08X-7700-46F4-AA96-D5E974E32A54' % (0x0C4C0000 + 0x3001))
OMRON_SENSOR_SERVICE_UUID = UUID('%08X-7700-46F4-AA96-D5E974E32A54' % (0x0C4C0000 + (0xFFF0 & 0x3000)))
parser = argparse.ArgumentParser(description='Get Latest Data from OMRON's environmental sensor')
parser.add_argument("--addr", required=True, type=str, help='Specify the MAC address of the environment sensor')
args = parser.parse_args()
#Connect to environmental sensor
ble_peripheral = Peripheral()
print(f"connecting... {args.addr}")
ble_peripheral.connect(addr=args.addr, addrType="random")
print(f"ble_peripheral={ble_peripheral}")
#Get BLE service
service = ble_peripheral.getServiceByUUID(uuidVal=OMRON_SENSOR_SERVICE_UUID)
print(f"service = {service}")
#Get BLE Characteristics
ble_char = service.getCharacteristics(forUUID=OMRON_LATEST_DATA_UUID)[0]
print(f"ble_char = {ble_char}")
#Read measurement data from Latest Data
raw_data = ble_char.read()
print(f"raw_data = {raw_data}")
#Convert raw measurement data
(row_number, temperature, humidity, light, uv_index, pressure, noise, discomfort_index, heat_stroke,
battery_level) = struct.unpack('<BhhhhhhhhH', raw_data)
temperature /= 100
humidity /= 100
uv_index /= 100
pressure /= 10
noise /= 100
discomfort_index /= 100
heat_stroke /= 100
battery_level /= 1000
#Display conversion result
print(
f"temperature = {temperature}, humidity = {humidity}, uv_index={uv_index}, pressure={pressure}, noise={noise}"
f", discomfort_index={discomfort_index}, heat_stroke={heat_stroke}, battery_level={battery_level}")
Execution result
ble_peripheral=<bluepy.btle.Peripheral object at 0xb657d5b0>
service = Service <uuid=0c4c3000-7700-46f4-aa96-d5e974e32a54 handleStart=23 handleEnd=37>
ble_char = Characteristic <0c4c3001-7700-46f4-aa96-d5e974e32a54>
raw_data = b"\x03\t\t\x03\t\x00\x00\x01\x00\xef'X\x0e-\x1aZ\x06*\x0b"
temperature = 23.13, humidity = 23.07, uv_index=0.01, pressure=1022.3, noise=36.72, discomfort_index=67.01, heat_stroke=16.26, battery_level=2.858
If the connection to the device fails, the following error will occur.
In case of connection error
Traceback (most recent call last):
File "./example1.py", line 20, in <module>
ble_peripheral.connect(addr=args.addr, addrType="random")
File "/home/cnaos/.local/share/virtualenvs/dev-raspi-wxbeacon2-cfd34nEC/lib/python3.7/site-packages/bluepy/btle.py", line 445, in connect
self._connect(addr, addrType, iface)
File "/home/cnaos/.local/share/virtualenvs/dev-raspi-wxbeacon2-cfd34nEC/lib/python3.7/site-packages/bluepy/btle.py", line 439, in _connect
raise BTLEDisconnectError("Failed to connect to peripheral %s, addr type: %s" % (addr, addrType), rsp)
bluepy.btle.BTLEDisconnectError: Failed to connect to peripheral XX:XX:XX:XX:XX:XX, addr type: random
Since the connection process to the BLE device is the most likely to fail, put the retry process here.
The following example is an improvement of the connection processing part of the Read of Latest Data (0x3001). The connect_with_timeout () method is used to perform connection and retry processing at once.
example/example2.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import struct
import time
from threading import Timer
from bluepy.btle import Peripheral, UUID, BTLEException
OMRON_LATEST_DATA_UUID = UUID('%08X-7700-46F4-AA96-D5E974E32A54' % (0x0C4C0000 + 0x3001))
OMRON_SENSOR_SERVICE_UUID = UUID('%08X-7700-46F4-AA96-D5E974E32A54' % (0x0C4C0000 + (0xFFF0 & 0x3000)))
def connect_with_timeout(ble_peripheral: Peripheral, addr: str, timeout_sec=30, max_retry=5,
retry_interval_sec=5) -> None:
is_connected = False
for i in range(max_retry):
try:
print(f'connecting to {addr} {i + 1}/{max_retry}')
connect_timer = Timer(timeout_sec, timeout_disconnect, args=[ble_peripheral])
connect_timer.start()
ble_peripheral.connect(addr=addr, addrType="random")
except BTLEException as e:
print(f'ERROR: try {i + 1}: BTLE Exception while connecting ')
print(f'ERROR: type:' + str(type(e)))
print(f'ERROR: args:' + str(e.args))
time.sleep(retry_interval_sec)
else:
is_connected = True
print(f'connected.')
break
finally:
connect_timer.cancel()
connect_timer.join() #Wait until you cancel completely
if not is_connected:
print(f"ERROR: connect failed.")
raise Exception(F"BTLE connect to {addr} failed.")
def timeout_disconnect(ble_peripheral: Peripheral) -> None:
print(f'ERROR connect timer expired')
ble_peripheral.disconnect()
def main():
parser = argparse.ArgumentParser(description='Get Latest Data from OMRON's environmental sensor')
parser.add_argument("--addr", required=True, type=str, help='Specify the MAC address of the environment sensor')
args = parser.parse_args()
#Connect to environmental sensor
ble_peripheral = Peripheral()
connect_with_timeout(ble_peripheral, addr=args.addr)
#Get BLE service
service = ble_peripheral.getServiceByUUID(uuidVal=OMRON_SENSOR_SERVICE_UUID)
#Get BLE Characteristics
ble_char = service.getCharacteristics(forUUID=OMRON_LATEST_DATA_UUID)[0]
#Read measurement data from Latest Data
raw_data = ble_char.read()
#Convert raw measurement data
(row_number, temperature, humidity, light, uv_index, pressure, noise, discomfort_index, heat_stroke,
battery_level) = struct.unpack('<BhhhhhhhhH', raw_data)
temperature /= 100
humidity /= 100
uv_index /= 100
pressure /= 10
noise /= 100
discomfort_index /= 100
heat_stroke /= 100
battery_level /= 1000
#Display conversion result
print(
f"temperature = {temperature}, humidity = {humidity}, uv_index={uv_index}, pressure={pressure}, noise={noise}"
f", discomfort_index={discomfort_index}, heat_stroke={heat_stroke}, battery_level={battery_level}")
if __name__ == "__main__":
main()
Execution result
connecting to XX:XX:XX:XX:XX:XX 1/5
ERROR: try 1: BTLE Exception while connecting
ERROR: type:<class 'bluepy.btle.BTLEDisconnectError'>
ERROR: args:('Failed to connect to peripheral XX:XX:XX:XX:XX:XX, addr type: random', {'rsp': ['stat'], 'state': ['disc'], 'mtu': [0], 'sec': ['low']})
connecting to XX:XX:XX:XX:XX:XX 2/5
ERROR: try 2: BTLE Exception while connecting
ERROR: type:<class 'bluepy.btle.BTLEDisconnectError'>
ERROR: args:('Failed to connect to peripheral XX:XX:XX:XX:XX:XX, addr type: random', {'rsp': ['stat'], 'state': ['disc'], 'mtu': [0], 'sec': ['low']})
connecting to XX:XX:XX:XX:XX:XX 3/5
connected.
temperature = 10.76, humidity = 43.24, uv_index=0.01, pressure=1021.3, noise=32.35, discomfort_index=53.43, heat_stroke=9.57, battery_level=2.654
You can read and write the data directly as it is, but it is difficult to understand which BLE Characteristics the UUID shows, and the decoding of the read data is different for each Characteristics, so the UUID and data of the BLE Characteristics of the Omron environment sensor Rewrite to use the class that encodes / decodes.
In the following example, the class OmronLatestData
in omron_env_sensor.py
is used in the main () method.
example/example3.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
sys.path.append('../')
import argparse
import time
from threading import Timer
from bluepy.btle import Peripheral, BTLEException
from omron_env_sensor import OmronLatestData
def connect_with_timeout(ble_peripheral: Peripheral, addr: str, timeout_sec=30, max_retry=5,
retry_interval_sec=5) -> None:
is_connected = False
for i in range(max_retry):
try:
print(f'connecting to {addr} {i + 1}/{max_retry}')
connect_timer = Timer(timeout_sec, timeout_disconnect, args=[ble_peripheral])
connect_timer.start()
ble_peripheral.connect(addr=addr, addrType="random")
except BTLEException as e:
print(f'ERROR: try {i + 1}: BTLE Exception while connecting ')
print(f'ERROR: type:' + str(type(e)))
print(f'ERROR: args:' + str(e.args))
time.sleep(retry_interval_sec)
else:
is_connected = True
print(f'connected.')
break
finally:
connect_timer.cancel()
connect_timer.join() #Wait until you cancel completely
if not is_connected:
print(f"ERROR: connect failed.")
raise Exception(F"BTLE connect to {addr} failed.")
def timeout_disconnect(ble_peripheral: Peripheral) -> None:
print(f'ERROR connect timer expired')
ble_peripheral.disconnect()
def main():
parser = argparse.ArgumentParser(description='Get Latest Data from OMRON's environmental sensor')
parser.add_argument("--addr", required=True, type=str, help='Specify the MAC address of the environment sensor')
args = parser.parse_args()
#Connect to environmental sensor
ble_peripheral = Peripheral()
connect_with_timeout(ble_peripheral, addr=args.addr)
#Get BLE service
latest_data = OmronLatestData()
service = ble_peripheral.getServiceByUUID(uuidVal=latest_data.serviceUuid)
#Get BLE Characteristics
ble_char = service.getCharacteristics(forUUID=latest_data.uuid)[0]
#Read measurement data from Latest Data
raw_data = ble_char.read()
#Convert raw measurement data
latest_data.parse(raw_data)
#Display conversion result
print(f"latest_data={latest_data.to_dict()}")
if __name__ == "__main__":
main()
0x01
, the page is ready to be read.0x00
, it is in preparation, so start over from reading ResponseFlag.example/example4.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
sys.path.append('../')
import argparse
import time
from threading import Timer
from bluepy.btle import Peripheral, BTLEException
from omron_env_sensor import OmronRequestPage, OmronResponseFlag, OmronResponseData
def connect_with_timeout(ble_peripheral: Peripheral, addr: str, timeout_sec=30, max_retry=5,
retry_interval_sec=5) -> None:
is_connected = False
for i in range(max_retry):
try:
print(f'connecting to {addr} {i + 1}/{max_retry}')
connect_timer = Timer(timeout_sec, timeout_disconnect, args=[ble_peripheral])
connect_timer.start()
ble_peripheral.connect(addr=addr, addrType="random")
except BTLEException as e:
print(f'ERROR: try {i + 1}: BTLE Exception while connecting ')
print(f'ERROR: type:' + str(type(e)))
print(f'ERROR: args:' + str(e.args))
time.sleep(retry_interval_sec)
else:
is_connected = True
print(f'connected.')
break
finally:
connect_timer.cancel()
connect_timer.join() #Wait until you cancel completely
if not is_connected:
print(f"ERROR: connect failed.")
raise Exception(F"BTLE connect to {addr} failed.")
def timeout_disconnect(ble_peripheral: Peripheral) -> None:
print(f'ERROR connect timer expired')
ble_peripheral.disconnect()
def write_request_page(ble_peripheral: Peripheral, page: int, row: int):
assert 0 <= page <= 2047
assert 0 <= row <= 12
request_page = OmronRequestPage()
ble_service = ble_peripheral.getServiceByUUID(uuidVal=request_page.serviceUuid)
ble_char = ble_service.getCharacteristics(forUUID=request_page.uuid)[0]
write_value = request_page.encode_data(page, row)
print(f'write_request_page(page={page}, row={row}) write_value={write_value}')
ble_char.write(write_value)
def read_response_flag(ble_peripheral: Peripheral) -> OmronResponseFlag:
response_flag = OmronResponseFlag()
ble_service = ble_peripheral.getServiceByUUID(uuidVal=response_flag.serviceUuid)
ble_char = ble_service.getCharacteristics(forUUID=response_flag.uuid)[0]
print(f'read_response_flag')
raw_data = ble_char.read()
return response_flag.parse(raw_data)
def read_response_data(ble_peripheral: Peripheral):
response_data = OmronResponseData()
ble_service = ble_peripheral.getServiceByUUID(uuidVal=response_data.serviceUuid)
ble_char = ble_service.getCharacteristics(forUUID=response_data.uuid)[0]
print(f'read_response_data')
raw_data = ble_char.read()
return response_data.parse(raw_data)
def main():
parser = argparse.ArgumentParser(description='Reads the observation data of the specified page from the OMRON environmental sensor.')
parser.add_argument("--addr", required=True, type=str, help='Specify the MAC address of the environment sensor')
parser.add_argument("--page", type=int, default=0, help='Page number you want to read')
parser.add_argument("--row", type=int, default=12, help='Number of page lines you want to read')
args = parser.parse_args()
assert 0 <= args.page <= 2047
assert 0 <= args.row <= 12
#Connect to environmental sensor
ble_peripheral = Peripheral()
connect_with_timeout(ble_peripheral, addr=args.addr)
#Write RequestPage
write_request_page(ble_peripheral, args.page, args.row)
#Read ResponseFlg
is_ready_response_data = False
for i in range(3):
response_flag = read_response_flag(ble_peripheral)
print(f'response_flag({i})={response_flag}')
if response_flag.update_flag == 0x01: #Update completed
is_ready_response_data = True
break
elif response_flag.update_flag == 0x00: #Updating
continue
else: #Update failed
print(f'ERROR: response flag failed.')
raise IOError
if not is_ready_response_data:
print(f'ERROR: response flag failed.')
raise IOError
#Read data on the specified page
for i in range(args.row + 1):
response_data = read_response_data(ble_peripheral)
print(F'response_data[{i}] = {response_data}')
ble_peripheral.disconnect()
if __name__ == "__main__":
main()
Execution result
write_request_page(page=1, row=12) write_value=b'\x01\x00\x0c'
read_response_flag
response_flag(0)={'update_flag': 1, 'unix_time': 1607876503, 'datetime': '2020-12-14T01:21:43+09:00'}
read_response_data
response_data[0] = {'row': 12, 'temperature': 14.78, 'humidity': 53.08, 'light': 0, 'uv': 0.02, 'barometric_pressure': 1002.5, 'noise': 33.94, 'discomfort_index': 58.44, 'heat_stroke': 13.62, 'battery_level': 2.858}
read_response_data
response_data[1] = {'row': 11, 'temperature': 14.81, 'humidity': 53.06, 'light': 0, 'uv': 0.01, 'barometric_pressure': 1002.6, 'noise': 31.9, 'discomfort_index': 58.48, 'heat_stroke': 13.64, 'battery_level': 2.858}
read_response_data
response_data[2] = {'row': 10, 'temperature': 14.83, 'humidity': 53.0, 'light': 0, 'uv': 0.02, 'barometric_pressure': 1002.7, 'noise': 33.18, 'discomfort_index': 58.51, 'heat_stroke': 13.66, 'battery_level': 2.858}
read_response_data
response_data[3] = {'row': 9, 'temperature': 14.84, 'humidity': 52.97, 'light': 0, 'uv': 0.01, 'barometric_pressure': 1003.0, 'noise': 33.18, 'discomfort_index': 58.52, 'heat_stroke': 13.6, 'battery_level': 2.858}
read_response_data
response_data[4] = {'row': 8, 'temperature': 14.85, 'humidity': 52.92, 'light': 0, 'uv': 0.01, 'barometric_pressure': 1003.0, 'noise': 32.35, 'discomfort_index': 58.54, 'heat_stroke': 13.61, 'battery_level': 2.858}
read_response_data
response_data[5] = {'row': 7, 'temperature': 14.86, 'humidity': 52.86, 'light': 0, 'uv': 0.02, 'barometric_pressure': 1003.3, 'noise': 32.35, 'discomfort_index': 58.55, 'heat_stroke': 13.62, 'battery_level': 2.858}
read_response_data
response_data[6] = {'row': 6, 'temperature': 14.88, 'humidity': 52.77, 'light': 0, 'uv': 0.01, 'barometric_pressure': 1003.2, 'noise': 33.57, 'discomfort_index': 58.58, 'heat_stroke': 13.63, 'battery_level': 2.858}
read_response_data
response_data[7] = {'row': 5, 'temperature': 14.88, 'humidity': 52.72, 'light': 0, 'uv': 0.01, 'barometric_pressure': 1003.4, 'noise': 32.77, 'discomfort_index': 58.58, 'heat_stroke': 13.63, 'battery_level': 2.861}
read_response_data
response_data[8] = {'row': 4, 'temperature': 14.9, 'humidity': 52.63, 'light': 0, 'uv': 0.02, 'barometric_pressure': 1003.4, 'noise': 31.58, 'discomfort_index': 58.6, 'heat_stroke': 13.61, 'battery_level': 2.861}
read_response_data
response_data[9] = {'row': 3, 'temperature': 14.92, 'humidity': 52.57, 'light': 0, 'uv': 0.01, 'barometric_pressure': 1003.6, 'noise': 31.58, 'discomfort_index': 58.63, 'heat_stroke': 13.62, 'battery_level': 2.861}
read_response_data
response_data[10] = {'row': 2, 'temperature': 14.93, 'humidity': 52.48, 'light': 0, 'uv': 0.02, 'barometric_pressure': 1003.7, 'noise': 32.77, 'discomfort_index': 58.64, 'heat_stroke': 13.63, 'battery_level': 2.861}
read_response_data
response_data[11] = {'row': 1, 'temperature': 14.95, 'humidity': 52.4, 'light': 0, 'uv': 0.01, 'barometric_pressure': 1003.9, 'noise': 32.35, 'discomfort_index': 58.67, 'heat_stroke': 13.65, 'battery_level': 2.865}
read_response_data
response_data[12] = {'row': 0, 'temperature': 14.99, 'humidity': 52.25, 'light': 0, 'uv': 0.01, 'barometric_pressure': 1003.7, 'noise': 32.77, 'discomfort_index': 58.72, 'heat_stroke': 13.67, 'battery_level': 2.865}
It seems that it takes about 4 to 5 seconds to read the observation data from the environmental sensor per page, so it is not realistic to read all pages every time a connection is made. Therefore, save the observation data in the DB and read the difference to acquire only the difference from the observation data acquired when the previous command was started.
I didn't prepare the implementation sample by hand, so please see the main () method of main.py.
https://github.com/cnaos/raspi-wxbeacon2/blob/master/main.py
Recommended Posts