Experiment to collect tweets for a long time (Program preparation (5))

Until last time

-[x] Required specifications: OK -[x] Top priority: OK -[x] Execution environment maintenance: OK -[] Demonstration of additional functions and operation -[] DM notifies error -[] Function to stop when the time comes

Note: Since I write in real time, I may derail or stagnate while writing.

Other functions added

Basically, it was confirmed that there is almost no problem in operation at present. In that case, the next thing to do is "to end when the deadline comes, to prevent forgetting while running" and "to avoid the situation where you do not notice the end of the error, contact by DM" * * Second priority ** Implementation of function.

Function to end when the time comes

When I read datetime.now () every time, I thought that there was no trick, but I couldn't think of any other means, and there was no speed problem, so I kept it simple.

ExitLimit = datetime(2017, 1, 16, 0, 0, 0, 000000) - timedelta(hours=9)

With on_status (), which cuts the time like this and performs the processing at the time of acquisition,

        if(datetime.now() > ExitLimit):
            print("It's time to finish")
            self.send_message("Ends over time")
            self.StopFlg = True
            return False

Stop like this. The last one is (strictly) overtime acquisition, but no.

Function to notify error by DM

Looking at Stream.py of Tweepy, there is a method called API () itself, but it seems that the object obtained with this can not be used for DM transmission. That's why the main process is to call tweepy.API () to get the object.

    def send_message(self, message):
        if(message is None) or (self.API is None):
            return
        if(len(DM_DEST) != 0  and len(message) != 0):
            d = '{:%Y/%m/%d %H:%M:%S}'.format(datetime.now() + timedelta(hours=9))
            mes = "[TWEET CRAWLER]\n" + d + "\n" + message
            
            self.API.send_direct_message(screen_name=DM_DEST, text=mes.encode('utf8'))
            print('DM transmission:' + message)
        else:
            print('Did not send DM')
        return

Let's create a method like this and call it when an event occurs.

Other concerns

What if the DB goes down?

I don't think it's possible that only MongoDB will fall dexterously, but just in case. If it fails, write it to a text file.

    def insert(self, json_src):
        #Insert JSON data into Mongo.
        try:
            self.Collection.insert(json_src)        #Store here
            self.dbErrCnt = 0
        except pymongo.errors.PyMongoError as exc:
            #mongoDB mystery stop
            self.dbErrCnt += 1
            print('error' + str(self.dbErrCnt) + 'Time')
            if(self.dbErrCnt < 3):
                self.send_message('MongoDB error occurred.')   #Measures against multiple same content repeated hits
                #dir(exc)

            with open('MongoInsertError.txt','a') as f:
                f.write(str(json_src) + '\n')
                print('Reprocessing failed, wrote to file')
                f.close()

It's almost like this. I searched for a function to check if the connection was alive, but I didn't feel like this, so I omitted it. By the way, if MongoDB is revived, it will be reconnected without any need to reconfigure and data storage will be resumed, so that's probably the case (probably not).

How to specify keywords?

Since the event (code word) </ sub> is called by multiple keywords, it is necessary to specify it with OR of multiple keywords.

If you set the area, you will be able to cover it. Some have both hashtags and untagged. According to the explanation of Twitter Streaming API, Japanese untagged keywords will not hit unless they are separated by spaces etc., so it is okay to have no untagged keywords ...

TRACKWORD = ['# D51, #locomotive, #steam locomotive, #hill steam, #locomotive, locomotive, steam locomotive'] Specify it like this (keywords are tampered with appropriately) </ sub>

stream.filter(track=TRACKWORD)

The call is fixed like this. If you enter too many keywords, you will be worried about spam posts. In fact, when you check with the keywords for production, there are a lot of auction bots, and there are also things like "Even if you say save everything, this is annoying" (because it is the quietest time now). ) I understand well.

Since the bot uses a specific client, it seems that you can refer to the key "souce" in the acquired json and delete it all at once. Well, the "number of bot posts" is likely to be a source, so I will omit the deletion later.

Production source

Here is the completed version made by a Python amateur in a month (about a week of actual work).

TweetCrawler.py


#!/usr/bin/env python
# -*- coding:utf-8 -*-

import tweepy
from tweepy.api import API

import pymongo
from pymongo import MongoClient
import json

from datetime import datetime, timedelta

import sys

#Twitter access related variables
CK = ''                            # Consumer Key
CS = ''   # Consumer Secret
AT = ''   # Access Token
AS = ''        # Accesss Token Secert

TRACKWORD = ['#(Argon)']  #Keywords when operating Public Stream Filter

DM_DEST = ''      #DM destination

#Variables related to MongoDB connection
HOST = 'mongo'      #host
PORT = 27017            #port(Default:27017)
DB_NAME = 'TwitterDB'   #DB name
COL_NAME= 'Twitter'    #Collection name

ExitLimit = datetime(2017, 1, 16, 0, 0, 0, 000000) - timedelta(hours=9)

class Listener(tweepy.StreamListener):
    def __init__(self, api=None):   #constructor
        tweepy.StreamListener.__init__(self)    #Parent class constructor
        print('constructor')

        self.StopFlg = False          #Stop flag.
        self.mongo_init()

        self.API = None
        
        self.dbErrCnt = 0
        
        print(ExitLimit)

        return

    def mongo_init(self):           #MongoDB initialization
        try:
            Client = MongoClient(HOST, PORT)
            db = Client[DB_NAME]
            self.Collection = db[COL_NAME]
            print('DB ready')
        except pymongo.errors.PyMongoError as exc:
            #Connection error
            print('DB connection error')
        return

    def on_status(self, status):
        #print('Tweet(' + str(self.TweetCnt) + ')')
        
        self.insert(status._json)
        
        if(datetime.now() > ExitLimit):
            print("It's time to finish")
            self.send_message("Ends over time")
            self.StopFlg = True
            return False

        return True

    def on_error(self, status_code):
        print('Error occurred: ' + str(status_code))
        self.send_message("ERR:" + str(status_code))
        return True

    def on_connect(self):
        print('Connected')

        self.send_message('Connected')
        return

    def on_disconnect(self, notice):
        print('Disconnected:' + str(notice.code))
        self.send_message("DISCCONECT:" + str(notice.code))
        return

    def on_limit(self, track):
        print('Receive limit has occurred:' + str(track))
        self.send_message("RCV_LIMIT:" + str(track))
        return

    def on_timeout(self):
        print('time out')
        self.send_message("TIMEOUT")
        return True

    def on_warning(self, notice):
        print('Warning message:' + str(notice.message))
        self.send_message("WARN:" + str(notice.message))
        return

    def on_exception(self, exception):
        print('Exception error:' + str(exception))
        self.send_message("EXCEPTION:" + str(exception))
        return
        
    def send_message(self, message):
        #Method to send DM
        if(message is None) or (self.API is None):
            return
        if(len(DM_DEST) != 0  and len(message) != 0):
            d = '{:%Y/%m/%d %H:%M:%S}'.format(datetime.now() + timedelta(hours=9))
            mes = "[TWEET CRAWLER]\n" + d + "\n" + message
            
            self.API.send_direct_message(screen_name=DM_DEST, text=mes.encode('utf8'))
            print('DM transmission:' + message)
        else:
            print('Did not send DM')
        return

    def insert(self, json_src):
        #JSON input to Mongo.
        try:
            self.Collection.insert(json_src)        #Store here
            self.dbErrCnt = 0
        except pymongo.errors.PyMongoError as exc:
            #Error occurred
            self.dbErrCnt += 1
            print('error' + str(self.dbErrCnt) + 'Time')
            if(self.dbErrCnt < 3):
                self.send_message('MongoDB error occurred.')   #Prevent multiple notifications with the same content

            #Export to a file
            with open('MongoInsertError.txt','a') as f:
                f.write(str(json_src) + '\n')
                print('Reprocessing failed, wrote to file')
                f.close()

#Main processing from here
auth = tweepy.OAuthHandler(CK, CS)
auth.set_access_token(AT, AS)

ExitCode = 255

while (True):     #infinite loop
    try:
        listener = Listener()
        stream = tweepy.Stream(auth, listener)
        listener.API = tweepy.API(auth)

        #Select one and uncomment it.
        print(TRACKWORD)
        stream.filter(track=TRACKWORD)
        #stream.sample()
        #stream.userstream()

        #Stop judgment by stop flag
        if(listener.StopFlg == True):
            ExitCode = 0
            break

    except KeyboardInterrupt:
        # CTRL+C
        print('CTRL +End with C.')
        ExitCode = 0
        break
    except:
        print('Error Exit')
        pass    #Ignore all exceptions and loop

sys.exit(ExitCode)

It's a source that even amateurs can only get rid of, but it has to be moved in (blurring) </ sub> for a few minutes and a few days, and even if it is moved for a certain period of time, no error will occur, so it's scary, but this is scary. I will move it with.

$ nohup python TweetCrawler.py >normal.log 2>error.log &

It's going to be a little bit like this.

Urgent recruitment

** The highlights and the points to be reworked. ** **

(Continue.)

Recommended Posts

Experiment to collect tweets for a long time (Program preparation (3))
Experiment to collect tweets for a long time (Program preparation (1))
Experiment to collect tweets for a long time (program preparation (2))
Experiment to collect tweets for a long time (Program preparation (5))
Experiment to collect tweets for a long period of time (aggregation & content confirmation)
A study method for beginners to learn time series analysis
I made a program to collect images in tweets that I liked on twitter with Python
I want to create a Dockerfile for the time being.
[Profile] Identify where the program is taking a long time (google-perftool)
[Python] It was very convenient to use a Python class for a ROS program.
How to stop a program in python until a specific date and time
I tried to create a linebot (preparation)
Introduction to discord.py (1st day) -Preparation for discord.py-
A simple workaround for bots to try to post tweets with the same content
It takes a long time to shut down in CentOS 7 with LVM configuration.