401 lines
15 KiB
Python
Executable File
401 lines
15 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Zabbix Agent Management Tool
|
|
============================
|
|
|
|
This script provides functionality to:
|
|
1. Backup Zabbix agent configuration files
|
|
2. Restore Zabbix agent configuration files
|
|
3. Upgrade Zabbix agent while preserving custom configurations
|
|
|
|
Author: GitHub Copilot
|
|
Date: September 2025
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import subprocess
|
|
import shutil
|
|
import glob
|
|
import difflib
|
|
import logging
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import re
|
|
|
|
# Configuration
|
|
ZABBIX_CONFIG_DIR = "/etc/zabbix"
|
|
SCRIPT_DIR = Path(__file__).parent.absolute()
|
|
DEFAULT_CONFIG_FILE = SCRIPT_DIR / "zabbix_agentd.conf"
|
|
BACKUP_DIR = SCRIPT_DIR / "backups"
|
|
LOG_FILE = SCRIPT_DIR / "agent_tool.log"
|
|
|
|
# Logging setup
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(LOG_FILE),
|
|
logging.StreamHandler(sys.stdout)
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ZabbixAgentTool:
|
|
def __init__(self):
|
|
self.distro_family = self._detect_distro_family()
|
|
self.backup_dir = BACKUP_DIR
|
|
self.backup_dir.mkdir(exist_ok=True)
|
|
|
|
def _detect_distro_family(self):
|
|
"""Detect if the system is Debian-based or RHEL-based"""
|
|
# Detection rules: (file_path, keywords_for_debian, keywords_for_rhel)
|
|
detection_rules = [
|
|
('/etc/debian_version', True, False),
|
|
('/etc/redhat-release', False, True),
|
|
('/etc/centos-release', False, True),
|
|
('/etc/os-release', ['debian', 'ubuntu'], ['centos', 'rhel', 'fedora'])
|
|
]
|
|
|
|
for file_path, debian_check, rhel_check in detection_rules:
|
|
if not os.path.exists(file_path):
|
|
continue
|
|
|
|
try:
|
|
if isinstance(debian_check, bool):
|
|
return 'debian' if debian_check else 'rhel'
|
|
|
|
# For os-release, check content
|
|
with open(file_path, 'r') as f:
|
|
content = f.read().lower()
|
|
if any(keyword in content for keyword in debian_check):
|
|
return 'debian'
|
|
elif any(keyword in content for keyword in rhel_check):
|
|
return 'rhel'
|
|
except Exception as e:
|
|
logger.debug(f"Error reading {file_path}: {e}")
|
|
continue
|
|
|
|
logger.warning("Unknown distribution family, defaulting to debian")
|
|
return 'debian'
|
|
|
|
def _get_config_files(self):
|
|
"""Find all Zabbix agent configuration files"""
|
|
patterns = [f"{ZABBIX_CONFIG_DIR}/zabbix_agent*.conf"]
|
|
return [f for pattern in patterns for f in glob.glob(pattern)]
|
|
|
|
def _run_command(self, command, check=True, log_output=False):
|
|
"""Run a shell command and return the result"""
|
|
logger.info(f"Running: {command}")
|
|
try:
|
|
result = subprocess.run(command, shell=True, capture_output=True, text=True, check=check)
|
|
if log_output and result.stdout:
|
|
logger.debug(f"Output: {result.stdout.strip()}")
|
|
return result
|
|
except subprocess.CalledProcessError as e:
|
|
logger.error(f"Command failed: {command}")
|
|
if e.stderr:
|
|
logger.error(f"Error: {e.stderr.strip()}")
|
|
raise
|
|
|
|
def _parse_config_file(self, config_path):
|
|
"""Parse configuration file and extract uncommented settings"""
|
|
try:
|
|
with open(config_path, 'r') as f:
|
|
return [line.strip() for line in f if line.strip() and not line.strip().startswith('#')]
|
|
except Exception as e:
|
|
logger.error(f"Error parsing config file {config_path}: {e}")
|
|
raise
|
|
|
|
def backup_configs(self):
|
|
"""Backup existing Zabbix agent configuration files"""
|
|
config_files = self._get_config_files()
|
|
if not config_files:
|
|
logger.warning("No Zabbix agent configuration files found to backup")
|
|
return None
|
|
|
|
# Create backup directory
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
backup_subdir = self.backup_dir / f"backup_{timestamp}"
|
|
backup_subdir.mkdir(exist_ok=True)
|
|
|
|
# Backup files and track results
|
|
backed_up_files = []
|
|
for config_file in config_files:
|
|
try:
|
|
backup_file = backup_subdir / Path(config_file).name
|
|
shutil.copy2(config_file, backup_file)
|
|
logger.info(f"Backed up {config_file}")
|
|
backed_up_files.append((config_file, str(backup_file)))
|
|
except Exception as e:
|
|
logger.error(f"Failed to backup {config_file}: {e}")
|
|
|
|
# Create manifest
|
|
self._create_backup_manifest(backup_subdir, backed_up_files)
|
|
logger.info(f"Backup completed: {backup_subdir}")
|
|
return str(backup_subdir)
|
|
|
|
def _create_backup_manifest(self, backup_dir, backed_up_files):
|
|
"""Create backup manifest file"""
|
|
manifest_file = backup_dir / "backup_manifest.txt"
|
|
with open(manifest_file, 'w') as f:
|
|
f.write(f"Backup created: {datetime.now()}\n")
|
|
f.write(f"Distribution family: {self.distro_family}\n")
|
|
f.write("Backed up files:\n")
|
|
for original, backup in backed_up_files:
|
|
f.write(f" {original} -> {backup}\n")
|
|
|
|
def restore_configs(self, backup_path):
|
|
"""Restore Zabbix agent configuration files from backup"""
|
|
backup_dir = Path(backup_path)
|
|
if not backup_dir.exists():
|
|
raise FileNotFoundError(f"Backup directory not found: {backup_path}")
|
|
|
|
# Log manifest if available
|
|
manifest_file = backup_dir / "backup_manifest.txt"
|
|
if manifest_file.exists():
|
|
logger.info(f"Restoring from backup: {backup_path}")
|
|
with open(manifest_file, 'r') as f:
|
|
logger.info(f"Manifest:\n{f.read()}")
|
|
|
|
# Find and restore config files
|
|
backup_configs = list(backup_dir.glob("zabbix_agent*.conf"))
|
|
if not backup_configs:
|
|
raise FileNotFoundError("No configuration files found in backup directory")
|
|
|
|
restored_files = []
|
|
for backup_file in backup_configs:
|
|
try:
|
|
target_file = Path(ZABBIX_CONFIG_DIR) / backup_file.name
|
|
|
|
# Backup current file if it exists
|
|
if target_file.exists():
|
|
shutil.copy2(target_file, target_file.with_suffix(".conf.pre-restore"))
|
|
|
|
shutil.copy2(backup_file, target_file)
|
|
logger.info(f"Restored {backup_file.name}")
|
|
restored_files.append(str(target_file))
|
|
except Exception as e:
|
|
logger.error(f"Failed to restore {backup_file}: {e}")
|
|
|
|
self._restart_zabbix_agent()
|
|
logger.info(f"Restore completed. Files: {[Path(f).name for f in restored_files]}")
|
|
return restored_files
|
|
|
|
def _restart_zabbix_agent(self):
|
|
"""Restart Zabbix agent service"""
|
|
services = ['zabbix-agent2', 'zabbix-agent', 'zabbix-agentd']
|
|
|
|
for service in services:
|
|
try:
|
|
# Check if service exists and restart it
|
|
if self._run_command(f"systemctl list-unit-files {service}.service", check=False).returncode == 0:
|
|
self._run_command(f"sudo systemctl restart {service}")
|
|
self._run_command(f"sudo systemctl enable {service}")
|
|
logger.info(f"Successfully restarted {service}")
|
|
return
|
|
except Exception as e:
|
|
logger.debug(f"Could not restart {service}: {e}")
|
|
|
|
logger.warning("Could not restart any Zabbix agent service")
|
|
|
|
def upgrade_agent(self):
|
|
"""Upgrade Zabbix agent while preserving custom configurations"""
|
|
logger.info("Starting Zabbix agent upgrade process")
|
|
|
|
# Backup and extract custom settings
|
|
backup_path = self.backup_configs()
|
|
if not backup_path:
|
|
raise Exception("Failed to create backup before upgrade")
|
|
|
|
custom_settings = {Path(f).name: self._parse_config_file(f) for f in self._get_config_files()}
|
|
|
|
# Upgrade package
|
|
self._upgrade_zabbix_package()
|
|
|
|
# Merge custom settings into new configs
|
|
for config_file in self._get_config_files():
|
|
config_name = Path(config_file).name
|
|
if config_name in custom_settings:
|
|
self._merge_custom_settings(config_file, custom_settings[config_name], backup_path)
|
|
|
|
self._restart_zabbix_agent()
|
|
logger.info("Zabbix agent upgrade completed successfully")
|
|
return backup_path
|
|
|
|
def _upgrade_zabbix_package(self):
|
|
"""Upgrade Zabbix agent package based on distribution family"""
|
|
logger.info(f"Upgrading Zabbix agent on {self.distro_family}-based system")
|
|
|
|
if self.distro_family == 'debian':
|
|
# Simple apt upgrade
|
|
self._run_command("sudo apt update")
|
|
self._run_command("sudo apt upgrade -y")
|
|
|
|
elif self.distro_family == 'rhel':
|
|
# Simple yum/dnf upgrade - try both
|
|
try:
|
|
self._run_command("sudo yum update -y")
|
|
except:
|
|
try:
|
|
self._run_command("sudo dnf update -y")
|
|
except Exception as e:
|
|
logger.warning(f"Could not upgrade packages: {e}")
|
|
else:
|
|
raise Exception(f"Unsupported distribution family: {self.distro_family}")
|
|
|
|
def _merge_custom_settings(self, new_config_file, custom_settings, backup_path):
|
|
"""Merge custom settings into new configuration file"""
|
|
logger.info(f"Merging custom settings into {new_config_file}")
|
|
|
|
# Parse custom settings into key-value pairs
|
|
custom_params = {}
|
|
for setting in custom_settings:
|
|
if '=' in setting:
|
|
key, value = setting.split('=', 1)
|
|
custom_params[key.strip()] = value.strip()
|
|
|
|
# Read and process configuration file
|
|
with open(new_config_file, 'r') as f:
|
|
lines = f.readlines()
|
|
|
|
original_lines = lines.copy()
|
|
updated_lines = []
|
|
|
|
# Process each line
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
if not stripped or stripped.startswith('#'):
|
|
updated_lines.append(line)
|
|
elif '=' in stripped:
|
|
key = stripped.split('=', 1)[0].strip()
|
|
if key in custom_params:
|
|
updated_lines.append(f"{key}={custom_params.pop(key)}\n")
|
|
else:
|
|
updated_lines.append(line)
|
|
else:
|
|
updated_lines.append(line)
|
|
|
|
# Add remaining custom parameters
|
|
if custom_params:
|
|
updated_lines.extend(["\n# Custom parameters added during upgrade\n"] +
|
|
[f"{k}={v}\n" for k, v in custom_params.items()])
|
|
|
|
# Write updated configuration and save diff
|
|
with open(new_config_file, 'w') as f:
|
|
f.writelines(updated_lines)
|
|
|
|
self._save_config_diff(new_config_file, original_lines, updated_lines, backup_path)
|
|
logger.info(f"Custom settings merged into {new_config_file}")
|
|
|
|
def _save_config_diff(self, config_file, original_lines, updated_lines, backup_path):
|
|
"""Save the differences between original and updated configuration"""
|
|
config_name = Path(config_file).name
|
|
diff_file = Path(backup_path) / f"{config_name}.diff"
|
|
|
|
diff = difflib.unified_diff(
|
|
original_lines,
|
|
updated_lines,
|
|
fromfile=f"{config_name}.original",
|
|
tofile=f"{config_name}.updated",
|
|
lineterm=''
|
|
)
|
|
|
|
with open(diff_file, 'w') as f:
|
|
f.writelines(diff)
|
|
|
|
logger.info(f"Configuration differences saved to {diff_file}")
|
|
|
|
def list_backups(self):
|
|
"""List available backups"""
|
|
if not self.backup_dir.exists():
|
|
logger.info("No backups directory found")
|
|
return []
|
|
|
|
backup_dirs = [d for d in self.backup_dir.iterdir() if d.is_dir() and d.name.startswith('backup_')]
|
|
backup_dirs.sort(key=lambda x: x.name, reverse=True) # Most recent first
|
|
|
|
backups = []
|
|
for backup_dir in backup_dirs:
|
|
manifest_file = backup_dir / "backup_manifest.txt"
|
|
info = {"path": str(backup_dir), "timestamp": backup_dir.name.replace('backup_', '')}
|
|
|
|
if manifest_file.exists():
|
|
try:
|
|
with open(manifest_file, 'r') as f:
|
|
content = f.read()
|
|
info["manifest"] = content
|
|
except Exception as e:
|
|
info["manifest"] = f"Error reading manifest: {e}"
|
|
|
|
backups.append(info)
|
|
|
|
return backups
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Zabbix Agent Management Tool")
|
|
parser.add_argument(
|
|
'action',
|
|
choices=['backup', 'restore', 'upgrade', 'list-backups'],
|
|
help='Action to perform'
|
|
)
|
|
parser.add_argument(
|
|
'--backup-path',
|
|
help='Path to backup directory (required for restore action)'
|
|
)
|
|
parser.add_argument(
|
|
'--verbose',
|
|
action='store_true',
|
|
help='Enable verbose logging'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.verbose:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
try:
|
|
tool = ZabbixAgentTool()
|
|
|
|
if args.action == 'backup':
|
|
backup_path = tool.backup_configs()
|
|
if backup_path:
|
|
print(f"Backup created successfully: {backup_path}")
|
|
else:
|
|
print("No configuration files found to backup")
|
|
|
|
elif args.action == 'restore':
|
|
if not args.backup_path:
|
|
print("Error: --backup-path is required for restore action")
|
|
sys.exit(1)
|
|
|
|
restored_files = tool.restore_configs(args.backup_path)
|
|
print(f"Restore completed successfully. Restored files: {restored_files}")
|
|
|
|
elif args.action == 'upgrade':
|
|
backup_path = tool.upgrade_agent()
|
|
print(f"Upgrade completed successfully. Backup created: {backup_path}")
|
|
|
|
elif args.action == 'list-backups':
|
|
backups = tool.list_backups()
|
|
if not backups:
|
|
print("No backups found")
|
|
else:
|
|
print("Available backups:")
|
|
for backup in backups:
|
|
print(f"\nBackup: {backup['path']}")
|
|
print(f"Timestamp: {backup['timestamp']}")
|
|
if 'manifest' in backup:
|
|
print("Manifest:")
|
|
print(backup['manifest'])
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |