Files
tg_to_vk/tg_to_vk.py

355 lines
13 KiB
Python
Raw Permalink Normal View History

2025-08-27 00:11:30 +03:00
import os
import requests
import logging
import time
from pathlib import Path
from dotenv import load_dotenv
# Инициализация путей
BASE_DIR = Path(__file__).parent
DATA_DIR = BASE_DIR / "data"
LOG_DIR = BASE_DIR / "logs"
# Создание директорий
DATA_DIR.mkdir(exist_ok=True, parents=True)
LOG_DIR.mkdir(exist_ok=True, parents=True)
# Загрузка переменных окружения
load_dotenv()
# Конфигурация
TELEGRAM_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
TELEGRAM_CHANNEL = os.getenv('TELEGRAM_CHANNEL')
VK_TOKEN = os.getenv('VK_ACCESS_TOKEN')
VK_GROUP_ID = int(os.getenv('VK_GROUP_ID', '-14409962'))
CHECK_INTERVAL = int(os.getenv('CHECK_INTERVAL', '180'))
# Логирование
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_DIR / 'vk_crosspost.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('VK_CrossPost')
class TelegramToVK:
def __init__(self):
self.MAX_ATTACHMENTS = 10
self.LAST_ID_FILE = DATA_DIR / "last_id.txt"
self.last_id = self._load_last_id()
logger.info(f"Bot initialized. Last processed ID: {self.last_id}")
def _load_last_id(self):
"""Загрузка последнего ID с созданием файла при необходимости"""
try:
if self.LAST_ID_FILE.exists():
with open(self.LAST_ID_FILE, 'r') as f:
content = f.read().strip()
if content.isdigit():
return int(content)
# Если файла нет или он пуст/поврежден
last_id = self._fetch_last_post_id()
self._save_last_id(last_id)
return last_id
except Exception as e:
logger.error(f"Error loading last_id: {e}")
return 0
def _fetch_last_post_id(self):
"""Получение последнего ID поста из Telegram"""
try:
response = requests.get(
f'https://api.telegram.org/bot{TELEGRAM_TOKEN}/getUpdates',
params={'limit': 1},
timeout=10
).json()
if response.get('result'):
return response['result'][-1]['update_id']
return 0
except Exception as e:
logger.error(f"Error fetching last post ID: {e}")
return 0
def _save_last_id(self, last_id):
"""Атомарное сохранение ID последнего поста"""
try:
temp_file = self.LAST_ID_FILE.with_suffix('.tmp')
with open(temp_file, 'w') as f:
f.write(str(last_id))
temp_file.replace(self.LAST_ID_FILE)
except Exception as e:
logger.error(f"Failed to save last_id: {e}")
def _get_telegram_updates(self):
"""Получение новых постов"""
try:
response = requests.get(
f'https://api.telegram.org/bot{TELEGRAM_TOKEN}/getUpdates',
params={
'offset': self.last_id + 1,
'allowed_updates': ['channel_post'],
'timeout': 30
},
timeout=35
).json()
if not response.get('ok'):
logger.error(f"Telegram API error: {response.get('description')}")
return []
return response.get('result', [])
except Exception as e:
logger.error(f"Error getting updates: {e}")
return []
def _process_media(self, post):
"""Обработка медиа без дублирования"""
attachments = []
# Обработка видео
if 'video' in post:
video = post['video']
if attachment := self._upload_media(video['file_id'], 'video'):
attachments.append(attachment)
logger.debug("Added video attachment")
# Обработка фото (только самое качественное)
if 'photo' in post and len(post['photo']) > 0:
best_photo = post['photo'][-1]
if attachment := self._upload_media(best_photo['file_id'], 'photo'):
attachments.append(attachment)
logger.debug(f"Added photo (best of {len(post['photo'])} versions)")
return attachments[:self.MAX_ATTACHMENTS]
def _upload_media(self, file_id, media_type):
"""Загрузка медиа в VK"""
try:
file_url = self._get_telegram_file_url(file_id)
if not file_url:
return None
if media_type == 'photo':
# Получение URL для загрузки фото
upload_server = requests.get(
'https://api.vk.com/method/photos.getWallUploadServer',
params={
'group_id': abs(VK_GROUP_ID),
'access_token': VK_TOKEN,
'v': '5.131'
},
timeout=10
).json()
if 'error' in upload_server:
logger.error(f"VK upload server error: {upload_server['error']}")
return None
# Загрузка фото
file_data = requests.get(file_url, timeout=10).content
upload = requests.post(
upload_server['response']['upload_url'],
files={'photo': ('image.jpg', file_data)},
timeout=15
).json()
# Сохранение фото
save_response = requests.get(
'https://api.vk.com/method/photos.saveWallPhoto',
params={
'group_id': abs(VK_GROUP_ID),
'photo': upload['photo'],
'server': upload['server'],
'hash': upload['hash'],
'access_token': VK_TOKEN,
'v': '5.131'
},
timeout=10
).json()
if 'error' in save_response:
logger.error(f"VK save photo error: {save_response['error']}")
return None
photo = save_response['response'][0]
return f"photo{photo['owner_id']}_{photo['id']}"
elif media_type == 'video':
# Загрузка видео
upload_server = requests.get(
'https://api.vk.com/method/video.save',
params={
'name': 'Video from Telegram',
'group_id': abs(VK_GROUP_ID),
'access_token': VK_TOKEN,
'v': '5.131'
},
timeout=10
).json()
if 'error' in upload_server:
logger.error(f"VK video save error: {upload_server['error']}")
return None
file_data = requests.get(file_url, timeout=15).content
upload = requests.post(
upload_server['response']['upload_url'],
files={'video_file': ('video.mp4', file_data)},
timeout=20
).json()
return f"video{upload['owner_id']}_{upload['video_id']}"
except Exception as e:
logger.error(f"Error uploading {media_type}: {e}")
return None
def _get_telegram_file_url(self, file_id):
"""Получение URL файла из Telegram"""
try:
response = requests.get(
f'https://api.telegram.org/bot{TELEGRAM_TOKEN}/getFile',
params={'file_id': file_id},
timeout=10
).json()
if response.get('ok'):
return f"https://api.telegram.org/file/bot{TELEGRAM_TOKEN}/{response['result']['file_path']}"
logger.error(f"Telegram file error: {response.get('description')}")
except Exception as e:
logger.error(f"Error getting file URL: {e}")
return None
def _post_to_vk(self, text, attachments):
"""Публикация поста в VK"""
if not text and not attachments:
logger.warning("Skipping empty post")
return False
params = {
'owner_id': VK_GROUP_ID,
'from_group': 1,
'access_token': VK_TOKEN,
'v': '5.131'
}
if text:
params['message'] = text[:1000] # Лимит VK
logger.debug(f"Post text: {text[:50]}...")
if attachments:
params['attachments'] = ','.join(attachments[:10]) # Лимит VK
logger.debug(f"Attachments: {len(attachments)} items")
try:
response = requests.post(
'https://api.vk.com/method/wall.post',
params=params,
timeout=15
).json()
if 'error' in response:
logger.error(f"VK API error: {response['error']}")
return False
post_id = response['response']['post_id']
logger.info(f"Posted: https://vk.com/wall{VK_GROUP_ID}_{post_id}")
return True
except Exception as e:
logger.error(f"Error posting to VK: {e}")
return False
def run(self):
"""Основной цикл работы"""
logger.info("Starting Telegram to VK crossposting bot...")
try:
while True:
self._process_new_posts()
logger.debug(f"Waiting {CHECK_INTERVAL} seconds...")
time.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
logger.info("Bot stopped by user")
except Exception as e:
logger.critical(f"Fatal error: {e}", exc_info=True)
raise
def _process_new_posts(self):
updates = self._get_telegram_updates()
if not updates:
logger.debug("No new updates")
return
logger.info(f"Found {len(updates)} new updates")
# Группировка по media_group_id (альбом) или update_id (одиночный пост)
grouped_posts = {}
for update in sorted(updates, key=lambda x: x['update_id']):
current_id = update['update_id']
if current_id <= self.last_id:
continue
post = update.get('channel_post')
if not post:
continue
group_id = post.get('media_group_id', current_id) # альбом или одиночка
grouped_posts.setdefault(group_id, []).append((current_id, post))
# Обработка каждой группы постов
for group_id, posts in grouped_posts.items():
posts.sort(key=lambda x: x[0]) # сортировка внутри группы по update_id
# Текст берём из первого поста группы
first_post = posts[0][1]
text = first_post.get('text') or first_post.get('caption') or ""
attachments = []
for _, post in posts:
# Фото (берём самое качественное из набора размеров)
photos = post.get('photo') or []
if photos:
best_photo = photos[-1]
att = self._upload_media(best_photo['file_id'], 'photo')
if att:
attachments.append(att)
# Видео
if 'video' in post:
att = self._upload_media(post['video']['file_id'], 'video')
if att:
attachments.append(att)
# Ограничим количество вложений (и так в _post_to_vk есть [:10], но пусть будет явно)
if attachments:
attachments = attachments[:self.MAX_ATTACHMENTS]
# Публикуем, если есть текст ИЛИ вложения.
if text or attachments:
if self._post_to_vk(text, attachments):
self.last_id = posts[-1][0] # помечаем обработанным последний update из группы
self._save_last_id(self.last_id)
logger.info(f"Processed group {group_id} (text={'yes' if text else 'no'}, attachments={len(attachments)})")
else:
logger.error(f"Failed to post group {group_id}")
else:
# Совсем пустая группа (редко, но бывает) — помечаем как обработанную, чтобы не зациклиться
self.last_id = posts[-1][0]
self._save_last_id(self.last_id)
logger.info(f"Skipped empty group {group_id}: no text and no media")
if __name__ == '__main__':
bot = TelegramToVK()
bot.run()