Fixed formatting

This commit is contained in:
Seb Seager 2021-11-18 13:51:48 -05:00
parent d7f5fa5db1
commit 4b7fea7ec1
4 changed files with 331 additions and 196 deletions

165
bot.py
View file

@ -8,32 +8,34 @@ from dotenv import load_dotenv
from exporter import parse_replies, parse_channel_history
app = Flask(__name__)
load_dotenv(os.path.join(app.root_path, '.env'))
load_dotenv(os.path.join(app.root_path, ".env"))
# chat write interactions
def post_response(response_url, text):
requests.post(response_url, json={'text': text})
requests.post(response_url, json={"text": text})
# pagination handling
def get_at_cursor(url, params, response_url, cursor=None):
if cursor is not None:
params['cursor'] = cursor
params["cursor"] = cursor
r = requests.get(url, params=params)
data = r.json()
try:
if data['ok'] is False:
post_response(response_url, "I encountered an error: %s" % data['error'])
if data["ok"] is False:
post_response(response_url, "I encountered an error: %s" % data["error"])
next_cursor = None
if 'response_metadata' in data and 'next_cursor' in data['response_metadata']:
next_cursor = data['response_metadata']['next_cursor']
if str(next_cursor).strip() == '':
if "response_metadata" in data and "next_cursor" in data["response_metadata"]:
next_cursor = data["response_metadata"]["next_cursor"]
if str(next_cursor).strip() == "":
next_cursor = None
return next_cursor, data
@ -49,9 +51,13 @@ def paginated_get(url, params, response_url, combine_key=None):
while True:
next_cursor, data = get_at_cursor(url, params, response_url, cursor=next_cursor)
try:
result.extend(data) if combine_key is None else result.extend(data[combine_key])
result.extend(data) if combine_key is None else result.extend(
data[combine_key]
)
except KeyError:
post_response(response_url, "Sorry! I got an unexpected response (KeyError).")
post_response(
response_url, "Sorry! I got an unexpected response (KeyError)."
)
if next_cursor is None:
break
@ -60,36 +66,45 @@ def paginated_get(url, params, response_url, combine_key=None):
# GET requests
def channel_history(channel_id, response_url):
params = {
'token': os.environ['SLACK_USER_TOKEN'],
'channel': channel_id,
'limit': 200
"token": os.environ["SLACK_USER_TOKEN"],
"channel": channel_id,
"limit": 200,
}
return paginated_get('https://slack.com/api/conversations.history', params, response_url, combine_key='messages')
return paginated_get(
"https://slack.com/api/conversations.history",
params,
response_url,
combine_key="messages",
)
def user_list(team_id, response_url):
params = {
'token': os.environ['SLACK_USER_TOKEN'],
'limit': 200,
'team_id': team_id
}
params = {"token": os.environ["SLACK_USER_TOKEN"], "limit": 200, "team_id": team_id}
return paginated_get('https://slack.com/api/users.list', params, response_url, combine_key='members')
return paginated_get(
"https://slack.com/api/users.list", params, response_url, combine_key="members"
)
def channel_replies(timestamps, channel_id, response_url):
replies = []
for timestamp in timestamps:
params = {
'token': os.environ['SLACK_USER_TOKEN'],
'channel': channel_id,
'ts': timestamp,
'limit': 200
"token": os.environ["SLACK_USER_TOKEN"],
"channel": channel_id,
"ts": timestamp,
"limit": 200,
}
r = paginated_get('https://slack.com/api/conversations.replies', params, response_url, combine_key='messages')
r = paginated_get(
"https://slack.com/api/conversations.replies",
params,
response_url,
combine_key="messages",
)
replies.append(r)
return replies
@ -97,17 +112,18 @@ def channel_replies(timestamps, channel_id, response_url):
# Flask routes
@app.route('/slack/export-channel', methods=['POST'])
@app.route("/slack/export-channel", methods=["POST"])
def export_channel():
data = request.form
try:
team_id = data['team_id']
team_domain = data['team_domain']
ch_id = data['channel_id']
ch_name = data['channel_name']
response_url = data['response_url']
command_args = data['text']
team_id = data["team_id"]
team_domain = data["team_domain"]
ch_id = data["channel_id"]
ch_name = data["channel_name"]
response_url = data["response_url"]
command_args = data["text"]
except KeyError:
return Response("Sorry! I got an unexpected response (KeyError)."), 200
@ -116,43 +132,53 @@ def export_channel():
export_mode = str(command_args).lower()
exports_subdir = 'exports'
exports_subdir = "exports"
exports_dir = os.path.join(app.root_path, exports_subdir)
file_ext = '.txt' if export_mode == 'text' else '.json'
file_ext = ".txt" if export_mode == "text" else ".json"
filename = "%s-ch_%s-%s%s" % (team_domain, ch_id, str(uuid4().hex)[:6], file_ext)
filepath = os.path.join(exports_dir, filename)
loc = urljoin(request.url_root, 'download/%s' % filename)
loc = urljoin(request.url_root, "download/%s" % filename)
if not os.path.isdir(exports_dir):
os.makedirs(exports_dir, exist_ok=True)
with open(filepath, mode='w') as f:
if export_mode == 'text':
with open(filepath, mode="w") as f:
if export_mode == "text":
num_msgs = len(ch_hist)
sep = '=' * 24
header_str = 'Channel Name: %s\nChannel ID: %s\n%s Messages\n%s\n\n' % (ch_name, ch_id, num_msgs, sep)
data_ch = header_str + parse_channel_history(ch_hist, user_list(team_id, response_url))
sep = "=" * 24
header_str = "Channel Name: %s\nChannel ID: %s\n%s Messages\n%s\n\n" % (
ch_name,
ch_id,
num_msgs,
sep,
)
data_ch = header_str + parse_channel_history(
ch_hist, user_list(team_id, response_url)
)
f.write(data_ch)
else:
json.dump(ch_hist, f, indent=4)
post_response(response_url, "Done! This channel's history is available for download here (note that this link "
"is single-use): %s" % loc)
post_response(
response_url,
"Done! This channel's history is available for download here (note that this link "
"is single-use): %s" % loc,
)
return Response(), 200
@app.route('/slack/export-replies', methods=['POST'])
@app.route("/slack/export-replies", methods=["POST"])
def export_replies():
data = request.form
try:
team_id = data['team_id']
team_domain = data['team_domain']
ch_id = data['channel_id']
ch_name = data['channel_name']
response_url = data['response_url']
command_args = data['text']
team_id = data["team_id"]
team_domain = data["team_domain"]
ch_id = data["channel_id"]
ch_name = data["channel_name"]
response_url = data["response_url"]
command_args = data["text"]
except KeyError:
return Response("Sorry! I got an unexpected response (KeyError)."), 200
@ -160,55 +186,62 @@ def export_replies():
print(ch_id)
ch_hist = channel_history(ch_id, response_url)
print(ch_hist)
ch_replies = channel_replies([x['ts'] for x in ch_hist if 'reply_count' in x], ch_id, response_url)
ch_replies = channel_replies(
[x["ts"] for x in ch_hist if "reply_count" in x], ch_id, response_url
)
export_mode = str(command_args).lower()
exports_subdir = 'exports'
exports_subdir = "exports"
exports_dir = os.path.join(app.root_path, exports_subdir)
file_ext = '.txt' if export_mode == 'text' else '.json'
file_ext = ".txt" if export_mode == "text" else ".json"
filename = "%s-re_%s-%s%s" % (team_domain, ch_id, str(uuid4().hex)[:6], file_ext)
filepath = os.path.join(exports_dir, filename)
loc = urljoin(request.url_root, 'download/%s' % filename)
loc = urljoin(request.url_root, "download/%s" % filename)
if export_mode == 'text':
header_str = 'Threads in: %s\n%s Messages' % (ch_name, len(ch_replies))
if export_mode == "text":
header_str = "Threads in: %s\n%s Messages" % (ch_name, len(ch_replies))
data_replies = parse_replies(ch_replies, user_list(team_id, response_url))
sep = '=' * 24
data_replies = '%s\n%s\n\n%s' % (header_str, sep, data_replies)
sep = "=" * 24
data_replies = "%s\n%s\n\n%s" % (header_str, sep, data_replies)
else:
data_replies = ch_replies
if not os.path.isdir(exports_dir):
os.makedirs(exports_dir, exist_ok=True)
with open(filepath, mode='w') as f:
if export_mode == 'text':
with open(filepath, mode="w") as f:
if export_mode == "text":
f.write(data_replies)
else:
json.dump(data_replies, f, indent=4)
post_response(response_url, "Done! This channel's reply threads are available for download here (note that this "
"link is single-use): %s" % loc)
post_response(
response_url,
"Done! This channel's reply threads are available for download here (note that this "
"link is single-use): %s" % loc,
)
return Response(), 200
@app.route('/download/<filename>')
@app.route("/download/<filename>")
def download(filename):
path = os.path.join(app.root_path, 'exports', filename)
path = os.path.join(app.root_path, "exports", filename)
def generate():
with open(path) as f:
yield from f
os.remove(path)
mimetype = 'text/plain' if os.path.splitext(filename)[-1] == '.txt' else 'application/json'
mimetype = (
"text/plain" if os.path.splitext(filename)[-1] == ".txt" else "application/json"
)
r = app.response_class(generate(), mimetype=mimetype)
r.headers.set('Content-Disposition', 'attachment', filename=filename)
r.headers.set("Content-Disposition", "attachment", filename=filename)
return r
if __name__ == '__main__':
if __name__ == "__main__":
app.run(debug=False)