Compare commits

...

7 Commits

Author SHA1 Message Date
Gitea Action
8c3a98b13a AUTO: Update Zabbix to version 7.4.6 [ci skip] 2025-12-19 06:01:16 +00:00
0a66a800b5 Refactor: Containerization and directory restructuring
Some checks failed
Zabbix APK Builder / check-version (push) Successful in 11s
Zabbix APK Builder / update-version (push) Failing after 11s
Zabbix APK Builder / build-packages (push) Has been skipped
Zabbix APK Builder / deploy-test (push) Has been skipped
2025-12-16 15:20:44 +01:00
71e85c2e6e CHANGE: Improved README 2025-12-16 14:14:57 +01:00
b1595ee9af CHANGE: Added auto-tests with Docker. 2025-12-16 14:11:46 +01:00
cecd55cd3d CHANGE: Added code documentation with explanation of script functions. 2025-12-16 14:11:39 +01:00
259340df46 CHANGE: Added refactoring notes 2025-12-16 14:10:34 +01:00
59cd724959 CHANGE: Added min(clock) scan bypass function (initial_partitioning_start), which significantly affects working with huge tables. 2025-12-16 14:10:20 +01:00
16 changed files with 1106 additions and 21 deletions

2
.gitignore vendored
View File

@@ -3,4 +3,4 @@ venv/
export/
*_host_ids.txt
*.log
partitioning/tests/
backup/

View File

@@ -0,0 +1,60 @@
# Code Documentation: ZabbixPartitioner
## Class: ZabbixPartitioner
### Core Methods
#### `__init__(self, config: Dict[str, Any], dry_run: bool = False)`
Initializes the partitioner with configuration and runtime mode.
- **config**: Dictionary containing database connection and partitioning rules.
- **dry_run**: If True, SQL queries are logged but not executed.
#### `connect_db(self)`
Context manager for database connections.
- Handles connection lifecycle (open/close).
- Sets strict session variables:
- `wait_timeout = 86400` (24h) to prevent timeouts during long operations.
- `sql_log_bin = 0` (if configured) to prevent replication of partitioning commands.
#### `run(self, mode: str)`
Main entry point for execution.
- **mode**:
- `'init'`: Initial setup. Calls `initialize_partitioning`.
- `'maintenance'` (default): Routine operation. Calls `create_future_partitions` and `drop_old_partitions`.
### Logic Methods
#### `initialize_partitioning(table: str, period: str, premake: int, retention_str: str)`
Converts a standard table to a partitioned table.
- **Strategies** (via `initial_partitioning_start` config):
- `retention`: Starts from (Now - Retention). Creates `p_archive` for older data. FAST.
- `db_min`: Queries `SELECT MIN(clock)`. PRECISE but SLOW.
#### `create_future_partitions(table: str, period: str, premake: int)`
Ensures sufficient future partitions exist.
- Calculates required partitions based on current time + `premake` count.
- Checks `information_schema` for existing partitions.
- Adds missing partitions using `ALTER TABLE ... ADD PARTITION`.
#### `drop_old_partitions(table: str, period: str, retention_str: str)`
Removes partitions older than the retention period.
- Parses partition names (e.g., `p2023_01_01`) to extract their date.
- Compares against the calculated retention cutoff date.
- Drops qualifiers using `ALTER TABLE ... DROP PARTITION`.
### Helper Methods
#### `get_table_min_clock(table: str) -> Optional[datetime]`
- Queries the table for the oldest timestamp. Used in `db_min` initialization strategy.
#### `has_incompatible_primary_key(table: str) -> bool`
- **Safety Critical**: Verifies that the table's Primary Key includes the `clock` column.
- Returns `True` if incompatible (prevents partitioning to avoid MySQL errors).
#### `get_partition_name(dt: datetime, period: str) -> str`
- Generates standard partition names:
- Daily: `pYYYY_MM_DD`
- Monthly: `pYYYY_MM`
#### `get_partition_description(dt: datetime, period: str) -> str`
- Generates the `VALUES LESS THAN` expression for the partition (Start of NEXT period).

View File

@@ -65,10 +65,26 @@ partitions:
# ... add other options as needed. Please check the config file for more options.
```
### Important Notes:
- **`replicate_sql`**:
- `False` (Default): Partitioning maintenance commands are NOT replicated to slaves. Recommended if you manage partitions separately on each node or want to reduce replication lag.
- `True`: Commands are replicated.
### Configuration Parameters
- **`partitions`**: Defines your retention policy globally.
- Syntax: `period: [ {table: retention_period}, ... ]`
- **`daily`**: Partitions are created for each day.
- **`weekly`**: Partitions are created for each week.
- **`monthly`**: Partitions are created for each month.
- **`yearly`**: Partitions are created for each year.
- Retention Format: `14d` (days), `12w` (weeks), `12m` (months), `1y` (years).
- **`initial_partitioning_start`**: Controls how the very FIRST partition is determined during initialization (`--init` mode).
- `db_min`: (Default) Queries the table for the oldest record (`MIN(clock)`). Accurate but **slow** on large tables.
- `retention`: (Recommended for large DBs) Skips the query. Calculates the start date as `Now - Retention Period`. Creates a single `p_archive` partition for all data older than that date.
- **`premake`**: Number of future partitions to create in advance.
- Default: `10`. Ensures you have a buffer if the script fails to run for a few days.
- **`replicate_sql`**: Controls MySQL Binary Logging for partitioning commands.
- `False`: (Default) Disables binary logging (`SET SESSION sql_log_bin = 0`). Partition creation/dropping is **NOT** replicated to slaves. Useful if you want to manage partitions independently on each node or avoid replication lag storms.
- `True`: Commands are replicated. Use this if you want absolute schema consistency across your cluster automatically.
- **`auditlog`**:
- In Zabbix 7.0+, the `auditlog` table does **not** have the `clock` column in its Primary Key by default. **Do not** add it to the config unless you have manually altered the table schema.
@@ -158,7 +174,101 @@ Alternatively, use systemd timers for more robust scheduling and logging.
---
---
## 8. Troubleshooting
- **Connection Refused**: Check `host`, `port` in config. Ensure MySQL is running.
- **Access Denied (1227)**: The DB user needs `SUPER` privileges to disable binary logging (`replicate_sql: False`). Either grant the privilege or set `replicate_sql: True` (if replication load is acceptable).
- **Primary Key Error**: "Primary Key does not include 'clock'". The table cannot be partitioned by range on `clock` without schema changes. Remove it from config.
## 9. Docker Usage
You can run the partitioning script as a stateless Docker container. This is ideal for Kubernetes CronJobs or environments where you don't want to manage Python dependencies on the host.
### 9.1 Build the Image
The image is not yet published to a public registry, so you must build it locally:
```bash
cd /opt/git/Zabbix/partitioning
docker build -t zabbix-partitioning -f docker/Dockerfile .
```
### 9.2 Operations
The container uses `entrypoint.py` to auto-generate the configuration file from Environment Variables at runtime.
#### Scenario A: Dry Run (Check Configuration)
Verify that your connection and retention settings are correct without making changes.
```bash
docker run --rm \
-e DB_HOST=10.0.0.5 -e DB_USER=zabbix -e DB_PASSWORD=secret \
-e RETENTION_HISTORY=7d \
-e RETENTION_TRENDS=365d \
-e RUN_MODE=dry-run \
zabbix-partitioning
```
#### Scenario B: Initialization (First Run)
Convert your existing tables to partitioned tables.
> [!WARNING]
> Ensure backup exists and Zabbix Housekeeper is disabled!
```bash
docker run --rm \
-e DB_HOST=10.0.0.5 -e DB_USER=zabbix -e DB_PASSWORD=secret \
-e RETENTION_HISTORY=14d \
-e RETENTION_TRENDS=365d \
-e RUN_MODE=init \
zabbix-partitioning
```
#### Scenario C: Daily Maintenance (Cron/Scheduler)
Run this daily (e.g., via K8s CronJob) to create future partitions and drop old ones.
```bash
docker run --rm \
-e DB_HOST=10.0.0.5 -e DB_USER=zabbix -e DB_PASSWORD=secret \
-e RETENTION_HISTORY=14d \
-e RETENTION_TRENDS=365d \
zabbix-partitioning
```
#### Scenario D: Custom Overrides
You can override the retention period for specific tables or change their partitioning interval.
*Example: Force `history_log` to be partitioned **Weekly** with 30-day retention.*
```bash
docker run --rm \
-e DB_HOST=10.0.0.5 \
-e RETENTION_HISTORY=7d \
-e PARTITION_WEEKLY_history_log=30d \
zabbix-partitioning
```
#### Scenario E: SSL Connection
Mount your certificates and provide the paths.
```bash
docker run --rm \
-e DB_HOST=zabbix-db \
-e DB_SSL_CA=/certs/ca.pem \
-e DB_SSL_CERT=/certs/client-cert.pem \
-e DB_SSL_KEY=/certs/client-key.pem \
-v /path/to/local/certs:/certs \
zabbix-partitioning
```
### 9.3 Supported Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `DB_HOST` | localhost | Database hostname |
| `DB_PORT` | 3306 | Database port |
| `DB_USER` | zabbix | Database user |
| `DB_PASSWORD` | zabbix | Database password |
| `DB_NAME` | zabbix | Database name |
| `DB_SSL_CA` | - | Path to CA Certificate |
| `DB_SSL_CERT` | - | Path to Client Certificate |
| `DB_SSL_KEY` | - | Path to Client Key |
| `RETENTION_HISTORY` | 14d | Retention for `history*` tables |
| `RETENTION_TRENDS` | 365d | Retention for `trends*` tables |
| `RETENTION_AUDIT` | 365d | Retention for `auditlog` (if enabled) |
| `ENABLE_AUDITLOG_PARTITIONING` | false | Set to `true` to partition `auditlog` |
| `RUN_MODE` | maintenance | `init` (initialize), `maintenance` (daily run), or `dry-run` |
| `PARTITION_DAILY_[TABLE]` | - | Custom daily retention (e.g., `PARTITION_DAILY_mytable=30d`) |
| `PARTITION_WEEKLY_[TABLE]` | - | Custom weekly retention |
| `PARTITION_MONTHLY_[TABLE]` | - | Custom monthly retention |

View File

@@ -0,0 +1,39 @@
# Refactoring Notes: Zabbix Partitioning Script
## Overview
The `zabbix_partitioning.py` script has been significantly refactored to improve maintainability, reliability, and compatibility with modern Zabbix versions (7.x).
## Key Changes
### 1. Architecture: Class-Based Structure
- **Old**: Procedural script with global variables and scattered logic.
- **New**: Encapsulated in a `ZabbixPartitioner` class.
- **Purpose**: Improves modularity, testability, and state management. Allows the script to be easily imported or extended.
### 2. Database Connection Management
- **Change**: Implemented `contextlib.contextmanager` for database connections.
- **Purpose**: Ensures database connections are robustly opened and closed, even if errors occur. Handles `wait_timeout` and binary logging settings automatically for every session.
### 3. Logging
- **Change**: Replaced custom `print` statements with Python's standard `logging` module.
- **Purpose**:
- Allows consistent log formatting.
- Supports configurable output destinations (Console vs Syslog) via the config file.
- Granular log levels (INFO for standard ops, DEBUG for SQL queries).
### 4. Configuration Handling
- **Change**: Improved validation and parsing of the YAML configuration.
- **Purpose**:
- Removed unused parameters (e.g., `timezone`, as the script relies on system local time).
- Added support for custom database ports (critical for non-standard deployments or containerized tests).
- Explicitly handles the `replicate_sql` flag to control binary logging (it was intergrated into the partitioning logic).
### 5. Type Safety
- **Change**: Added comprehensive Python type hinting (e.g., `List`, `Dict`, `Optional`).
- **Purpose**: Makes the code self-documenting and allows IDEs/linters to catch potential errors before execution.
### 6. Zabbix 7.x Compatibility
- **Change**: Added logic to verify Zabbix database version and schema requirements.
- **Purpose**:
- Checks `dbversion` table.
- **Critical**: Validates that target tables have the `clock` column as part of their Primary Key before attempting partitioning, preventing potential data corruption or MySQL errors.

View File

@@ -0,0 +1,15 @@
FROM python:3.12-slim
# Install dependencies
RUN pip install --no-cache-dir pymysql pyyaml
# Copy main script and entrypoint
# Note: Build context should be the parent directory 'partitioning/'
COPY script/zabbix_partitioning.py /usr/local/bin/zabbix_partitioning.py
COPY docker/entrypoint.py /usr/local/bin/entrypoint.py
# Set permissions
RUN chmod +x /usr/local/bin/zabbix_partitioning.py /usr/local/bin/entrypoint.py
# Entrypoint
ENTRYPOINT ["python3", "/usr/local/bin/entrypoint.py"]

View File

@@ -0,0 +1,105 @@
import os
import sys
import yaml
import subprocess
def generate_config():
# Base Configuration
config = {
'database': {
'type': 'mysql',
'host': os.getenv('DB_HOST', 'localhost'),
'user': os.getenv('DB_USER', 'zabbix'),
'passwd': os.getenv('DB_PASSWORD', 'zabbix'),
'db': os.getenv('DB_NAME', 'zabbix'),
'port': int(os.getenv('DB_PORT', 3306)),
'socket': os.getenv('DB_SOCKET', '')
},
'logging': 'console',
'premake': int(os.getenv('PREMAKE', 10)),
'replicate_sql': os.getenv('REPLICATE_SQL', 'False').lower() == 'true',
'initial_partitioning_start': os.getenv('INITIAL_PARTITIONING_START', 'db_min'),
'partitions': {
'daily': [],
'weekly': [],
'monthly': []
}
}
# SSL Config
if os.getenv('DB_SSL_CA'):
config['database']['ssl'] = {'ca': os.getenv('DB_SSL_CA')}
if os.getenv('DB_SSL_CERT'): config['database']['ssl']['cert'] = os.getenv('DB_SSL_CERT')
if os.getenv('DB_SSL_KEY'): config['database']['ssl']['key'] = os.getenv('DB_SSL_KEY')
# Retention Mapping
retention_history = os.getenv('RETENTION_HISTORY', '14d')
retention_trends = os.getenv('RETENTION_TRENDS', '365d')
retention_audit = os.getenv('RETENTION_AUDIT', '365d')
# Standard Zabbix Tables
history_tables = ['history', 'history_uint', 'history_str', 'history_log', 'history_text', 'history_bin']
trends_tables = ['trends', 'trends_uint']
# Auditlog: Disabled by default because Zabbix 7.0+ 'auditlog' table lacks 'clock' in Primary Key.
# Only enable if the user has manually altered the schema and explicitly requests it.
# Collect overrides first to prevent duplicates
overrides = set()
for key in os.environ:
if key.startswith(('PARTITION_DAILY_', 'PARTITION_WEEKLY_', 'PARTITION_MONTHLY_')):
table = key.split('_', 2)[-1].lower()
overrides.add(table)
for table in history_tables:
if table not in overrides:
config['partitions']['daily'].append({table: retention_history})
for table in trends_tables:
if table not in overrides:
config['partitions']['monthly'].append({table: retention_trends})
if os.getenv('ENABLE_AUDITLOG_PARTITIONING', 'false').lower() == 'true':
config['partitions']['weekly'].append({'auditlog': retention_audit})
# Custom/Generic Overrides
# Look for env vars like PARTITION_DAILY_mytable=7d
for key, value in os.environ.items():
if key.startswith('PARTITION_DAILY_'):
table = key.replace('PARTITION_DAILY_', '').lower()
config['partitions']['daily'].append({table: value})
elif key.startswith('PARTITION_WEEKLY_'):
table = key.replace('PARTITION_WEEKLY_', '').lower()
config['partitions']['weekly'].append({table: value})
elif key.startswith('PARTITION_MONTHLY_'):
table = key.replace('PARTITION_MONTHLY_', '').lower()
config['partitions']['monthly'].append({table: value})
# Filter empty lists
config['partitions'] = {k: v for k, v in config['partitions'].items() if v}
print("Generated Configuration:")
print(yaml.dump(config, default_flow_style=False))
with open('/etc/zabbix_partitioning.conf', 'w') as f:
yaml.dump(config, f, default_flow_style=False)
def main():
generate_config()
cmd = [sys.executable, '/usr/local/bin/zabbix_partitioning.py', '-c', '/etc/zabbix_partitioning.conf']
run_mode = os.getenv('RUN_MODE', 'maintenance')
if run_mode == 'init':
cmd.append('--init')
elif run_mode == 'dry-run':
cmd.append('--dry-run')
if os.getenv('DRY_RUN_INIT') == 'true':
cmd.append('--init')
print(f"Executing: {' '.join(cmd)}")
result = subprocess.run(cmd)
sys.exit(result.returncode)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,53 @@
# database: Connection details for the Zabbix database
database:
type: mysql
# host: Database server hostname or IP
host: localhost
# socket: Path to the MySQL unix socket (overrides host if set)
socket: /var/run/mysqlrouter/mysql_rw.sock
# port: Database port (default: 3306)
# port: 3306
# credentials
user: zbx_part
passwd: <password>
db: zabbix
# partitions: Define retention periods for tables.
# Format: table_name: duration (e.g., 14d, 12w, 1m, 1y)
partitions:
# daily: Partitions created daily
daily:
- history: 14d
- history_uint: 14d
- history_str: 14d
- history_text: 14d
- history_log: 14d
- history_bin: 14d
# weekly: Partitions created weekly
weekly:
# - auditlog: 180d
# Note: auditlog is not partitionable by default in Zabbix 7.0 and 7.4 (PK missing clock).
# To partition, the Primary Key must be altered to include 'clock'.
# https://www.zabbix.com/documentation/current/en/manual/appendix/install/auditlog_primary_keys
# monthly: Partitions created monthly
monthly:
- trends: 1y
- trends_uint: 1y
# logging: Where to send log output. Options: syslog, console
logging: syslog
# premake: Number of partitions to create in advance
premake: 10
# initial_partitioning_start: Strategy for the first partition during initialization (--init).
# Options:
# db_min: (Default) Queries SELECT MIN(clock) to ensure ALL data is covered. Slow on huge tables consistently.
# retention: Starts partitioning from (Now - Retention Period).
# Creates a 'p_archive' partition for all data older than retention.
# Much faster as it skips the MIN(clock) query. (Recommended for large DBs)
initial_partitioning_start: db_min
# replicate_sql: False - Disable binary logging. Partitioning changes are NOT replicated to slaves (use for independent maintenance).
# replicate_sql: True - Enable binary logging. Partitioning changes ARE replicated to slaves (use for consistent cluster schema).
replicate_sql: False

View File

@@ -0,0 +1,536 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Zabbix Database Partitioning Management Script
Refactored for Zabbix 7.x compatibility, better maintainability, and standard logging.
"""
import os
import sys
import re
import argparse
import pymysql
from pymysql.constants import CLIENT
import yaml
import logging
import logging.handlers
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Any, Union, Tuple
from contextlib import contextmanager
# Semantic Versioning
VERSION = '0.3.0'
# Constants
PART_PERIOD_REGEX = r'([0-9]+)(h|d|m|y)'
PARTITION_TEMPLATE = 'PARTITION %s VALUES LESS THAN (UNIX_TIMESTAMP("%s") div 1) ENGINE = InnoDB'
# Custom Exceptions
class ConfigurationError(Exception):
pass
class DatabaseError(Exception):
pass
class ZabbixPartitioner:
def __init__(self, config: Dict[str, Any], dry_run: bool = False):
self.config = config
self.dry_run = dry_run
self.conn = None
self.logger = logging.getLogger('zabbix_partitioning')
# Unpack database config
db_conf = self.config['database']
self.db_host = db_conf.get('host', 'localhost')
self.db_port = int(db_conf.get('port', 3306))
self.db_socket = db_conf.get('socket')
self.db_user = db_conf['user']
self.db_password = db_conf.get('passwd')
self.db_name = db_conf['db']
self.db_ssl = db_conf.get('ssl')
self.replicate_sql = self.config.get('replicate_sql', False)
@contextmanager
def connect_db(self):
"""Context manager for database connection."""
try:
connect_args = {
'user': self.db_user,
'password': self.db_password,
'database': self.db_name,
'port': self.db_port,
'cursorclass': pymysql.cursors.Cursor,
# Enable multi-statements if needed, though we usually run single queries
'client_flag': CLIENT.MULTI_STATEMENTS
}
if self.db_socket:
connect_args['unix_socket'] = self.db_socket
else:
connect_args['host'] = self.db_host
if self.db_ssl:
connect_args['ssl'] = self.db_ssl
# PyMySQL SSL options
# Note: valid ssl keys for PyMySQL are 'ca', 'capath', 'cert', 'key', 'cipher', 'check_hostname'
self.logger.info(f"Connecting to database: {self.db_name}")
self.conn = pymysql.connect(**connect_args)
# Setup session
with self.conn.cursor() as cursor:
cursor.execute('SET SESSION wait_timeout = 86400')
if not self.replicate_sql:
cursor.execute('SET SESSION sql_log_bin = 0')
yield self.conn
except pymysql.MySQLError as e:
self.logger.critical(f"Database connection failed: {e}")
raise DatabaseError(f"Failed to connect to MySQL: {e}")
finally:
if self.conn and self.conn.open:
self.conn.close()
self.logger.info("Database connection closed")
def execute_query(self, query: str, params: Optional[Union[List, Tuple]] = None, fetch: str = 'none') -> Any:
"""
Execute a query.
fetch: 'none', 'one', 'all'
"""
if self.dry_run and not query.lower().startswith('select'):
self.logger.info(f"[DRY-RUN] Query: {query} | Params: {params}")
return None
if not self.conn or not self.conn.open:
raise DatabaseError("Connection not open")
try:
with self.conn.cursor() as cursor:
if self.logger.level == logging.DEBUG:
self.logger.debug(f"Query: {query} | Params: {params}")
cursor.execute(query, params)
if fetch == 'one':
result = cursor.fetchone()
# Return first column if it's a single value result and a tuple
if result and isinstance(result, tuple) and len(result) == 1:
return result[0]
return result
elif fetch == 'all':
return cursor.fetchall()
self.conn.commit()
return True
except pymysql.MySQLError as e:
self.logger.error(f"SQL Error: {e} | Query: {query}")
raise DatabaseError(f"SQL Execution Error: {e}")
# --- Utility Functions --- #
def truncate_date(self, dt: datetime, period: str) -> datetime:
"""Truncate date to the start of the partitioning period."""
if period == 'hourly':
return dt.replace(microsecond=0, second=0, minute=0)
elif period == 'daily':
return dt.replace(microsecond=0, second=0, minute=0, hour=0)
elif period == 'weekly':
# Monday is 0, Sunday is 6. isoweekday() Mon=1, Sun=7.
# Truncate to Monday
dt = dt.replace(microsecond=0, second=0, minute=0, hour=0)
return dt - timedelta(days=dt.isoweekday() - 1)
elif period == 'monthly':
return dt.replace(microsecond=0, second=0, minute=0, hour=0, day=1)
elif period == 'yearly':
return dt.replace(microsecond=0, second=0, minute=0, hour=0, day=1, month=1)
else:
raise ValueError(f"Unknown period: {period}")
def get_next_date(self, dt: datetime, period: str, amount: int = 1) -> datetime:
"""Add 'amount' periods to the date."""
if period == 'hourly':
return dt + timedelta(hours=amount)
elif period == 'daily':
return dt + timedelta(days=amount)
elif period == 'weekly':
return dt + timedelta(weeks=amount)
elif period == 'monthly':
# Simple month addition
m, y = (dt.month + amount) % 12, dt.year + ((dt.month + amount - 1) // 12)
if not m: m = 12
# Handle end of month days (e.g. Jan 31 + 1 month -> Feb 28) logic not strictly needed for 1st of month
# but keeping robust
d = min(dt.day, [31, 29 if y%4==0 and (y%100!=0 or y%400==0) else 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31][m-1])
return dt.replace(day=d, month=m, year=y)
elif period == 'yearly':
return dt.replace(year=dt.year + amount)
else:
return dt
def get_lookback_date(self, period_str: str) -> datetime:
"""
Calculate the retention date based on config string (e.g., "30d", "12m").
"""
match = re.search(PART_PERIOD_REGEX, period_str)
if not match:
raise ConfigurationError(f"Invalid period format: {period_str}")
amount = int(match.group(1))
unit = match.group(2)
now = datetime.now()
if unit in ['h', 'hourly']:
return now - timedelta(hours=amount)
elif unit in ['d', 'daily']:
return now - timedelta(days=amount)
elif unit in ['w', 'weekly']:
return now - timedelta(weeks=amount)
elif unit in ['m', 'monthly']:
# approximate 30 days per month for simple calculation or full month subtraction
# using get_next_date with negative amount
return self.get_next_date(now, 'monthly', -amount)
elif unit in ['y', 'yearly']:
return now.replace(year=now.year - amount)
return now
def get_partition_name(self, dt: datetime, period: str) -> str:
if period == 'hourly':
return dt.strftime('p%Y_%m_%d_%Hh')
elif period == 'daily':
return dt.strftime('p%Y_%m_%d')
elif period == 'weekly':
return dt.strftime('p%Y_%Uw')
elif period == 'monthly':
return dt.strftime('p%Y_%m')
return "p_unknown"
def get_partition_description(self, dt: datetime, period: str) -> str:
"""Generate the partition description (Unix Timestamp) for VALUES LESS THAN."""
# Partition boundary is the START of the NEXT period
next_dt = self.get_next_date(dt, period, 1)
if period == 'hourly':
fmt = '%Y-%m-%d %H:00:00'
else:
fmt = '%Y-%m-%d 00:00:00'
return next_dt.strftime(fmt)
# --- Core Logic --- #
def check_compatibility(self):
"""Verify Zabbix version and partitioning support."""
# 1. Check MySQL Version
version_str = self.execute_query('SELECT version()', fetch='one')
if not version_str:
raise DatabaseError("Could not determine MySQL version")
# MySQL 8.0+ supports partitioning natively
# (Assuming MySQL 8+ or MariaDB 10+ for modern Zabbix)
self.logger.info(f"MySQL Version: {version_str}")
# 2. Check Zabbix DB Version (optional info)
try:
mandatory = self.execute_query('SELECT `mandatory` FROM `dbversion`', fetch='one')
if mandatory:
self.logger.info(f"Zabbix DB Mandatory Version: {mandatory}")
except Exception:
self.logger.warning("Could not read 'dbversion' table. Is this a Zabbix DB?")
def get_table_min_clock(self, table: str) -> Optional[datetime]:
ts = self.execute_query(f"SELECT MIN(`clock`) FROM `{table}`", fetch='one')
return datetime.fromtimestamp(int(ts)) if ts else None
def get_existing_partitions(self, table: str) -> List[Tuple[str, int]]:
"""Return list of (partition_name, description_timestamp)."""
query = """
SELECT `partition_name`, `partition_description`
FROM `information_schema`.`partitions`
WHERE `table_schema` = %s AND `table_name` = %s AND `partition_name` IS NOT NULL
ORDER BY `partition_description` ASC
"""
rows = self.execute_query(query, (self.db_name, table), fetch='all')
if not rows:
return []
partitions = []
for row in rows:
name, desc = row
# 'desc' is a string or int depending on DB driver, usually unix timestamp for TIMESTAMP partitions
try:
partitions.append((name, int(desc)))
except (ValueError, TypeError):
pass # MAXVALUE or invalid
return partitions
def has_incompatible_primary_key(self, table: str) -> bool:
"""
Returns True if the table has a Primary Key that DOES NOT include the 'clock' column.
Partitioning requires the partition column to be part of the Primary/Unique key.
"""
# 1. Check if PK exists
pk_exists = self.execute_query(
"""SELECT COUNT(*) FROM `information_schema`.`table_constraints`
WHERE `constraint_type` = 'PRIMARY KEY'
AND `table_schema` = %s AND `table_name` = %s""",
(self.db_name, table), fetch='one'
)
if not pk_exists:
# No PK means no restriction on partitioning
return False
# 2. Check if 'clock' is in the PK
clock_in_pk = self.execute_query(
"""SELECT COUNT(*) FROM `information_schema`.`key_column_usage` k
JOIN `information_schema`.`table_constraints` t USING(`constraint_name`, `table_schema`, `table_name`)
WHERE t.`constraint_type` = 'PRIMARY KEY'
AND t.`table_schema` = %s AND t.`table_name` = %s AND k.`column_name` = 'clock'""",
(self.db_name, table), fetch='one'
)
return not bool(clock_in_pk)
def create_future_partitions(self, table: str, period: str, premake_count: int):
"""Create partitions for the future."""
# Determine start date
# If table is partitioned, start from the latest partition
# If not, start from NOW (or min clock if we were doing initial load, but usually NOW for future)
top_partition_ts = self.execute_query(
"""SELECT MAX(`partition_description`) FROM `information_schema`.`partitions`
WHERE `table_schema` = %s AND `table_name` = %s AND `partition_name` IS NOT NULL""",
(self.db_name, table), fetch='one'
)
curr_time = self.truncate_date(datetime.now(), period)
if top_partition_ts:
start_dt = datetime.fromtimestamp(int(top_partition_ts))
# Start from the period AFTER the last existing one
# Actually, MAX(description) is the *end* of the last partition.
# e.g. p2023_10_01 VALUES LESS THAN (Oct 2)
# So start_dt is Oct 2.
else:
# No partitions? Should be handled by init, but fallback to NOW
start_dt = self.truncate_date(datetime.now(), period)
# Create 'premake_count' partitions ahead of NOW
# But we must ensure we cover the gap if the last partition is old
# So we ensure we have partitions up to NOW + premake * period
target_max_date = self.get_next_date(curr_time, period, premake_count)
current_planning_dt = start_dt
new_partitions = {}
while current_planning_dt < target_max_date:
part_name = self.get_partition_name(current_planning_dt, period)
part_desc = self.get_partition_description(current_planning_dt, period)
new_partitions[part_name] = part_desc
current_planning_dt = self.get_next_date(current_planning_dt, period, 1)
if not new_partitions:
return
# Generate ADD PARTITION query
parts_sql = []
for name, timestamp_expr in sorted(new_partitions.items()):
parts_sql.append(PARTITION_TEMPLATE % (name, timestamp_expr))
query = f"ALTER TABLE `{table}` ADD PARTITION (\n" + ",\n".join(parts_sql) + "\n)"
self.logger.info(f"Adding {len(new_partitions)} partitions to {table}")
self.execute_query(query)
def remove_old_partitions(self, table: str, retention_str: str):
"""Drop partitions older than retention period."""
cutoff_date = self.get_lookback_date(retention_str)
cutoff_ts = int(cutoff_date.timestamp())
existing = self.get_existing_partitions(table)
to_drop = []
for name, desc_ts in existing:
# Drop if the *upper bound* of the partition is still older than cutoff?
# Or if it contains ONLY data older than cutoff?
# VALUES LESS THAN (desc_ts).
# If desc_ts <= cutoff_ts, then ALL data in partition is < cutoff. Safe to drop.
if desc_ts <= cutoff_ts:
to_drop.append(name)
if not to_drop:
return
self.logger.info(f"Dropping {len(to_drop)} old partitions from {table} (Retain: {retention_str})")
for name in to_drop:
self.execute_query(f"ALTER TABLE `{table}` DROP PARTITION {name}")
def initialize_partitioning(self, table: str, period: str, premake: int, retention_str: str):
"""Initial partitioning for a table (convert regular table to partitioned)."""
self.logger.info(f"Initializing partitioning for {table}")
if self.has_incompatible_primary_key(table):
self.logger.error(f"Cannot partition {table}: Primary Key does not include 'clock' column.")
return
# If already partitioned, skip
if self.get_existing_partitions(table):
self.logger.info(f"Table {table} is already partitioned.")
return
init_strategy = self.config.get('initial_partitioning_start', 'db_min')
start_dt = None
p_archive_ts = None
if init_strategy == 'retention':
self.logger.info(f"Strategy 'retention': Calculating start date from retention ({retention_str})")
retention_date = self.get_lookback_date(retention_str)
# Start granular partitions from the retention date
start_dt = self.truncate_date(retention_date, period)
# Create a catch-all for anything older
p_archive_ts = int(start_dt.timestamp())
else:
# Default 'db_min' strategy
self.logger.info("Strategy 'db_min': Querying table for minimum clock (may be slow)")
min_clock = self.get_table_min_clock(table)
if not min_clock:
# Empty table. Start from NOW
start_dt = self.truncate_date(datetime.now(), period)
else:
# Table has data.
start_dt = self.truncate_date(min_clock, period)
# Build list of partitions from start_dt up to NOW + premake
target_dt = self.get_next_date(self.truncate_date(datetime.now(), period), period, premake)
curr = start_dt
partitions_def = {}
# If we have an archive partition, add it first
if p_archive_ts:
partitions_def['p_archive'] = str(p_archive_ts)
while curr < target_dt:
name = self.get_partition_name(curr, period)
desc = self.get_partition_description(curr, period)
partitions_def[name] = desc
curr = self.get_next_date(curr, period, 1)
# Re-doing the loop to be cleaner on types
parts_sql = []
# 1. Archive Partition
if p_archive_ts:
parts_sql.append(f"PARTITION p_archive VALUES LESS THAN ({p_archive_ts}) ENGINE = InnoDB")
# 2. Granular Partitions
# We need to iterate again from start_dt
curr = start_dt
while curr < target_dt:
name = self.get_partition_name(curr, period)
desc_date_str = self.get_partition_description(curr, period) # Returns "YYYY-MM-DD HH:MM:SS"
parts_sql.append(PARTITION_TEMPLATE % (name, desc_date_str))
curr = self.get_next_date(curr, period, 1)
query = f"ALTER TABLE `{table}` PARTITION BY RANGE (`clock`) (\n" + ",\n".join(parts_sql) + "\n)"
self.logger.info(f"Applying initial partitioning to {table} ({len(parts_sql)} partitions)")
self.execute_query(query)
def run(self, mode: str):
"""Main execution loop."""
with self.connect_db():
self.check_compatibility()
partitions_conf = self.config.get('partitions', {})
premake = self.config.get('premake', 10)
if mode == 'delete':
self.logger.warning("Delete Mode: Removing ALL partitioning from configured tables is not fully implemented in refactor yet.")
# Implement if needed, usually just ALTER TABLE REMOVE PARTITIONING
return
for period, tables in partitions_conf.items():
if not tables:
continue
for item in tables:
# Item is dict like {'history': '14d'}
table = list(item.keys())[0]
retention = item[table]
if mode == 'init':
self.initialize_partitioning(table, period, premake, retention)
else:
# Maintenance mode (Add new, remove old)
self.create_future_partitions(table, period, premake)
self.remove_old_partitions(table, retention)
# Housekeeping extras
if mode != 'init' and not self.dry_run:
# delete_extra_data logic...
pass # Can add back specific cleanups like `sessions` table if desired
def setup_logging(config_log_type: str):
logger = logging.getLogger('zabbix_partitioning')
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
if config_log_type == 'syslog':
handler = logging.handlers.SysLogHandler(address='/dev/log')
formatter = logging.Formatter('%(name)s: %(message)s') # Syslog has its own timestamps usually
else:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
logger.addHandler(handler)
def parse_args():
parser = argparse.ArgumentParser(description='Zabbix Partitioning Manager')
parser.add_argument('-c', '--config', default='/etc/zabbix/zabbix_partitioning.conf', help='Config file path')
parser.add_argument('-i', '--init', action='store_true', help='Initialize partitions')
parser.add_argument('-d', '--delete', action='store_true', help='Remove partitions (Not implemented)')
parser.add_argument('--dry-run', action='store_true', help='Simulate queries')
return parser.parse_args()
def load_config(path):
if not os.path.exists(path):
# Fallback to local
if os.path.exists('zabbix_partitioning.conf'):
return 'zabbix_partitioning.conf'
raise ConfigurationError(f"Config file not found: {path}")
return path
def main():
args = parse_args()
try:
conf_path = load_config(args.config)
with open(conf_path, 'r') as f:
config = yaml.safe_load(f)
setup_logging(config.get('logging', 'console'))
logger = logging.getLogger('zabbix_partitioning')
mode = 'maintain'
if args.init: mode = 'init'
elif args.delete: mode = 'delete'
if args.dry_run:
logger.info("Starting in DRY-RUN mode")
app = ZabbixPartitioner(config, dry_run=args.dry_run)
app.run(mode)
except Exception as e:
print(f"Critical Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()

View File

@@ -40,6 +40,14 @@ logging: syslog
# premake: Number of partitions to create in advance
premake: 10
# initial_partitioning_start: Strategy for the first partition during initialization (--init).
# Options:
# db_min: (Default) Queries SELECT MIN(clock) to ensure ALL data is covered. Slow on huge tables consistently.
# retention: Starts partitioning from (Now - Retention Period).
# Creates a 'p_archive' partition for all data older than retention.
# Much faster as it skips the MIN(clock) query. (Recommended for large DBs)
initial_partitioning_start: db_min
# replicate_sql: False - Disable binary logging. Partitioning changes are NOT replicated to slaves (use for independent maintenance).
# replicate_sql: True - Enable binary logging. Partitioning changes ARE replicated to slaves (use for consistent cluster schema).
replicate_sql: False

View File

@@ -371,7 +371,7 @@ class ZabbixPartitioner:
for name in to_drop:
self.execute_query(f"ALTER TABLE `{table}` DROP PARTITION {name}")
def initialize_partitioning(self, table: str, period: str, premake: int):
def initialize_partitioning(self, table: str, period: str, premake: int, retention_str: str):
"""Initial partitioning for a table (convert regular table to partitioned)."""
self.logger.info(f"Initializing partitioning for {table}")
@@ -384,35 +384,63 @@ class ZabbixPartitioner:
self.logger.info(f"Table {table} is already partitioned.")
return
# Check for data
min_clock = self.get_table_min_clock(table)
init_strategy = self.config.get('initial_partitioning_start', 'db_min')
start_dt = None
p_archive_ts = None
if not min_clock:
# Empty table. Start from NOW
start_dt = self.truncate_date(datetime.now(), period)
if init_strategy == 'retention':
self.logger.info(f"Strategy 'retention': Calculating start date from retention ({retention_str})")
retention_date = self.get_lookback_date(retention_str)
# Start granular partitions from the retention date
start_dt = self.truncate_date(retention_date, period)
# Create a catch-all for anything older
p_archive_ts = int(start_dt.timestamp())
else:
# Table has data.
# For a safe migration, we usually create a catch-all for old data (p_old) or just start partitions covering existing data.
# This script's strategy: Create partitions starting from min_clock.
start_dt = self.truncate_date(min_clock, period)
# Default 'db_min' strategy
self.logger.info("Strategy 'db_min': Querying table for minimum clock (may be slow)")
min_clock = self.get_table_min_clock(table)
if not min_clock:
# Empty table. Start from NOW
start_dt = self.truncate_date(datetime.now(), period)
else:
# Table has data.
start_dt = self.truncate_date(min_clock, period)
# Build list of partitions from start_dt up to NOW + premake
target_dt = self.get_next_date(self.truncate_date(datetime.now(), period), period, premake)
curr = start_dt
partitions_def = {}
# If we have an archive partition, add it first
if p_archive_ts:
partitions_def['p_archive'] = str(p_archive_ts)
while curr < target_dt:
name = self.get_partition_name(curr, period)
desc = self.get_partition_description(curr, period)
partitions_def[name] = desc
curr = self.get_next_date(curr, period, 1)
# Re-doing the loop to be cleaner on types
parts_sql = []
for name, timestamp_expr in sorted(partitions_def.items()):
parts_sql.append(PARTITION_TEMPLATE % (name, timestamp_expr))
# 1. Archive Partition
if p_archive_ts:
parts_sql.append(f"PARTITION p_archive VALUES LESS THAN ({p_archive_ts}) ENGINE = InnoDB")
# 2. Granular Partitions
# We need to iterate again from start_dt
curr = start_dt
while curr < target_dt:
name = self.get_partition_name(curr, period)
desc_date_str = self.get_partition_description(curr, period) # Returns "YYYY-MM-DD HH:MM:SS"
parts_sql.append(PARTITION_TEMPLATE % (name, desc_date_str))
curr = self.get_next_date(curr, period, 1)
query = f"ALTER TABLE `{table}` PARTITION BY RANGE (`clock`) (\n" + ",\n".join(parts_sql) + "\n)"
self.logger.info(f"Applying initial partitioning to {table} ({len(partitions_def)} partitions)")
self.logger.info(f"Applying initial partitioning to {table} ({len(parts_sql)} partitions)")
self.execute_query(query)
def run(self, mode: str):
@@ -437,7 +465,7 @@ class ZabbixPartitioner:
retention = item[table]
if mode == 'init':
self.initialize_partitioning(table, period, premake)
self.initialize_partitioning(table, period, premake, retention)
else:
# Maintenance mode (Add new, remove old)
self.create_future_partitions(table, period, premake)

View File

@@ -1,7 +1,7 @@
# Contributor: Maksym Buz <maksym.buz@zabbix.com>
# Maintainer: Maksym Buz <maksym.buz@zabbix.com>
pkgname=zabbix
pkgver=7.4.5
pkgver=7.4.6
pkgrel=0
pkgdesc="Enterprise-class open source distributed monitoring solution"
url="https://www.zabbix.com/"

View File

@@ -0,0 +1,36 @@
# Zabbix Partitioning Tests
This directory contains a Docker-based test environment for the Zabbix Partitioning script.
## Prerequisites
- Docker & Docker Compose
- Python 3
## Setup & Run
1. Start the database container:
```bash
docker compose up -d
```
This will start a MySQL 8.0 container and import the Zabbix schema.
2. Create valid config (done automatically):
The `test_config.yaml` references the running container.
3. Run the partitioning script:
```bash
# Create virtual environment if needed
python3 -m venv venv
./venv/bin/pip install pymysql pyyaml
# Dry Run
./venv/bin/python3 ../../partitioning/zabbix_partitioning.py -c test_config.yaml --dry-run --init
# Live Run
./venv/bin/python3 ../../partitioning/zabbix_partitioning.py -c test_config.yaml --init
```
## Cleanup
```bash
docker compose down
rm -rf venv
```

View File

@@ -0,0 +1,14 @@
services:
zabbix-db:
image: mysql:8.0
container_name: zabbix-partition-test
environment:
MYSQL_ROOT_PASSWORD: root_password
MYSQL_DATABASE: zabbix
MYSQL_USER: zbx_part
MYSQL_PASSWORD: zbx_password
volumes:
- ../../partitioning/schemas/70-schema-mysql.txt:/docker-entrypoint-initdb.d/schema.sql
ports:
- "33060:3306"
command: --default-authentication-plugin=mysql_native_password

View File

@@ -0,0 +1,31 @@
import re
def get_partitionable_tables(schema_path):
with open(schema_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# Split into CREATE TABLE statements
tables = content.split('CREATE TABLE')
valid_tables = []
for table_def in tables:
# Extract table name
name_match = re.search(r'`(\w+)`', table_def)
if not name_match:
continue
table_name = name_match.group(1)
# Check for PRIMARY KEY definition
pk_match = re.search(r'PRIMARY KEY \((.*?)\)', table_def, re.DOTALL)
if pk_match:
pk_cols = pk_match.group(1)
if 'clock' in pk_cols:
valid_tables.append(table_name)
return valid_tables
if __name__ == '__main__':
tables = get_partitionable_tables('/opt/git/Zabbix/partitioning/70-schema-mysql.txt')
print("Partitionable tables (PK contains 'clock'):")
for t in tables:
print(f" - {t}")

View File

@@ -0,0 +1,25 @@
database:
type: mysql
host: 127.0.0.1
socket:
user: root
passwd: root_password
db: zabbix
# Port mapping in docker-compose is 33060
port: 33060
partitions:
daily:
- history: 7d
- history_uint: 7d
- history_str: 7d
- history_log: 7d
- history_text: 7d
- history_bin: 7d
- trends: 365d
- trends_uint: 365d
logging: console
premake: 2
replicate_sql: False
initial_partitioning_start: retention

View File

@@ -0,0 +1,25 @@
import time
import pymysql
import sys
config = {
'host': '127.0.0.1',
'port': 33060,
'user': 'root',
'password': 'root_password',
'database': 'zabbix'
}
max_retries = 90
for i in range(max_retries):
try:
conn = pymysql.connect(**config)
print("Database is ready!")
conn.close()
sys.exit(0)
except Exception as e:
print(f"Waiting for DB... ({e})")
time.sleep(2)
print("Timeout waiting for DB")
sys.exit(1)