It's a Python version of RokcetChat's REST API.
It is necessary to use it properly for Public and Private (Immediately ...). So I tried to make it possible for users to get information without being aware of how to use it properly.
In principle Due to the presence of response
--Public channel → private API --private channel → API for public
When it becomes a combination of, depending on the presence or absence of response By letting the storage process go through I tried to make it somehow.
#!/opt/anaconda3/bin/python3
# -*- coding: utf-8 -*-
'''RocketChat Channel maintenance
Manage Rocket Chat channels
Todo:
*Only Redmine and RocketChat yet. Make the same for other OSS
def __init__(self, HEADERS, URL):
def _getChannelPublicMap(self):
def _getChannelPrivateMap(self):
def exchangeMapkeyToList(self, map):
def getChannelMap(self):
def getChannelUserMap(self, list_channelname):
def getDifftimeLastUpdateSec(self, _targetTime):
def _getChannel_id(self, channelname):
def sendMessageToRocketChat(self, channel, msg):
def closeTargetChannel(self, roomname):
def _ISOtimeToDatetime(self, target):
def _CreateMapFromChannelIDtoChannelname(self):
def _CreateMapFromChannelnameToChannelID(self, self._CreateMapFromChannelIDtoChannelname()):
def _judgeRocketChatMessage(self, target_date, limit):
def _JudgeDeleteChannelMessages(self, roomname, LIMIT):
def JudgeDeleteChannelMessages(self, LIMIT):
'''
################################################
# library
################################################
import dateutil
import json
import pandas as pd
import requests
import sys
from datetime import date
from datetime import datetime
from datetime import timedelta
from dateutil import parser
from pprint import pprint
from pytz import timezone
################################################
#Get environment variables
################################################
################################################
# RocketChatChannelManager
################################################
class RocketChatChannelManager(object):
def __init__(self, HEADERS, URL):
'''Format to call REST
of class__init__processing
Share the URL with HEADERS to call with REST API.
Unlike the RedmineXXXXXManager class, the instance is
Do not generate.
'''
#Argument check type
if not isinstance(HEADERS, dict):
print(f'Arguments: HEADERS type is incorrect dict<-> {type(HEADERS)}')
raise TypeError
#Argument check type
if not isinstance(URL, str):
print(f'Arguments: URL type is incorrect str<-> {type(URL)}')
raise TypeError
#Parameter sharing
self.HEADERS = HEADERS
self.URL = URL
def _getChannelPublicMap(self):
'''List of public channels and last updated map
Create a map of public channel names and channel last update times.
Args:
Returns:
map:Map of public channel names and last updated times
Raises:
API runtime error
Examples:
>>> map = self._getChannelPublicMap()
Note:
It is said that the acquisition function is different between public and private. .. ..
'''
#Result storage
_map = {}
#API definition
API = f'{self.URL}/api/v1/channels.list'
#Acquisition process
response = None
try:
response = requests.get(
API,
headers=self.HEADERS,)
except Exception as e:
print(f'API execution error: {API}')
print(f'Error: {e}')
return False
else:
for l in response.json()['channels']:
_map[l['name']] = l['_updatedAt']
#Returns map
return _map
def _getChannelPrivateMap(self):
'''List of private channels and last modified time map
Create a map of your private name and channel last update time.
Args:
Returns:
map:Private channel name and last updated time map
Raises:
API runtime error
Examples:
>>> map = self._getChannelPrivateMap()
Note:
It is said that the acquisition function is different between public and private. .. ..
'''
#Result storage
_map = {}
#API definition
API = f'{self.URL}/api/v1/groups.listAll'
#Acquisition process
try:
response = requests.get(
API,
headers=self.HEADERS,)
except Exception as e:
print(f'API execution error: {API}')
print(f'Error: {e}')
return False
finally:
for l in response.json()['groups']:
_map[l['name']] = l['_updatedAt']
#Returns map
return _map
def exchangeMapkeyToList(self, map):
'''Generate list with map key as element
It's a bit annoying conversion, so I created it as a helper function
'''
#Argument check type
if not isinstance(map, dict):
print(f'Arguments: map type is incorrect dict<-> {type(map)}')
raise TypeError
#container
_list = []
#map loop
for key in map.keys():
_list.append(key)
return _list
def getChannelMap(self):
'''Get channel list and last updated time of channel
Process both public and private channels together
Args:
Returns:
map:Map of channel name and user list
Raises:
API runtime error
Examples:
>>> map_ = R.getChannelMap()
Note:
self._getChannelPubliclist()
self._getChannelPrivatelist()
Get public and private together
'''
# public,Get each private
_map_public = self._getChannelPublicMap()
_map_private = self._getChannelPrivateMap()
#Combine and return maps
if ((_map_public) and (_map_private)):
_map_public.update(_map_private)
return _map_public
#For public channel only
elif _map_public :
return _map_public
#For private channel only
elif _map_private :
return _map_private
else:
return {}
def getChannelUserMap(self, list_channelname):
'''List of registered IDs of specified channels
List users belonging to the channel stored in list
Returns a map of the channel name and the list of participating users
Public and private are implemented together
Args:
list_channelname(list):List of channel names to be searched
Returns:
map:Map of affiliated user list with channel name as Key
Raises:
API runtime error
Examples:
>>> map = getChannelUserMap(['aaaa','bbbb'])
Note:
'''
#Argument check type
if not isinstance(list_channelname , list):
print(f'Arguments: list_Incorrect type of channelname list<-> {type(list_channelname)}')
raise TypeError
#Map to store the entire result
_map = {}
#MSG send API definition
#Public and private implementation
APIS = [f'{self.URL}/api/v1/channels.members',
f'{self.URL}/api/v1/groups.members']
#It will not exceed 1000 people. .. .. From
COUNT = '1000'
#Loop on target channel name list
for channel in list_channelname:
#MSG assembly
msg = (('roomName', channel),('count',COUNT),)
#API issuance
for api in APIS:
try:
response = requests.get(
api,
params=msg,
headers=self.HEADERS,)
except Exception as e:
print(f'API execution error: {API}')
print(f'Error: {e}')
return False
else:
#List that stores users
_list = []
#Store only when results are obtained
if response:
#Generate user list to belong to
for l in response.json()['members']:
_list.append(f'{l["username"]}')
#Store user list with channel name as Key in map
_map[channel] = _list
#Returns map
return _map
def getDifftimeLastUpdateSec(self, _targetTime):
'''Returns the seconds elapsed since the last update time
Public,Private can be specified individually
Args:
_targetTime(str):Time you want to compare ISO time format
Returns:
list:List that stores the user list
Raises:
API runtime error
Examples:
>>> list_AllUser = R.getAllUserList()
Note:
'''
#Argument check type
if not isinstance(_targetTime, str):
print(f'argument:_Incorrect type of targetTime str<-> {type(_targetTime)}')
raise TypeError
#Now time generated
jst_now = datetime.now(timezone('Asia/Tokyo'))
target = parser.parse(_targetTime).astimezone(timezone('Asia/Tokyo'))
#Returns the difference between the current time and the target time in seconds
return (jst_now - target).total_seconds()
def _getChannel_id(self, channelname):
'''Channel name_Get id information
Get the channel ID from the channel name
RocketChat API uses channel ID instead of channel name
There are many cases that require it.
Args:
channelname:Channel name
Returns:
str:Channel ID for channel name
Raises:
API runtime error
Examples:
>>> R._getChannel_id('general')
Note:
'''
#Argument check type
if not isinstance(channelname, str):
print(f'Arguments: channel type is incorrect str<-> {type(channelname)}')
raise TypeError
#User information acquisition API definition
API = f'{self.URL}/api/v1/rooms.info'
#MSG assembly
msg = {'roomName': channelname,}
#MSG transmission
try:
response = requests.get(
API,
params=msg,
headers=self.HEADERS,)
except Exception as e:
print(f'API execution error: {API}')
print(f'Error: {e}')
return False
else:
if response.json()['success']:
return response.json()['room']['_id']
else:
return False
def sendMessageToRocketChat(self, channel, msg):
'''Send a message to the specified channel
Send a message to the specified channel
Args:
channel:Channel name
msg:Outgoing message
Returns:
Processing result,HTTP status code
Raises:
API runtime error
Examples:
'>>> R.getUser_id('geneal', 'Hello')
Note:
'''
#Argument check type
if not isinstance(channel, str):
print(f'Arguments: channel type is incorrect str<-> {type(channel)}')
raise TypeError
if not isinstance(msg, str):
print(f'Arguments: msg type is incorrect str<-> {type(msg)}')
raise TypeError
#MSG send API definition
API = f'{self.URL}/api/v1/chat.postMessage'
#MSG assembly
msg = {'channel': channel,
'text' : msg,}
#Execute only when the specified channel exists
if self._getChannel_id(channel):
#MSG transmission
try:
response = requests.post(
API,
data=json.dumps(msg),
headers=self.HEADERS,)
except Exception as e:
print(f'API execution error: {API}')
print(f'Error: {e}')
return False
else:
pprint(f'Status code: {response.status_code}')
return True
else:
print(f'The specified channel does not exist: {channel}')
return False
def closeTargetChannel(self, roomname):
'''Delete channels regardless of public or private
Delete the specified channel name
Args:
roomname(str):Channel name to delete
Returns:
Raises:
API runtime error
Examples:
>>> R.closeTargetChannel('Test channel')
Note:
It is not a specification to erase all at once, with targets one channel at a time
'''
#Argument check type
if not isinstance(roomname, str):
print(f'Arguments: roomname type is incorrect str<-> {type(roomname)}')
raise TypeError
#Delete API definition
#Conducted together without distinction between public and private
APIS = [f'{self.URL}/api/v1/channels.delete',
f'{self.URL}/api/v1/groups.delete']
#MSG assembly
msg = {'roomId': self._getChannel_id(roomname)}
#Perform channel deletion at once
for API in APIS:
try:
response = requests.post(API,
data=json.dumps(msg),
headers=self.HEADERS,)
except Exception as e:
print(f'API execution error: {API}')
print(f'Error: {e}')
else:
#Returns the processing code only when the result is obtained
if response:
return response.json()['success']
def _ISOtimeToDatetime(self, target):
'''JST conversion of ISO format time string and return as datetime type
Args:
target:str ISO format UTC time string
Returns:
datetime:After JST conversion
Raises:
API runtime error
Examples:
>>> self._ISOtimeToDatetime('2021-01-20T00:23:10.256Z')
Note:
'''
return parser.parse(target).astimezone(timezone('Asia/Tokyo'))
def _CreateMapFromChannelIDtoChannelname(self):
'''Generate a map with the channel name for the channel ID.
Generate a map with the channel ID as the key and the channel name as the Value.
The information returned from RocketChat is returned by the channel ID.
From the standpoint of giving back, there is a problem that it is difficult to understand if it is a channel ID.
Increase data convenience by replacing it with the channel name.
-> cf. _getChannel_id(self, channelname):Get the channel ID from the channel name
Args:
Returns:
map: Key:Channel ID, Value:Channel name
Raises:
API runtime error
Examples:
>>> _MapChannelIDtoChannelName = self._CreateMapFromChannelIDtoChannelname()
Note:
There is a concern about response if it is a mechanism to collect information by hitting the API each time.
Create a map in advance to improve conversion performance.
TODO:Channel name->You may also need to create a map of the channel ID.
'''
#Get the channel name by adding public and private
##Get channel names together using methods in Class
_map = self.getChannelMap()
#DataFrame generation to accumulate
channelMap = {}
#Processing loop
for key in _map.keys():
_key = self._getChannel_id(key)
channelMap[_key] = key
#Returns the accumulated result
return channelMap
def _CreateMapFromChannelnameToChannelID(self, self._CreateMapFromChannelIDtoChannelname()):
'''ChannelID->ChannelName using map of ChannelName->Generate ChannelID map
Key using internal capsule/Invert value
Args:
map: map ChannelID->Channel Key map
Returns:
map: map key/map with inverted value
Raises:
API runtime error
Examples:
>>> _map = self._CreateMapFromChannelnameToChannelID(self._CreateMapFromChannelIDtoChannelname())
Note:
'''
# ChannelID -> Channel NameMap
_map = self._CreateMapFromChannelIDtoChannelname
# ChannelID ->Channel Name Map Key/Invert Value
swap_map = {v: k for k, v in _map.items()}
return swap_map
def _judgeRocketChatMessage(self, target_date, limit):
'''Judge whether to save from the message creation date
Judge whether to store MSG for the period specified by limit and True/Returns False.
Date difference calculation is performed with datetime type support. Using a timedelta object
Judgment processing is performed for the difference date.
Args:
target_date:datetime Time data to be judged
limit :int RocketChat message retention period
Returns:
True/False:Boolean True Save, False Delete target
Raises:
API runtime error
Examples:
>>> self._judgeRocketChatMessage(target_datetime, 10)
Note:
'''
today = date.today()
diff_date = timedelta(limit)
return (today - target_date.date() > diff_date)
def _JudgeDeleteChannelMessages(self, roomname, LIMIT):
'''Generates data with the deletion determination flag set for messages that exceed the number of days exceeding LIMIT for roomname.
For the specified roomname, the number of days since the message was created
If the LIMIT is exceeded, add the deletion judgment flag
Generate a DataFrame.
Args:
roomname:str Search target channel name
LIMIT:int storage period (days)
Returns:
df: DataFrame: ['Channel','MSG_ID','Update time','Target to be deleted','MSG']
Raises:
API runtime error
Examples:
>>> _df = self._JudgeDeleteChannelMessages(key, LIMIT)
Note:
_CreateMapFromChannelIDtoChannelname
_MapChannelIDtoChannelName
_ISOtimeToDatetime
_judgeRocketChatMessage
'''
#Argument check type
if not isinstance(roomname, str):
print(f'Arguments: roomname type is incorrect str<-> {type(roomname)}')
raise TypeError
if not isinstance(LIMIT, int):
print(f'Argument: LIMIT type is incorrect int<-> {type(LIMIT)}')
raise TypeError
#MSG extraction API definition
#Conducted together without distinction between public and private
APIS = [f'{self.URL}/api/v1/channels.messages',
f'{self.URL}/api/v1/groups.messages']
#MSG assembly
channel_id = self._getChannel_id(roomname)
params = (
('roomId', channel_id),
)
##This way of writing fails
#params = (
# ('roomId', channel_id)
#)
#Conversion Map creation
_MapChannelIDtoChannelName = self._CreateMapFromChannelIDtoChannelname()
#Prepare containers for both patterns
_list = []
for API in APIS:
pprint(f'API={API}')
try:
response = requests.get(API,
headers=self.HEADERS,
params=params,)
except Exception as e:
print(f'API execution error: {API}')
print(f'Error: {e}')
else:
#Return the result like a log
#Returns the log only if the result is obtained
pprint(f'response={response}')
if response:
#Result check
pprint(response)
pprint(len(response.json()['messages']))
#The deletion target judgment result is embedded in the DataFrame and returned.
for _ in response.json()['messages']:
_list.append([_MapChannelIDtoChannelName[_['rid']],
_['_id'],
self._ISOtimeToDatetime(_['_updatedAt']),
self._judgeRocketChatMessage(self._ISOtimeToDatetime(_['_updatedAt']), LIMIT),
_['msg']])
#Return result as DataFrame
df= pd.DataFrame(_list)
df.columns = ['Channel','MSG_ID','Update time','Target to be deleted','MSG']
return df
def JudgeDeleteChannelMessages(self, LIMIT):
'''DataFrames with deletion target flags are combined into one DataFrame.
Submethod_Obtained from JudgeDeleteChannelMessages
Accumulate DataFrames obtained from both public and private.
Args:
LIMIT:int storage period (days)
Returns:
df:DataFrame Accumulated DataFrame
Raises:
API runtime error
Examples:
Note:
self._JudgeDeleteChannelMessages(key, LIMIT):Create a DataFrame with the deletion target flag
'''
if not isinstance(LIMIT, int):
print(f'Argument: LIMIT type is incorrect int<-> {type(LIMIT)}')
raise TypeError
#Get the channel name by adding public and private
##Get channel names together using methods in Class
_map = self.getChannelMap()
#DataFrame generation to accumulate
df = pd.DataFrame(index=[])
#Processing loop
for key in _map.keys():
_df = self._JudgeDeleteChannelMessages(key, LIMIT)
df = pd.concat([df, _df], axis=0)
#Returns the accumulated result
return df.reset_index(drop=True)
Recommended Posts