Enregistrez des éléments dans AWS IoT à l'aide du kit SDK AWS IoT Python. Quand il y a beaucoup de choses, il est difficile de s'enregistrer à chaque fois sur la console.
En même temps que l'enregistrement de l'élément, procédez comme suit.
--Créez une stratégie à attacher au certificat. (Procédure omise)
import boto3
import json
import os
class AWSIoT():
#Certificat, nom du fichier de clé
FILENAME_PUBLIC_KEY = 'public_key.pem'
FILENAME_PRIVATE_KEY = 'private_key.pem'
FILENAME_CERT = 'cert.pem'
def __init__(self, dirpath_cert):
#Instancier la classe à utiliser
self.client_iot = boto3.client('iot')
self.client_iotdata = boto3.client('iot-data')
#Répertoire de stockage des certificats
self.DIRPATH_CERT = dirpath_cert
return
def enroll_thing(self, thing_name, dict_attr, group_name, property_desired, property_reported, policy):
'''
Enregistrer des éléments dans AWS IoT
'''
#Enregistrer des éléments dans AWS IoT ("attribut"Enregistrer des informations sur des éléments
self.__create_thing(thing_name, dict_attr)
#Ajouter des éléments enregistrés au groupe
self.client_iot.add_thing_to_thing_group(thingGroupName=group_name, thingName=thing_name)
#Enregistrer les informations dans l'ombre de l'appareil
self.__update_shadow(thing_name, property_desired, property_reported)
#Émettre et enregistrer le certificat / la clé de l'appareil
response = self.__create_keys_and_cert(thing_name)
#Joindre la stratégie au certificat
self.client_iot.attach_policy(policyName=policy, target=response['certificateArn'])
#Lier le certificat à l'appareil
self.client_iot.attach_thing_principal(thingName=thing_name, principal=response['certificateArn'])
return
def __create_thing(self, thingname, dict_attr):
'''
Enregistrer des éléments dans AWS IoT ("attribut"Enregistrer des informations sur des éléments
'''
#Générer des informations d'enregistrement (attribut)
attributePayload = self.__create_attribute_payload(dict_attr)
#Enregistrer des choses
self.client_iot.create_thing(
thingName=thingname,
attributePayload=attributePayload
)
return
def __create_attribute_payload(self, dict_attr):
'''
Générer des informations d'enregistrement (attribut)
'''
attributePayload = {
'attributes': dict_attr
}
return attributePayload
def __update_shadow(self, thing_name, property_desired, property_reported):
'''
Enregistrer les informations dans l'ombre de l'appareil
'''
#Formatage des informations de version pour écrire dans l'ombre de l'appareil
payload = self.__create_payload(property_desired, property_reported)
#Enregistrer les informations dans l'ombre de l'appareil
self.client_iotdata.update_thing_shadow(
thingName=thing_name,
payload=payload
)
return
def __create_payload(self, property_desired, property_reported):
'''
Formatage des informations de version pour écrire dans l'ombre de l'appareil
'''
payload = json.dumps({'state':
{"desired": {"property": property_desired},
"reported": {"property": property_reported}}})
return payload
def __create_keys_and_cert(self, thing_name):
'''
Émettre et enregistrer le certificat / la clé de l'appareil
'''
#Émettre un certificat et une clé
response = self.client_iot.create_keys_and_certificate(setAsActive=True)
#Générer un chemin de répertoire de destination
dirpath_save = self.DIRPATH_CERT + thing_name + '/'
#Écrire dans un fichier et enregistrer
self.__write_to_file(dirpath_save, self.FILENAME_PUBLIC_KEY, response['keyPair']['PublicKey'])
self.__write_to_file(dirpath_save, self.FILENAME_PRIVATE_KEY, response['keyPair']['PrivateKey'])
self.__write_to_file(dirpath_save, self.FILENAME_CERT, response['certificatePem'])
return response
def __write_to_file(self, dirpath, filename, contents):
'''
Écrire dans un fichier
'''
os.makedirs(dirpath, exist_ok=True)
filepath = dirpath + filename
with open(filepath, mode='w') as f:
f.write(contents)
return
--Définir les informations d'enregistrement
#Le nom de la chose
thing_name = 'ThingName'
#Attribut de chose (clé d'attribut:valeur)
dict_attr = {'BuildingName':'hogehoge_building', 'Floor':'6'}
#Le nom du groupe auquel appartient l'objet
group_name = 'hogehoge_group'
#Informations à enregistrer dans le shadow d'appareil
temp_desired = 26
temp_reported = 22
#Politique à joindre au certificat
policy = 'policy_thermometer'
#Chemin du répertoire pour stocker le certificat et la clé
dirpath_cert = './cert/'
--Instaurer et exécuter la classe
awsiot = AWSIoT(dirpath_cert)
awsiot.enroll_thing(thing_name, dict_attr, group_name, temp_desired, temp_reported, policy)
L'appareil a été enregistré. Les attributs sont également enregistrés correctement.
L'ombre est également correctement enregistrée. ("Delta" est créé automatiquement. Les détails sont omis.)
Le certificat est également lié correctement,
Une politique est attachée au certificat.
Je suis un très débutant, alors j'apprécierais que vous puissiez souligner et commenter même les plus petites choses. Je fais Twitter → @shin_job
Recommended Posts