Implementation example of LINE BOT server for actual operation

Overview

Considering a LINE BOT server that can be actually operated, asynchronous processing is required as written by yoichiro6642 at the following reference URL. Reference URL: LINE BOT server architecture that is safe even if a large number of messages come

Sequence Diagram.png

I wrote a LINE BOT server (skeleton) according to the above, with the aim of being able to withstand a large number of messages in a small to medium scale environment. The last "Reduce the number of API calls" (specify multiple MIDs in message transmission to reduce the number of PI calls) is not implemented. The environment used is as follows.

I had the option of Amazon API Gateway + Lambda + DynamoDB, but I thought that Node.js + MongoDB + Python could implement a lightweight Dispatcher & jobWorker with less overhead.

For Queue, RabbitMQ, memcached, Redis, etc. were considered, but MongoDB was used for the following reasons.

--I want a trigger that can kick the process when it is added to the Queue instead of polling. --MongoDB can be used as a single (not necessarily single) master to use oplog, and can be used as a trigger by monitoring oplog. --After all, a high-speed DB is required to store and refer to the information for each received MID.

Prerequisite knowledge

Implementation example

Preparing MongoDB

I specified about replication and oplogSizeMB.

text:mongod.line_bot.conf


systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongod.line_bot.log
storage:
  dbPath: /var/lib/mongo/line_bot
  journal:
    enabled: true
processManagement:
  fork: false  # fork and run in background
  pidFilePath: /var/run/mongodb/mongod.line_bot.pid  # location of pidfile
net:
  port: 27017
  bindIp: 127.0.0.1  # Listen to local interface only, comment to listen on all interfaces.
replication:
  oplogSizeMB: 3072

Launch Mongod

Start in master mode.

$ mongod --master -f mongod.line_bot.conf

Creating a collection

The collection is capped so that you don't have to worry about capacity growth.

create_collection


#!/bin/bach -v
mongo --port=27017 <<EOF
use line_bot;
db.createCollection("recvq", {
  capped: true,
  size: 1048576000 // 1GB
});
EOF

BOT Server(Node.js) frontDesk.js receives a message from LINE Server and returns an immediate response.

frontDesk.js


// Settings of the this program
var httpsPort = 443;
var allowPath = "/callback";
var httpsOpt  = {
    "caKey"  : "/etc/letsencrypt/live/xxx/privkey.pem",
    "caCert" : "/etc/letsencrypt/live/xxx/fullchain.pem",
    "caCa"   : "/etc/letsencrypt/live/xxx/chain.pem"
};
local = {};
local['channelSecret'] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";

// Settings of the MongoDB
var mongoHost = "127.0.0.1";
var mongoPort = 27017;
var mongoDb   = "line_bot";
var mongoCol  = "recvq";

var express= require('express'),
bodyParser = require('body-parser'),
log4js     = require('log4js'),
https      = require('https'),
fs         = require('fs'),
mongo      = require('mongodb'),
path       = require('path');

var accept = require(__dirname+'/accept');
var app    = express();
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: true }));

// MongoDB
var MongoClient = require('mongodb').MongoClient, assert = require('assert');
var mongoUrl = 'mongodb://'+mongoHost + ":" + mongoPort + "/" + mongoDb;
set_col(local, mongoUrl, function(rc, local, mongoUrl) {
    if (!rc) {
        console.log("set_col.rc:"+rc);
        local.db.close();
        process.exit(1);
    }
    console.log("Connected succesfully to "+mongoUrl);
});

// handle a request
app.post(allowPath, function(req, res, next) {
    local['acceptTime'] = new Date().getTime();  // record accept time(ms)

    // response ASAP
    res.status(200).send("OK");
    res.end();

    accept.post_callback(req, res, next);  // Handle the request
});

// server certificate authority
var httpsOpt = {
    key:  fs.readFileSync(httpsOpt.caKey),
    cert: fs.readFileSync(httpsOpt.caCert),
    ca:   fs.readFileSync(httpsOpt.caCa)
};
// listen port
var httpsServer = https.createServer(httpsOpt, app);
httpsServer.listen(httpsPort, function() {
    console.log('Listening on port '+httpsPort+'...'); 
}).on('error', function(err) {
    if (err.errno === 'EADDRINUSE') {
        console.log('This program is already running.');
    } else {
        console.log(err);
    }
    process.exit(1);
});

function set_col(local, url, callback) {
    // Use connect method to connect to the MongoServer
    MongoClient.connect(url, function(err, db) {
        if (err) {
            console.log("MongoDB connection error."); console.log(err);
            process.exit(1);
        }
        local['db'] = db;

        local.db.collection(mongoCol, function(err, collection) {
            if (err) {
                console.log("MongoDB collection error."); console.log(err);
                process.exit(1);
            }
            local.db['collection'] = collection;
            callback(true, local, url);
        });
    });
}

After that, use accept.js to verify the signature and register it in MongoDB.

accept.js


var crypto = require('crypto');
var assert = require('assert');

exports.post_callback = function(req, res) {
    //Check for signature
    if ((! req.headers) || (! req.headers["x-line-channelsignature"])) {
        console.log("400. Bad Request. The request does not have a x-line-channelsignature");
        return;
    }

    //Check for the result of request
    if ((! req.body) ||
        (! req.body['result'])) {
        console.log("400. Bad Request. The request does not have result");
        return;
    }
    var result_num = req.body.result.length;

    //HTTP body with channelSecret sha256 encryption,Ask for base64 digest.
    var body_str = new Buffer(JSON.stringify(req.body), 'utf8');
    computedSignature = crypto.createHmac("sha256",local['channelSecret']).update(body_str).digest("base64");

    //Compare signatures and confirm legitimacy
    if (req.headers["x-line-channelsignature"] != computedSignature) {
        console.log("400. Bad Request. The x-line-channelsignature is wrong.");
        return;
    }

    //Enter the time you received
    for (var i=0; i<Object.keys(req.body.result).length; i++) {
        req.body.result[i]['acceptTime'] = local['acceptTime'];
    }

    //Register message in MongoDB
    local.db.collection.insertMany(req.body.result, function(err, r) {
        assert.equal(null, err);
        assert.equal(result_num, r.insertedCount);

        toQueueTime = new Date().getTime() - local['acceptTime'];
        console.log("necessary time to store to queue: "+toQueueTime+" ms");

        return;
    });

}

Dispatcher & jobWorker Implemented in Python multithreading. The jobWorker thread waits for threading.Event () with wait () when it is created. The trigger thread monitors oplog with ts and starts processing when it is added to the Queue. Allocate the contents of the read Queue to a free jobWorker thread, set Event and let jobWorker start processing.

I'm aware of thread references and updates to lists and variables, so I don't lock them ... I was planning to do this, but when I accessed the LINE API server in multiple threads, an error regarding the number of simultaneous connections occurred. So, to access the LINE API server from jobWorker, we use exclusive lock with acquire (). Since the document does not describe that area, I set it to 1 multiplex and access interval of 100ms. I'm new to Python multithreading, so please point out any mistakes.

dispatcher.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Settings of the this program
NumOfThread       = 20
searchInterval    = 100000  # uSec
mainSleepInterval = 60      # Sec

# Settings of the MongoDB
mongoHost = "127.0.0.1";
mongoPort = 27017;
mongoDb   = "line_bot";
mongoCol  = "recvq";

import os,os.path
import sys
import threading
import time
import json
import pymongo
from   pymongo.cursor import CursorType
from   datetime import datetime
import datetime
import jobWorker

usleep = lambda x: time.sleep(x/1000000.0)  #Microsecond sleep


#####worker thread
def workerThread(tt):
    tno = str(tt[0])
    while True:
        tt[2].clear()  #Clear Event and wait until Evant occurs
        tt[3] = 'w'
        tt[2].wait()
        if verbose:  #Waiting ends. Start processing
            print '\nworker['+tno+']: wake up'
        
        #Call the actual processing function here
        jobWorker.jobWorker(verbose, tno, tt[4]['o'])


#####MongoDB trigger thread
def TriggerMongo(t, tchain, last, searchInterval, host, port, db, col):
    dbCol = db + '.' + col
    c = pymongo.MongoClient(host, port)
    # Uncomment this for master/slave.
    oplog = c.local.oplog['$main']
    # Uncomment this for replica sets.
    #oplog = c.local.oplog.rs
    
    first = next(oplog.find().sort('$natural', pymongo.DESCENDING).limit(-1))
    ts = first['ts']
    
    while True:
        cursor = oplog.find({'ts': {'$gt': ts}}, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True)
        while cursor.alive:
            for doc in cursor:
                #Regularly{h:0,ts:Timestamp(nn.., 1),o:{},v:2,ns:'',op:'n'}Returns
                #Is op:'n'Is just information. ignore.
                if doc['ns']==dbCol and doc['op']!='n':
                    #Find free threads
                    i = tchain[last]
                    while t[i][3] != 'w':
                        i = tchain[i]
                        if i == tchain[last]:  #If you look for one lap
                            usleep(searchInterval)
                    
                    t[i][4] = doc  #Free thread t[n][4]Store data in
                    t[i][3] = 'r'
                    t[i][2].set()  # t[n]Processing start instruction
                    last = i
                # Work with doc here
                ts = doc['ts']
        print "got out of a while corsor.alive loop"


#######################################################################

# Check of the parameter
verbose = False
if len(sys.argv)==2 and sys.argv[1]=='-v':
    verbose = True
elif len(sys.argv)!=1:
    print "Usage: %s [-v]" % (sys.argv[0],)
    quit()

#worker thread management data creation&worker thread creation
# [ThreadNo, ThreadObj ,EvantObj, status,Data to pass to the thread]
#   (status ' ':in preparation, 'w':Waiting / free, 'r':Running)
#  :
t = [ [0 for i in range(5)] for i in range(NumOfThread)]
for i in range(NumOfThread):
    t[i][0] = i                  # Thread No.
    t[i][2] = threading.Event()  #Evant object generation
    t[i][3] = ' '                # is_running
    #worker thread creation
    t[i][1] = threading.Thread(name='worker['+str(i)+']', target=workerThread,
                               args=(t[i],))
    t[i][1].setDaemon(True)

# Thread list of circulation
tc = [0 for i in range(NumOfThread)]  #The value is the next thread No.
for i in range(NumOfThread):
    tc[i] = i+1
tc[i] = 0  # make a list of circulation

lastThread = i  #Last thread used.Next is tc[lastThread]Use the second thread.

#worker thread start
for i in range(NumOfThread):
    t[i][1].start()

#Wait for wait state after starting worker thread
yetAllThread = True
while yetAllThread:
    for i in range(NumOfThread):
        if t[i][3] == ' ':
            break
        else:
            usleep(100)  #Monitoring interval is 0.1 millisecond
    if i == NumOfThread-1:
        yetAllThread = False
    else:
        usleep(100)  #Monitoring interval is 0.1 millisecond

#MongoDB trigger thread generation
t_mongo = threading.Thread(name='t_mongo', target=TriggerMongo, args=(t,tc,lastThread,searchInterval,mongoHost,mongoPort,mongoDb,mongoCol,))
t_mongo.setDaemon(True)
t_mongo.start()  #start

#main thread
while True:
    time.sleep(mainSleepInterval)

jobWorker.py is the thread that does the actual processing. This is a sample that only returns parrots according to the type of content to be sent. Please note that the method of taking MID (from) differs depending on the opType.

jobWorker.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Note of caution:
# - This program is one of the threads.
# - Must not do exit(),quit()
# - Please use only return()

# Settings of the LINE API Server
lineApiHost      = "trialbot-api_line_me"
accessIntervalMS = 100  # ms
getProfilesUrl   = "https://trialbot-api.line.me/v1/profiles"
postUrl          = "https://trialbot-api.line.me/v1/events"
getContentUrl    = "https://trialbot-api.line.me/v1/bot/message/$messageId/content"
header =  {
    "Content-Type"                : "application/json; charser=UTF-8",
    "X-Line-ChannelID"            : "xxxxxxxxxx",
    "X-Line-ChannelSecret"        : "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
    "X-Line-Trusted-User-With-ACL": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
}
postBodyTemplate = {
    "to"          : [],
    "toChannel"   : 1383378250,
    "eventType"   : "138311608800106203",
    "content"     : {}
}

import threading
import time
import datetime
import json

usleep = lambda x: time.sleep(x/1000000.0)  #Microsecond sleep

#Lock related to avoid simultaneous access to LINE API server from multiple jobWorker threads
globalLock = {}            #Lock for each connection destination
globalLastAccessTime = {}  #Last access time for each connection destination
loadTime = int(time.time()*1000)

def jobWorker(verbose, mynum, recvBody):
    global globalLock
    global globalLastAccessTime
    
    #Initial lock settings for each connection destination
    if not globalLock.has_key(lineApiHost):
        globalLock[lineApiHost] = threading.Lock()
    #Initial setting of last access time for each connection destination
    if not globalLastAccessTime.has_key(lineApiHost):
        globalLastAccessTime[lineApiHost] = loadTime
    
    if verbose:
        recvBody['_id'] = 'ObjectId("'+str(recvBody['_id'])+'")'
        print 'worker['+mynum+'] recvBody: '+str(int(time.time()*1000)-recvBody['acceptTime'])+' ms to here from accept'
        print recvBody
    
    opType = recvBody['content'].get('opType')
    
    # blocked from user
    if opType == 8:
        #MID of block user from user management(recvBody['content']['params'][0])Delete
        print 'please delete user "'+recvBody['content']['params'][0]+'" from management data.'
        return
    
    #Copy the Body part of POST
    postBody = {}
    postBody['to'] = ''
    postBody['toChannel'] = postBodyTemplate['toChannel']
    postBody['eventType'] = postBodyTemplate['eventType']
    postBody['content'] = {}
    
    #Message reply destination
    if opType==4:  # New user
        postBody['to'] = [ recvBody['content']['params'][0] ]
    else:
        postBody['to'] = [ recvBody['content']['from'] ]
    
    # New user
    if opType==4:
        #Get user profile
        result = apiServer(verbose, mynum, 'get', lineApiHost, getProfilesUrl, header, postBody['to'][0], accessIntervalMS)
        userProfile = json.loads(result.text)
        resText = 'Welcome!'
        #Profile should be added to the user management DB with the user's MID
        print 'please add '+userProfile['contacts'][0]['displayName']+'\'s profile to user management db.'
        print json.dumps(userProfile, sort_keys = True, indent=4)
    
    #Processing according to the message
    contentType = recvBody['content'].get('contentType')
    resText = ''
    if contentType == 1:  # Text
        resText = u'Yes,'+recvBody['content']['text']+u',is not it.'
    elif contentType == 2:  # Image
        resText = u'It's a photo...'
    elif contentType == 3:  # Video
        resText = u'It's a video...'
    elif contentType == 4:  # Audio
        resText = u'It's a voice message...'
    elif contentType == 7:  # Location
        resText = u'It's location information...'
        if verbose:
            print recvBody['content']['text'].encode('utf-8')
            print recvBody['content']['location']['title'].encode('utf-8')
            print recvBody['content']['location']['address'].encode('utf-8')
    elif contentType == 8:  # Sticker
        resText = u'It's a stamp'
        if verbose:
            print recvBody['content']['contentMetadata']['STKTXT'].encode('utf-8')
    elif contentType == 10: # Contact
        # Contact(contentType==10)Get the mid profile of contentMetadata
        resText = recvBody['content']['contentMetadata']['displayName']+u'It's your contact information'
        result = apiServer(verbose, mynum, 'get', lineApiHost, getProfilesUrl, header, recvBody['content']['contentMetadata']['mid'], accessIntervalMS)
        contactProfile = json.loads(result.text)
        if verbose:
            print '\ncontactProfile: ' + str(contactProfile)
    
    #Send response message
    if resText:
        #Get user profile(Originally registered in the DB at the time of user registration, acquired as needed)
        result = apiServer(verbose, mynum, 'get', lineApiHost, getProfilesUrl, header, postBody['to'][0], accessIntervalMS)
        userProfile = json.loads(result.text)
        resText = userProfile['contacts'][0]['displayName'] + u'Mr.' + resText
        if verbose:
            print '\nprofile: ' + str(userProfile)
        
        #Outgoing message is text(ContentType=1).Besides, Image,Video,Audio,Location,Sticker,multiple messages,You can send rich messages
        postBody['content'] = {
            'contentType': 1,
            'toType'     : 1,
            'text'       : resText
        }
        if verbose:
            print '\nworker['+mynum+'] ' + postUrl
            print 'worker['+mynum+'] postHeader: ' + json.dumps(header, sort_keys = True, indent = 4)
            print 'worker['+mynum+'] postBody: ' + json.dumps(postBody, sort_keys = True, indent = 4)
        
        #Send Messege
        r = apiServer(verbose, mynum, 'post', lineApiHost, postUrl, header, postBody, accessIntervalMS)
    
    return


#LINE API server access
def apiServer(verbose, mynum, method, host, url, header, body, accessIntervalMS):
    import requests
    global globalLock
    global globalLastAccessTime
    
    globalLock[host].acquire()  # Lock
    
    #How long should I wait if I have a certain amount of time to access the LINE API server?
    currentTime = int(time.time()*1000)
    remain = accessIntervalMS - (currentTime - globalLastAccessTime[host])
    
    if verbose:
        print 'worker['+mynum+'] time since last access(ms): '+str(currentTime - globalLastAccessTime[host])
        print 'worker['+mynum+'] remain='+str(remain)+' ms'
    
    # wait accessIntervalMS from last access
    if remain > 0:
        usleep(remain*1000)
    
    if method=='get':
        if body:
            payload = { 'mids': body }
            r = requests.get(url, params=payload, headers=header)
        else:
            if verbose:
                print url, header
            r = requests.get(url, headers=header)
    else:
        r = requests.post(url, data=json.dumps(body), headers=header)
    
    if verbose and r.status_code!=200:
        print 'worker['+mynum+'] HTTP status code: ' + str(r.status_code)
        print 'worker['+mynum+'] response: ' + r.text
    
    globalLastAccessTime[host] = int(time.time()*1000)
    
    globalLock[host].release()  # release
    return r

Summary

I think we were able to implement the Dispatcher & jowWorker skeleton, a lightweight Queue mechanism that can be used even when a large number of messages come. In the initial state of 64bit CentOS 7, the upper limit of the number of threads in the entire system is 30118, but if it is ..5000 threads, the generation will fail. (... I don't need that much) Such a mechanism is necessary not only for BOT servers but also for efficient delivery of large amounts of mail using multiple SMTP servers.

If you want to make the jobWorker side a different language application, you can use it by making it a microservice or changing it to communicate with pipe in another process. If you want to distribute the load with this mechanism, you can make MongoDB another server or Sharding, or bring "4.consurring message" or later to another machine. If you want to distribute job workers more than that, it is better to make it a microservice or use another request broker mechanism.

Recommended Posts

Implementation example of LINE BOT server for actual operation
Build API server for checking the operation of front implementation with python3 and Flask
Implementation of Scale-space for SIFT
An implementation of ArcFace for TensorFlow
Notify LINE of train operation information
Implementation example of hostile generation network (GAN) by keras [For beginners]
Create a LINE BOT with Minette for Python
Format one line of json for easy viewing
Code for checking the operation of Python Matplotlib
Implementation of Deep Learning model for image recognition
Simple implementation example of one kind of data augmentation