diff --git a/exporter-ng.py b/exporter-ng.py new file mode 100644 index 0000000..16a7ecc --- /dev/null +++ b/exporter-ng.py @@ -0,0 +1,479 @@ +import os +import sys +import requests +import json +from timeit import default_timer +from datetime import datetime +import argparse +from dotenv import load_dotenv +from pathvalidate import sanitize_filename +from time import sleep +from dataclasses import dataclass +from typing import List, Optional, Dict, Any, Iterator, Tuple +from abc import ABC, abstractmethod + +@dataclass +class SlackConfig: + """Konfiguracja klienta Slack API""" + user_token: str + additional_sleep_time: int = 2 + + @classmethod + def from_env(cls, env_path: str = ".env") -> 'SlackConfig': + env_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), env_path) + if os.path.isfile(env_file): + load_dotenv(env_file) + + try: + token = os.environ["SLACK_USER_TOKEN"] + return cls(token) + except KeyError: + raise ValueError("Brak SLACK_USER_TOKEN w zmiennych środowiskowych") + +class SlackAPI: + """Klasa do komunikacji z API Slacka""" + def __init__(self, config: SlackConfig): + self.config = config + self.headers = {"Authorization": f"Bearer {config.user_token}"} + self.base_url = "https://slack.com/api" + + def get_data(self, endpoint: str, params: dict) -> requests.Response: + """Pobiera dane z API z obsługą rate-limitingu""" + url = f"{self.base_url}/{endpoint}" + success = False + attempt = 0 + + while not success: + response = requests.get(url, headers=self.headers, params=params) + attempt += 1 + + if response.status_code != 429: + success = True + else: + retry_after = int(response.headers["Retry-After"]) + sleep_time = retry_after + self.config.additional_sleep_time + print(f"Limit zapytań przekroczony. Ponowna próba za {sleep_time} sekund (próba {attempt}).") + sleep(sleep_time) + return response + + def paginated_get(self, endpoint: str, params: dict, combine_key: Optional[str] = None) -> List[Dict]: + """Pobiera wszystkie strony wyników z API""" + result = [] + next_cursor = None + + while True: + if next_cursor: + params["cursor"] = next_cursor + + response = self.get_data(endpoint, params) + + if response.status_code != 200: + raise Exception(f"Błąd API: {response.status_code} {response.reason}") + + data = response.json() + + if not data.get("ok", False): + raise Exception(f"Błąd Slack API: {data}") + + items = data[combine_key] if combine_key else data + result.extend(items) + + metadata = data.get("response_metadata", {}) + next_cursor = metadata.get("next_cursor", "").strip() + if not next_cursor: + break + + return result + + def get_users(self, team_id: Optional[str] = None) -> List[Dict]: + """Pobiera listę użytkowników""" + params = {"limit": 200} + if team_id: + params["team_id"] = team_id + return self.paginated_get("users.list", params, "members") + + def get_channels(self, team_id: Optional[str] = None) -> List[Dict]: + """Pobiera listę kanałów""" + params = { + "types": "public_channel,private_channel,mpim,im", + "limit": 200 + } + if team_id: + params["team_id"] = team_id + return self.paginated_get("conversations.list", params, "channels") + + def get_channel_history(self, channel_id: str, oldest: Optional[str] = None, + latest: Optional[str] = None) -> List[Dict]: + """Pobiera historię kanału""" + params = { + "channel": channel_id, + "limit": 200 + } + if oldest: + params["oldest"] = oldest + if latest: + params["latest"] = latest + return self.paginated_get("conversations.history", params, "messages") + + def get_replies(self, channel_id: str, thread_ts: str) -> List[Dict]: + """Pobiera odpowiedzi w wątku""" + params = { + "channel": channel_id, + "ts": thread_ts, + "limit": 200 + } + return self.paginated_get("conversations.replies", params, "messages") + + def get_files(self, channel: Optional[str] = None) -> List[Dict]: + params = {} + if channel: + params["channel"] = channel + return self.paginated_get("files.list", params, "files") + +@dataclass +class SlackUser: + """Reprezentacja użytkownika Slack""" + id: str + name: str + real_name: Optional[str] = None + display_name: Optional[str] = None + is_admin: bool = False + is_owner: bool = False + is_primary_owner: bool = False + is_restricted: bool = False + is_ultra_restricted: bool = False + is_bot: bool = False + is_app_user: bool = False + tz: Optional[str] = None + + @classmethod + def from_dict(cls, data: Dict) -> 'SlackUser': + profile = data.get("profile", {}) + return cls( + id=data["id"], + name=data.get("name", ""), + real_name=profile.get("real_name"), + display_name=profile.get("display_name"), + is_admin=data.get("is_admin", False), + is_owner=data.get("is_owner", False), + is_primary_owner=data.get("is_primary_owner", False), + is_restricted=data.get("is_restricted", False), + is_ultra_restricted=data.get("is_ultra_restricted", False), + is_bot=data.get("is_bot", False), + is_app_user=data.get("is_app_user", False), + tz=data.get("tz") + ) + + def get_display_name(self) -> str: + """Zwraca nazwę wyświetlaną użytkownika""" + return self.real_name or self.display_name or self.name or "[brak nazwy]" + + def format(self) -> str: + """Formatuje informacje o użytkowniku""" + parts = [f"[{self.id}] {self.name}"] + + if self.real_name: + parts.append(f"({self.real_name})") + + if self.tz: + parts.append(self.tz) + + user_types = [] + for attr in ['is_admin', 'is_owner', 'is_primary_owner', 'is_restricted', + 'is_ultra_restricted', 'is_bot', 'is_app_user']: + if getattr(self, attr): + user_types.append(attr[3:]) + + if user_types: + parts.append("|".join(user_types)) + + return ", ".join(parts) + +class SlackChannel: + """Reprezentacja kanału Slack""" + def __init__(self, data: Dict): + self.id = data["id"] + self.name = data.get("name", "") + self.is_private = data.get("is_private", False) + self.is_im = data.get("is_im", False) + self.is_mpim = data.get("is_mpim", False) + self.is_group = data.get("is_group", False) + self.creator_id = data.get("creator") + self.user_id = data.get("user") + + @property + def type(self) -> str: + if self.is_im: + return "direct_message" + elif self.is_mpim: + return "multiparty-direct_message" + elif self.is_group: + return "group" + else: + return "channel" + + def format(self, users: Dict[str, SlackUser]) -> str: + """Formatuje informacje o kanale""" + parts = [f"[{self.id}]"] + + if self.name: + parts.append(f" {self.name}:") + + if self.is_private: + parts.append("private") + + parts.append(self.type) + + if self.creator_id and self.creator_id in users: + parts.append(f"created by {users[self.creator_id].name}") + elif self.user_id and self.user_id in users: + parts.append(f"with {users[self.user_id].name}") + + return " ".join(parts) + +@dataclass +class SlackFile: + """Reprezentacja pliku Slack""" + id: str + name: str + url_private: str + + @classmethod + def from_dict(cls, data: Dict) -> 'SlackFile': + return cls( + id=data["id"], + name=data.get("name", ""), + url_private=data.get("url_private", "") + ) + +class SlackMessage: + """Reprezentacja wiadomości Slack""" + def __init__(self, data: Dict, users: Dict[str, SlackUser]): + self.timestamp = float(data["ts"]) + self.text = data.get("text", "[no message content]") + self.user_id = data.get("user") + self.reactions = data.get("reactions", []) + self.files = data.get("files", []) + self.users = users + self.has_thread = "reply_count" in data + self.parent_user_id = data.get("parent_user_id") + + def format(self, indent: bool = False) -> str: + """Formatuje wiadomość""" + timestamp = datetime.fromtimestamp(round(self.timestamp)).strftime("%Y-%m-%d %H:%M:%S") + user = self.users.get(self.user_id) + user_info = f"{user.name} ({user.get_display_name()})" if user else "none" + + text = self.text + for uid, u in self.users.items(): + text = text.replace(f"<@{uid}>", f"<@{uid}> ({u.name})") + + lines = [ + f"Message at {timestamp}", + f"User: {user_info}", + text + ] + + if self.reactions: + reaction_parts = [] + for reaction in self.reactions: + users = [self.users[uid].name for uid in reaction["users"] if uid in self.users] + reaction_parts.append(f"{reaction['name']} ({', '.join(users)})") + lines.append("Reactions: " + ", ".join(reaction_parts)) + + if self.files: + lines.append("Files:") + for file in self.files: + if "name" in file and "url_private_download" in file: + lines.append(f" - [{file['id']}] {file['name']}, {file['url_private_download']}") + else: + lines.append(f" - [{file['id']}] [deleted, oversize, or unavailable file]") + + message = "\n".join(lines) + if indent: + message = "\n".join("\t" + line for line in message.split("\n")) + + return message + "\n\n" + "*" * 24 + "\n\n" + +class SlackExporter: + """Główna klasa eksportera""" + def __init__(self, config: SlackConfig, output_dir: str): + self.api = SlackAPI(config) + self.timestamp = datetime.now().strftime("%Y-%m-%d_%H%M%S") + self.output_dir = self._set_output_dir(output_dir) + self.users: Dict[str, SlackUser] = self._load_users() + self.channels: List[SlackChannel] = self._load_channels() + + def _set_output_dir(self, path: str) -> str: + """Ustawia katalog wyjściowy""" + parent_dir = os.path.abspath(os.path.expanduser(os.path.expandvars(path))) + output_dir = os.path.join(parent_dir, f"slack_export_{self.timestamp}") + os.makedirs(output_dir, exist_ok=True) + return output_dir + + def _load_users(self) -> Dict[str, SlackUser]: + """Ładuje użytkowników""" + users_data = self.api.get_users() + return {u["id"]: SlackUser.from_dict(u) for u in users_data} + + def _load_channels(self) -> List[SlackChannel]: + """Ładuje kanały""" + channels_data = self.api.get_channels() + return [SlackChannel(ch) for ch in channels_data] + + def _save_data(self, data: Any, filename: str, as_json: bool = False): + """Zapisuje dane do pliku""" + if not self.output_dir: + json.dump(data, sys.stdout, indent=4) + return + + ext = "json" if as_json else "txt" + filepath = os.path.join(self.output_dir, f"{filename}.{ext}") + + print(f"Zapisywanie do {filepath}") + with open(filepath, mode="w", encoding="utf-8") as f: + if as_json: + json.dump(data, f, indent=4) + else: + f.write(data) + + def export_channel_list(self, as_json: bool = False): + """Eksportuje listę kanałów""" + if as_json: + data = [vars(ch) for ch in self.channels] + else: + data = "\n".join(ch.format(self.users) for ch in self.channels) + self._save_data(data, "channel_list", as_json) + + def export_user_list(self, as_json: bool = False): + """Eksportuje listę użytkowników""" + if as_json: + data = [vars(u) for u in self.users.values()] + else: + data = "\n".join(u.format() for u in self.users.values()) + self._save_data(data, "user_list", as_json) + + def export_channel_history(self, channel_id: str, oldest: Optional[str] = None, + latest: Optional[str] = None, as_json: bool = False): + """Eksportuje historię kanału""" + history = self.api.get_channel_history(channel_id, oldest, latest) + + if as_json: + data = history + else: + messages = [SlackMessage(msg, self.users) for msg in history] + channel = next((ch for ch in self.channels if ch.id == channel_id), None) + + header = f"Channel ID: {channel_id}\n" + if channel: + header += f"{channel.type.title()} Name: {channel.name}\n" + header += f"{len(messages)} Messages\n{'*' * 24}\n\n" + + data = header + "".join(msg.format() for msg in messages) + + self._save_data(data, f"channel_{channel_id}", as_json) + + def export_channel_replies(self, channel_id: str, oldest: Optional[str] = None, + latest: Optional[str] = None, as_json: bool = False): + """Eksportuje wątki w kanale""" + history = self.api.get_channel_history(channel_id, oldest, latest) + thread_messages = [msg for msg in history if "reply_count" in msg] + + all_replies = [] + for msg in thread_messages: + replies = self.api.get_replies(channel_id, msg["ts"]) + all_replies.extend(replies) + + if as_json: + data = all_replies + else: + messages = [SlackMessage(msg, self.users) for msg in all_replies] + channel = next((ch for ch in self.channels if ch.id == channel_id), None) + + header = f"Threads in {channel.type if channel else 'channel'}: " + header += f"{channel.name if channel else channel_id}\n" + header += f"{len(messages)} Messages\n{'*' * 24}\n\n" + + data = header + "".join(msg.format(True) for msg in messages) + + self._save_data(data, f"channel-replies_{channel_id}", as_json) + + def export_channel_files(self, channel_id: Optional[str] = None): + """Eksportuje pliki w kanale""" + files = [SlackFile.from_dict(f) for f in self.api.get_files(channel_id)] + for file in files: + filename = f"{file.id}-{sanitize_filename(file.name)}" + self.download_file(filename, file.url_private) + + def download_file(self, filename: str, url: str, attempts: int = 10) -> bool: + if attempts == 0: + return False + target = os.path.join(self.output_dir, filename) + if os.path.exists(target): + return True + try: + response = requests.get(url, stream=True) + response.raise_for_status() + with open(target, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + return True + except requests.exceptions.RequestException as e: + print(f"Error downloading file {filename}: {e}. {attempts} attempts left.") + return self.download_file(filename, url, attempts - 1) + +def main(): + parser = argparse.ArgumentParser(description="Eksporter danych ze Slacka") + parser.add_argument("-o", help="Katalog wyjściowy (jeśli pusty, wyświetla na stdout)") + parser.add_argument("--lc", action="store_true", help="Lista wszystkich konwersacji") + parser.add_argument("--lu", action="store_true", help="Lista wszystkich użytkowników") + parser.add_argument("--json", action="store_true", help="Wynik w formacie JSON") + parser.add_argument("-c", action="store_true", help="Historia wszystkich dostępnych konwersacji") + parser.add_argument("--ch", help="Z -c, ogranicza eksport do podanego ID kanału") + parser.add_argument("--fr", help="Z -c, timestamp początku zakresu (Unix)") + parser.add_argument("--to", help="Z -c, timestamp końca zakresu (Unix)") + parser.add_argument("-r", action="store_true", help="Pobierz wątki ze wszystkich konwersacji") + parser.add_argument("--files", action="store_true", help="Pobierz wszystkie pliki") + + args = parser.parse_args() + + if args.files and not args.o: + print("Opcja --files wymaga określenia katalogu wyjściowego (-o)") + sys.exit(1) + + try: + config = SlackConfig.from_env() + exporter = SlackExporter(config) + + if args.o: + exporter.set_output_dir(args.o) + + exporter.load_users() + exporter.load_channels() + + if args.lc: + exporter.export_channel_list(args.json) + + if args.lu: + exporter.export_user_list(args.json) + + if args.c or args.r: + channel_ids = [args.ch] if args.ch else [ch.id for ch in exporter.channels] + for channel_id in channel_ids: + if args.c: + exporter.export_channel_history(channel_id, args.fr, args.to, args.json) + if args.r: + exporter.export_channel_replies(channel_id, args.fr, args.to, args.json) + + if args.files and args.o: + # TODO: Implementacja pobierania plików + print("Funkcja pobierania plików jeszcze nie zaimplementowana") + + except Exception as e: + print(f"Błąd: {e}") + sys.exit(1) + +if __name__ == "__main__": + # main() + config = SlackConfig.from_env() + exporter = SlackExporter(config, 'out')