Files
tg_to_vk/tg_to_vk.py
2025-08-27 00:11:30 +03:00

355 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import requests
import logging
import time
from pathlib import Path
from dotenv import load_dotenv
# Инициализация путей
BASE_DIR = Path(__file__).parent
DATA_DIR = BASE_DIR / "data"
LOG_DIR = BASE_DIR / "logs"
# Создание директорий
DATA_DIR.mkdir(exist_ok=True, parents=True)
LOG_DIR.mkdir(exist_ok=True, parents=True)
# Загрузка переменных окружения
load_dotenv()
# Конфигурация
TELEGRAM_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
TELEGRAM_CHANNEL = os.getenv('TELEGRAM_CHANNEL')
VK_TOKEN = os.getenv('VK_ACCESS_TOKEN')
VK_GROUP_ID = int(os.getenv('VK_GROUP_ID', '-14409962'))
CHECK_INTERVAL = int(os.getenv('CHECK_INTERVAL', '180'))
# Логирование
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_DIR / 'vk_crosspost.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('VK_CrossPost')
class TelegramToVK:
def __init__(self):
self.MAX_ATTACHMENTS = 10
self.LAST_ID_FILE = DATA_DIR / "last_id.txt"
self.last_id = self._load_last_id()
logger.info(f"Bot initialized. Last processed ID: {self.last_id}")
def _load_last_id(self):
"""Загрузка последнего ID с созданием файла при необходимости"""
try:
if self.LAST_ID_FILE.exists():
with open(self.LAST_ID_FILE, 'r') as f:
content = f.read().strip()
if content.isdigit():
return int(content)
# Если файла нет или он пуст/поврежден
last_id = self._fetch_last_post_id()
self._save_last_id(last_id)
return last_id
except Exception as e:
logger.error(f"Error loading last_id: {e}")
return 0
def _fetch_last_post_id(self):
"""Получение последнего ID поста из Telegram"""
try:
response = requests.get(
f'https://api.telegram.org/bot{TELEGRAM_TOKEN}/getUpdates',
params={'limit': 1},
timeout=10
).json()
if response.get('result'):
return response['result'][-1]['update_id']
return 0
except Exception as e:
logger.error(f"Error fetching last post ID: {e}")
return 0
def _save_last_id(self, last_id):
"""Атомарное сохранение ID последнего поста"""
try:
temp_file = self.LAST_ID_FILE.with_suffix('.tmp')
with open(temp_file, 'w') as f:
f.write(str(last_id))
temp_file.replace(self.LAST_ID_FILE)
except Exception as e:
logger.error(f"Failed to save last_id: {e}")
def _get_telegram_updates(self):
"""Получение новых постов"""
try:
response = requests.get(
f'https://api.telegram.org/bot{TELEGRAM_TOKEN}/getUpdates',
params={
'offset': self.last_id + 1,
'allowed_updates': ['channel_post'],
'timeout': 30
},
timeout=35
).json()
if not response.get('ok'):
logger.error(f"Telegram API error: {response.get('description')}")
return []
return response.get('result', [])
except Exception as e:
logger.error(f"Error getting updates: {e}")
return []
def _process_media(self, post):
"""Обработка медиа без дублирования"""
attachments = []
# Обработка видео
if 'video' in post:
video = post['video']
if attachment := self._upload_media(video['file_id'], 'video'):
attachments.append(attachment)
logger.debug("Added video attachment")
# Обработка фото (только самое качественное)
if 'photo' in post and len(post['photo']) > 0:
best_photo = post['photo'][-1]
if attachment := self._upload_media(best_photo['file_id'], 'photo'):
attachments.append(attachment)
logger.debug(f"Added photo (best of {len(post['photo'])} versions)")
return attachments[:self.MAX_ATTACHMENTS]
def _upload_media(self, file_id, media_type):
"""Загрузка медиа в VK"""
try:
file_url = self._get_telegram_file_url(file_id)
if not file_url:
return None
if media_type == 'photo':
# Получение URL для загрузки фото
upload_server = requests.get(
'https://api.vk.com/method/photos.getWallUploadServer',
params={
'group_id': abs(VK_GROUP_ID),
'access_token': VK_TOKEN,
'v': '5.131'
},
timeout=10
).json()
if 'error' in upload_server:
logger.error(f"VK upload server error: {upload_server['error']}")
return None
# Загрузка фото
file_data = requests.get(file_url, timeout=10).content
upload = requests.post(
upload_server['response']['upload_url'],
files={'photo': ('image.jpg', file_data)},
timeout=15
).json()
# Сохранение фото
save_response = requests.get(
'https://api.vk.com/method/photos.saveWallPhoto',
params={
'group_id': abs(VK_GROUP_ID),
'photo': upload['photo'],
'server': upload['server'],
'hash': upload['hash'],
'access_token': VK_TOKEN,
'v': '5.131'
},
timeout=10
).json()
if 'error' in save_response:
logger.error(f"VK save photo error: {save_response['error']}")
return None
photo = save_response['response'][0]
return f"photo{photo['owner_id']}_{photo['id']}"
elif media_type == 'video':
# Загрузка видео
upload_server = requests.get(
'https://api.vk.com/method/video.save',
params={
'name': 'Video from Telegram',
'group_id': abs(VK_GROUP_ID),
'access_token': VK_TOKEN,
'v': '5.131'
},
timeout=10
).json()
if 'error' in upload_server:
logger.error(f"VK video save error: {upload_server['error']}")
return None
file_data = requests.get(file_url, timeout=15).content
upload = requests.post(
upload_server['response']['upload_url'],
files={'video_file': ('video.mp4', file_data)},
timeout=20
).json()
return f"video{upload['owner_id']}_{upload['video_id']}"
except Exception as e:
logger.error(f"Error uploading {media_type}: {e}")
return None
def _get_telegram_file_url(self, file_id):
"""Получение URL файла из Telegram"""
try:
response = requests.get(
f'https://api.telegram.org/bot{TELEGRAM_TOKEN}/getFile',
params={'file_id': file_id},
timeout=10
).json()
if response.get('ok'):
return f"https://api.telegram.org/file/bot{TELEGRAM_TOKEN}/{response['result']['file_path']}"
logger.error(f"Telegram file error: {response.get('description')}")
except Exception as e:
logger.error(f"Error getting file URL: {e}")
return None
def _post_to_vk(self, text, attachments):
"""Публикация поста в VK"""
if not text and not attachments:
logger.warning("Skipping empty post")
return False
params = {
'owner_id': VK_GROUP_ID,
'from_group': 1,
'access_token': VK_TOKEN,
'v': '5.131'
}
if text:
params['message'] = text[:1000] # Лимит VK
logger.debug(f"Post text: {text[:50]}...")
if attachments:
params['attachments'] = ','.join(attachments[:10]) # Лимит VK
logger.debug(f"Attachments: {len(attachments)} items")
try:
response = requests.post(
'https://api.vk.com/method/wall.post',
params=params,
timeout=15
).json()
if 'error' in response:
logger.error(f"VK API error: {response['error']}")
return False
post_id = response['response']['post_id']
logger.info(f"Posted: https://vk.com/wall{VK_GROUP_ID}_{post_id}")
return True
except Exception as e:
logger.error(f"Error posting to VK: {e}")
return False
def run(self):
"""Основной цикл работы"""
logger.info("Starting Telegram to VK crossposting bot...")
try:
while True:
self._process_new_posts()
logger.debug(f"Waiting {CHECK_INTERVAL} seconds...")
time.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
logger.info("Bot stopped by user")
except Exception as e:
logger.critical(f"Fatal error: {e}", exc_info=True)
raise
def _process_new_posts(self):
updates = self._get_telegram_updates()
if not updates:
logger.debug("No new updates")
return
logger.info(f"Found {len(updates)} new updates")
# Группировка по media_group_id (альбом) или update_id (одиночный пост)
grouped_posts = {}
for update in sorted(updates, key=lambda x: x['update_id']):
current_id = update['update_id']
if current_id <= self.last_id:
continue
post = update.get('channel_post')
if not post:
continue
group_id = post.get('media_group_id', current_id) # альбом или одиночка
grouped_posts.setdefault(group_id, []).append((current_id, post))
# Обработка каждой группы постов
for group_id, posts in grouped_posts.items():
posts.sort(key=lambda x: x[0]) # сортировка внутри группы по update_id
# Текст берём из первого поста группы
first_post = posts[0][1]
text = first_post.get('text') or first_post.get('caption') or ""
attachments = []
for _, post in posts:
# Фото (берём самое качественное из набора размеров)
photos = post.get('photo') or []
if photos:
best_photo = photos[-1]
att = self._upload_media(best_photo['file_id'], 'photo')
if att:
attachments.append(att)
# Видео
if 'video' in post:
att = self._upload_media(post['video']['file_id'], 'video')
if att:
attachments.append(att)
# Ограничим количество вложений (и так в _post_to_vk есть [:10], но пусть будет явно)
if attachments:
attachments = attachments[:self.MAX_ATTACHMENTS]
# Публикуем, если есть текст ИЛИ вложения.
if text or attachments:
if self._post_to_vk(text, attachments):
self.last_id = posts[-1][0] # помечаем обработанным последний update из группы
self._save_last_id(self.last_id)
logger.info(f"Processed group {group_id} (text={'yes' if text else 'no'}, attachments={len(attachments)})")
else:
logger.error(f"Failed to post group {group_id}")
else:
# Совсем пустая группа (редко, но бывает) — помечаем как обработанную, чтобы не зациклиться
self.last_id = posts[-1][0]
self._save_last_id(self.last_id)
logger.info(f"Skipped empty group {group_id}: no text and no media")
if __name__ == '__main__':
bot = TelegramToVK()
bot.run()