CHANGE: Added min(clock) scan bypass function (initial_partitioning_start), which significantly affects working with huge tables.
This commit is contained in:
@@ -40,6 +40,14 @@ logging: syslog
|
||||
# premake: Number of partitions to create in advance
|
||||
premake: 10
|
||||
|
||||
# initial_partitioning_start: Strategy for the first partition during initialization (--init).
|
||||
# Options:
|
||||
# db_min: (Default) Queries SELECT MIN(clock) to ensure ALL data is covered. Slow on huge tables consistently.
|
||||
# retention: Starts partitioning from (Now - Retention Period).
|
||||
# Creates a 'p_archive' partition for all data older than retention.
|
||||
# Much faster as it skips the MIN(clock) query. (Recommended for large DBs)
|
||||
initial_partitioning_start: db_min
|
||||
|
||||
# replicate_sql: False - Disable binary logging. Partitioning changes are NOT replicated to slaves (use for independent maintenance).
|
||||
# replicate_sql: True - Enable binary logging. Partitioning changes ARE replicated to slaves (use for consistent cluster schema).
|
||||
replicate_sql: False
|
||||
@@ -371,7 +371,7 @@ class ZabbixPartitioner:
|
||||
for name in to_drop:
|
||||
self.execute_query(f"ALTER TABLE `{table}` DROP PARTITION {name}")
|
||||
|
||||
def initialize_partitioning(self, table: str, period: str, premake: int):
|
||||
def initialize_partitioning(self, table: str, period: str, premake: int, retention_str: str):
|
||||
"""Initial partitioning for a table (convert regular table to partitioned)."""
|
||||
self.logger.info(f"Initializing partitioning for {table}")
|
||||
|
||||
@@ -384,35 +384,63 @@ class ZabbixPartitioner:
|
||||
self.logger.info(f"Table {table} is already partitioned.")
|
||||
return
|
||||
|
||||
# Check for data
|
||||
min_clock = self.get_table_min_clock(table)
|
||||
|
||||
if not min_clock:
|
||||
# Empty table. Start from NOW
|
||||
start_dt = self.truncate_date(datetime.now(), period)
|
||||
init_strategy = self.config.get('initial_partitioning_start', 'db_min')
|
||||
start_dt = None
|
||||
p_archive_ts = None
|
||||
|
||||
if init_strategy == 'retention':
|
||||
self.logger.info(f"Strategy 'retention': Calculating start date from retention ({retention_str})")
|
||||
retention_date = self.get_lookback_date(retention_str)
|
||||
# Start granular partitions from the retention date
|
||||
start_dt = self.truncate_date(retention_date, period)
|
||||
# Create a catch-all for anything older
|
||||
p_archive_ts = int(start_dt.timestamp())
|
||||
else:
|
||||
# Table has data.
|
||||
# For a safe migration, we usually create a catch-all for old data (p_old) or just start partitions covering existing data.
|
||||
# This script's strategy: Create partitions starting from min_clock.
|
||||
start_dt = self.truncate_date(min_clock, period)
|
||||
# Default 'db_min' strategy
|
||||
self.logger.info("Strategy 'db_min': Querying table for minimum clock (may be slow)")
|
||||
min_clock = self.get_table_min_clock(table)
|
||||
|
||||
if not min_clock:
|
||||
# Empty table. Start from NOW
|
||||
start_dt = self.truncate_date(datetime.now(), period)
|
||||
else:
|
||||
# Table has data.
|
||||
start_dt = self.truncate_date(min_clock, period)
|
||||
|
||||
# Build list of partitions from start_dt up to NOW + premake
|
||||
target_dt = self.get_next_date(self.truncate_date(datetime.now(), period), period, premake)
|
||||
|
||||
curr = start_dt
|
||||
partitions_def = {}
|
||||
|
||||
# If we have an archive partition, add it first
|
||||
if p_archive_ts:
|
||||
partitions_def['p_archive'] = str(p_archive_ts)
|
||||
|
||||
while curr < target_dt:
|
||||
name = self.get_partition_name(curr, period)
|
||||
desc = self.get_partition_description(curr, period)
|
||||
partitions_def[name] = desc
|
||||
curr = self.get_next_date(curr, period, 1)
|
||||
|
||||
# Re-doing the loop to be cleaner on types
|
||||
parts_sql = []
|
||||
for name, timestamp_expr in sorted(partitions_def.items()):
|
||||
parts_sql.append(PARTITION_TEMPLATE % (name, timestamp_expr))
|
||||
|
||||
# 1. Archive Partition
|
||||
if p_archive_ts:
|
||||
parts_sql.append(f"PARTITION p_archive VALUES LESS THAN ({p_archive_ts}) ENGINE = InnoDB")
|
||||
|
||||
# 2. Granular Partitions
|
||||
# We need to iterate again from start_dt
|
||||
curr = start_dt
|
||||
while curr < target_dt:
|
||||
name = self.get_partition_name(curr, period)
|
||||
desc_date_str = self.get_partition_description(curr, period) # Returns "YYYY-MM-DD HH:MM:SS"
|
||||
parts_sql.append(PARTITION_TEMPLATE % (name, desc_date_str))
|
||||
curr = self.get_next_date(curr, period, 1)
|
||||
|
||||
query = f"ALTER TABLE `{table}` PARTITION BY RANGE (`clock`) (\n" + ",\n".join(parts_sql) + "\n)"
|
||||
self.logger.info(f"Applying initial partitioning to {table} ({len(partitions_def)} partitions)")
|
||||
self.logger.info(f"Applying initial partitioning to {table} ({len(parts_sql)} partitions)")
|
||||
self.execute_query(query)
|
||||
|
||||
def run(self, mode: str):
|
||||
@@ -437,7 +465,7 @@ class ZabbixPartitioner:
|
||||
retention = item[table]
|
||||
|
||||
if mode == 'init':
|
||||
self.initialize_partitioning(table, period, premake)
|
||||
self.initialize_partitioning(table, period, premake, retention)
|
||||
else:
|
||||
# Maintenance mode (Add new, remove old)
|
||||
self.create_future_partitions(table, period, premake)
|
||||
|
||||
Reference in New Issue
Block a user