Considering a LINE BOT server that can be actually operated, asynchronous processing is required as written by yoichiro6642 at the following reference URL. Reference URL: LINE BOT server architecture that is safe even if a large number of messages come
I wrote a LINE BOT server (skeleton) according to the above, with the aim of being able to withstand a large number of messages in a small to medium scale environment. The last "Reduce the number of API calls" (specify multiple MIDs in message transmission to reduce the number of PI calls) is not implemented. The environment used is as follows.
I had the option of Amazon API Gateway + Lambda + DynamoDB, but I thought that Node.js + MongoDB + Python could implement a lightweight Dispatcher & jobWorker with less overhead.
For Queue, RabbitMQ, memcached, Redis, etc. were considered, but MongoDB was used for the following reasons.
--I want a trigger that can kick the process when it is added to the Queue instead of polling. --MongoDB can be used as a single (not necessarily single) master to use oplog, and can be used as a trigger by monitoring oplog. --After all, a high-speed DB is required to store and refer to the information for each received MID.
I specified about replication and oplogSizeMB.
text:mongod.line_bot.conf
systemLog:
destination: file
logAppend: true
path: /var/log/mongodb/mongod.line_bot.log
storage:
dbPath: /var/lib/mongo/line_bot
journal:
enabled: true
processManagement:
fork: false # fork and run in background
pidFilePath: /var/run/mongodb/mongod.line_bot.pid # location of pidfile
net:
port: 27017
bindIp: 127.0.0.1 # Listen to local interface only, comment to listen on all interfaces.
replication:
oplogSizeMB: 3072
Start in master mode.
$ mongod --master -f mongod.line_bot.conf
The collection is capped so that you don't have to worry about capacity growth.
create_collection
#!/bin/bach -v
mongo --port=27017 <<EOF
use line_bot;
db.createCollection("recvq", {
capped: true,
size: 1048576000 // 1GB
});
EOF
BOT Server(Node.js) frontDesk.js receives a message from LINE Server and returns an immediate response.
frontDesk.js
// Settings of the this program
var httpsPort = 443;
var allowPath = "/callback";
var httpsOpt = {
"caKey" : "/etc/letsencrypt/live/xxx/privkey.pem",
"caCert" : "/etc/letsencrypt/live/xxx/fullchain.pem",
"caCa" : "/etc/letsencrypt/live/xxx/chain.pem"
};
local = {};
local['channelSecret'] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
// Settings of the MongoDB
var mongoHost = "127.0.0.1";
var mongoPort = 27017;
var mongoDb = "line_bot";
var mongoCol = "recvq";
var express= require('express'),
bodyParser = require('body-parser'),
log4js = require('log4js'),
https = require('https'),
fs = require('fs'),
mongo = require('mongodb'),
path = require('path');
var accept = require(__dirname+'/accept');
var app = express();
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: true }));
// MongoDB
var MongoClient = require('mongodb').MongoClient, assert = require('assert');
var mongoUrl = 'mongodb://'+mongoHost + ":" + mongoPort + "/" + mongoDb;
set_col(local, mongoUrl, function(rc, local, mongoUrl) {
if (!rc) {
console.log("set_col.rc:"+rc);
local.db.close();
process.exit(1);
}
console.log("Connected succesfully to "+mongoUrl);
});
// handle a request
app.post(allowPath, function(req, res, next) {
local['acceptTime'] = new Date().getTime(); // record accept time(ms)
// response ASAP
res.status(200).send("OK");
res.end();
accept.post_callback(req, res, next); // Handle the request
});
// server certificate authority
var httpsOpt = {
key: fs.readFileSync(httpsOpt.caKey),
cert: fs.readFileSync(httpsOpt.caCert),
ca: fs.readFileSync(httpsOpt.caCa)
};
// listen port
var httpsServer = https.createServer(httpsOpt, app);
httpsServer.listen(httpsPort, function() {
console.log('Listening on port '+httpsPort+'...');
}).on('error', function(err) {
if (err.errno === 'EADDRINUSE') {
console.log('This program is already running.');
} else {
console.log(err);
}
process.exit(1);
});
function set_col(local, url, callback) {
// Use connect method to connect to the MongoServer
MongoClient.connect(url, function(err, db) {
if (err) {
console.log("MongoDB connection error."); console.log(err);
process.exit(1);
}
local['db'] = db;
local.db.collection(mongoCol, function(err, collection) {
if (err) {
console.log("MongoDB collection error."); console.log(err);
process.exit(1);
}
local.db['collection'] = collection;
callback(true, local, url);
});
});
}
After that, use accept.js to verify the signature and register it in MongoDB.
accept.js
var crypto = require('crypto');
var assert = require('assert');
exports.post_callback = function(req, res) {
//Check for signature
if ((! req.headers) || (! req.headers["x-line-channelsignature"])) {
console.log("400. Bad Request. The request does not have a x-line-channelsignature");
return;
}
//Check for the result of request
if ((! req.body) ||
(! req.body['result'])) {
console.log("400. Bad Request. The request does not have result");
return;
}
var result_num = req.body.result.length;
//HTTP body with channelSecret sha256 encryption,Ask for base64 digest.
var body_str = new Buffer(JSON.stringify(req.body), 'utf8');
computedSignature = crypto.createHmac("sha256",local['channelSecret']).update(body_str).digest("base64");
//Compare signatures and confirm legitimacy
if (req.headers["x-line-channelsignature"] != computedSignature) {
console.log("400. Bad Request. The x-line-channelsignature is wrong.");
return;
}
//Enter the time you received
for (var i=0; i<Object.keys(req.body.result).length; i++) {
req.body.result[i]['acceptTime'] = local['acceptTime'];
}
//Register message in MongoDB
local.db.collection.insertMany(req.body.result, function(err, r) {
assert.equal(null, err);
assert.equal(result_num, r.insertedCount);
toQueueTime = new Date().getTime() - local['acceptTime'];
console.log("necessary time to store to queue: "+toQueueTime+" ms");
return;
});
}
Dispatcher & jobWorker Implemented in Python multithreading. The jobWorker thread waits for threading.Event () with wait () when it is created. The trigger thread monitors oplog with ts and starts processing when it is added to the Queue. Allocate the contents of the read Queue to a free jobWorker thread, set Event and let jobWorker start processing.
I'm aware of thread references and updates to lists and variables, so I don't lock them ... I was planning to do this, but when I accessed the LINE API server in multiple threads, an error regarding the number of simultaneous connections occurred. So, to access the LINE API server from jobWorker, we use exclusive lock with acquire (). Since the document does not describe that area, I set it to 1 multiplex and access interval of 100ms. I'm new to Python multithreading, so please point out any mistakes.
dispatcher.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Settings of the this program
NumOfThread = 20
searchInterval = 100000 # uSec
mainSleepInterval = 60 # Sec
# Settings of the MongoDB
mongoHost = "127.0.0.1";
mongoPort = 27017;
mongoDb = "line_bot";
mongoCol = "recvq";
import os,os.path
import sys
import threading
import time
import json
import pymongo
from pymongo.cursor import CursorType
from datetime import datetime
import datetime
import jobWorker
usleep = lambda x: time.sleep(x/1000000.0) #Microsecond sleep
#####worker thread
def workerThread(tt):
tno = str(tt[0])
while True:
tt[2].clear() #Clear Event and wait until Evant occurs
tt[3] = 'w'
tt[2].wait()
if verbose: #Waiting ends. Start processing
print '\nworker['+tno+']: wake up'
#Call the actual processing function here
jobWorker.jobWorker(verbose, tno, tt[4]['o'])
#####MongoDB trigger thread
def TriggerMongo(t, tchain, last, searchInterval, host, port, db, col):
dbCol = db + '.' + col
c = pymongo.MongoClient(host, port)
# Uncomment this for master/slave.
oplog = c.local.oplog['$main']
# Uncomment this for replica sets.
#oplog = c.local.oplog.rs
first = next(oplog.find().sort('$natural', pymongo.DESCENDING).limit(-1))
ts = first['ts']
while True:
cursor = oplog.find({'ts': {'$gt': ts}}, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True)
while cursor.alive:
for doc in cursor:
#Regularly{h:0,ts:Timestamp(nn.., 1),o:{},v:2,ns:'',op:'n'}Returns
#Is op:'n'Is just information. ignore.
if doc['ns']==dbCol and doc['op']!='n':
#Find free threads
i = tchain[last]
while t[i][3] != 'w':
i = tchain[i]
if i == tchain[last]: #If you look for one lap
usleep(searchInterval)
t[i][4] = doc #Free thread t[n][4]Store data in
t[i][3] = 'r'
t[i][2].set() # t[n]Processing start instruction
last = i
# Work with doc here
ts = doc['ts']
print "got out of a while corsor.alive loop"
#######################################################################
# Check of the parameter
verbose = False
if len(sys.argv)==2 and sys.argv[1]=='-v':
verbose = True
elif len(sys.argv)!=1:
print "Usage: %s [-v]" % (sys.argv[0],)
quit()
#worker thread management data creation&worker thread creation
# [ThreadNo, ThreadObj ,EvantObj, status,Data to pass to the thread]
# (status ' ':in preparation, 'w':Waiting / free, 'r':Running)
# :
t = [ [0 for i in range(5)] for i in range(NumOfThread)]
for i in range(NumOfThread):
t[i][0] = i # Thread No.
t[i][2] = threading.Event() #Evant object generation
t[i][3] = ' ' # is_running
#worker thread creation
t[i][1] = threading.Thread(name='worker['+str(i)+']', target=workerThread,
args=(t[i],))
t[i][1].setDaemon(True)
# Thread list of circulation
tc = [0 for i in range(NumOfThread)] #The value is the next thread No.
for i in range(NumOfThread):
tc[i] = i+1
tc[i] = 0 # make a list of circulation
lastThread = i #Last thread used.Next is tc[lastThread]Use the second thread.
#worker thread start
for i in range(NumOfThread):
t[i][1].start()
#Wait for wait state after starting worker thread
yetAllThread = True
while yetAllThread:
for i in range(NumOfThread):
if t[i][3] == ' ':
break
else:
usleep(100) #Monitoring interval is 0.1 millisecond
if i == NumOfThread-1:
yetAllThread = False
else:
usleep(100) #Monitoring interval is 0.1 millisecond
#MongoDB trigger thread generation
t_mongo = threading.Thread(name='t_mongo', target=TriggerMongo, args=(t,tc,lastThread,searchInterval,mongoHost,mongoPort,mongoDb,mongoCol,))
t_mongo.setDaemon(True)
t_mongo.start() #start
#main thread
while True:
time.sleep(mainSleepInterval)
jobWorker.py is the thread that does the actual processing. This is a sample that only returns parrots according to the type of content to be sent. Please note that the method of taking MID (from) differs depending on the opType.
jobWorker.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Note of caution:
# - This program is one of the threads.
# - Must not do exit(),quit()
# - Please use only return()
# Settings of the LINE API Server
lineApiHost = "trialbot-api_line_me"
accessIntervalMS = 100 # ms
getProfilesUrl = "https://trialbot-api.line.me/v1/profiles"
postUrl = "https://trialbot-api.line.me/v1/events"
getContentUrl = "https://trialbot-api.line.me/v1/bot/message/$messageId/content"
header = {
"Content-Type" : "application/json; charser=UTF-8",
"X-Line-ChannelID" : "xxxxxxxxxx",
"X-Line-ChannelSecret" : "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"X-Line-Trusted-User-With-ACL": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
}
postBodyTemplate = {
"to" : [],
"toChannel" : 1383378250,
"eventType" : "138311608800106203",
"content" : {}
}
import threading
import time
import datetime
import json
usleep = lambda x: time.sleep(x/1000000.0) #Microsecond sleep
#Lock related to avoid simultaneous access to LINE API server from multiple jobWorker threads
globalLock = {} #Lock for each connection destination
globalLastAccessTime = {} #Last access time for each connection destination
loadTime = int(time.time()*1000)
def jobWorker(verbose, mynum, recvBody):
global globalLock
global globalLastAccessTime
#Initial lock settings for each connection destination
if not globalLock.has_key(lineApiHost):
globalLock[lineApiHost] = threading.Lock()
#Initial setting of last access time for each connection destination
if not globalLastAccessTime.has_key(lineApiHost):
globalLastAccessTime[lineApiHost] = loadTime
if verbose:
recvBody['_id'] = 'ObjectId("'+str(recvBody['_id'])+'")'
print 'worker['+mynum+'] recvBody: '+str(int(time.time()*1000)-recvBody['acceptTime'])+' ms to here from accept'
print recvBody
opType = recvBody['content'].get('opType')
# blocked from user
if opType == 8:
#MID of block user from user management(recvBody['content']['params'][0])Delete
print 'please delete user "'+recvBody['content']['params'][0]+'" from management data.'
return
#Copy the Body part of POST
postBody = {}
postBody['to'] = ''
postBody['toChannel'] = postBodyTemplate['toChannel']
postBody['eventType'] = postBodyTemplate['eventType']
postBody['content'] = {}
#Message reply destination
if opType==4: # New user
postBody['to'] = [ recvBody['content']['params'][0] ]
else:
postBody['to'] = [ recvBody['content']['from'] ]
# New user
if opType==4:
#Get user profile
result = apiServer(verbose, mynum, 'get', lineApiHost, getProfilesUrl, header, postBody['to'][0], accessIntervalMS)
userProfile = json.loads(result.text)
resText = 'Welcome!'
#Profile should be added to the user management DB with the user's MID
print 'please add '+userProfile['contacts'][0]['displayName']+'\'s profile to user management db.'
print json.dumps(userProfile, sort_keys = True, indent=4)
#Processing according to the message
contentType = recvBody['content'].get('contentType')
resText = ''
if contentType == 1: # Text
resText = u'Yes,'+recvBody['content']['text']+u',is not it.'
elif contentType == 2: # Image
resText = u'It's a photo...'
elif contentType == 3: # Video
resText = u'It's a video...'
elif contentType == 4: # Audio
resText = u'It's a voice message...'
elif contentType == 7: # Location
resText = u'It's location information...'
if verbose:
print recvBody['content']['text'].encode('utf-8')
print recvBody['content']['location']['title'].encode('utf-8')
print recvBody['content']['location']['address'].encode('utf-8')
elif contentType == 8: # Sticker
resText = u'It's a stamp'
if verbose:
print recvBody['content']['contentMetadata']['STKTXT'].encode('utf-8')
elif contentType == 10: # Contact
# Contact(contentType==10)Get the mid profile of contentMetadata
resText = recvBody['content']['contentMetadata']['displayName']+u'It's your contact information'
result = apiServer(verbose, mynum, 'get', lineApiHost, getProfilesUrl, header, recvBody['content']['contentMetadata']['mid'], accessIntervalMS)
contactProfile = json.loads(result.text)
if verbose:
print '\ncontactProfile: ' + str(contactProfile)
#Send response message
if resText:
#Get user profile(Originally registered in the DB at the time of user registration, acquired as needed)
result = apiServer(verbose, mynum, 'get', lineApiHost, getProfilesUrl, header, postBody['to'][0], accessIntervalMS)
userProfile = json.loads(result.text)
resText = userProfile['contacts'][0]['displayName'] + u'Mr.' + resText
if verbose:
print '\nprofile: ' + str(userProfile)
#Outgoing message is text(ContentType=1).Besides, Image,Video,Audio,Location,Sticker,multiple messages,You can send rich messages
postBody['content'] = {
'contentType': 1,
'toType' : 1,
'text' : resText
}
if verbose:
print '\nworker['+mynum+'] ' + postUrl
print 'worker['+mynum+'] postHeader: ' + json.dumps(header, sort_keys = True, indent = 4)
print 'worker['+mynum+'] postBody: ' + json.dumps(postBody, sort_keys = True, indent = 4)
#Send Messege
r = apiServer(verbose, mynum, 'post', lineApiHost, postUrl, header, postBody, accessIntervalMS)
return
#LINE API server access
def apiServer(verbose, mynum, method, host, url, header, body, accessIntervalMS):
import requests
global globalLock
global globalLastAccessTime
globalLock[host].acquire() # Lock
#How long should I wait if I have a certain amount of time to access the LINE API server?
currentTime = int(time.time()*1000)
remain = accessIntervalMS - (currentTime - globalLastAccessTime[host])
if verbose:
print 'worker['+mynum+'] time since last access(ms): '+str(currentTime - globalLastAccessTime[host])
print 'worker['+mynum+'] remain='+str(remain)+' ms'
# wait accessIntervalMS from last access
if remain > 0:
usleep(remain*1000)
if method=='get':
if body:
payload = { 'mids': body }
r = requests.get(url, params=payload, headers=header)
else:
if verbose:
print url, header
r = requests.get(url, headers=header)
else:
r = requests.post(url, data=json.dumps(body), headers=header)
if verbose and r.status_code!=200:
print 'worker['+mynum+'] HTTP status code: ' + str(r.status_code)
print 'worker['+mynum+'] response: ' + r.text
globalLastAccessTime[host] = int(time.time()*1000)
globalLock[host].release() # release
return r
I think we were able to implement the Dispatcher & jowWorker skeleton, a lightweight Queue mechanism that can be used even when a large number of messages come. In the initial state of 64bit CentOS 7, the upper limit of the number of threads in the entire system is 30118, but if it is ..5000 threads, the generation will fail. (... I don't need that much) Such a mechanism is necessary not only for BOT servers but also for efficient delivery of large amounts of mail using multiple SMTP servers.
If you want to make the jobWorker side a different language application, you can use it by making it a microservice or changing it to communicate with pipe in another process. If you want to distribute the load with this mechanism, you can make MongoDB another server or Sharding, or bring "4.consurring message" or later to another machine. If you want to distribute job workers more than that, it is better to make it a microservice or use another request broker mechanism.
Recommended Posts