# coding=UTF-8 ############################################################################################ # # Author: Wenguan Ding # Date: 2025/03/07 # Description: This script is developed to run logical hot backup for mysql database(s) # ############################################################################################ import os import subprocess import time import gzip import argparse # 解析命令行参数 parser = argparse.ArgumentParser(description="MySQL热物理备份脚本") parser.add_argument("-d", "--database", help="要备份的数据库名称(输入'all'备份所有数据库)", required=True) parser.add_argument("-p", "--path", help="备份文件存储路径", required=True) args = parser.parse_args() # 配置信息 MYSQL_USER = "root" # MySQL用户名 MYSQL_PASSWORD = "xxxxxxxxxxxx" # MySQL密码 MYSQL_HOST = "localhost" # MySQL主机 MYSQL_PORT = "12138" # MySQL端口 # 创建备份目录(如果不存在) if not os.path.exists(args.path): os.makedirs(args.path) def backup_database(db_name, backup_dir): """备份单个数据库""" timestamp = time.strftime("%Y%m%d_%H%M%S") backup_file = os.path.join(backup_dir, "{0}_backup_{1}.sql".format(db_name, timestamp)) compressed_backup_file = backup_file + ".gz" try: print("开始备份数据库: {0}".format(db_name)) with open(backup_file, "w") as f: subprocess.check_call( [ "mysqldump", "--user={0}".format(MYSQL_USER), "--password={0}".format(MYSQL_PASSWORD), "--host={0}".format(MYSQL_HOST), "--port={0}".format(MYSQL_PORT), "--single-transaction", # 确保热备份 "--routines", # 备份存储过程和函数 "--triggers", # 备份触发器 "--events", # 备份事件 db_name, ], stdout=f, ) print("数据库备份成功,文件保存到: {0}".format(backup_file)) # 压缩备份文件 with open(backup_file, "rb") as f_in: with gzip.open(compressed_backup_file, "wb") as f_out: f_out.writelines(f_in) print("备份文件已压缩: {0}".format(compressed_backup_file)) # 删除未压缩的备份文件 os.remove(backup_file) print("已删除未压缩的备份文件: {0}".format(backup_file)) except subprocess.CalledProcessError as e: print("备份失败: {0}".format(e)) except Exception as e: print("发生错误: {0}".format(e)) def get_all_databases(): """获取所有数据库名称""" try: output = subprocess.check_output( [ "mysql", "--user={0}".format(MYSQL_USER), "--password={0}".format(MYSQL_PASSWORD), "--host={0}".format(MYSQL_HOST), "--port={0}".format(MYSQL_PORT), "-e", "SHOW DATABASES;", ] ) databases = output.splitlines()[1:] # 跳过第一行标题 return databases except subprocess.CalledProcessError as e: print("获取数据库列表失败: {0}".format(e)) return [] # 执行备份 if args.database.lower() == "all": databases = get_all_databases() if databases: print("开始备份所有数据库: {0}".format(databases)) for db in databases: if db not in ["information_schema", "performance_schema", "mysql", "sys"]: # 跳过系统数据库 backup_database(db, args.path) else: print("未找到可备份的数据库") else: backup_database(args.database, args.path)