slack-exporter/exporter.py
2020-12-29 16:37:57 -06:00

280 lines
9.8 KiB
Python

import os
import sys
import requests
import json
from datetime import datetime
import argparse
from dotenv import load_dotenv
load_dotenv(os.path.join(os.path.dirname(__file__), '.env'))
# pagination handling
def get_at_cursor(url, params, cursor=None):
if cursor is not None:
params['cursor'] = cursor
r = requests.get(url, params=params)
if r.status_code != 200:
print("ERROR: %s %s" % (r.status_code, r.reason))
sys.exit(1)
d = r.json()
try:
if d['ok'] is False:
print("I encountered an error: %s" % d)
sys.exit(1)
next_cursor = None
if 'response_metadata' in d and 'next_cursor' in d['response_metadata']:
next_cursor = d['response_metadata']['next_cursor']
if str(next_cursor).strip() == '':
next_cursor = None
return next_cursor, d
except KeyError as e:
print("Something went wrong: %s." % e)
return None, []
def paginated_get(url, params, combine_key=None):
next_cursor = None
result = []
while True:
next_cursor, data = get_at_cursor(url, params, cursor=next_cursor)
result.extend(data) if combine_key is None else result.extend(data[combine_key])
if next_cursor is None:
break
return result
# GET requests
def channel_list(team_id=None):
params = {
'token': os.environ['SLACK_USER_TOKEN'],
'team_id': team_id,
'types': 'public_channel,private_channel,mpim,im',
'limit': 200
}
return paginated_get('https://slack.com/api/conversations.list', params, combine_key='channels')
def channel_history(channel_id):
params = {
'token': os.environ['SLACK_USER_TOKEN'],
'channel': channel_id,
'limit': 200
}
return paginated_get('https://slack.com/api/conversations.history', params, combine_key='messages')
def user_list(team_id=None):
params = {
'token': os.environ['SLACK_USER_TOKEN'],
'limit': 200,
'team_id': team_id
}
return paginated_get('https://slack.com/api/users.list', params, combine_key='members')
def channel_replies(timestamps, channel_id):
replies = []
for timestamp in timestamps:
params = {
'token': os.environ['SLACK_USER_TOKEN'],
'channel': channel_id,
'ts': timestamp,
'limit': 200
}
replies.append(paginated_get('https://slack.com/api/conversations.replies', params, combine_key='messages'))
return replies
# parsing
def parse_channel_list(channels, users):
result = ''
for channel in channels:
ch_id = channel['id']
ch_name = channel['name'] if 'name' in channel else ''
ch_private = 'private ' if 'is_private' in channel and channel['is_private'] else ''
if 'is_im' in channel and channel['is_im']:
ch_type = 'direct_message'
elif 'is_mpim' in channel and channel['is_mpim']:
ch_type = 'multiparty-direct_message'
elif 'group' in channel and channel['is_group']:
ch_type = 'group'
else:
ch_type = 'channel'
if 'creator' in channel:
ch_ownership = 'created by %s' % name_from_uid(channel['creator'], users)
elif 'user' in channel:
ch_ownership = 'with %s' % name_from_uid(channel['user'], users)
else:
ch_ownership = ''
ch_name = ' %s:' % ch_name if ch_name.strip() != '' else ch_name
result += '[%s]%s %s%s %s\n' % (ch_id, ch_name, ch_private, ch_type, ch_ownership)
return result
def name_from_uid(user_id, users, real=False):
for user in users:
if user['id'] == user_id:
return user['real_name'] if real else user['name']
return '[null user]'
def name_from_ch_id(channel_id, channels):
for channel in channels:
if channel['id'] == channel_id:
return (channel['user'], 'Direct Message') if 'user' in channel else (channel['name'], 'Channel')
return '[null channel]'
def parse_user_list(users):
result = ''
for u in users:
entry = '[%s] %s (%s), %s' % (u['id'], u['name'], u['real_name'], u['tz'])
u_type = ''
if 'is_admin' in u and u['is_admin']:
u_type += 'admin|'
if 'is_owner' in u and u['is_owner']:
u_type += 'owner|'
if 'is_primary_owner' in u and u['is_primary_owner']:
u_type += 'primary_owner|'
if 'is_restricted' in u and u['is_restricted']:
u_type += 'restricted|'
if 'is_ultra_restricted' in u and u['is_ultra_restricted']:
u_type += 'ultra_restricted|'
if 'is_bot' in u and u['is_bot']:
u_type += 'bot|'
if 'is_app_user' in u and u['is_app_user']:
u_type += 'app_user|'
u_type = u_type[:-1] if u_type.endswith('|') else u_type
entry += ', ' if u_type.strip() != '' else ''
entry += '%s\n' % u_type
result += entry
return result
def parse_channel_history(msgs, users, check_thread=False):
if 'messages' in msgs:
msgs = msgs['messages']
messages = [x for x in msgs if x['type'] == 'message'] # files are also messages
body = ''
for msg in messages:
if 'user' in msg:
usr = {'name': name_from_uid(msg['user'], users), 'real_name': name_from_uid(msg['user'], users, True)}
else:
usr = {'name': '', 'real_name': 'none'}
timestamp = datetime.fromtimestamp(round(float(msg['ts']))).strftime('%m-%d-%Y %H:%M:%S')
text = msg['text'] if msg['text'].strip() != "" else "[no message content]"
for u in [x['id'] for x in users]:
text = str(text).replace('<@%s>' % u, '<@%s> (%s)' % (u, name_from_uid(u, users)))
entry = "Message at %s\nUser: %s (%s)\n%s" % (timestamp, usr['name'], usr['real_name'], text)
if 'reactions' in msg:
rxns = msg['reactions']
entry += "\nReactions: " + ', '.join('%s (%s)' % (x['name'], ', '.join(
name_from_uid(u, users) for u in x['users'])) for x in rxns)
if 'files' in msg:
files = msg['files']
entry += "\nFiles:\n" + '\n'.join(' - %s, %s' % (f['name'], f['url_private_download']) for f in files)
entry += '\n\n%s\n\n' % ('*' * 24)
if check_thread and 'parent_user_id' in msg:
entry = '\n'.join('\t%s' % x for x in entry.split('\n'))
body += entry.rstrip('\t') # get rid of any extra tabs between trailing newlines
return body
def parse_replies(threads, users):
body = ''
for thread in threads:
body += parse_channel_history(thread, users, check_thread=True)
body += '\n'
return body
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-o', help="Directory in which to save output files (if left blank, prints to stdout)")
parser.add_argument('--lc', action='store_true', help="List all conversations in your workspace")
parser.add_argument('--lu', action='store_true', help="List all users in your workspace")
parser.add_argument('--json', action='store_true', help="Give the requested output in raw JSON format (no parsing)")
parser.add_argument('-c', action='store_true', help="Get history for all accessible conversations")
parser.add_argument('-r', action='store_true', help="Get reply threads for all accessible conversations")
a = parser.parse_args()
ts = str(datetime.strftime(datetime.now(), '%m-%d-%Y_%H%M%S'))
def save(data, filename):
if a.o is None:
print(data)
else:
out_dir_parent = os.path.abspath(os.path.expanduser(os.path.expandvars(a.o)))
out_dir = os.path.join(out_dir_parent, 'slack_export_%s' % ts)
filename = filename + '.json' if a.json else filename + '.txt'
os.makedirs(out_dir, exist_ok=True)
full_filepath = os.path.join(out_dir, filename)
print("Writing output to %s" % full_filepath)
with open(full_filepath, mode='w') as f:
if a.json:
json.dump(data, f, indent=4)
else:
f.write(data)
def save_replies(channel_hist, channel_id, users):
ch_replies = channel_replies([x['ts'] for x in channel_hist if 'reply_count' in x], channel_id)
if a.json:
data_replies = ch_replies
else:
ch_name, ch_type = name_from_ch_id(ch_id, ch_list)
header_str = 'Threads in %s: %s\n%s Messages' % (ch_type, ch_name, len(ch_replies))
data_replies = parse_replies(ch_replies, users)
sep = '=' * 24
data_replies = '%s\n%s\n\n%s' % (header_str, sep, data_replies)
save(data_replies, 'channel-replies_%s' % channel_id)
if a.lc:
data = channel_list() if a.json else parse_channel_list(channel_list(), user_list())
save(data, 'channel_list')
if a.lu:
data = user_list() if a.json else parse_user_list(user_list())
save(data, 'user_list')
if a.c:
ch_list = channel_list()
users = user_list()
for ch_id in [x['id'] for x in ch_list]:
ch_hist = channel_history(ch_id)
if a.json:
data_ch = ch_hist
else:
data_ch = parse_channel_history(ch_hist, users)
ch_name, ch_type = name_from_ch_id(ch_id, ch_list)
header_str = '%s Name: %s' % (ch_type, ch_name)
sep = '=' * 24
data_ch = 'Channel ID: %s\n%s\n%s Messages\n%s\n\n' % (ch_id, header_str, len(ch_hist), sep) + data_ch
save(data_ch, 'channel_%s' % ch_id)
if a.r:
save_replies(ch_hist, ch_id, users)
# elif, since we want to avoid asking for channel_history twice
elif a.r:
for ch_id in [x['id'] for x in channel_list()]:
save_replies(channel_history(ch_id), ch_id, user_list())