-[x] Required specifications: OK -[x] Top priority: OK -[x] Execution environment maintenance: OK -[] Demonstration of additional functions and operation -[] DM notifies error -[] Function to stop when the time comes
Note: Since I write in real time, I may derail or stagnate while writing.
Basically, it was confirmed that there is almost no problem in operation at present. In that case, the next thing to do is "to end when the deadline comes, to prevent forgetting while running" and "to avoid the situation where you do not notice the end of the error, contact by DM" * * Second priority ** Implementation of function.
When I read datetime.now () every time, I thought that there was no trick, but I couldn't think of any other means, and there was no speed problem, so I kept it simple.
ExitLimit = datetime(2017, 1, 16, 0, 0, 0, 000000) - timedelta(hours=9)
With on_status (), which cuts the time like this and performs the processing at the time of acquisition,
if(datetime.now() > ExitLimit):
print("It's time to finish")
self.send_message("Ends over time")
self.StopFlg = True
return False
Stop like this. The last one is (strictly) overtime acquisition, but no.
Looking at Stream.py of Tweepy, there is a method called API () itself, but it seems that the object obtained with this can not be used for DM transmission. That's why the main process is to call tweepy.API () to get the object.
def send_message(self, message):
if(message is None) or (self.API is None):
return
if(len(DM_DEST) != 0 and len(message) != 0):
d = '{:%Y/%m/%d %H:%M:%S}'.format(datetime.now() + timedelta(hours=9))
mes = "[TWEET CRAWLER]\n" + d + "\n" + message
self.API.send_direct_message(screen_name=DM_DEST, text=mes.encode('utf8'))
print('DM transmission:' + message)
else:
print('Did not send DM')
return
Let's create a method like this and call it when an event occurs.
I don't think it's possible that only MongoDB will fall dexterously, but just in case. If it fails, write it to a text file.
def insert(self, json_src):
#Insert JSON data into Mongo.
try:
self.Collection.insert(json_src) #Store here
self.dbErrCnt = 0
except pymongo.errors.PyMongoError as exc:
#mongoDB mystery stop
self.dbErrCnt += 1
print('error' + str(self.dbErrCnt) + 'Time')
if(self.dbErrCnt < 3):
self.send_message('MongoDB error occurred.') #Measures against multiple same content repeated hits
#dir(exc)
with open('MongoInsertError.txt','a') as f:
f.write(str(json_src) + '\n')
print('Reprocessing failed, wrote to file')
f.close()
It's almost like this. I searched for a function to check if the connection was alive, but I didn't feel like this, so I omitted it. By the way, if MongoDB is revived, it will be reconnected without any need to reconfigure and data storage will be resumed, so that's probably the case (probably not).
Since the event (code word) </ sub> is called by multiple keywords, it is necessary to specify it with OR of multiple keywords.
If you set the area, you will be able to cover it. Some have both hashtags and untagged. According to the explanation of Twitter Streaming API, Japanese untagged keywords will not hit unless they are separated by spaces etc., so it is okay to have no untagged keywords ...
TRACKWORD = ['# D51, #locomotive, #steam locomotive, #hill steam, #locomotive, locomotive, steam locomotive'] Specify it like this (keywords are tampered with appropriately) </ sub>
stream.filter(track=TRACKWORD)
The call is fixed like this. If you enter too many keywords, you will be worried about spam posts. In fact, when you check with the keywords for production, there are a lot of auction bots, and there are also things like "Even if you say save everything, this is annoying" (because it is the quietest time now). ) I understand well.
Since the bot uses a specific client, it seems that you can refer to the key "souce" in the acquired json and delete it all at once. Well, the "number of bot posts" is likely to be a source, so I will omit the deletion later.
Here is the completed version made by a Python amateur in a month (about a week of actual work).
TweetCrawler.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import tweepy
from tweepy.api import API
import pymongo
from pymongo import MongoClient
import json
from datetime import datetime, timedelta
import sys
#Twitter access related variables
CK = '' # Consumer Key
CS = '' # Consumer Secret
AT = '' # Access Token
AS = '' # Accesss Token Secert
TRACKWORD = ['#(Argon)'] #Keywords when operating Public Stream Filter
DM_DEST = '' #DM destination
#Variables related to MongoDB connection
HOST = 'mongo' #host
PORT = 27017 #port(Default:27017)
DB_NAME = 'TwitterDB' #DB name
COL_NAME= 'Twitter' #Collection name
ExitLimit = datetime(2017, 1, 16, 0, 0, 0, 000000) - timedelta(hours=9)
class Listener(tweepy.StreamListener):
def __init__(self, api=None): #constructor
tweepy.StreamListener.__init__(self) #Parent class constructor
print('constructor')
self.StopFlg = False #Stop flag.
self.mongo_init()
self.API = None
self.dbErrCnt = 0
print(ExitLimit)
return
def mongo_init(self): #MongoDB initialization
try:
Client = MongoClient(HOST, PORT)
db = Client[DB_NAME]
self.Collection = db[COL_NAME]
print('DB ready')
except pymongo.errors.PyMongoError as exc:
#Connection error
print('DB connection error')
return
def on_status(self, status):
#print('Tweet(' + str(self.TweetCnt) + ')')
self.insert(status._json)
if(datetime.now() > ExitLimit):
print("It's time to finish")
self.send_message("Ends over time")
self.StopFlg = True
return False
return True
def on_error(self, status_code):
print('Error occurred: ' + str(status_code))
self.send_message("ERR:" + str(status_code))
return True
def on_connect(self):
print('Connected')
self.send_message('Connected')
return
def on_disconnect(self, notice):
print('Disconnected:' + str(notice.code))
self.send_message("DISCCONECT:" + str(notice.code))
return
def on_limit(self, track):
print('Receive limit has occurred:' + str(track))
self.send_message("RCV_LIMIT:" + str(track))
return
def on_timeout(self):
print('time out')
self.send_message("TIMEOUT")
return True
def on_warning(self, notice):
print('Warning message:' + str(notice.message))
self.send_message("WARN:" + str(notice.message))
return
def on_exception(self, exception):
print('Exception error:' + str(exception))
self.send_message("EXCEPTION:" + str(exception))
return
def send_message(self, message):
#Method to send DM
if(message is None) or (self.API is None):
return
if(len(DM_DEST) != 0 and len(message) != 0):
d = '{:%Y/%m/%d %H:%M:%S}'.format(datetime.now() + timedelta(hours=9))
mes = "[TWEET CRAWLER]\n" + d + "\n" + message
self.API.send_direct_message(screen_name=DM_DEST, text=mes.encode('utf8'))
print('DM transmission:' + message)
else:
print('Did not send DM')
return
def insert(self, json_src):
#JSON input to Mongo.
try:
self.Collection.insert(json_src) #Store here
self.dbErrCnt = 0
except pymongo.errors.PyMongoError as exc:
#Error occurred
self.dbErrCnt += 1
print('error' + str(self.dbErrCnt) + 'Time')
if(self.dbErrCnt < 3):
self.send_message('MongoDB error occurred.') #Prevent multiple notifications with the same content
#Export to a file
with open('MongoInsertError.txt','a') as f:
f.write(str(json_src) + '\n')
print('Reprocessing failed, wrote to file')
f.close()
#Main processing from here
auth = tweepy.OAuthHandler(CK, CS)
auth.set_access_token(AT, AS)
ExitCode = 255
while (True): #infinite loop
try:
listener = Listener()
stream = tweepy.Stream(auth, listener)
listener.API = tweepy.API(auth)
#Select one and uncomment it.
print(TRACKWORD)
stream.filter(track=TRACKWORD)
#stream.sample()
#stream.userstream()
#Stop judgment by stop flag
if(listener.StopFlg == True):
ExitCode = 0
break
except KeyboardInterrupt:
# CTRL+C
print('CTRL +End with C.')
ExitCode = 0
break
except:
print('Error Exit')
pass #Ignore all exceptions and loop
sys.exit(ExitCode)
It's a source that even amateurs can only get rid of, but it has to be moved in (blurring) </ sub> for a few minutes and a few days, and even if it is moved for a certain period of time, no error will occur, so it's scary, but this is scary. I will move it with.
$ nohup python TweetCrawler.py >normal.log 2>error.log &
It's going to be a little bit like this.
** The highlights and the points to be reworked. ** **
(Continue.)
Recommended Posts