Compare commits

...

3 Commits

9 changed files with 285 additions and 613 deletions

28
partitioning/CHANGELOG.md Normal file
View File

@@ -0,0 +1,28 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.4.0] - 2025-12-16
### Added
- **Monitoring**: Added `--discovery` argument for Zabbix Low-Level Discovery (LLD) of partitioned tables.
- **Monitoring**: Added `--check-days` argument to calculate days remaining until partition buffer exhaustion.
- **CLI**: Added `--version` / `-V` flag to display script version.
- **Docker**: Added `RUN_MODE=discovery` and `RUN_MODE=check` support to `entrypoint.py`.
- **Templates**: Added Zabbix 7.0 compatible template `zabbix_partitioning_template.yaml`.
### Removed
- **CLI**: Removed unimplemented `--delete` / `-d` argument.
## [0.3.0] - 2025-12-14
### Changed
- **Refactor**: Complete rewrite of `zabbix_partitioning.py` using Class-based structure (`ZabbixPartitioner`).
- **Configuration**: Switched to YAML configuration file (`zabbix_partitioning.conf`).
- **Safety**: Added checks to prevent partitioning of tables incompatible with Zabbix 7.0 schema (e.g., `auditlog` without `clock` in PK).
- **Docker**: Introduced Docker container support (`Dockerfile`, `entrypoint.py`).
### Added
- **Optimization**: Added `initial_partitioning_start` option (`db_min` vs `retention`) to speed up initialization on large DBs.
- **Reliability**: Use `pymysql` with robust connection handling and SSL support.

View File

@@ -267,8 +267,64 @@ docker run --rm \
| `RETENTION_TRENDS` | 365d | Retention for `trends*` tables | | `RETENTION_TRENDS` | 365d | Retention for `trends*` tables |
| `RETENTION_AUDIT` | 365d | Retention for `auditlog` (if enabled) | | `RETENTION_AUDIT` | 365d | Retention for `auditlog` (if enabled) |
| `ENABLE_AUDITLOG_PARTITIONING` | false | Set to `true` to partition `auditlog` | | `ENABLE_AUDITLOG_PARTITIONING` | false | Set to `true` to partition `auditlog` |
| `RUN_MODE` | maintenance | `init` (initialize), `maintenance` (daily run), or `dry-run` | | `RUN_MODE` | maintenance | `init`, `maintenance`, `dry-run`, `discovery`, `check` |
| `CHECK_TARGET` | - | Required if `RUN_MODE=check`. Table name to check (e.g. `history`). |
| `PARTITION_DAILY_[TABLE]` | - | Custom daily retention (e.g., `PARTITION_DAILY_mytable=30d`) | | `PARTITION_DAILY_[TABLE]` | - | Custom daily retention (e.g., `PARTITION_DAILY_mytable=30d`) |
| `PARTITION_WEEKLY_[TABLE]` | - | Custom weekly retention | | `PARTITION_WEEKLY_[TABLE]` | - | Custom weekly retention |
| `PARTITION_MONTHLY_[TABLE]` | - | Custom monthly retention | | `PARTITION_MONTHLY_[TABLE]` | - | Custom monthly retention |
#### Scenario F: Monitoring (Discovery)
Output Zabbix LLD JSON for table discovery.
```bash
docker run --rm \
-e DB_HOST=zabbix-db \
-e RUN_MODE=discovery \
zabbix-partitioning
```
#### Scenario G: Monitoring (Health Check)
Check days remaining for a specific table (e.g., `history`). Returns integer days.
```bash
docker run --rm \
-e DB_HOST=zabbix-db \
-e RUN_MODE=check \
-e CHECK_TARGET=history \
zabbix-partitioning
```
---
## 10. Monitoring
The script includes built-in features for monitoring the health of your partitions via Zabbix.
### 10.1 CLI Usage
- **Discovery (LLD)**:
```bash
./zabbix_partitioning.py --discovery
# Output: [{"{#TABLE}": "history", "{#PERIOD}": "daily"}, ...]
```
- **Check Days**:
```bash
./zabbix_partitioning.py --check-days history
# Output: 30 (integer days remaining)
```
- **Version**:
```bash
./zabbix_partitioning.py --version
# Output: zabbix_partitioning.py 0.3.1-test
```
### 10.2 Zabbix Template
A Zabbix 7.0 template is provided: `zabbix_partitioning_template.yaml`.
**Setup**:
1. Import the YAML template into Zabbix.
2. Install the script on the Zabbix Server or Proxy.
3. Add the `UserParameter` commands to your Zabbix Agent config (see Template description).
4. Link the template to the host running the script.
**Features**:
- **Discovery**: Automatically finds all partitioned tables.
- **Triggers**: Alerts if a table has less than 3 days of future partitions pre-created.
- **Log Monitoring**: Alerts on script execution failures.

View File

@@ -5,7 +5,8 @@ RUN pip install --no-cache-dir pymysql pyyaml
# Copy main script and entrypoint # Copy main script and entrypoint
# Note: Build context should be the parent directory 'partitioning/' # Note: Build context should be the parent directory 'partitioning/'
COPY script/zabbix_partitioning.py /usr/local/bin/zabbix_partitioning.py COPY script/zabbix_partitioning.py /usr/local/bin/
RUN mkdir -p /etc/zabbix
COPY docker/entrypoint.py /usr/local/bin/entrypoint.py COPY docker/entrypoint.py /usr/local/bin/entrypoint.py
# Set permissions # Set permissions

View File

@@ -81,13 +81,13 @@ def generate_config():
print("Generated Configuration:") print("Generated Configuration:")
print(yaml.dump(config, default_flow_style=False)) print(yaml.dump(config, default_flow_style=False))
with open('/etc/zabbix_partitioning.conf', 'w') as f: with open('/etc/zabbix/zabbix_partitioning.conf', 'w') as f:
yaml.dump(config, f, default_flow_style=False) yaml.dump(config, f, default_flow_style=False)
def main(): def main():
generate_config() generate_config()
cmd = [sys.executable, '/usr/local/bin/zabbix_partitioning.py', '-c', '/etc/zabbix_partitioning.conf'] cmd = [sys.executable, '/usr/local/bin/zabbix_partitioning.py', '-c', '/etc/zabbix/zabbix_partitioning.conf']
run_mode = os.getenv('RUN_MODE', 'maintenance') run_mode = os.getenv('RUN_MODE', 'maintenance')
if run_mode == 'init': if run_mode == 'init':
@@ -96,6 +96,15 @@ def main():
cmd.append('--dry-run') cmd.append('--dry-run')
if os.getenv('DRY_RUN_INIT') == 'true': if os.getenv('DRY_RUN_INIT') == 'true':
cmd.append('--init') cmd.append('--init')
elif run_mode == 'discovery':
cmd.append('--discovery')
elif run_mode == 'check':
target = os.getenv('CHECK_TARGET')
if not target:
print("Error: CHECK_TARGET env var required for check mode")
sys.exit(1)
cmd.append('--check-days')
cmd.append(target)
print(f"Executing: {' '.join(cmd)}") print(f"Executing: {' '.join(cmd)}")
result = subprocess.run(cmd) result = subprocess.run(cmd)

134
partitioning/script/zabbix_partitioning.py Normal file → Executable file
View File

@@ -13,6 +13,7 @@ import argparse
import pymysql import pymysql
from pymysql.constants import CLIENT from pymysql.constants import CLIENT
import yaml import yaml
import json
import logging import logging
import logging.handlers import logging.handlers
from datetime import datetime, timedelta from datetime import datetime, timedelta
@@ -20,7 +21,7 @@ from typing import Optional, Dict, List, Any, Union, Tuple
from contextlib import contextmanager from contextlib import contextmanager
# Semantic Versioning # Semantic Versioning
VERSION = '0.3.0' VERSION = '0.4.0'
# Constants # Constants
PART_PERIOD_REGEX = r'([0-9]+)(h|d|m|y)' PART_PERIOD_REGEX = r'([0-9]+)(h|d|m|y)'
@@ -443,19 +444,83 @@ class ZabbixPartitioner:
self.logger.info(f"Applying initial partitioning to {table} ({len(parts_sql)} partitions)") self.logger.info(f"Applying initial partitioning to {table} ({len(parts_sql)} partitions)")
self.execute_query(query) self.execute_query(query)
def run(self, mode: str): def discovery(self):
"""Output Zabbix Low-Level Discovery logic JSON."""
partitions_conf = self.config.get('partitions', {})
discovery_data = []
for period, tables in partitions_conf.items():
if not tables:
continue
for item in tables:
table = list(item.keys())[0]
discovery_data.append({"{#TABLE}": table, "{#PERIOD}": period})
print(json.dumps(discovery_data))
def check_partitions_coverage(self, table: str, period: str) -> int:
"""
Check how many days of future partitions exist for a table.
Returns: Number of days from NOW until the end of the last partition.
"""
top_partition_ts = self.execute_query(
"""SELECT MAX(`partition_description`) FROM `information_schema`.`partitions`
WHERE `table_schema` = %s AND `table_name` = %s AND `partition_name` IS NOT NULL""",
(self.db_name, table), fetch='one'
)
if not top_partition_ts:
return 0
# partition_description is "VALUES LESS THAN (TS)"
# So it represents the END of the partition (start of next)
end_ts = int(top_partition_ts)
end_dt = datetime.fromtimestamp(end_ts)
now = datetime.now()
diff = end_dt - now
return max(0, diff.days)
def run(self, mode: str, target_table: str = None):
"""Main execution loop.""" """Main execution loop."""
with self.connect_db(): with self.connect_db():
self.check_compatibility()
partitions_conf = self.config.get('partitions', {}) partitions_conf = self.config.get('partitions', {})
premake = self.config.get('premake', 10)
if mode == 'delete': # --- Discovery Mode ---
self.logger.warning("Delete Mode: Removing ALL partitioning from configured tables is not fully implemented in refactor yet.") if mode == 'discovery':
# Implement if needed, usually just ALTER TABLE REMOVE PARTITIONING self.discovery()
return return
# --- Check Mode ---
if mode == 'check':
if not target_table:
# Check all and print simple status? Or error?
# Zabbix usually queries one by one.
# Implementing simple check which returns days for specific table
raise ConfigurationError("Target table required for check mode")
# Find period for table
found_period = None
for period, tables in partitions_conf.items():
for item in tables:
if list(item.keys())[0] == target_table:
found_period = period
break
if found_period: break
if not found_period:
# Table not in config?
print("-1") # Error code
return
days_left = self.check_partitions_coverage(target_table, found_period)
print(days_left)
return
# --- Normal Mode (Init/Maintain) ---
self.check_compatibility()
premake = self.config.get('premake', 10)
for period, tables in partitions_conf.items(): for period, tables in partitions_conf.items():
if not tables: if not tables:
continue continue
@@ -473,8 +538,10 @@ class ZabbixPartitioner:
# Housekeeping extras # Housekeeping extras
if mode != 'init' and not self.dry_run: if mode != 'init' and not self.dry_run:
# delete_extra_data logic... self.logger.info("Partitioning completed successfully")
pass # Can add back specific cleanups like `sessions` table if desired
if mode != 'init' and not self.dry_run:
pass
def setup_logging(config_log_type: str): def setup_logging(config_log_type: str):
logger = logging.getLogger('zabbix_partitioning') logger = logging.getLogger('zabbix_partitioning')
@@ -484,7 +551,7 @@ def setup_logging(config_log_type: str):
if config_log_type == 'syslog': if config_log_type == 'syslog':
handler = logging.handlers.SysLogHandler(address='/dev/log') handler = logging.handlers.SysLogHandler(address='/dev/log')
formatter = logging.Formatter('%(name)s: %(message)s') # Syslog has its own timestamps usually formatter = logging.Formatter('%(name)s: %(message)s')
else: else:
handler = logging.StreamHandler(sys.stdout) handler = logging.StreamHandler(sys.stdout)
@@ -495,8 +562,13 @@ def parse_args():
parser = argparse.ArgumentParser(description='Zabbix Partitioning Manager') parser = argparse.ArgumentParser(description='Zabbix Partitioning Manager')
parser.add_argument('-c', '--config', default='/etc/zabbix/zabbix_partitioning.conf', help='Config file path') parser.add_argument('-c', '--config', default='/etc/zabbix/zabbix_partitioning.conf', help='Config file path')
parser.add_argument('-i', '--init', action='store_true', help='Initialize partitions') parser.add_argument('-i', '--init', action='store_true', help='Initialize partitions')
parser.add_argument('-d', '--delete', action='store_true', help='Remove partitions (Not implemented)')
parser.add_argument('--dry-run', action='store_true', help='Simulate queries') parser.add_argument('--dry-run', action='store_true', help='Simulate queries')
# Monitoring args
parser.add_argument('--discovery', action='store_true', help='Output Zabbix LLD JSON')
parser.add_argument('--check-days', type=str, help='Check days of future partitions left for table', metavar='TABLE')
parser.add_argument('-V', '--version', action='version', version=f'%(prog)s {VERSION}', help='Show version and exit')
return parser.parse_args() return parser.parse_args()
def load_config(path): def load_config(path):
@@ -515,20 +587,46 @@ def main():
with open(conf_path, 'r') as f: with open(conf_path, 'r') as f:
config = yaml.safe_load(f) config = yaml.safe_load(f)
setup_logging(config.get('logging', 'console')) # For discovery/check, we might want minimal logging or specific output, so we handle that in run()
logger = logging.getLogger('zabbix_partitioning') # But we still need basic logging setup for db errors
mode = 'maintain' mode = 'maintain'
if args.init: mode = 'init' target = None
elif args.delete: mode = 'delete'
if args.discovery:
mode = 'discovery'
config['logging'] = 'console' # Force console for discovery? Or suppress?
# actually we don't want logs mixing with JSON output
# so checking mode before setup logging
elif args.check_days:
mode = 'check'
target = args.check_days
elif args.init: mode = 'init'
# Setup logging
# If discovery or check, we mute info logs to stdout to keep output clean,
# unless errors happen.
if mode in ['discovery', 'check']:
logging.basicConfig(level=logging.ERROR) # Only show critical errors
else:
setup_logging(config.get('logging', 'console'))
logger = logging.getLogger('zabbix_partitioning')
if args.dry_run: if args.dry_run:
logger.info("Starting in DRY-RUN mode") logger.info("Starting in DRY-RUN mode")
# ZabbixPartitioner expects dict config
app = ZabbixPartitioner(config, dry_run=args.dry_run) app = ZabbixPartitioner(config, dry_run=args.dry_run)
app.run(mode) app.run(mode, target)
except Exception as e: except Exception as e:
# Important: Zabbix log monitoring needs to see "Failed"
# We print to stderr for script failure, logging handles log file
try:
logging.getLogger('zabbix_partitioning').critical(f"Partitioning failed: {e}")
except:
pass
print(f"Critical Error: {e}", file=sys.stderr) print(f"Critical Error: {e}", file=sys.stderr)
sys.exit(1) sys.exit(1)

View File

@@ -1,53 +0,0 @@
# database: Connection details for the Zabbix database
database:
type: mysql
# host: Database server hostname or IP
host: localhost
# socket: Path to the MySQL unix socket (overrides host if set)
socket: /var/run/mysqlrouter/mysql_rw.sock
# port: Database port (default: 3306)
# port: 3306
# credentials
user: zbx_part
passwd: <password>
db: zabbix
# partitions: Define retention periods for tables.
# Format: table_name: duration (e.g., 14d, 12w, 1m, 1y)
partitions:
# daily: Partitions created daily
daily:
- history: 14d
- history_uint: 14d
- history_str: 14d
- history_text: 14d
- history_log: 14d
- history_bin: 14d
# weekly: Partitions created weekly
weekly:
# - auditlog: 180d
# Note: auditlog is not partitionable by default in Zabbix 7.0 and 7.4 (PK missing clock).
# To partition, the Primary Key must be altered to include 'clock'.
# https://www.zabbix.com/documentation/current/en/manual/appendix/install/auditlog_primary_keys
# monthly: Partitions created monthly
monthly:
- trends: 1y
- trends_uint: 1y
# logging: Where to send log output. Options: syslog, console
logging: syslog
# premake: Number of partitions to create in advance
premake: 10
# initial_partitioning_start: Strategy for the first partition during initialization (--init).
# Options:
# db_min: (Default) Queries SELECT MIN(clock) to ensure ALL data is covered. Slow on huge tables consistently.
# retention: Starts partitioning from (Now - Retention Period).
# Creates a 'p_archive' partition for all data older than retention.
# Much faster as it skips the MIN(clock) query. (Recommended for large DBs)
initial_partitioning_start: db_min
# replicate_sql: False - Disable binary logging. Partitioning changes are NOT replicated to slaves (use for independent maintenance).
# replicate_sql: True - Enable binary logging. Partitioning changes ARE replicated to slaves (use for consistent cluster schema).
replicate_sql: False

View File

@@ -1,536 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Zabbix Database Partitioning Management Script
Refactored for Zabbix 7.x compatibility, better maintainability, and standard logging.
"""
import os
import sys
import re
import argparse
import pymysql
from pymysql.constants import CLIENT
import yaml
import logging
import logging.handlers
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Any, Union, Tuple
from contextlib import contextmanager
# Semantic Versioning
VERSION = '0.3.0'
# Constants
PART_PERIOD_REGEX = r'([0-9]+)(h|d|m|y)'
PARTITION_TEMPLATE = 'PARTITION %s VALUES LESS THAN (UNIX_TIMESTAMP("%s") div 1) ENGINE = InnoDB'
# Custom Exceptions
class ConfigurationError(Exception):
pass
class DatabaseError(Exception):
pass
class ZabbixPartitioner:
def __init__(self, config: Dict[str, Any], dry_run: bool = False):
self.config = config
self.dry_run = dry_run
self.conn = None
self.logger = logging.getLogger('zabbix_partitioning')
# Unpack database config
db_conf = self.config['database']
self.db_host = db_conf.get('host', 'localhost')
self.db_port = int(db_conf.get('port', 3306))
self.db_socket = db_conf.get('socket')
self.db_user = db_conf['user']
self.db_password = db_conf.get('passwd')
self.db_name = db_conf['db']
self.db_ssl = db_conf.get('ssl')
self.replicate_sql = self.config.get('replicate_sql', False)
@contextmanager
def connect_db(self):
"""Context manager for database connection."""
try:
connect_args = {
'user': self.db_user,
'password': self.db_password,
'database': self.db_name,
'port': self.db_port,
'cursorclass': pymysql.cursors.Cursor,
# Enable multi-statements if needed, though we usually run single queries
'client_flag': CLIENT.MULTI_STATEMENTS
}
if self.db_socket:
connect_args['unix_socket'] = self.db_socket
else:
connect_args['host'] = self.db_host
if self.db_ssl:
connect_args['ssl'] = self.db_ssl
# PyMySQL SSL options
# Note: valid ssl keys for PyMySQL are 'ca', 'capath', 'cert', 'key', 'cipher', 'check_hostname'
self.logger.info(f"Connecting to database: {self.db_name}")
self.conn = pymysql.connect(**connect_args)
# Setup session
with self.conn.cursor() as cursor:
cursor.execute('SET SESSION wait_timeout = 86400')
if not self.replicate_sql:
cursor.execute('SET SESSION sql_log_bin = 0')
yield self.conn
except pymysql.MySQLError as e:
self.logger.critical(f"Database connection failed: {e}")
raise DatabaseError(f"Failed to connect to MySQL: {e}")
finally:
if self.conn and self.conn.open:
self.conn.close()
self.logger.info("Database connection closed")
def execute_query(self, query: str, params: Optional[Union[List, Tuple]] = None, fetch: str = 'none') -> Any:
"""
Execute a query.
fetch: 'none', 'one', 'all'
"""
if self.dry_run and not query.lower().startswith('select'):
self.logger.info(f"[DRY-RUN] Query: {query} | Params: {params}")
return None
if not self.conn or not self.conn.open:
raise DatabaseError("Connection not open")
try:
with self.conn.cursor() as cursor:
if self.logger.level == logging.DEBUG:
self.logger.debug(f"Query: {query} | Params: {params}")
cursor.execute(query, params)
if fetch == 'one':
result = cursor.fetchone()
# Return first column if it's a single value result and a tuple
if result and isinstance(result, tuple) and len(result) == 1:
return result[0]
return result
elif fetch == 'all':
return cursor.fetchall()
self.conn.commit()
return True
except pymysql.MySQLError as e:
self.logger.error(f"SQL Error: {e} | Query: {query}")
raise DatabaseError(f"SQL Execution Error: {e}")
# --- Utility Functions --- #
def truncate_date(self, dt: datetime, period: str) -> datetime:
"""Truncate date to the start of the partitioning period."""
if period == 'hourly':
return dt.replace(microsecond=0, second=0, minute=0)
elif period == 'daily':
return dt.replace(microsecond=0, second=0, minute=0, hour=0)
elif period == 'weekly':
# Monday is 0, Sunday is 6. isoweekday() Mon=1, Sun=7.
# Truncate to Monday
dt = dt.replace(microsecond=0, second=0, minute=0, hour=0)
return dt - timedelta(days=dt.isoweekday() - 1)
elif period == 'monthly':
return dt.replace(microsecond=0, second=0, minute=0, hour=0, day=1)
elif period == 'yearly':
return dt.replace(microsecond=0, second=0, minute=0, hour=0, day=1, month=1)
else:
raise ValueError(f"Unknown period: {period}")
def get_next_date(self, dt: datetime, period: str, amount: int = 1) -> datetime:
"""Add 'amount' periods to the date."""
if period == 'hourly':
return dt + timedelta(hours=amount)
elif period == 'daily':
return dt + timedelta(days=amount)
elif period == 'weekly':
return dt + timedelta(weeks=amount)
elif period == 'monthly':
# Simple month addition
m, y = (dt.month + amount) % 12, dt.year + ((dt.month + amount - 1) // 12)
if not m: m = 12
# Handle end of month days (e.g. Jan 31 + 1 month -> Feb 28) logic not strictly needed for 1st of month
# but keeping robust
d = min(dt.day, [31, 29 if y%4==0 and (y%100!=0 or y%400==0) else 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31][m-1])
return dt.replace(day=d, month=m, year=y)
elif period == 'yearly':
return dt.replace(year=dt.year + amount)
else:
return dt
def get_lookback_date(self, period_str: str) -> datetime:
"""
Calculate the retention date based on config string (e.g., "30d", "12m").
"""
match = re.search(PART_PERIOD_REGEX, period_str)
if not match:
raise ConfigurationError(f"Invalid period format: {period_str}")
amount = int(match.group(1))
unit = match.group(2)
now = datetime.now()
if unit in ['h', 'hourly']:
return now - timedelta(hours=amount)
elif unit in ['d', 'daily']:
return now - timedelta(days=amount)
elif unit in ['w', 'weekly']:
return now - timedelta(weeks=amount)
elif unit in ['m', 'monthly']:
# approximate 30 days per month for simple calculation or full month subtraction
# using get_next_date with negative amount
return self.get_next_date(now, 'monthly', -amount)
elif unit in ['y', 'yearly']:
return now.replace(year=now.year - amount)
return now
def get_partition_name(self, dt: datetime, period: str) -> str:
if period == 'hourly':
return dt.strftime('p%Y_%m_%d_%Hh')
elif period == 'daily':
return dt.strftime('p%Y_%m_%d')
elif period == 'weekly':
return dt.strftime('p%Y_%Uw')
elif period == 'monthly':
return dt.strftime('p%Y_%m')
return "p_unknown"
def get_partition_description(self, dt: datetime, period: str) -> str:
"""Generate the partition description (Unix Timestamp) for VALUES LESS THAN."""
# Partition boundary is the START of the NEXT period
next_dt = self.get_next_date(dt, period, 1)
if period == 'hourly':
fmt = '%Y-%m-%d %H:00:00'
else:
fmt = '%Y-%m-%d 00:00:00'
return next_dt.strftime(fmt)
# --- Core Logic --- #
def check_compatibility(self):
"""Verify Zabbix version and partitioning support."""
# 1. Check MySQL Version
version_str = self.execute_query('SELECT version()', fetch='one')
if not version_str:
raise DatabaseError("Could not determine MySQL version")
# MySQL 8.0+ supports partitioning natively
# (Assuming MySQL 8+ or MariaDB 10+ for modern Zabbix)
self.logger.info(f"MySQL Version: {version_str}")
# 2. Check Zabbix DB Version (optional info)
try:
mandatory = self.execute_query('SELECT `mandatory` FROM `dbversion`', fetch='one')
if mandatory:
self.logger.info(f"Zabbix DB Mandatory Version: {mandatory}")
except Exception:
self.logger.warning("Could not read 'dbversion' table. Is this a Zabbix DB?")
def get_table_min_clock(self, table: str) -> Optional[datetime]:
ts = self.execute_query(f"SELECT MIN(`clock`) FROM `{table}`", fetch='one')
return datetime.fromtimestamp(int(ts)) if ts else None
def get_existing_partitions(self, table: str) -> List[Tuple[str, int]]:
"""Return list of (partition_name, description_timestamp)."""
query = """
SELECT `partition_name`, `partition_description`
FROM `information_schema`.`partitions`
WHERE `table_schema` = %s AND `table_name` = %s AND `partition_name` IS NOT NULL
ORDER BY `partition_description` ASC
"""
rows = self.execute_query(query, (self.db_name, table), fetch='all')
if not rows:
return []
partitions = []
for row in rows:
name, desc = row
# 'desc' is a string or int depending on DB driver, usually unix timestamp for TIMESTAMP partitions
try:
partitions.append((name, int(desc)))
except (ValueError, TypeError):
pass # MAXVALUE or invalid
return partitions
def has_incompatible_primary_key(self, table: str) -> bool:
"""
Returns True if the table has a Primary Key that DOES NOT include the 'clock' column.
Partitioning requires the partition column to be part of the Primary/Unique key.
"""
# 1. Check if PK exists
pk_exists = self.execute_query(
"""SELECT COUNT(*) FROM `information_schema`.`table_constraints`
WHERE `constraint_type` = 'PRIMARY KEY'
AND `table_schema` = %s AND `table_name` = %s""",
(self.db_name, table), fetch='one'
)
if not pk_exists:
# No PK means no restriction on partitioning
return False
# 2. Check if 'clock' is in the PK
clock_in_pk = self.execute_query(
"""SELECT COUNT(*) FROM `information_schema`.`key_column_usage` k
JOIN `information_schema`.`table_constraints` t USING(`constraint_name`, `table_schema`, `table_name`)
WHERE t.`constraint_type` = 'PRIMARY KEY'
AND t.`table_schema` = %s AND t.`table_name` = %s AND k.`column_name` = 'clock'""",
(self.db_name, table), fetch='one'
)
return not bool(clock_in_pk)
def create_future_partitions(self, table: str, period: str, premake_count: int):
"""Create partitions for the future."""
# Determine start date
# If table is partitioned, start from the latest partition
# If not, start from NOW (or min clock if we were doing initial load, but usually NOW for future)
top_partition_ts = self.execute_query(
"""SELECT MAX(`partition_description`) FROM `information_schema`.`partitions`
WHERE `table_schema` = %s AND `table_name` = %s AND `partition_name` IS NOT NULL""",
(self.db_name, table), fetch='one'
)
curr_time = self.truncate_date(datetime.now(), period)
if top_partition_ts:
start_dt = datetime.fromtimestamp(int(top_partition_ts))
# Start from the period AFTER the last existing one
# Actually, MAX(description) is the *end* of the last partition.
# e.g. p2023_10_01 VALUES LESS THAN (Oct 2)
# So start_dt is Oct 2.
else:
# No partitions? Should be handled by init, but fallback to NOW
start_dt = self.truncate_date(datetime.now(), period)
# Create 'premake_count' partitions ahead of NOW
# But we must ensure we cover the gap if the last partition is old
# So we ensure we have partitions up to NOW + premake * period
target_max_date = self.get_next_date(curr_time, period, premake_count)
current_planning_dt = start_dt
new_partitions = {}
while current_planning_dt < target_max_date:
part_name = self.get_partition_name(current_planning_dt, period)
part_desc = self.get_partition_description(current_planning_dt, period)
new_partitions[part_name] = part_desc
current_planning_dt = self.get_next_date(current_planning_dt, period, 1)
if not new_partitions:
return
# Generate ADD PARTITION query
parts_sql = []
for name, timestamp_expr in sorted(new_partitions.items()):
parts_sql.append(PARTITION_TEMPLATE % (name, timestamp_expr))
query = f"ALTER TABLE `{table}` ADD PARTITION (\n" + ",\n".join(parts_sql) + "\n)"
self.logger.info(f"Adding {len(new_partitions)} partitions to {table}")
self.execute_query(query)
def remove_old_partitions(self, table: str, retention_str: str):
"""Drop partitions older than retention period."""
cutoff_date = self.get_lookback_date(retention_str)
cutoff_ts = int(cutoff_date.timestamp())
existing = self.get_existing_partitions(table)
to_drop = []
for name, desc_ts in existing:
# Drop if the *upper bound* of the partition is still older than cutoff?
# Or if it contains ONLY data older than cutoff?
# VALUES LESS THAN (desc_ts).
# If desc_ts <= cutoff_ts, then ALL data in partition is < cutoff. Safe to drop.
if desc_ts <= cutoff_ts:
to_drop.append(name)
if not to_drop:
return
self.logger.info(f"Dropping {len(to_drop)} old partitions from {table} (Retain: {retention_str})")
for name in to_drop:
self.execute_query(f"ALTER TABLE `{table}` DROP PARTITION {name}")
def initialize_partitioning(self, table: str, period: str, premake: int, retention_str: str):
"""Initial partitioning for a table (convert regular table to partitioned)."""
self.logger.info(f"Initializing partitioning for {table}")
if self.has_incompatible_primary_key(table):
self.logger.error(f"Cannot partition {table}: Primary Key does not include 'clock' column.")
return
# If already partitioned, skip
if self.get_existing_partitions(table):
self.logger.info(f"Table {table} is already partitioned.")
return
init_strategy = self.config.get('initial_partitioning_start', 'db_min')
start_dt = None
p_archive_ts = None
if init_strategy == 'retention':
self.logger.info(f"Strategy 'retention': Calculating start date from retention ({retention_str})")
retention_date = self.get_lookback_date(retention_str)
# Start granular partitions from the retention date
start_dt = self.truncate_date(retention_date, period)
# Create a catch-all for anything older
p_archive_ts = int(start_dt.timestamp())
else:
# Default 'db_min' strategy
self.logger.info("Strategy 'db_min': Querying table for minimum clock (may be slow)")
min_clock = self.get_table_min_clock(table)
if not min_clock:
# Empty table. Start from NOW
start_dt = self.truncate_date(datetime.now(), period)
else:
# Table has data.
start_dt = self.truncate_date(min_clock, period)
# Build list of partitions from start_dt up to NOW + premake
target_dt = self.get_next_date(self.truncate_date(datetime.now(), period), period, premake)
curr = start_dt
partitions_def = {}
# If we have an archive partition, add it first
if p_archive_ts:
partitions_def['p_archive'] = str(p_archive_ts)
while curr < target_dt:
name = self.get_partition_name(curr, period)
desc = self.get_partition_description(curr, period)
partitions_def[name] = desc
curr = self.get_next_date(curr, period, 1)
# Re-doing the loop to be cleaner on types
parts_sql = []
# 1. Archive Partition
if p_archive_ts:
parts_sql.append(f"PARTITION p_archive VALUES LESS THAN ({p_archive_ts}) ENGINE = InnoDB")
# 2. Granular Partitions
# We need to iterate again from start_dt
curr = start_dt
while curr < target_dt:
name = self.get_partition_name(curr, period)
desc_date_str = self.get_partition_description(curr, period) # Returns "YYYY-MM-DD HH:MM:SS"
parts_sql.append(PARTITION_TEMPLATE % (name, desc_date_str))
curr = self.get_next_date(curr, period, 1)
query = f"ALTER TABLE `{table}` PARTITION BY RANGE (`clock`) (\n" + ",\n".join(parts_sql) + "\n)"
self.logger.info(f"Applying initial partitioning to {table} ({len(parts_sql)} partitions)")
self.execute_query(query)
def run(self, mode: str):
"""Main execution loop."""
with self.connect_db():
self.check_compatibility()
partitions_conf = self.config.get('partitions', {})
premake = self.config.get('premake', 10)
if mode == 'delete':
self.logger.warning("Delete Mode: Removing ALL partitioning from configured tables is not fully implemented in refactor yet.")
# Implement if needed, usually just ALTER TABLE REMOVE PARTITIONING
return
for period, tables in partitions_conf.items():
if not tables:
continue
for item in tables:
# Item is dict like {'history': '14d'}
table = list(item.keys())[0]
retention = item[table]
if mode == 'init':
self.initialize_partitioning(table, period, premake, retention)
else:
# Maintenance mode (Add new, remove old)
self.create_future_partitions(table, period, premake)
self.remove_old_partitions(table, retention)
# Housekeeping extras
if mode != 'init' and not self.dry_run:
# delete_extra_data logic...
pass # Can add back specific cleanups like `sessions` table if desired
def setup_logging(config_log_type: str):
logger = logging.getLogger('zabbix_partitioning')
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
if config_log_type == 'syslog':
handler = logging.handlers.SysLogHandler(address='/dev/log')
formatter = logging.Formatter('%(name)s: %(message)s') # Syslog has its own timestamps usually
else:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
logger.addHandler(handler)
def parse_args():
parser = argparse.ArgumentParser(description='Zabbix Partitioning Manager')
parser.add_argument('-c', '--config', default='/etc/zabbix/zabbix_partitioning.conf', help='Config file path')
parser.add_argument('-i', '--init', action='store_true', help='Initialize partitions')
parser.add_argument('-d', '--delete', action='store_true', help='Remove partitions (Not implemented)')
parser.add_argument('--dry-run', action='store_true', help='Simulate queries')
return parser.parse_args()
def load_config(path):
if not os.path.exists(path):
# Fallback to local
if os.path.exists('zabbix_partitioning.conf'):
return 'zabbix_partitioning.conf'
raise ConfigurationError(f"Config file not found: {path}")
return path
def main():
args = parse_args()
try:
conf_path = load_config(args.config)
with open(conf_path, 'r') as f:
config = yaml.safe_load(f)
setup_logging(config.get('logging', 'console'))
logger = logging.getLogger('zabbix_partitioning')
mode = 'maintain'
if args.init: mode = 'init'
elif args.delete: mode = 'delete'
if args.dry_run:
logger.info("Starting in DRY-RUN mode")
app = ZabbixPartitioner(config, dry_run=args.dry_run)
app.run(mode)
except Exception as e:
print(f"Critical Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,65 @@
zabbix_export:
version: '7.0'
template_groups:
- uuid: e29f7cbf75cf41cb81078cb4c10d584a
name: 'Templates/Databases'
templates:
- uuid: 69899eb3126b4c62b70351f305b69dd9
template: 'Zabbix Partitioning Monitor'
name: 'Zabbix Partitioning Monitor'
description: |
Monitor Zabbix Database Partitioning.
Prerequisites:
1. Install zabbix_partitioning.py on the Zabbix Server/Proxy.
2. Configure userparameter for automatic discovery:
UserParameter=zabbix.partitioning.discovery[*], /usr/local/bin/zabbix_partitioning.py -c $1 --discovery
UserParameter=zabbix.partitioning.check[*], /usr/local/bin/zabbix_partitioning.py -c $1 --check-days $2
Or use Docker wrapper scripts.
groups:
- name: 'Templates/Databases'
items:
- uuid: bc753e750cc2485f917ba1f023c87d05
name: 'Partitioning Last Run Status'
type: TRAP
key: partitioning.run.status
delay: 0
history: 7d
trends: '0'
value_type: TEXT
description: 'Send "Success" or "Failed" via zabbix_sender or check log file'
triggers:
- uuid: 25497978dbb943e49dac8f3b9db91c29
expression: 'find(/Zabbix Partitioning Monitor/partitioning.run.status,,"like","Failed")=1'
name: 'Zabbix Partitioning Failed'
priority: HIGH
description: 'The partitioning script reported a failure.'
tags:
- tag: services
value: database
discovery_rules:
- uuid: 097c96467035468a80ce5c519b0297bb
name: 'Partitioning Discovery'
key: 'zabbix.partitioning.discovery[/etc/zabbix/zabbix_partitioning.conf]'
delay: 1h
description: 'Discover partitioned tables'
item_prototypes:
- uuid: 1fbff85191c244dca956be7a94bf08a3
name: 'Partitions remaining: {#TABLE}'
key: 'zabbix.partitioning.check[/etc/zabbix/zabbix_partitioning.conf, {#TABLE}]'
delay: 12h
history: 7d
description: 'Days until the last partition runs out for {#TABLE}'
tags:
- tag: component
value: partitioning
- tag: table
value: '{#TABLE}'
trigger_prototypes:
- uuid: da23fae76a41455c86c58267d6d9f86d
expression: 'last(/Zabbix Partitioning Monitor/zabbix.partitioning.check[/etc/zabbix/zabbix_partitioning.conf, {#TABLE}])<=3'
name: 'Partitioning critical: {#TABLE} has less than 3 days of partitions'
priority: HIGH
description: 'New partitions are not being created. Check the script logs.'

View File

@@ -7,15 +7,19 @@ database:
db: zabbix db: zabbix
# Port mapping in docker-compose is 33060 # Port mapping in docker-compose is 33060
port: 33060 port: 33060
partitions: partitions:
# daily: Partitions created daily
daily: daily:
- history: 7d - history: 7d
- history_uint: 7d - history_uint: 7d
- history_str: 7d - history_str: 7d
- history_log: 7d
- history_text: 7d - history_text: 7d
- history_bin: 7d - history_bin: 7d
# weekly: Partitions created weekly
weekly:
- history_log: 7d
# monthly: Partitions created monthly
monthly:
- trends: 365d - trends: 365d
- trends_uint: 365d - trends_uint: 365d