A network equipment manufacturer in Shenzhen, China, whose main product is routers. In recent years, we have been focusing on IoT home appliances such as smart light bulbs and smart plugs, and have established a unique position on Amazon due to the good cost performance.
[Smart Plug HS105](https://www.amazon.co.jp/Alexa%E8%AA%8D%E5%AE%9A%E5%8F%96%E5%BE%97%E8%A3%BD % E5% 93% 81% E3% 80% 91-% E7% 9B% B4% E5% B7% AE% E3% 81% 97% E3% 82% B3% E3% 83% B3% E3% 82% BB% E3% 83% B3% E3% 83% 88-Google% E3% 83% 9B% E3% 83% BC% E3% 83% A0% E5% AF% BE% E5% BF% 9C-% E9% 9F% B3 % E5% A3% B0% E3% 82% B3% E3% 83% B3% E3% 83% 88% E3% 83% AD% E3% 83% BC% E3% 83% AB-HS105 / dp / B078HSBNMT? Ref_ = ast_sto_dp)
[Smart Light Bulb KL110](https://www.amazon.co.jp/%E3%80%90Amazon-Alexa%E8%AA%8D%E5%AE%9A-%E3%80%91TP-Link-KL110] -Google / dp / B07GC4JR83? Ref_ = ast_sto_dp & th = 1)
This time, using the API, ・ ON-OFF operation of equipment ・ Acquisition of information such as ON-OFF and light bulb brightness I tried running it in Python and Node.js
Since the above can cover many of the applications that can be thought of as IoT home appliances. ** The result of feeling the possibility of application **!
** ・ PC ** ** ・ Raspberry Pi ** ** ・ TPLink smart plug or light bulb ** This time I tried the following 3 products HS105: Smart plug KL110: White light bulb KL130: Color light bulb
First, test on the terminal whether the data can be obtained from TPLink.
Install tplink-smarthome-api (reference)
sudo npm install -g tplink-smarthome-api
Get a list of connected TPLink devices with the following command
tplink-smarthome-api search
HS105(JP) plug IOT.SMARTPLUGSWITCH 192.168.0.101 9999 B0BE76 ‥ Smart plug
KL110(JP) bulb IOT.SMARTBULB 192.168.0.102 9999 98DAC4 ‥ White light bulb
KL130(JP) bulb IOT.SMARTBULB 192.168.0.103 9999 0C8063 ‥ Color light bulb
You can see that all three devices have been detected
You can get device settings and OnOff with the following command.
tplink-smarthome-api getSysInfo [IP address of the device]:9999
** ・ Example of KL130 (color bulb) **
:
ctrl_protocols: { name: 'Linkie', version: '1.0' },
↓ Here is the device settings
light_state: {
on_off: 1,
mode: 'normal',
hue: 0,
saturation: 0,
color_temp: 2700,
brightness: 100
},
↑ This is the device setting
is_dimmable: 1,
is_color: 1,
:
on_off: 0 means power off, 1 means power on hue: color? (0 in white mode) color_temp: Color temperature (0 when not in white mode) brightness: Brightness (in%) Seems to be
** ・ Example of KL110 (white light bulb) **
:
ctrl_protocols: { name: 'Linkie', version: '1.0' },
↓ Here is the device settings
light_state: {
on_off: 1,
mode: 'normal',
hue: 0,
saturation: 0,
color_temp: 2700,
brightness: 100
},
↑ This is the device setting
is_dimmable: 1,
is_color: 0,
:
on_off: 0 means power off, 1 means power on hue: Hue (0 in white mode) saturation: saturation color_temp: Color temperature (0 when not in white mode) brightness: Brightness (in%) It seems that. It's almost the same as KL130, but it's not a color, so it seems to be is_color: 0.
** ・ Example of KL105 (smart plug) **
alias: '',
↓ Here is the device settings
relay_state: 1,
on_time: 288,
active_mode: 'none',
feature: 'TIM',
updating: 0,
icon_hash: '',
rssi: -52,
led_off: 0,
longitude_i: 1356352,
latitude_i: 348422,
↑ This is the device setting
hwId: '047D‥',
relay_state: 0 for power off, 1 for power on on_time: Continuous power ON time rssi: WiFi signal strength It seems that. The longitude (logitude) and latitude (latitude) are also displayed, but the mystery deepens when it is 5 km away from the actual location.
Above, you can confirm that you can get the information you want with the command! In the following chapters, we will describe how to get and operate from the program (Node.js & Python).
** * If you say "I don't need to explain Node.js because I use Python!", Please skip this chapter and move to ③ **
Refer to here and refer to Node.js
On Windows, the path does not pass to the global installation destination of npm, and the module can not be loaded with Node.js, so please pass it by referring to the following https://qiita.com/shiftsphere/items/5610f692899796b03f99
Find out where to install the npm module globally with the command below (For some reason, it seems to be different from the folder found by the command "npm bin -g" in Windows)
npm ls -g
Edit .profile with the following command.
nano /home/[User name]/.profile
Add the following line to the end of the .profile and reboot
export NODE_PATH=[The path examined above]/node_modules
If the path specified by the following command is displayed, it is successful.
printenv NODE_PATH
Create the following script
tplink_test.js
const { Client } = require('tplink-smarthome-api');
const client = new Client();
client.getDevice({ host: '192.168.0.102' }).then(device => {
device.getSysInfo().then(console.log);
});
If you execute the script with the following command, you can get various information in the same way as ①.
node tplink_test.js
I didn't log well in Node.js due to my lack of JavaScript skills, so I regained my mind and made a script to operate and log in Python.
Python struggled because I couldn't find a document as polite as Node.js, but here and here I decrypted the code of tplink-lb130-api / blob / master / lb130.py) and created a script.
Above I created the following 4 classes by referring to the code. ** TPLink_Common (): Class of common functions for plugs and light bulbs ** ** TPLink_Plug (): Plug-only function class ** (inherits TPLink_Common ()) ** TPLink_Bulb (): Light bulb-only function class ** (inheriting TPLink_Common ()) ** GetTPLinkData (): Class to get data using the above class **
tplink.py
import socket
from struct import pack
import json
#TPLink data acquisition class
class GetTPLinkData():
#Method for getting plug data
def get_plug_data(self, ip):
#Creating a class for plug operation
plg = TPLink_Plug(ip)
#Get data and convert to dict
rjson = plg.info()
rdict = json.loads(rjson)
return rdict
#Light bulb data acquisition method
def get_bulb_data(self, ip):
#Creating a class for operating a light bulb
blb = TPLink_Bulb(ip)
#Get data and convert to dict
rjson = blb.info()
rdict = json.loads(rjson)
return rdict
#TPLink bulb & plug common class
class TPLink_Common():
def __init__(self, ip, port=9999):
"""Default constructor
"""
self.__ip = ip
self.__port = port
def info(self):
cmd = '{"system":{"get_sysinfo":{}}}'
receive = self.send_command(cmd)
return receive
def send_command(self, cmd, timeout=10):
try:
sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock_tcp.settimeout(timeout)
sock_tcp.connect((self.__ip, self.__port))
sock_tcp.settimeout(None)
sock_tcp.send(self.encrypt(cmd))
data = sock_tcp.recv(2048)
sock_tcp.close()
decrypted = self.decrypt(data[4:])
print("Sent: ", cmd)
print("Received: ", decrypted)
return decrypted
except socket.error:
quit("Could not connect to host " + self.__ip + ":" + str(self.__port))
return None
def encrypt(self, string):
key = 171
result = pack('>I', len(string))
for i in string:
a = key ^ ord(i)
key = a
result += bytes([a])
return result
def decrypt(self, string):
key = 171
result = ""
for i in string:
a = key ^ i
key = i
result += chr(a)
return result
#TPLink plug operation class
class TPLink_Plug(TPLink_Common):
def on(self):
cmd = '{"system":{"set_relay_state":{"state":1}}}'
receive = self.send_command(cmd)
def off(self):
cmd = '{"system":{"set_relay_state":{"state":0}}}'
receive = self.send_command(cmd)
def ledon(self):
cmd = '{"system":{"set_led_off":{"off":0}}}'
receive = self.send_command(cmd)
def ledoff(self):
cmd = '{"system":{"set_led_off":{"off":1}}}'
receive = self.send_command(cmd)
def set_countdown_on(self, delay):
cmd = '{"count_down":{"add_rule":{"enable":1,"delay":' + str(delay) +',"act":1,"name":"turn on"}}}'
receive = self.send_command(cmd)
def set_countdown_off(self, delay):
cmd = '{"count_down":{"add_rule":{"enable":1,"delay":' + str(delay) +',"act":0,"name":"turn off"}}}'
receive = self.send_command(cmd)
def delete_countdown_table(self):
cmd = '{"count_down":{"delete_all_rules":null}}'
receive = self.send_command(cmd)
def energy(self):
cmd = '{"emeter":{"get_realtime":{}}}'
receive = self.send_command(cmd)
return receive
#TPLink bulb operation class
class TPLink_Bulb(TPLink_Common):
def on(self):
cmd = '{"smartlife.iot.smartbulb.lightingservice":{"transition_light_state":{"on_off":1}}}'
receive = self.send_command(cmd)
def off(self):
cmd = '{"smartlife.iot.smartbulb.lightingservice":{"transition_light_state":{"on_off":0}}}'
receive = self.send_command(cmd)
def transition_light_state(self, hue: int = None, saturation: int = None, brightness: int = None,
color_temp: int = None, on_off: bool = None, transition_period: int = None,
mode: str = None, ignore_default: bool = None):
# copy all given argument name-value pairs as a dict
d = {k: v for k, v in locals().items() if k is not 'self' and v is not None}
r = {
'smartlife.iot.smartbulb.lightingservice': {
'transition_light_state': d
}
}
cmd = json.dumps(r)
receive = self.send_command(cmd)
print(receive)
def brightness(self, brightness):
self.transition_light_state(brightness=brightness)
def purple(self, brightness = None, transition_period = None):
self.transition_light_state(hue=277, saturation=86, color_temp=0, brightness=brightness, transition_period=transition_period)
def blue(self, brightness = None, transition_period = None):
self.transition_light_state(hue=240, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)
def cyan(self, brightness = None, transition_period = None):
self.transition_light_state(hue=180, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)
def green(self, brightness = None, transition_period = None):
self.transition_light_state(hue=120, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)
def yellow(self, brightness = None, transition_period = None):
self.transition_light_state(hue=60, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)
def orange(self, brightness = None, transition_period = None):
self.transition_light_state(hue=39, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)
def red(self, brightness = None, transition_period = None):
self.transition_light_state(hue=0, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)
def lamp_color(self, brightness = None):
self.transition_light_state(color_temp=2700, brightness=brightness)
The above class can be executed on Python code as below
** ・ When you want to turn on the power of the light bulb **
TPLink_Bulb(IP address of the light bulb).on()
** ・ When you want to turn off the power of the plug **
TPLink_Plug(IP address of the plug).off()
** ・ When you want to turn on the plug after 10 seconds **
TPLink_Plug(IP address of the plug).set_countdown_on(10)
** ・ When you want to increase the brightness of the light bulb to 10% **
TPLink_Bulb(IP address of the light bulb).brightness(10)
** ・ When you want to make the light bulb red (color light bulb only) **
TPLink_Bulb(IP address of the light bulb).red()
** ・ Get information such as on-off of light bulbs **
info = GetTPLinkData().get_plug_data(IP address of the plug)
Using the last method in the previous chapter, I created a script that logs information about light bulbs and plugs. The structure of the script is [here](https://qiita.com/c60evaporator/items/283d0569eba58830f86e#%E3%83%A1%E3%82%A4%E3%83%B3%E3%82%B9%E3%82 It is the same as% AF% E3% 83% AA% E3% 83% 97% E3% 83% 88% E4% BD% 9C% E6% 88% 90), so please read the link.
[Here](https://qiita.com/c60evaporator/items/283d0569eba58830f86e#%E8%A8%AD%E5%AE%9A%E3%83%95%E3%82%A1%E3%82%A4%E3 Like the article of% 83% AB), we created the following two types of configuration files to make it easier to manage. -DeviceList.csv: Describe the necessary information for each sensor
DeviceList.csv
ApplianceName,ApplianceType,IP,Retry
TPLink_KL130_ColorBulb_1,TPLink_ColorBulb,192.168.0.103,2
TPLink_KL110_WhiteBulb_1,TPLink_WhiteBulb,192.168.0.102,2
TPLink_HS105_Plug_1,TPLink_Plug,192.168.0.101,2
The meaning of the columns is as follows ApplianceName: Manages device names and identifies multiple devices of the same type ApplianceType: Device type. TPLink_ColorBulb: Color bulb (KL130, etc.) TPLink_WhiteBulb: White light bulb (KL110, etc.) TPLink_Plug: Smart plug (HS105, etc.) IP: IP address of the device Retry: Maximum number of re-executions Details (Re-execution count when acquisition fails, click here](https://qiita.com/c60evaporator/items/283d0569eba58830f86e#%E4%B8%8D%E5%85%B7%E5 % 90% 881peripheral% E3% 81% AE% E5% 88% 9D% E6% 9C% 9F% E5% 8C% 96% E6% 99% 82% E3% 81% AB% E3% 82% A8% E3% 83 % A9% E3% 83% BC% E3% 81% 8C% E5% 87% BA% E3% 82% 8B))
-Config.ini: Specify CSV and log output directory
config.ini [Path] CSVOutput = /share/Data/Appliance LogOutput = /share/Log/Appliance If you output both in the shared folder created by samba, you can access it from outside the Raspberry Pi, which is convenient.
appliance_data_logger.py
from tplink import GetTPLinkData
import logging
from datetime import datetime, timedelta
import os
import csv
import configparser
import pandas as pd
#Global variables
global masterdate
######TPLink data acquisition######
def getdata_tplink(appliance):
#Maximum appliance when no data value is available.Retry Repeat scan
for i in range(appliance.Retry):
try:
#When plugging
if appliance.ApplianceType == 'TPLink_Plug':
applianceValue = GetTPLinkData().get_plug_data(appliance.IP)
#When it's a light bulb
elif appliance.ApplianceType == 'TPLink_ColorBulb' or appliance.ApplianceType == 'TPLink_WhiteBulb':
applianceValue = GetTPLinkData().get_bulb_data(appliance.IP)
else:
applianceValue = None
#Log output if an error occurs
except:
logging.warning(f'retry to get data [loop{str(i)}, date{str(masterdate)}, appliance{appliance.ApplianceName}]')
applianceValue = None
continue
else:
break
#If the value can be obtained, store the POST data in dict
if applianceValue is not None:
#When plugging
if appliance.ApplianceType == 'TPLink_Plug':
data = {
'ApplianceName': appliance.ApplianceName,
'Date_Master': str(masterdate),
'Date': str(datetime.today()),
'IsOn': str(applianceValue['system']['get_sysinfo']['relay_state']),
'OnTime': str(applianceValue['system']['get_sysinfo']['on_time'])
}
#When it's a light bulb
else:
data = {
'ApplianceName': appliance.ApplianceName,
'Date_Master': str(masterdate),
'Date': str(datetime.today()),
'IsOn': str(applianceValue['system']['get_sysinfo']['light_state']['on_off']),
'Color': str(applianceValue['system']['get_sysinfo']['light_state']['hue']),
'ColorTemp': str(applianceValue['system']['get_sysinfo']['light_state']['color_temp']),
'Brightness': str(applianceValue['system']['get_sysinfo']['light_state']['brightness'])
}
return data
#If it could not be obtained, log output
else:
logging.error(f'cannot get data [loop{str(appliance.Retry)}, date{str(masterdate)}, appliance{appliance.ApplianceName}]')
return None
######CSV output of data######
def output_csv(data, csvpath):
appliancename = data['ApplianceName']
monthstr = masterdate.strftime('%Y%m')
#Output destination folder name
outdir = f'{csvpath}/{appliancename}/{masterdate.year}'
#When the output destination folder does not exist, create a new one
os.makedirs(outdir, exist_ok=True)
#Output file path
outpath = f'{outdir}/{appliancename}_{monthstr}.csv'
#Create a new output file when it does not exist
if not os.path.exists(outpath):
with open(outpath, 'w', newline="") as f:
writer = csv.DictWriter(f, data.keys())
writer.writeheader()
writer.writerow(data)
#Add one line when the output file exists
else:
with open(outpath, 'a', newline="") as f:
writer = csv.DictWriter(f, data.keys())
writer.writerow(data)
######Main######
if __name__ == '__main__':
#Get start time
startdate = datetime.today()
#Round the start time in minutes
masterdate = startdate.replace(second=0, microsecond=0)
if startdate.second >= 30:
masterdate += timedelta(minutes=1)
#Read configuration file and device list
cfg = configparser.ConfigParser()
cfg.read('./config.ini', encoding='utf-8')
df_appliancelist = pd.read_csv('./ApplianceList.csv')
#Total number of sensors and successful data acquisition
appliance_num = len(df_appliancelist)
success_num = 0
#Log initialization
logname = f"/appliancelog_{str(masterdate.strftime('%y%m%d'))}.log"
logging.basicConfig(filename=cfg['Path']['LogOutput'] + logname, level=logging.INFO)
#Dict for holding all acquired data
all_values_dict = None
######Data acquisition for each device######
for appliance in df_appliancelist.itertuples():
#Confirm that Appliance Type is TPLinke
if appliance.ApplianceType in ['TPLink_Plug','TPLink_ColorBulb','TPLink_WhiteBulb']:
data = getdata_tplink(appliance)
#Other than those above
else:
data = None
#When data exists, add it to Dict for holding all data and output CSV
if data is not None:
#all_values_Create a new dictionary when dict is None
if all_values_dict is None:
all_values_dict = {data['ApplianceName']: data}
#all_values_Add to existing dictionary when dict is not None
else:
all_values_dict[data['ApplianceName']] = data
#CSV output
output_csv(data, cfg['Path']['CSVOutput'])
#Success number plus
success_num+=1
#Log output of processing end
logging.info(f'[masterdate{str(masterdate)} startdate{str(startdate)} enddate{str(datetime.today())} success{str(success_num)}/{str(appliance_num)}]')
If you execute the above, the acquired data will be CSV output with the device name and date and time name to the folder specified in the setting file "CSVOutput".
This completes the information acquisition.
It runs 24 hours a day on RaspberrypPi, and Python has more freedom than IFTTT, so you can embody various ideas. ・ Combine with a motion sensor to turn on electricity when a person enters ・ Turn off the lights if there are no people for 30 minutes or more. ・ Automatically switches the brightness of the light bulb depending on the person And so on.
I have some things I want to make, so I will write an article again when the production is completed.
Recommended Posts