About processing IoT time series data-Aggregation processing in Azure Time Series Insights

Overview

Azure Time Series Insights acts as a temporary store for time series data. In IoT, data is often time series, and because it is data from sensors, it is necessary to process the data before visualization or analysis. In particular, time-series data often needs to be processed for data loss, non-uniformity of time intervals, noise, and so on. If the processed data can be output at regular time intervals, it will be very useful as a temporary store.

ts1.png

Azure Time Series Insights allows you to do a lot of aggregation with API and JSON-based query language. I would like to make an API call from Python. Use the environment created in the previous article.

ts2.png

The Azure API uses Oauth 2.0 as authentication for authentication. The processing is performed in the following order.

Register in the directory and issue the key

Register the application and make a note of the following three items.

A: Directory ID

B: Application ID

C: key

Set access rights for applications registered in resources (services). Go to the Time Series Insights portal page and configure the settings.

keypub.jpg

Issuing an access token by API

URL ENCODE the content you wrote down in the previous step to the body of the message and POST it to the API URL with https.

AUTH_API_URL = "https://login.windows.net/Directory ID of A/oauth2/token?api-version=1.0"
 token_query = {
      'grant_type': 'client_credentials',
      'resource': 'https://api.timeseries.azure.com/',
      'client_id':’B application ID', 
      'client_secret':'C key'
 }
 token_request = urllib2.Request(AUTH_API_URL,urllib.urlencode(token_query))
 token_request.add_header('Content-Type','application/x-www-form-urlencoded')
 response = urllib2.urlopen(token_request)

Create Time Series Insights and record the FQDN

Please refer to here for the creation method. Make a note of the FQDN of D.

fqdn.jpg

Use the service using the access token

If the above is successful, a token to access will be returned. Use that token to access the Aggregation feature of Time Series Insights.

For aggregation, the aggregation is executed by defining a query in JSON format and sending it to the Websocket API. The API reference is here.

The query is written in JSON. The reference is here.

Below is an example query that aggregates temperature and humidity. It is totaled in 1 minute units. This process keeps the time interval constant (averaged).

{
  "content": {
    "searchSpan": {
      "from": "2017-08-08T00:00:00.000Z",
      "to": "2017-08-08T02:00:00.000Z"
    },
    "aggregates": [
      {
        "dimension": {
          "dateHistogram": {
            "input": {
              "builtInProperty": "$ts"
            },
            "breaks": { "size": "1m" }
          }
        },
        "measures": [
          {
            "avg": {
              "input": {
                "property": "temperature",
                "type": "Double"
              }
            }
          },
          {
            "avg": {
              "input": {
                "property": "humidity",
                "type": "Double"
              }
            }
          },
          {
            "count": { }
          }
        ]
      }
    ]
  },
  "headers": {
    "Authorization": "Set an access token"
  }
}

This is Python code that sends a query and displays the result in CSV.

AUTH_API_URL = "https://login.windows.net/Directory ID of A/oauth2/token?api-version=1.0"
REST_API_URL = "wss://FQDN of D.env.timeseries.azure.com/aggregates?api-version=2016-12-12"
import urllib2
import urllib
import json
import websocket
import ssl
try:
        token_query = {
                'grant_type': 'client_credentials',
                'resource': 'https://api.timeseries.azure.com/',
      'client_id':'B's application ID', 
      'client_secret':'C key'
        }
         token_request = urllib2.Request(AUTH_API_URL,urllib.urlencode(token_query))
         token_request.add_header('Content-Type','application/x-www-form-urlencoded')
         response = urllib2.urlopen(token_request)
         result_token=response.read()
         token=json.loads(result_token)
         query_file = open('c:\\local\query4.json') --Reading a JSON query from a file
          request_query=json.load(query_file)
          request_query['headers']['Authorization']="Bearer "+token['access_token']--Set access token in query

          wSocket = websocket.create_connection(REST_API_URL,sslopt={"cert_reqs": ssl.CERT_NONE})
          wSocket.send(json.dumps(request_query))
          result_wSocket =  wSocket.recv()
         dataset=json.loads(result_wSocket)--Convert received JSON to Python object
          index=0
          print "timestamp,","temperature,","humidity"
          for dimension in dataset['content'][0]['dimension']:
                         print dimension,",",dataset['content'][0]['measures'][index][0],",",dataset['content'][0]['measures'][index][1]
                         index=index+1
        wSocket.close()
except urllib2.HTTPError as e:
        print("HTTP Error: {0} - {1}".format(e.code, e.reason))
except urllib2.URLError as e:
        print("URL Error: {0}".format(e.reason))
except Exception as e:
        print("General Exception: {0}".format(e))

Recommended Posts

About processing IoT time series data-Aggregation processing in Azure Time Series Insights
About parameter processing in Flask request.args
About time series data and overfitting
Convenient time series aggregation with TimeGrouper in pandas
I investigated in detail about variable processing in python
A clever way to time processing in Python
How to read time series data in PyTorch