exporter-ng: dictionary fixes

This commit is contained in:
Piotr Dec 2025-04-30 12:26:21 +02:00
parent bafe448aa2
commit c0bb7d5aa0
No known key found for this signature in database
GPG key ID: D3B5A5D0150D147A

464
exporter_ng.py Normal file
View file

@ -0,0 +1,464 @@
import json
import os
import sys
from dataclasses import dataclass
from datetime import datetime
from time import sleep
from typing import List, Optional, Dict, Any
import requests
from dotenv import load_dotenv
from pathvalidate import sanitize_filename
@dataclass
class SlackConfig:
"""Konfiguracja klienta Slack API"""
user_token: str
additional_sleep_time: int = 2
@classmethod
def from_env(cls, env_path: str = ".env") -> 'SlackConfig':
env_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), env_path)
if os.path.isfile(env_file):
load_dotenv(env_file)
try:
token = os.environ["SLACK_USER_TOKEN"]
return cls(token)
except KeyError:
raise ValueError("Brak SLACK_USER_TOKEN w zmiennych środowiskowych")
class SlackAPI:
"""Klasa do komunikacji z API Slacka"""
def __init__(self, config: SlackConfig):
self.config = config
self.headers = {"Authorization": f"Bearer {config.user_token}"}
self.base_url = "https://slack.com/api"
def get_data(self, endpoint: str, params: dict) -> requests.Response:
"""Pobiera dane z API z obsługą rate-limitingu"""
url = f"{self.base_url}/{endpoint}"
success = False
attempt = 0
while not success:
response = requests.get(url, headers=self.headers, params=params)
attempt += 1
if response.status_code != 429:
success = True
else:
retry_after = int(response.headers["Retry-After"])
sleep_time = retry_after + self.config.additional_sleep_time
print(f"Limit zapytań przekroczony. Ponowna próba za {sleep_time} sekund (próba {attempt}).")
sleep(sleep_time)
return response
def paginated_get(self, endpoint: str, params: dict, combine_key: Optional[str] = None) -> List[Dict]:
"""Pobiera wszystkie strony wyników z API"""
result = []
next_cursor = None
while True:
if next_cursor:
params["cursor"] = next_cursor
response = self.get_data(endpoint, params)
if response.status_code != 200:
raise Exception(f"Błąd API: {response.status_code} {response.reason}")
data = response.json()
if not data.get("ok", False):
raise Exception(f"Błąd Slack API: {data}")
items = data[combine_key] if combine_key else data
result.extend(items)
metadata = data.get("response_metadata", {})
next_cursor = metadata.get("next_cursor", "").strip()
if not next_cursor:
break
return result
def get_users(self, team_id: Optional[str] = None) -> List[Dict]:
"""Pobiera listę użytkowników"""
params = {"limit": 200}
if team_id:
params["team_id"] = team_id
return self.paginated_get("users.list", params, "members")
def get_channels(self, team_id: Optional[str] = None) -> List[Dict]:
"""Pobiera listę kanałów"""
params = {
"types": "public_channel,private_channel,mpim,im",
"limit": 200
}
if team_id:
params["team_id"] = team_id
return self.paginated_get("conversations.list", params, "channels")
def get_channel_history(self, channel_id: str, oldest: Optional[str] = None,
latest: Optional[str] = None) -> List[Dict]:
"""Pobiera historię kanału"""
params = {
"channel": channel_id,
"limit": 200
}
if oldest:
params["oldest"] = oldest
if latest:
params["latest"] = latest
return self.paginated_get("conversations.history", params, "messages")
def get_replies(self, channel_id: str, thread_ts: str) -> List[Dict]:
"""Pobiera odpowiedzi w wątku"""
params = {
"channel": channel_id,
"ts": thread_ts,
"limit": 200
}
return self.paginated_get("conversations.replies", params, "messages")
def get_files(self, channel: Optional[str] = None) -> List[Dict]:
params = {}
if channel:
params["channel"] = channel
return self.paginated_get("files.list", params, "files")
@dataclass
class SlackUser:
"""Reprezentacja użytkownika Slack"""
id: str
name: str
real_name: Optional[str] = None
display_name: Optional[str] = None
is_admin: bool = False
is_owner: bool = False
is_primary_owner: bool = False
is_restricted: bool = False
is_ultra_restricted: bool = False
is_bot: bool = False
is_app_user: bool = False
tz: Optional[str] = None
@classmethod
def from_dict(cls, data: Dict) -> 'SlackUser':
return cls(
id=data["id"],
name=data.get("name", ""),
real_name=data.get("real_name"),
display_name=data.get("display_name"),
is_admin=data.get("is_admin", False),
is_owner=data.get("is_owner", False),
is_primary_owner=data.get("is_primary_owner", False),
is_restricted=data.get("is_restricted", False),
is_ultra_restricted=data.get("is_ultra_restricted", False),
is_bot=data.get("is_bot", False),
is_app_user=data.get("is_app_user", False),
tz=data.get("tz")
)
def get_display_name(self) -> str:
"""Zwraca nazwę wyświetlaną użytkownika"""
return self.real_name or self.display_name or self.name or "[brak nazwy]"
def format(self) -> str:
"""Formatuje informacje o użytkowniku"""
parts = [f"[{self.id}] {self.name}"]
if self.real_name:
parts.append(f"({self.real_name})")
if self.tz:
parts.append(self.tz)
user_types = []
for attr in ['is_admin', 'is_owner', 'is_primary_owner', 'is_restricted',
'is_ultra_restricted', 'is_bot', 'is_app_user']:
if getattr(self, attr):
user_types.append(attr[3:])
if user_types:
parts.append("|".join(user_types))
return ", ".join(parts)
class SlackChannel:
"""Reprezentacja kanału Slack"""
def __init__(self, data: Dict, users: Dict[str, SlackUser]):
self.id = data["id"]
self.name = data.get("name", "")
self.is_private = data.get("is_private", False)
self.is_im = data.get("is_im", False)
self.is_mpim = data.get("is_mpim", False)
self.is_group = data.get("is_group", False)
self._creator_id = data.get("creator_id")
self._user_id = data.get("user_id")
self.user = users[self._user_id].get_display_name() if self._user_id and self._user_id in users else "(nieznany)"
self._mpim_users = self._list_mpim({u.name: u for u in users.values()}) if self.is_mpim else []
self.label = self._create_label(users)
@property
def type(self) -> str:
if self.is_im:
return "direct_message"
elif self.is_mpim:
return "multiparty-direct_message"
elif self.is_group:
return "group"
else:
return "channel"
@property
def short_label(self) -> str:
if self.is_im:
return f"(DM) {self.user}"
elif self.is_mpim:
return f"(MPDM) {', '.join(self._mpim_users)}"
return self.name
def _list_mpim(self, users: Dict[str, SlackUser]) -> List[str]:
result = []
for part in self.name.split("-"):
name = users.get(part)
if name:
result.append(name.get_display_name())
return result
def _create_label(self, users: Dict[str, SlackUser]) -> str:
"""Formatuje informacje o kanale"""
parts = [f"[{self.id}]"]
if self.name:
parts.append(f" {self.name}:")
if self.is_private:
parts.append("private")
parts.append(self.type)
if self._creator_id and self._creator_id in users:
parts.append(f"created by {users[self._creator_id].get_display_name()}")
elif self._user_id and self._user_id in users:
parts.append(f"with {users[self._user_id].get_display_name()}")
return " ".join(parts)
@dataclass
class SlackFile:
"""Reprezentacja pliku Slack"""
id: str
name: str
url_private: str
@classmethod
def from_dict(cls, data: Dict) -> 'SlackFile':
return cls(
id=data["id"],
name=data.get("name", ""),
url_private=data.get("url_private", "")
)
class SlackMessage:
"""Reprezentacja wiadomości Slack"""
def __init__(self, data: Dict, users: Dict[str, SlackUser]):
self.timestamp = float(data["ts"])
self.text = data.get("text", "[no message content]")
self.user_id = data.get("user")
self.reactions = data.get("reactions", [])
self.files = data.get("files", [])
self.users = users
self.has_thread = "reply_count" in data
self.parent_user_id = data.get("parent_user_id")
def format(self, indent: bool = False) -> str:
"""Formatuje wiadomość"""
timestamp = datetime.fromtimestamp(round(self.timestamp)).strftime("%Y-%m-%d %H:%M:%S")
user = self.users.get(self.user_id)
user_info = f"{user.name} ({user.get_display_name()})" if user else "none"
text = self.text
for uid, u in self.users.items():
text = text.replace(f"<@{uid}>", f"<@{uid}> ({u.name})")
lines = [
f"Message at {timestamp}",
f"User: {user_info}",
text
]
if self.reactions:
reaction_parts = []
for reaction in self.reactions:
users = [self.users[uid].name for uid in reaction["users"] if uid in self.users]
reaction_parts.append(f"{reaction['name']} ({', '.join(users)})")
lines.append("Reactions: " + ", ".join(reaction_parts))
if self.files:
lines.append("Files:")
for file in self.files:
if "name" in file and "url_private_download" in file:
lines.append(f" - [{file['id']}] {file['name']}, {file['url_private_download']}")
else:
lines.append(f" - [{file['id']}] [deleted, oversize, or unavailable file]")
message = "\n".join(lines)
if indent:
message = "\n".join("\t" + line for line in message.split("\n"))
return message + "\n\n" + "*" * 24 + "\n\n"
class SlackExporter:
"""Główna klasa eksportera"""
def __init__(self, config: SlackConfig, output_dir: str = 'out'):
self.api = SlackAPI(config)
self.timestamp = datetime.now().strftime("%Y-%m-%d_%H%M%S")
self.output_dir = self._set_output_dir(output_dir)
self.users: Dict[str, SlackUser] = self._load_users()
self.channels: List[SlackChannel] = self._load_channels()
def _set_output_dir(self, path: str) -> str:
"""Ustawia katalog wyjściowy"""
parent_dir = os.path.abspath(os.path.expanduser(os.path.expandvars(path)))
output_dir = os.path.join(parent_dir, f"slack_export_{self.timestamp}")
os.makedirs(output_dir, exist_ok=True)
return output_dir
def _load_users(self) -> Dict[str, SlackUser]:
"""Ładuje użytkowników"""
# users_data = self.api.get_users()
users_data = json.load(open("out/user_list.json", "r", encoding="utf-8"))
return {u["id"]: SlackUser.from_dict(u) for u in users_data}
def _load_channels(self) -> List[SlackChannel]:
"""Ładuje kanały"""
# channels_data = self.api.get_channels()
channels_data = json.load(open("out/channel_list.json", "r", encoding="utf-8"))
return [SlackChannel(ch, self.users) for ch in channels_data]
def _save_data(self, data: Any, filename: str, as_json: bool = False):
"""Zapisuje dane do pliku"""
if not self.output_dir:
json.dump(data, sys.stdout, indent=4)
return
ext = "json" if as_json else "txt"
filepath = os.path.join(self.output_dir, f"{filename}.{ext}")
print(f"Zapisywanie do {filepath}")
with open(filepath, mode="w", encoding="utf-8") as f:
if as_json:
json.dump(data, f, indent=4)
else:
f.write(data)
def export_channels(self, channels: List[str]):
with open(os.path.join(self.output_dir, "channels.txt"), "w", encoding="utf-8") as f:
for channel in channels:
f.write(str(channel) + "\n")
pass
def export_channel_list(self, as_json: bool = False):
"""Eksportuje listę kanałów"""
if as_json:
data = [vars(ch) for ch in self.channels]
else:
data = "\n".join(ch.format(self.users) for ch in self.channels)
self._save_data(data, "channel_list", as_json)
def export_user_list(self, as_json: bool = False):
"""Eksportuje listę użytkowników"""
if as_json:
data = [vars(u) for u in self.users.values()]
else:
data = "\n".join(u.format() for u in self.users.values())
self._save_data(data, "user_list", as_json)
def export_channel_history(self, channel_id: str, oldest: Optional[str] = None,
latest: Optional[str] = None, as_json: bool = False):
"""Eksportuje historię kanału"""
history = self.api.get_channel_history(channel_id, oldest, latest)
if as_json:
data = history
else:
messages = [SlackMessage(msg, self.users) for msg in history]
channel = next((ch for ch in self.channels if ch.id == channel_id), None)
header = f"Channel ID: {channel_id}\n"
if channel:
header += f"{channel.type.title()} Name: {channel.name}\n"
header += f"{len(messages)} Messages\n{'*' * 24}\n\n"
data = header + "".join(msg.format() for msg in messages)
self._save_data(data, f"channel_{channel_id}", as_json)
def export_channel_replies(self, channel_id: str, oldest: Optional[str] = None,
latest: Optional[str] = None, as_json: bool = False):
"""Eksportuje wątki w kanale"""
history = self.api.get_channel_history(channel_id, oldest, latest)
thread_messages = [msg for msg in history if "reply_count" in msg]
all_replies = []
for msg in thread_messages:
replies = self.api.get_replies(channel_id, msg["ts"])
all_replies.extend(replies)
if as_json:
data = all_replies
else:
messages = [SlackMessage(msg, self.users) for msg in all_replies]
channel = next((ch for ch in self.channels if ch.id == channel_id), None)
header = f"Threads in {channel.type if channel else 'channel'}: "
header += f"{channel.name if channel else channel_id}\n"
header += f"{len(messages)} Messages\n{'*' * 24}\n\n"
data = header + "".join(msg.format(True) for msg in messages)
self._save_data(data, f"channel-replies_{channel_id}", as_json)
def export_channel_files(self, channel_id: Optional[str] = None):
"""Eksportuje pliki w kanale"""
files = [SlackFile.from_dict(f) for f in self.api.get_files(channel_id)]
for file in files:
filename = f"{file.id}-{sanitize_filename(file.name)}"
self.download_file(filename, file.url_private)
def download_file(self, filename: str, url: str, attempts: int = 10) -> bool:
if attempts == 0:
return False
target = os.path.join(self.output_dir, filename)
if os.path.exists(target):
return True
try:
response = requests.get(url, stream=True)
response.raise_for_status()
with open(target, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return True
except requests.exceptions.RequestException as e:
print(f"Error downloading file {filename}: {e}. {attempts} attempts left.")
return self.download_file(filename, url, attempts - 1)
if __name__ == "__main__":
config = SlackConfig.from_env()
exporter = SlackExporter(config)