J'ai une idée que les informations SNS telles que Twitter peuvent être utilisées pour surveiller le risque d'apparition de nouveaux clusters d'infection par le virus corona, et j'aimerais collecter des tweets par recherche de mots-clés tels que "La soirée potable d'aujourd'hui". Étant donné que l'API de recherche gratuite ne peut tracer les tweets qu'il y a une semaine, nous allons créer un mécanisme pour collecter automatiquement des données chaque jour, en envisageant la possibilité de les utiliser pour de futures recherches.
Si vous avez une bonne idée d'un mot de recherche pouvant être utilisé pour évaluer le risque de développer un nouveau cluster infecté par le virus corona, veuillez commenter! </ b>
Auteur https://developer.twitter.com/en/docs/twitter-api Site de commentaires très facile à comprendre https://gaaaon.jp/blog/twitterapi Il est triste que cet article n'ait même pas atteint le "code d'autosuffisance essayé" dans le lien ci-dessus, donc dans certains cas, l'article peut rester privé.
Exécutez la commande suivante nomikai_tweets.py
.
# coding: utf-8
# nomikai_tweets.py
import pandas as pd
import json
import schedule
from time import sleep
from requests_oauthlib import OAuth1Session
import datetime
from datetime import date, timedelta
import pytz
def convert_to_datetime(datetime_str):
"""
Conversion de format de date et d'heure
"""
tweet_datetime = datetime.datetime.strptime(datetime_str,'%a %b %d %H:%M:%S %z %Y')
return(tweet_datetime)
def job():
"""
Programme qui se répète en main
"""
#Mot-clé de recherche à l'exclusion des retweets
keyword = "Boire à exclure aujourd'hui:retweets"
#Enregistrer le répertoire Créer d'abord
DIR = 'nomikai/'
#Informations obtenues par communication API et enregistrement des développeurs
Consumer_key = 'bT*****************'
Consumer_secret = 've*****************'
Access_token = '25*****************'
Access_secret = 'NT*****************'
url = "https://api.twitter.com/1.1/search/tweets.json"
twitter = OAuth1Session(Consumer_key, Consumer_secret, Access_token, Access_secret)
#Paramètres utilisés pour la collecte
max_id = -1
count = 100
params = {'q' : keyword, 'count' : count, 'max_id' : max_id, 'lang' : 'ja', 'tweet_mode' : 'extended'}
#Préparation pour comparer le traitement de la date utc et jst en japonais
today =datetime.datetime.now(pytz.timezone('Asia/Tokyo'))
today_beggining_of_day = today.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday_beggining_of_day = today_beggining_of_day - timedelta(days=1)
yesterday_str = datetime.datetime.strftime(yesterday_beggining_of_day, '%Y-%m-%d')
#Correspond à l'enregistrement dans l'instruction DF while qui stocke les informations de tweet
columns = ['time', 'user.id', 'user.location', 'full_text', 'user.followers_count', 'user.friends_count', 'user.description', 'id']
df = pd.DataFrame(index=[], columns=columns)
while(True):
if max_id != -1: #Revenez au tweet qui a déjà stocké l'identifiant du tweet
params['max_id'] = max_id - 1
req = twitter.get(url, params = params)
if req.status_code == 200: #Si tu peux l'obtenir normalement
search_timeline = json.loads(req.text)
if search_timeline['statuses'] == []: #Après avoir pris tous les tweets
break
for tweet in search_timeline['statuses']:
#Heure du Tweet UTC
tweet_datetime = convert_to_datetime(tweet['created_at'])
#Ignorer sinon le tweet d'hier
in_jst_yesterday = today_beggining_of_day > tweet_datetime >= yesterday_beggining_of_day
if not in_jst_yesterday: #Ignorer sinon le tweet d'hier
continue
#Magasin à DF
record = pd.Series([tweet_datetime,
tweet['user']['id'],
tweet['user']['location'],
tweet['full_text'],
tweet['user']['followers_count'],
tweet['user']['friends_count'],
tweet['user']['description'],
tweet['id'],
],index=df.columns)
df = df.append(record, ignore_index=True)
max_id = search_timeline['statuses'][-1]['id']
else: #Attendez 15 minutes si vous êtes bloqué dans les restrictions de fréquence d'accès
print("Total", df.shape[0], "tweets were extracted", sep=" ")
print('wainting for 15 min ...')
sleep(15*60)
#sauvegarder
df = df.set_index("time")
df.index = df.index.tz_convert('Asia/Tokyo')
df.to_pickle(DIR + yesterday_str + keyword +".pkl")
df.to_csv(DIR + yesterday_str + keyword +".csv")
print(today, "Total", df.shape[0], "tweets were extracted!\nnext start at 01:00 tommorow")
def main():
print("start at 01:00 tommorow")
#Courir à 01h00 tous les jours
schedule.every().day.at("01:00").do(job)
while True:
schedule.run_pending()
sleep(1)
if __name__ == '__main__':
main()
Je souhaite analyser dès que les données sont collectées. En effet, il n'est pas possible d'évaluer même les changements périodiques en fonction du jour de la journée en utilisant uniquement les données passées pour une semaine.
J'ai appris que je devais être prudent dans le traitement de l'heure japonaise et de l'heure standard afin de collecter des données tous les jours.
Notez que datetime.datetime.now ()
dépend de l'environnement dans lequel le programme s'exécute, donc exécuter cette source sur une machine dans un autre pays ne fonctionnera pas correctement. Il en va de même pour schedule.every (). Day.at (" 01: 00 "). Do (job)
.
Parmi les tweets qui incluaient le passé «aujourd'hui» et «boire un verre» qui pouvaient être extraits, «en ligne» était inclus dans environ 10%. De plus, de nombreux utilisateurs de Twitter n'aiment pas les buveurs d'entreprise.
Recommended Posts