import json import os import sys from dataclasses import dataclass from datetime import datetime from time import sleep from typing import List, Optional, Dict, Any import requests from dotenv import load_dotenv from pathvalidate import sanitize_filename @dataclass class SlackConfig: """Konfiguracja klienta Slack API""" user_token: str additional_sleep_time: int = 2 @classmethod def from_env(cls, env_path: str = ".env") -> 'SlackConfig': env_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), env_path) if os.path.isfile(env_file): load_dotenv(env_file) try: token = os.environ["SLACK_USER_TOKEN"] return cls(token) except KeyError: raise ValueError("Brak SLACK_USER_TOKEN w zmiennych środowiskowych") class SlackAPI: """Klasa do komunikacji z API Slacka""" def __init__(self, config: SlackConfig): self.config = config self.headers = {"Authorization": f"Bearer {config.user_token}"} self.base_url = "https://slack.com/api" def get_data(self, endpoint: str, params: dict) -> requests.Response: """Pobiera dane z API z obsługą rate-limitingu""" url = f"{self.base_url}/{endpoint}" success = False attempt = 0 while not success: response = requests.get(url, headers=self.headers, params=params) attempt += 1 if response.status_code != 429: success = True else: retry_after = int(response.headers["Retry-After"]) sleep_time = retry_after + self.config.additional_sleep_time print(f"Limit zapytań przekroczony. Ponowna próba za {sleep_time} sekund (próba {attempt}).") sleep(sleep_time) return response def paginated_get(self, endpoint: str, params: dict, combine_key: Optional[str] = None) -> List[Dict]: """Pobiera wszystkie strony wyników z API""" result = [] next_cursor = None while True: if next_cursor: params["cursor"] = next_cursor response = self.get_data(endpoint, params) if response.status_code != 200: raise Exception(f"Błąd API: {response.status_code} {response.reason}") data = response.json() if not data.get("ok", False): raise Exception(f"Błąd Slack API: {data}") items = data[combine_key] if combine_key else data result.extend(items) metadata = data.get("response_metadata", {}) next_cursor = metadata.get("next_cursor", "").strip() if not next_cursor: break return result def get_users(self, team_id: Optional[str] = None) -> List[Dict]: """Pobiera listę użytkowników""" params = {"limit": 200} if team_id: params["team_id"] = team_id return self.paginated_get("users.list", params, "members") def get_channels(self, team_id: Optional[str] = None) -> List[Dict]: """Pobiera listę kanałów""" params = { "types": "public_channel,private_channel,mpim,im", "limit": 200 } if team_id: params["team_id"] = team_id return self.paginated_get("conversations.list", params, "channels") def get_channel_history(self, channel_id: str, oldest: Optional[str] = None, latest: Optional[str] = None) -> List[Dict]: """Pobiera historię kanału""" params = { "channel": channel_id, "limit": 200 } if oldest: params["oldest"] = oldest if latest: params["latest"] = latest return self.paginated_get("conversations.history", params, "messages") def get_replies(self, channel_id: str, thread_ts: str) -> List[Dict]: """Pobiera odpowiedzi w wątku""" params = { "channel": channel_id, "ts": thread_ts, "limit": 200 } return self.paginated_get("conversations.replies", params, "messages") def get_files(self, channel: Optional[str] = None) -> List[Dict]: params = {} if channel: params["channel"] = channel return self.paginated_get("files.list", params, "files") @dataclass class SlackUser: """Reprezentacja użytkownika Slack""" id: str name: str real_name: Optional[str] = None display_name: Optional[str] = None is_admin: bool = False is_owner: bool = False is_primary_owner: bool = False is_restricted: bool = False is_ultra_restricted: bool = False is_bot: bool = False is_app_user: bool = False tz: Optional[str] = None @classmethod def from_dict(cls, data: Dict) -> 'SlackUser': return cls( id=data["id"], name=data.get("name", ""), real_name=data.get("real_name"), display_name=data.get("display_name"), is_admin=data.get("is_admin", False), is_owner=data.get("is_owner", False), is_primary_owner=data.get("is_primary_owner", False), is_restricted=data.get("is_restricted", False), is_ultra_restricted=data.get("is_ultra_restricted", False), is_bot=data.get("is_bot", False), is_app_user=data.get("is_app_user", False), tz=data.get("tz") ) def get_display_name(self) -> str: """Zwraca nazwę wyświetlaną użytkownika""" return self.real_name or self.display_name or self.name or "[brak nazwy]" def format(self) -> str: """Formatuje informacje o użytkowniku""" parts = [f"[{self.id}] {self.name}"] if self.real_name: parts.append(f"({self.real_name})") if self.tz: parts.append(self.tz) user_types = [] for attr in ['is_admin', 'is_owner', 'is_primary_owner', 'is_restricted', 'is_ultra_restricted', 'is_bot', 'is_app_user']: if getattr(self, attr): user_types.append(attr[3:]) if user_types: parts.append("|".join(user_types)) return ", ".join(parts) class SlackChannel: """Reprezentacja kanału Slack""" def __init__(self, data: Dict, users: Dict[str, SlackUser]): self.id = data["id"] self.name = data.get("name", "") self.is_private = data.get("is_private", False) self.is_im = data.get("is_im", False) self.is_mpim = data.get("is_mpim", False) self.is_group = data.get("is_group", False) self._creator_id = data.get("creator_id") self._user_id = data.get("user_id") self.user = users[self._user_id].get_display_name() if self._user_id and self._user_id in users else "(nieznany)" self._mpim_users = self._list_mpim({u.name: u for u in users.values()}) if self.is_mpim else [] self.label = self._create_label(users) @property def type(self) -> str: if self.is_im: return "direct_message" elif self.is_mpim: return "multiparty-direct_message" elif self.is_group: return "group" else: return "channel" @property def short_label(self) -> str: if self.is_im: return f"(DM) {self.user}" elif self.is_mpim: return f"(MPDM) {', '.join(self._mpim_users)}" return self.name def _list_mpim(self, users: Dict[str, SlackUser]) -> List[str]: result = [] for part in self.name.split("-"): name = users.get(part) if name: result.append(name.get_display_name()) return result def _create_label(self, users: Dict[str, SlackUser]) -> str: """Formatuje informacje o kanale""" parts = [f"[{self.id}]"] if self.name: parts.append(f" {self.name}:") if self.is_private: parts.append("private") parts.append(self.type) if self._creator_id and self._creator_id in users: parts.append(f"created by {users[self._creator_id].get_display_name()}") elif self._user_id and self._user_id in users: parts.append(f"with {users[self._user_id].get_display_name()}") return " ".join(parts) @dataclass class SlackFile: """Reprezentacja pliku Slack""" id: str name: str url_private: str @classmethod def from_dict(cls, data: Dict) -> 'SlackFile': return cls( id=data["id"], name=data.get("name", ""), url_private=data.get("url_private", "") ) class SlackMessage: """Reprezentacja wiadomości Slack""" def __init__(self, data: Dict, users: Dict[str, SlackUser]): self.timestamp = float(data["ts"]) self.text = data.get("text", "[no message content]") self.user_id = data.get("user") self.reactions = data.get("reactions", []) self.files = data.get("files", []) self.users = users self.has_thread = "reply_count" in data self.parent_user_id = data.get("parent_user_id") def format(self, indent: bool = False) -> str: """Formatuje wiadomość""" timestamp = datetime.fromtimestamp(round(self.timestamp)).strftime("%Y-%m-%d %H:%M:%S") user = self.users.get(self.user_id) user_info = f"{user.name} ({user.get_display_name()})" if user else "none" text = self.text for uid, u in self.users.items(): text = text.replace(f"<@{uid}>", f"<@{uid}> ({u.name})") lines = [ f"Message at {timestamp}", f"User: {user_info}", text ] if self.reactions: reaction_parts = [] for reaction in self.reactions: users = [self.users[uid].name for uid in reaction["users"] if uid in self.users] reaction_parts.append(f"{reaction['name']} ({', '.join(users)})") lines.append("Reactions: " + ", ".join(reaction_parts)) if self.files: lines.append("Files:") for file in self.files: if "name" in file and "url_private_download" in file: lines.append(f" - [{file['id']}] {file['name']}, {file['url_private_download']}") else: lines.append(f" - [{file['id']}] [deleted, oversize, or unavailable file]") message = "\n".join(lines) if indent: message = "\n".join("\t" + line for line in message.split("\n")) return message + "\n\n" + "*" * 24 + "\n\n" class SlackExporter: """Główna klasa eksportera""" def __init__(self, config: SlackConfig, output_dir: str = 'out'): self.api = SlackAPI(config) self.timestamp = datetime.now().strftime("%Y-%m-%d_%H%M%S") self.output_dir = self._set_output_dir(output_dir) self.users: Dict[str, SlackUser] = self._load_users() self.channels: List[SlackChannel] = self._load_channels() def _set_output_dir(self, path: str) -> str: """Ustawia katalog wyjściowy""" parent_dir = os.path.abspath(os.path.expanduser(os.path.expandvars(path))) output_dir = os.path.join(parent_dir, f"slack_export_{self.timestamp}") os.makedirs(output_dir, exist_ok=True) return output_dir def _load_users(self) -> Dict[str, SlackUser]: """Ładuje użytkowników""" # users_data = self.api.get_users() users_data = json.load(open("out/user_list.json", "r", encoding="utf-8")) return {u["id"]: SlackUser.from_dict(u) for u in users_data} def _load_channels(self) -> List[SlackChannel]: """Ładuje kanały""" # channels_data = self.api.get_channels() channels_data = json.load(open("out/channel_list.json", "r", encoding="utf-8")) return [SlackChannel(ch, self.users) for ch in channels_data] def _save_data(self, data: Any, filename: str, as_json: bool = False): """Zapisuje dane do pliku""" if not self.output_dir: json.dump(data, sys.stdout, indent=4) return ext = "json" if as_json else "txt" filepath = os.path.join(self.output_dir, f"{filename}.{ext}") print(f"Zapisywanie do {filepath}") with open(filepath, mode="w", encoding="utf-8") as f: if as_json: json.dump(data, f, indent=4) else: f.write(data) def export_channels(self, channels: List[str]): channels_map = {ch.id: ch for ch in self.channels} self.export_channel_list(True) self.export_user_list(True) for channel_id in channels: print(f"Eksport {channels_map.get(channel_id).label}...") self.export_channel_history(channel_id) self.export_channel_history(channel_id, as_json=True) self.export_channel_replies(channel_id) self.export_channel_replies(channel_id, as_json=True) self.export_channel_files(channel_id) def export_channel_list(self, as_json: bool = False): """Eksportuje listę kanałów""" if as_json: data = [vars(ch) for ch in self.channels] else: data = "\n".join(ch.label for ch in self.channels) self._save_data(data, "channel_list", as_json) def export_user_list(self, as_json: bool = False): """Eksportuje listę użytkowników""" if as_json: data = [vars(u) for u in self.users.values()] else: data = "\n".join(u.format() for u in self.users.values()) self._save_data(data, "user_list", as_json) def export_channel_history(self, channel_id: str, oldest: Optional[str] = None, latest: Optional[str] = None, as_json: bool = False): """Eksportuje historię kanału""" history = self.api.get_channel_history(channel_id, oldest, latest) if as_json: data = history else: messages = [SlackMessage(msg, self.users) for msg in history] channel = next((ch for ch in self.channels if ch.id == channel_id), None) header = f"Channel ID: {channel_id}\n" if channel: header += f"{channel.type.title()} Name: {channel.name}\n" header += f"{len(messages)} Messages\n{'*' * 24}\n\n" data = header + "".join(msg.format() for msg in messages) self._save_data(data, f"channel_{channel_id}", as_json) def export_channel_replies(self, channel_id: str, oldest: Optional[str] = None, latest: Optional[str] = None, as_json: bool = False): """Eksportuje wątki w kanale""" history = self.api.get_channel_history(channel_id, oldest, latest) thread_messages = [msg for msg in history if "reply_count" in msg] all_replies = [] for msg in thread_messages: replies = self.api.get_replies(channel_id, msg["ts"]) all_replies.extend(replies) if as_json: data = all_replies else: messages = [SlackMessage(msg, self.users) for msg in all_replies] channel = next((ch for ch in self.channels if ch.id == channel_id), None) header = f"Threads in {channel.type if channel else 'channel'}: " header += f"{channel.name if channel else channel_id}\n" header += f"{len(messages)} Messages\n{'*' * 24}\n\n" data = header + "".join(msg.format(True) for msg in messages) self._save_data(data, f"channel-replies_{channel_id}", as_json) def export_channel_files(self, channel_id: Optional[str] = None): """Eksportuje pliki w kanale""" files = [SlackFile.from_dict(f) for f in self.api.get_files(channel_id)] for file in files: filename = f"{file.id}-{sanitize_filename(file.name)}" self.download_file(filename, file.url_private) def download_file(self, filename: str, url: str, attempts: int = 10) -> bool: if attempts == 0: return False target = os.path.join(self.output_dir, filename) if os.path.exists(target): return True try: response = requests.get(url, stream=True) response.raise_for_status() with open(target, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return True except requests.exceptions.RequestException as e: print(f"Error downloading file {filename}: {e}. {attempts} attempts left.") return self.download_file(filename, url, attempts - 1) if __name__ == "__main__": config = SlackConfig.from_env() exporter = SlackExporter(config)