import os import sys import requests import json from timeit import default_timer from datetime import datetime import argparse from dotenv import load_dotenv from pathvalidate import sanitize_filename from time import sleep from dataclasses import dataclass from typing import List, Optional, Dict, Any, Iterator, Tuple from abc import ABC, abstractmethod @dataclass class SlackConfig: """Konfiguracja klienta Slack API""" user_token: str additional_sleep_time: int = 2 @classmethod def from_env(cls, env_path: str = ".env") -> 'SlackConfig': env_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), env_path) if os.path.isfile(env_file): load_dotenv(env_file) try: token = os.environ["SLACK_USER_TOKEN"] return cls(token) except KeyError: raise ValueError("Brak SLACK_USER_TOKEN w zmiennych środowiskowych") class SlackAPI: """Klasa do komunikacji z API Slacka""" def __init__(self, config: SlackConfig): self.config = config self.headers = {"Authorization": f"Bearer {config.user_token}"} self.base_url = "https://slack.com/api" def get_data(self, endpoint: str, params: dict) -> requests.Response: """Pobiera dane z API z obsługą rate-limitingu""" url = f"{self.base_url}/{endpoint}" success = False attempt = 0 while not success: response = requests.get(url, headers=self.headers, params=params) attempt += 1 if response.status_code != 429: success = True else: retry_after = int(response.headers["Retry-After"]) sleep_time = retry_after + self.config.additional_sleep_time print(f"Limit zapytań przekroczony. Ponowna próba za {sleep_time} sekund (próba {attempt}).") sleep(sleep_time) return response def paginated_get(self, endpoint: str, params: dict, combine_key: Optional[str] = None) -> List[Dict]: """Pobiera wszystkie strony wyników z API""" result = [] next_cursor = None while True: if next_cursor: params["cursor"] = next_cursor response = self.get_data(endpoint, params) if response.status_code != 200: raise Exception(f"Błąd API: {response.status_code} {response.reason}") data = response.json() if not data.get("ok", False): raise Exception(f"Błąd Slack API: {data}") items = data[combine_key] if combine_key else data result.extend(items) metadata = data.get("response_metadata", {}) next_cursor = metadata.get("next_cursor", "").strip() if not next_cursor: break return result def get_users(self, team_id: Optional[str] = None) -> List[Dict]: """Pobiera listę użytkowników""" params = {"limit": 200} if team_id: params["team_id"] = team_id return self.paginated_get("users.list", params, "members") def get_channels(self, team_id: Optional[str] = None) -> List[Dict]: """Pobiera listę kanałów""" params = { "types": "public_channel,private_channel,mpim,im", "limit": 200 } if team_id: params["team_id"] = team_id return self.paginated_get("conversations.list", params, "channels") def get_channel_history(self, channel_id: str, oldest: Optional[str] = None, latest: Optional[str] = None) -> List[Dict]: """Pobiera historię kanału""" params = { "channel": channel_id, "limit": 200 } if oldest: params["oldest"] = oldest if latest: params["latest"] = latest return self.paginated_get("conversations.history", params, "messages") def get_replies(self, channel_id: str, thread_ts: str) -> List[Dict]: """Pobiera odpowiedzi w wątku""" params = { "channel": channel_id, "ts": thread_ts, "limit": 200 } return self.paginated_get("conversations.replies", params, "messages") def get_files(self, channel: Optional[str] = None) -> List[Dict]: params = {} if channel: params["channel"] = channel return self.paginated_get("files.list", params, "files") @dataclass class SlackUser: """Reprezentacja użytkownika Slack""" id: str name: str real_name: Optional[str] = None display_name: Optional[str] = None is_admin: bool = False is_owner: bool = False is_primary_owner: bool = False is_restricted: bool = False is_ultra_restricted: bool = False is_bot: bool = False is_app_user: bool = False tz: Optional[str] = None @classmethod def from_dict(cls, data: Dict) -> 'SlackUser': profile = data.get("profile", {}) return cls( id=data["id"], name=data.get("name", ""), real_name=profile.get("real_name"), display_name=profile.get("display_name"), is_admin=data.get("is_admin", False), is_owner=data.get("is_owner", False), is_primary_owner=data.get("is_primary_owner", False), is_restricted=data.get("is_restricted", False), is_ultra_restricted=data.get("is_ultra_restricted", False), is_bot=data.get("is_bot", False), is_app_user=data.get("is_app_user", False), tz=data.get("tz") ) def get_display_name(self) -> str: """Zwraca nazwę wyświetlaną użytkownika""" return self.real_name or self.display_name or self.name or "[brak nazwy]" def format(self) -> str: """Formatuje informacje o użytkowniku""" parts = [f"[{self.id}] {self.name}"] if self.real_name: parts.append(f"({self.real_name})") if self.tz: parts.append(self.tz) user_types = [] for attr in ['is_admin', 'is_owner', 'is_primary_owner', 'is_restricted', 'is_ultra_restricted', 'is_bot', 'is_app_user']: if getattr(self, attr): user_types.append(attr[3:]) if user_types: parts.append("|".join(user_types)) return ", ".join(parts) class SlackChannel: """Reprezentacja kanału Slack""" def __init__(self, data: Dict): self.id = data["id"] self.name = data.get("name", "") self.is_private = data.get("is_private", False) self.is_im = data.get("is_im", False) self.is_mpim = data.get("is_mpim", False) self.is_group = data.get("is_group", False) self.creator_id = data.get("creator") self.user_id = data.get("user") @property def type(self) -> str: if self.is_im: return "direct_message" elif self.is_mpim: return "multiparty-direct_message" elif self.is_group: return "group" else: return "channel" def format(self, users: Dict[str, SlackUser]) -> str: """Formatuje informacje o kanale""" parts = [f"[{self.id}]"] if self.name: parts.append(f" {self.name}:") if self.is_private: parts.append("private") parts.append(self.type) if self.creator_id and self.creator_id in users: parts.append(f"created by {users[self.creator_id].name}") elif self.user_id and self.user_id in users: parts.append(f"with {users[self.user_id].name}") return " ".join(parts) @dataclass class SlackFile: """Reprezentacja pliku Slack""" id: str name: str url_private: str @classmethod def from_dict(cls, data: Dict) -> 'SlackFile': return cls( id=data["id"], name=data.get("name", ""), url_private=data.get("url_private", "") ) class SlackMessage: """Reprezentacja wiadomości Slack""" def __init__(self, data: Dict, users: Dict[str, SlackUser]): self.timestamp = float(data["ts"]) self.text = data.get("text", "[no message content]") self.user_id = data.get("user") self.reactions = data.get("reactions", []) self.files = data.get("files", []) self.users = users self.has_thread = "reply_count" in data self.parent_user_id = data.get("parent_user_id") def format(self, indent: bool = False) -> str: """Formatuje wiadomość""" timestamp = datetime.fromtimestamp(round(self.timestamp)).strftime("%Y-%m-%d %H:%M:%S") user = self.users.get(self.user_id) user_info = f"{user.name} ({user.get_display_name()})" if user else "none" text = self.text for uid, u in self.users.items(): text = text.replace(f"<@{uid}>", f"<@{uid}> ({u.name})") lines = [ f"Message at {timestamp}", f"User: {user_info}", text ] if self.reactions: reaction_parts = [] for reaction in self.reactions: users = [self.users[uid].name for uid in reaction["users"] if uid in self.users] reaction_parts.append(f"{reaction['name']} ({', '.join(users)})") lines.append("Reactions: " + ", ".join(reaction_parts)) if self.files: lines.append("Files:") for file in self.files: if "name" in file and "url_private_download" in file: lines.append(f" - [{file['id']}] {file['name']}, {file['url_private_download']}") else: lines.append(f" - [{file['id']}] [deleted, oversize, or unavailable file]") message = "\n".join(lines) if indent: message = "\n".join("\t" + line for line in message.split("\n")) return message + "\n\n" + "*" * 24 + "\n\n" class SlackExporter: """Główna klasa eksportera""" def __init__(self, config: SlackConfig, output_dir: str): self.api = SlackAPI(config) self.timestamp = datetime.now().strftime("%Y-%m-%d_%H%M%S") self.output_dir = self._set_output_dir(output_dir) self.users: Dict[str, SlackUser] = self._load_users() self.channels: List[SlackChannel] = self._load_channels() def _set_output_dir(self, path: str) -> str: """Ustawia katalog wyjściowy""" parent_dir = os.path.abspath(os.path.expanduser(os.path.expandvars(path))) output_dir = os.path.join(parent_dir, f"slack_export_{self.timestamp}") os.makedirs(output_dir, exist_ok=True) return output_dir def _load_users(self) -> Dict[str, SlackUser]: """Ładuje użytkowników""" users_data = self.api.get_users() return {u["id"]: SlackUser.from_dict(u) for u in users_data} def _load_channels(self) -> List[SlackChannel]: """Ładuje kanały""" channels_data = self.api.get_channels() return [SlackChannel(ch) for ch in channels_data] def _save_data(self, data: Any, filename: str, as_json: bool = False): """Zapisuje dane do pliku""" if not self.output_dir: json.dump(data, sys.stdout, indent=4) return ext = "json" if as_json else "txt" filepath = os.path.join(self.output_dir, f"{filename}.{ext}") print(f"Zapisywanie do {filepath}") with open(filepath, mode="w", encoding="utf-8") as f: if as_json: json.dump(data, f, indent=4) else: f.write(data) def export_channel_list(self, as_json: bool = False): """Eksportuje listę kanałów""" if as_json: data = [vars(ch) for ch in self.channels] else: data = "\n".join(ch.format(self.users) for ch in self.channels) self._save_data(data, "channel_list", as_json) def export_user_list(self, as_json: bool = False): """Eksportuje listę użytkowników""" if as_json: data = [vars(u) for u in self.users.values()] else: data = "\n".join(u.format() for u in self.users.values()) self._save_data(data, "user_list", as_json) def export_channel_history(self, channel_id: str, oldest: Optional[str] = None, latest: Optional[str] = None, as_json: bool = False): """Eksportuje historię kanału""" history = self.api.get_channel_history(channel_id, oldest, latest) if as_json: data = history else: messages = [SlackMessage(msg, self.users) for msg in history] channel = next((ch for ch in self.channels if ch.id == channel_id), None) header = f"Channel ID: {channel_id}\n" if channel: header += f"{channel.type.title()} Name: {channel.name}\n" header += f"{len(messages)} Messages\n{'*' * 24}\n\n" data = header + "".join(msg.format() for msg in messages) self._save_data(data, f"channel_{channel_id}", as_json) def export_channel_replies(self, channel_id: str, oldest: Optional[str] = None, latest: Optional[str] = None, as_json: bool = False): """Eksportuje wątki w kanale""" history = self.api.get_channel_history(channel_id, oldest, latest) thread_messages = [msg for msg in history if "reply_count" in msg] all_replies = [] for msg in thread_messages: replies = self.api.get_replies(channel_id, msg["ts"]) all_replies.extend(replies) if as_json: data = all_replies else: messages = [SlackMessage(msg, self.users) for msg in all_replies] channel = next((ch for ch in self.channels if ch.id == channel_id), None) header = f"Threads in {channel.type if channel else 'channel'}: " header += f"{channel.name if channel else channel_id}\n" header += f"{len(messages)} Messages\n{'*' * 24}\n\n" data = header + "".join(msg.format(True) for msg in messages) self._save_data(data, f"channel-replies_{channel_id}", as_json) def export_channel_files(self, channel_id: Optional[str] = None): """Eksportuje pliki w kanale""" files = [SlackFile.from_dict(f) for f in self.api.get_files(channel_id)] for file in files: filename = f"{file.id}-{sanitize_filename(file.name)}" self.download_file(filename, file.url_private) def download_file(self, filename: str, url: str, attempts: int = 10) -> bool: if attempts == 0: return False target = os.path.join(self.output_dir, filename) if os.path.exists(target): return True try: response = requests.get(url, stream=True) response.raise_for_status() with open(target, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return True except requests.exceptions.RequestException as e: print(f"Error downloading file {filename}: {e}. {attempts} attempts left.") return self.download_file(filename, url, attempts - 1) def main(): parser = argparse.ArgumentParser(description="Eksporter danych ze Slacka") parser.add_argument("-o", help="Katalog wyjściowy (jeśli pusty, wyświetla na stdout)") parser.add_argument("--lc", action="store_true", help="Lista wszystkich konwersacji") parser.add_argument("--lu", action="store_true", help="Lista wszystkich użytkowników") parser.add_argument("--json", action="store_true", help="Wynik w formacie JSON") parser.add_argument("-c", action="store_true", help="Historia wszystkich dostępnych konwersacji") parser.add_argument("--ch", help="Z -c, ogranicza eksport do podanego ID kanału") parser.add_argument("--fr", help="Z -c, timestamp początku zakresu (Unix)") parser.add_argument("--to", help="Z -c, timestamp końca zakresu (Unix)") parser.add_argument("-r", action="store_true", help="Pobierz wątki ze wszystkich konwersacji") parser.add_argument("--files", action="store_true", help="Pobierz wszystkie pliki") args = parser.parse_args() if args.files and not args.o: print("Opcja --files wymaga określenia katalogu wyjściowego (-o)") sys.exit(1) try: config = SlackConfig.from_env() exporter = SlackExporter(config) if args.o: exporter.set_output_dir(args.o) exporter.load_users() exporter.load_channels() if args.lc: exporter.export_channel_list(args.json) if args.lu: exporter.export_user_list(args.json) if args.c or args.r: channel_ids = [args.ch] if args.ch else [ch.id for ch in exporter.channels] for channel_id in channel_ids: if args.c: exporter.export_channel_history(channel_id, args.fr, args.to, args.json) if args.r: exporter.export_channel_replies(channel_id, args.fr, args.to, args.json) if args.files and args.o: # TODO: Implementacja pobierania plików print("Funkcja pobierania plików jeszcze nie zaimplementowana") except Exception as e: print(f"Błąd: {e}") sys.exit(1) if __name__ == "__main__": # main() config = SlackConfig.from_env() exporter = SlackExporter(config, 'out')