diff --git a/partitioning/zabbix_partitioning.conf b/partitioning/zabbix_partitioning.conf index 02899c8..2d211da 100644 --- a/partitioning/zabbix_partitioning.conf +++ b/partitioning/zabbix_partitioning.conf @@ -40,6 +40,14 @@ logging: syslog # premake: Number of partitions to create in advance premake: 10 +# initial_partitioning_start: Strategy for the first partition during initialization (--init). +# Options: +# db_min: (Default) Queries SELECT MIN(clock) to ensure ALL data is covered. Slow on huge tables consistently. +# retention: Starts partitioning from (Now - Retention Period). +# Creates a 'p_archive' partition for all data older than retention. +# Much faster as it skips the MIN(clock) query. (Recommended for large DBs) +initial_partitioning_start: db_min + # replicate_sql: False - Disable binary logging. Partitioning changes are NOT replicated to slaves (use for independent maintenance). # replicate_sql: True - Enable binary logging. Partitioning changes ARE replicated to slaves (use for consistent cluster schema). replicate_sql: False \ No newline at end of file diff --git a/partitioning/zabbix_partitioning.py b/partitioning/zabbix_partitioning.py index e4da393..a1a8f76 100644 --- a/partitioning/zabbix_partitioning.py +++ b/partitioning/zabbix_partitioning.py @@ -371,7 +371,7 @@ class ZabbixPartitioner: for name in to_drop: self.execute_query(f"ALTER TABLE `{table}` DROP PARTITION {name}") - def initialize_partitioning(self, table: str, period: str, premake: int): + def initialize_partitioning(self, table: str, period: str, premake: int, retention_str: str): """Initial partitioning for a table (convert regular table to partitioned).""" self.logger.info(f"Initializing partitioning for {table}") @@ -384,35 +384,63 @@ class ZabbixPartitioner: self.logger.info(f"Table {table} is already partitioned.") return - # Check for data - min_clock = self.get_table_min_clock(table) - - if not min_clock: - # Empty table. Start from NOW - start_dt = self.truncate_date(datetime.now(), period) + init_strategy = self.config.get('initial_partitioning_start', 'db_min') + start_dt = None + p_archive_ts = None + + if init_strategy == 'retention': + self.logger.info(f"Strategy 'retention': Calculating start date from retention ({retention_str})") + retention_date = self.get_lookback_date(retention_str) + # Start granular partitions from the retention date + start_dt = self.truncate_date(retention_date, period) + # Create a catch-all for anything older + p_archive_ts = int(start_dt.timestamp()) else: - # Table has data. - # For a safe migration, we usually create a catch-all for old data (p_old) or just start partitions covering existing data. - # This script's strategy: Create partitions starting from min_clock. - start_dt = self.truncate_date(min_clock, period) + # Default 'db_min' strategy + self.logger.info("Strategy 'db_min': Querying table for minimum clock (may be slow)") + min_clock = self.get_table_min_clock(table) + + if not min_clock: + # Empty table. Start from NOW + start_dt = self.truncate_date(datetime.now(), period) + else: + # Table has data. + start_dt = self.truncate_date(min_clock, period) # Build list of partitions from start_dt up to NOW + premake target_dt = self.get_next_date(self.truncate_date(datetime.now(), period), period, premake) curr = start_dt partitions_def = {} + + # If we have an archive partition, add it first + if p_archive_ts: + partitions_def['p_archive'] = str(p_archive_ts) + while curr < target_dt: name = self.get_partition_name(curr, period) desc = self.get_partition_description(curr, period) partitions_def[name] = desc curr = self.get_next_date(curr, period, 1) + # Re-doing the loop to be cleaner on types parts_sql = [] - for name, timestamp_expr in sorted(partitions_def.items()): - parts_sql.append(PARTITION_TEMPLATE % (name, timestamp_expr)) + + # 1. Archive Partition + if p_archive_ts: + parts_sql.append(f"PARTITION p_archive VALUES LESS THAN ({p_archive_ts}) ENGINE = InnoDB") + + # 2. Granular Partitions + # We need to iterate again from start_dt + curr = start_dt + while curr < target_dt: + name = self.get_partition_name(curr, period) + desc_date_str = self.get_partition_description(curr, period) # Returns "YYYY-MM-DD HH:MM:SS" + parts_sql.append(PARTITION_TEMPLATE % (name, desc_date_str)) + curr = self.get_next_date(curr, period, 1) query = f"ALTER TABLE `{table}` PARTITION BY RANGE (`clock`) (\n" + ",\n".join(parts_sql) + "\n)" - self.logger.info(f"Applying initial partitioning to {table} ({len(partitions_def)} partitions)") + self.logger.info(f"Applying initial partitioning to {table} ({len(parts_sql)} partitions)") self.execute_query(query) def run(self, mode: str): @@ -437,7 +465,7 @@ class ZabbixPartitioner: retention = item[table] if mode == 'init': - self.initialize_partitioning(table, period, premake) + self.initialize_partitioning(table, period, premake, retention) else: # Maintenance mode (Add new, remove old) self.create_future_partitions(table, period, premake)