使用tg-bot管理vps

on 2026-03-06

依赖的包

python3 -m venv venv
source venv/bin/activate
pip3 install python-telegram-bot
# bot.py
import subprocess
import logging
import os
from telegram import Update, BotCommand, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes, CallbackQueryHandler
from dotenv import load_dotenv

load_dotenv()

# ===== 配置区 =====
# 建议通过环境变量传入,避免 Token 硬编码泄露:
# export BOT_TOKEN="your_token"
# export ALLOWED_USERS="123456789,1111111"
BOT_TOKEN = os.environ.get("BOT_TOKEN", "在这里填入Token或通过环境变量传入")
ALLOWED_USERS = set(
    int(uid) for uid in os.environ.get("ALLOWED_USERS", "5469383389").split(",")
)

# 指令 -> (脚本路径, 菜单描述)
COMMANDS = {
    "update_blog":  ("/opt/app/tg-bot/scripts/update_blog.sh", "更新博客"),
    "docker":  ("/root/scripts/docker_manage.sh", "Docker 管理"),
    # 继续添加: "cmd_name": ("/path/to/script.sh", "描述"),
}
# ==================

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger(__name__)


def is_allowed(user_id: int) -> bool:
    return user_id in ALLOWED_USERS

# ===== 工具函数 =====

def get_containers(status: str = "running") -> list[str]:
    """获取容器列表,status: running / all"""
    args = ["docker", "ps", "--format", "{{.Names}}"]
    if status == "all":
        args.append("-a")
    result = subprocess.run(args, capture_output=True, text=True)
    containers = result.stdout.strip().split("\n")
    return [c for c in containers if c]


def run_docker_cmd(args: list[str]) -> tuple[bool, str]:
    """执行 docker 命令,返回 (成功, 输出)"""
    result = subprocess.run(
        ["docker"] + args,
        capture_output=True, text=True, timeout=30
    )
    output = result.stdout.strip() or result.stderr.strip()
    return result.returncode == 0, output


def make_keyboard(buttons: list[tuple[str, str]], cols: int = 1) -> InlineKeyboardMarkup:
    """
    构建内联键盘
    buttons: [(显示文字, callback_data), ...]
    cols: 每行几个按钮
    """
    kb = []
    row = []
    for label, data in buttons:
        row.append(InlineKeyboardButton(label, callback_data=data))
        if len(row) == cols:
            kb.append(row)
            row = []
    if row:
        kb.append(row)
    kb.append([InlineKeyboardButton("🔙 返回主菜单", callback_data="docker:main")])
    return InlineKeyboardMarkup(kb)


# ===== 一级菜单:Docker 主菜单 =====

async def docker_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE):
    if not is_allowed(update.effective_user.id):
        await update.message.reply_text("⛔ 无权限")
        return
    await update.message.reply_text(
        "🐳 Docker 管理",
        reply_markup=docker_main_menu()
    )


def docker_main_menu() -> InlineKeyboardMarkup:
    buttons = [
        ("⬆️  升级容器", "docker:pull"),
        ("🔄 重启容器", "docker:restart"),
        ("⏹ 停止容器", "docker:stop"),
        ("▶️ 启动容器", "docker:start"),
        ("📋 查看日志", "docker:logs"),
        ("📊 容器状态", "docker:status"),
    ]
    kb = [[InlineKeyboardButton(label, callback_data=data)] for label, data in buttons]
    kb.append([InlineKeyboardButton("❌ 取消", callback_data="docker:cancel")])
    return InlineKeyboardMarkup(kb)


# ===== 二级菜单:选择容器 =====

ACTION_CONFIG = {
    "pull":    {"label": "升级",   "emoji": "⬆️", "status": "running"},
    "restart": {"label": "重启",   "emoji": "🔄", "status": "running"},
    "stop":    {"label": "停止",   "emoji": "⏹",  "status": "running"},
    "start":   {"label": "启动",   "emoji": "▶️", "status": "all"},     # 启动显示所有容器
    "logs":    {"label": "查看日志","emoji": "📋", "status": "running"},
    "status":  {"label": "查看状态","emoji": "📊", "status": "all"},
}


async def show_container_list(query, action: str):
    """展示容器列表作为二级菜单"""
    cfg = ACTION_CONFIG[action]
    containers = get_containers(cfg["status"])

    if not containers:
        await query.edit_message_text("没有可用的容器")
        return

    buttons = [
        (name, f"docker:{action}:{name}")
        for name in containers
    ]
    await query.edit_message_text(
        f"{cfg['emoji']} 选择要{cfg['label']}的容器:",
        reply_markup=make_keyboard(buttons)
    )


# ===== 三级操作:执行具体命令 =====

async def execute_action(query, action: str, container: str):
    """执行具体 docker 操作"""
    cfg = ACTION_CONFIG[action]
    await query.edit_message_text(f"⏳ 正在{cfg['label']} {container}...")

    if action == "pull":
        # 获取镜像名
        ok, image = run_docker_cmd([
            "inspect", "--format", "{{.Config.Image}}", container
        ])
        if not ok:
            await query.edit_message_text(f"❌ 获取镜像信息失败\n```\n{image}\n```", parse_mode="Markdown")
            return

        image = image.strip()
        await query.edit_message_text(f"⏳ 正在拉取 {image}...")

        ok, output = run_docker_cmd(["pull", image])
        if ok:
            await query.edit_message_text(
                f"✅ 镜像 {image} 已更新\n请手动重启容器以生效。",
                reply_markup=make_keyboard([
                    (f"🔄 立即重启 {container}", f"docker:restart:{container}")
                ])
            )
        else:
            await query.edit_message_text(f"❌ 拉取失败\n```\n{output}\n```", parse_mode="Markdown")
        return
    if action == "logs":
        ok, output = run_docker_cmd(["logs", "--tail", "50", container])
        output = output[-3000:] if len(output) > 3000 else output
        status = "📋 最近 50 行日志"
        await query.edit_message_text(f"{status}{container}\n```\n{output or '(无日志)'}\n```", parse_mode="Markdown")
        return

    if action == "status":
        ok, output = run_docker_cmd(["inspect", "--format",
            "状态: {{.State.Status}}\nIP: {{.NetworkSettings.IPAddress}}\n启动时间: {{.State.StartedAt}}",
            container])
        await query.edit_message_text(f"📊 {container}\n```\n{output}\n```", parse_mode="Markdown")
        return

    # restart / stop / start
    docker_args = {
        "pull": ["pull", container],
        "restart": ["restart", container],
        "stop":    ["stop",    container],
        "start":   ["start",   container],
    }
    ok, output = run_docker_cmd(docker_args[action])

    if ok:
        await query.edit_message_text(
            f"✅ {container} {cfg['label']}成功",
            reply_markup=InlineKeyboardMarkup([[
                InlineKeyboardButton("🔙 返回主菜单", callback_data="docker:main"),
                InlineKeyboardButton("❌ 取消", callback_data="docker:cancel")
            ]])
        )
    else:
        await query.edit_message_text(f"❌ {cfg['label']}失败\n```\n{output}\n```", parse_mode="Markdown")


# ===== 统一回调路由 =====

async def docker_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
    query = update.callback_query
    await query.answer()

    if not is_allowed(query.from_user.id):
        await query.edit_message_text("⛔ 无权限")
        return

    parts = query.data.split(":")  # docker:action 或 docker:action:container

    match parts:
        case ["docker", "main"]:
            await query.edit_message_text("🐳 Docker 管理", reply_markup=docker_main_menu())

        case ["docker", "cancel"]:
            await query.edit_message_text("已取消")

        case ["docker", action] if action in ACTION_CONFIG:
            # 二级菜单:展示容器列表
            await show_container_list(query, action)

        case ["docker", action, container] if action in ACTION_CONFIG:
            # 三级操作:执行命令
            await execute_action(query, action, container)

        case _:
            await query.edit_message_text("❓ 未知操作")



async def set_commands(app):
    """Bot 启动时向 Telegram 注册命令菜单"""
    commands = [
    ]
    for cmd, (_, desc) in COMMANDS.items():
        commands.append(BotCommand(cmd, desc))
    await app.bot.set_my_commands(commands)
    logger.info("命令菜单注册成功,共 %d 条", len(commands))


async def run_script(update: Update, context: ContextTypes.DEFAULT_TYPE, script_path: str):
    user = update.effective_user
    if not is_allowed(user.id):
        await update.message.reply_text("⛔ 无权限")
        logger.warning("未授权访问 user_id=%s username=%s", user.id, user.username)
        return

    # 检查脚本是否存在
    if not os.path.isfile(script_path):
        await update.message.reply_text(f"❌ 脚本不存在:`{script_path}`", parse_mode="Markdown")
        logger.error("脚本不存在: %s", script_path)
        return

    logger.info("执行脚本 user_id=%s script=%s", user.id, script_path)
    await update.message.reply_text("⏳ 执行中...")

    try:
        result = subprocess.run(
            ["bash", script_path],
            capture_output=True,
            text=True,
            timeout=60,
        )
        stdout = result.stdout.strip()
        stderr = result.stderr.strip()

        if result.returncode == 0:
            output = stdout or "(无输出)"
            status = "✅ 执行成功"
        else:
            output = stderr or stdout or "(无输出)"
            status = f"⚠️ 退出码 {result.returncode}"

        # Telegram 消息最长 4096 字符,代码块占 8 字符,留余量
        if len(output) > 3800:
            output = output[:3800] + "\n...(输出已截断)"

        await update.message.reply_text(
            f"{status}\n```\n{output}\n```",
            parse_mode="Markdown",
        )

    except subprocess.TimeoutExpired:
        await update.message.reply_text("⚠️ 执行超时(超过 60 秒)")
        logger.warning("脚本超时: %s", script_path)
    except Exception as e:
        await update.message.reply_text(f"❌ 异常: {e}")
        logger.exception("脚本执行异常: %s", script_path)




def main():
    app = ApplicationBuilder().token(BOT_TOKEN).build()

    app.add_handler(CommandHandler("docker", docker_cmd))
    app.add_handler(CallbackQueryHandler(docker_callback, pattern="^docker:"))

    # 从 COMMANDS 动态注册,与菜单保持同步
    for cmd, (script, _) in COMMANDS.items():
        def make_handler(s):
            async def handler(update, context):
                await run_script(update, context, s)
            return handler
        app.add_handler(CommandHandler(cmd, make_handler(script)))

    # Bot 启动后自动注册菜单
    app.post_init = set_commands

    logger.info("Bot 启动中...")
    app.run_polling()


if __name__ == "__main__":
    main()