You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
dbs-maintenance/script/backup_database.py

104 lines
3.7 KiB
Python

# coding=UTF-8
############################################################################################
#
# Author Wenguan Ding
# Date: 2025/03/07
# Description: This script is developed to run logical hot backup for mysql database(s)
#
############################################################################################
import os
import subprocess
import time
import gzip
import argparse
# 解析命令行参数
parser = argparse.ArgumentParser(description="MySQL热物理备份脚本")
parser.add_argument("-d", "--database", help="要备份的数据库名称(输入'all'备份所有数据库)", required=True)
parser.add_argument("-p", "--path", help="备份文件存储路径", required=True)
args = parser.parse_args()
# 配置信息
MYSQL_USER = "root" # MySQL用户名
MYSQL_PASSWORD = "Lysxbk713!" # MySQL密码
MYSQL_HOST = "localhost" # MySQL主机
MYSQL_PORT = "12138" # MySQL端口
# 创建备份目录(如果不存在)
if not os.path.exists(args.path):
os.makedirs(args.path)
def backup_database(db_name, backup_dir):
"""备份单个数据库"""
timestamp = time.strftime("%Y%m%d_%H%M%S")
backup_file = os.path.join(backup_dir, "{0}_backup_{1}.sql".format(db_name, timestamp))
compressed_backup_file = backup_file + ".gz"
try:
print("开始备份数据库: {0}".format(db_name))
with open(backup_file, "w") as f:
subprocess.check_call(
[
"mysqldump",
"--user={0}".format(MYSQL_USER),
"--password={0}".format(MYSQL_PASSWORD),
"--host={0}".format(MYSQL_HOST),
"--port={0}".format(MYSQL_PORT),
"--single-transaction", # 确保热备份
"--routines", # 备份存储过程和函数
"--triggers", # 备份触发器
"--events", # 备份事件
db_name,
],
stdout=f,
)
print("数据库备份成功,文件保存到: {0}".format(backup_file))
# 压缩备份文件
with open(backup_file, "rb") as f_in:
with gzip.open(compressed_backup_file, "wb") as f_out:
f_out.writelines(f_in)
print("备份文件已压缩: {0}".format(compressed_backup_file))
# 删除未压缩的备份文件
os.remove(backup_file)
print("已删除未压缩的备份文件: {0}".format(backup_file))
except subprocess.CalledProcessError as e:
print("备份失败: {0}".format(e))
except Exception as e:
print("发生错误: {0}".format(e))
def get_all_databases():
"""获取所有数据库名称"""
try:
output = subprocess.check_output(
[
"mysql",
"--user={0}".format(MYSQL_USER),
"--password={0}".format(MYSQL_PASSWORD),
"--host={0}".format(MYSQL_HOST),
"--port={0}".format(MYSQL_PORT),
"-e",
"SHOW DATABASES;",
]
)
databases = output.splitlines()[1:] # 跳过第一行标题
return databases
except subprocess.CalledProcessError as e:
print("获取数据库列表失败: {0}".format(e))
return []
# 执行备份
if args.database.lower() == "all":
databases = get_all_databases()
if databases:
print("开始备份所有数据库: {0}".format(databases))
for db in databases:
if db not in ["information_schema", "performance_schema", "mysql", "sys"]: # 跳过系统数据库
backup_database(db, args.path)
else:
print("未找到可备份的数据库")
else:
backup_database(args.database, args.path)