mirror of
https://github.com/Icinga/icingadb.git
synced 2026-05-26 03:12:00 -04:00
Merge 427be4b3a0 into 9d990f9d1b
This commit is contained in:
commit
e65ae2c06b
3 changed files with 106 additions and 63 deletions
|
|
@ -8,14 +8,19 @@ import (
|
|||
"github.com/icinga/icinga-go-library/database"
|
||||
"github.com/icinga/icinga-go-library/retry"
|
||||
"github.com/icinga/icinga-go-library/types"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// CleanupStmt defines information needed to compose cleanup statements.
|
||||
//
|
||||
// When multiple Columns are given, COALESCE is being applied, returning the
|
||||
// first non NULL column. Thus, start by supplying the timestamp column expected
|
||||
// to appear later, e.g., remove_time before entry_time.
|
||||
type CleanupStmt struct {
|
||||
Table string
|
||||
PK string
|
||||
Column string
|
||||
Table string
|
||||
PK string
|
||||
Columns []string
|
||||
}
|
||||
|
||||
// CleanupOlderThan deletes all rows with the specified statement that are older than the given time.
|
||||
|
|
@ -78,15 +83,20 @@ func (stmt *CleanupStmt) CleanupOlderThan(
|
|||
|
||||
// build assembles the cleanup statement for the specified database driver with the given limit.
|
||||
func (stmt *CleanupStmt) build(driverName string, limit uint64) string {
|
||||
if len(stmt.Columns) == 0 {
|
||||
panic("CleanupStmt.Columns must not be empty")
|
||||
}
|
||||
timeCol := "COALESCE(" + strings.Join(stmt.Columns, ",") + ")"
|
||||
|
||||
switch driverName {
|
||||
case database.MySQL:
|
||||
return fmt.Sprintf(`DELETE FROM %[1]s WHERE environment_id = :environment_id AND %[2]s < :time LIMIT %[3]d`,
|
||||
stmt.Table, stmt.Column, limit)
|
||||
stmt.Table, timeCol, limit)
|
||||
case database.PostgreSQL:
|
||||
return fmt.Sprintf(`WITH rows AS (
|
||||
SELECT %[1]s FROM %[2]s WHERE environment_id = :environment_id AND %[3]s < :time LIMIT %[4]d
|
||||
)
|
||||
DELETE FROM %[2]s WHERE %[1]s IN (SELECT %[1]s FROM rows)`, stmt.PK, stmt.Table, stmt.Column, limit)
|
||||
DELETE FROM %[2]s WHERE %[1]s IN (SELECT %[1]s FROM rows)`, stmt.PK, stmt.Table, timeCol, limit)
|
||||
default:
|
||||
panic(fmt.Sprintf("invalid database type %s", driverName))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,69 +30,76 @@ type retentionStatement struct {
|
|||
}
|
||||
|
||||
// RetentionStatements maps history categories with corresponding cleanup statements.
|
||||
//
|
||||
// For history retention, both the initial timestamp and the closing timestamp
|
||||
// are taken into account. The latter takes precedence, but might be NULL for
|
||||
// some forgotten objects, e.g., for comments.
|
||||
//
|
||||
// However, SLA retention only cares about the closing timestamp to ensure that a short retention
|
||||
// period will not mess up SLA calculations.
|
||||
var RetentionStatements = []retentionStatement{{
|
||||
RetentionType: RetentionHistory,
|
||||
Category: "acknowledgement",
|
||||
CleanupStmt: icingadb.CleanupStmt{
|
||||
Table: "acknowledgement_history",
|
||||
PK: "id",
|
||||
Column: "clear_time",
|
||||
Table: "acknowledgement_history",
|
||||
PK: "id",
|
||||
Columns: []string{"clear_time", "set_time"},
|
||||
},
|
||||
}, {
|
||||
RetentionType: RetentionHistory,
|
||||
Category: "comment",
|
||||
CleanupStmt: icingadb.CleanupStmt{
|
||||
Table: "comment_history",
|
||||
PK: "comment_id",
|
||||
Column: "remove_time",
|
||||
Table: "comment_history",
|
||||
PK: "comment_id",
|
||||
Columns: []string{"remove_time", "entry_time"},
|
||||
},
|
||||
}, {
|
||||
RetentionType: RetentionHistory,
|
||||
Category: "downtime",
|
||||
CleanupStmt: icingadb.CleanupStmt{
|
||||
Table: "downtime_history",
|
||||
PK: "downtime_id",
|
||||
Column: "end_time",
|
||||
Table: "downtime_history",
|
||||
PK: "downtime_id",
|
||||
Columns: []string{"end_time", "entry_time"},
|
||||
},
|
||||
}, {
|
||||
RetentionType: RetentionHistory,
|
||||
Category: "flapping",
|
||||
CleanupStmt: icingadb.CleanupStmt{
|
||||
Table: "flapping_history",
|
||||
PK: "id",
|
||||
Column: "end_time",
|
||||
Table: "flapping_history",
|
||||
PK: "id",
|
||||
Columns: []string{"end_time", "start_time"},
|
||||
},
|
||||
}, {
|
||||
RetentionType: RetentionHistory,
|
||||
Category: "notification",
|
||||
CleanupStmt: icingadb.CleanupStmt{
|
||||
Table: "notification_history",
|
||||
PK: "id",
|
||||
Column: "send_time",
|
||||
Table: "notification_history",
|
||||
PK: "id",
|
||||
Columns: []string{"send_time"},
|
||||
},
|
||||
}, {
|
||||
RetentionType: RetentionHistory,
|
||||
Category: "state",
|
||||
CleanupStmt: icingadb.CleanupStmt{
|
||||
Table: "state_history",
|
||||
PK: "id",
|
||||
Column: "event_time",
|
||||
Table: "state_history",
|
||||
PK: "id",
|
||||
Columns: []string{"event_time"},
|
||||
},
|
||||
}, {
|
||||
RetentionType: RetentionSla,
|
||||
Category: "sla_downtime",
|
||||
CleanupStmt: icingadb.CleanupStmt{
|
||||
Table: "sla_history_downtime",
|
||||
PK: "downtime_id",
|
||||
Column: "downtime_end",
|
||||
Table: "sla_history_downtime",
|
||||
PK: "downtime_id",
|
||||
Columns: []string{"downtime_end"},
|
||||
},
|
||||
}, {
|
||||
RetentionType: RetentionSla,
|
||||
Category: "sla_state",
|
||||
CleanupStmt: icingadb.CleanupStmt{
|
||||
Table: "sla_history_state",
|
||||
PK: "id",
|
||||
Column: "event_time",
|
||||
Table: "sla_history_state",
|
||||
PK: "id",
|
||||
Columns: []string{"event_time"},
|
||||
},
|
||||
}}
|
||||
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/jmoiron/sqlx"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"slices"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
|
@ -88,8 +89,15 @@ func TestCleanupAndRetention(t *testing.T) {
|
|||
values = append(values, row{otherEnvId, getId(), startMilli - int64(j)})
|
||||
}
|
||||
|
||||
_, err = db.NamedExec(fmt.Sprintf(`INSERT INTO %s (environment_id, %s, %s) VALUES (:env, :id, :time)`,
|
||||
stmt.Table, stmt.PK, stmt.Column), values)
|
||||
timeColumns := strings.Join(
|
||||
append(stmt.ExtraTimeColumns, stmt.TimeColumn),
|
||||
", ")
|
||||
timeColumnsValuePlaceholder := strings.Join(
|
||||
slices.Repeat([]string{":time"}, 1+len(stmt.ExtraTimeColumns)),
|
||||
", ")
|
||||
|
||||
_, err = db.NamedExec(fmt.Sprintf(`INSERT INTO %s (environment_id, %s, %s) VALUES (:env, :id, %s)`,
|
||||
stmt.Table, stmt.PK, timeColumns, timeColumnsValuePlaceholder), values)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
|
@ -108,7 +116,7 @@ func TestCleanupAndRetention(t *testing.T) {
|
|||
|
||||
var rowsLeft int
|
||||
err := db.QueryRow(
|
||||
db.Rebind(fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE environment_id = ? AND %s < ?`, stmt.Table, stmt.Column)),
|
||||
db.Rebind(fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE environment_id = ? AND %s < ?`, stmt.Table, stmt.TimeColumn)),
|
||||
envId,
|
||||
thresholdMilli,
|
||||
).Scan(&rowsLeft)
|
||||
|
|
@ -116,7 +124,7 @@ func TestCleanupAndRetention(t *testing.T) {
|
|||
|
||||
var rowsSpared int
|
||||
err = db.QueryRow(
|
||||
db.Rebind(fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE environment_id = ? AND %s >= ?`, stmt.Table, stmt.Column)),
|
||||
db.Rebind(fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE environment_id = ? AND %s >= ?`, stmt.Table, stmt.TimeColumn)),
|
||||
envId,
|
||||
thresholdMilli,
|
||||
).Scan(&rowsSpared)
|
||||
|
|
@ -143,9 +151,10 @@ func TestCleanupAndRetention(t *testing.T) {
|
|||
}
|
||||
|
||||
type cleanupStmt struct {
|
||||
Table string
|
||||
PK string
|
||||
Column string
|
||||
Table string
|
||||
PK string
|
||||
TimeColumn string
|
||||
ExtraTimeColumns []string
|
||||
}
|
||||
|
||||
type retention struct {
|
||||
|
|
@ -156,44 +165,48 @@ type retention struct {
|
|||
|
||||
var retentionStatements = map[string]cleanupStmt{
|
||||
"acknowledgement": {
|
||||
Table: "acknowledgement_history",
|
||||
PK: "id",
|
||||
Column: "clear_time",
|
||||
Table: "acknowledgement_history",
|
||||
PK: "id",
|
||||
TimeColumn: "clear_time",
|
||||
ExtraTimeColumns: []string{"set_time"},
|
||||
},
|
||||
"comment": {
|
||||
Table: "comment_history",
|
||||
PK: "comment_id",
|
||||
Column: "remove_time",
|
||||
Table: "comment_history",
|
||||
PK: "comment_id",
|
||||
TimeColumn: "remove_time",
|
||||
ExtraTimeColumns: []string{"entry_time"},
|
||||
},
|
||||
"downtime": {
|
||||
Table: "downtime_history",
|
||||
PK: "downtime_id",
|
||||
Column: "end_time",
|
||||
Table: "downtime_history",
|
||||
PK: "downtime_id",
|
||||
TimeColumn: "end_time",
|
||||
ExtraTimeColumns: []string{"entry_time"},
|
||||
},
|
||||
"flapping": {
|
||||
Table: "flapping_history",
|
||||
PK: "id",
|
||||
Column: "end_time",
|
||||
Table: "flapping_history",
|
||||
PK: "id",
|
||||
TimeColumn: "end_time",
|
||||
ExtraTimeColumns: []string{"start_time"},
|
||||
},
|
||||
"notification": {
|
||||
Table: "notification_history",
|
||||
PK: "id",
|
||||
Column: "send_time",
|
||||
Table: "notification_history",
|
||||
PK: "id",
|
||||
TimeColumn: "send_time",
|
||||
},
|
||||
"state": {
|
||||
Table: "state_history",
|
||||
PK: "id",
|
||||
Column: "event_time",
|
||||
Table: "state_history",
|
||||
PK: "id",
|
||||
TimeColumn: "event_time",
|
||||
},
|
||||
"sla_downtime": {
|
||||
Table: "sla_history_downtime",
|
||||
PK: "downtime_id",
|
||||
Column: "downtime_end",
|
||||
Table: "sla_history_downtime",
|
||||
PK: "downtime_id",
|
||||
TimeColumn: "downtime_end",
|
||||
},
|
||||
"sla_state": {
|
||||
Table: "sla_history_state",
|
||||
PK: "id",
|
||||
Column: "event_time",
|
||||
Table: "sla_history_state",
|
||||
PK: "id",
|
||||
TimeColumn: "event_time",
|
||||
},
|
||||
}
|
||||
|
||||
|
|
@ -208,13 +221,26 @@ func dropNotNullColumns(db *sqlx.DB, stmt cleanupStmt) error {
|
|||
schema = `CURRENT_SCHEMA()`
|
||||
}
|
||||
|
||||
columnNames := append(
|
||||
[]string{"environment_id", stmt.PK, stmt.TimeColumn},
|
||||
stmt.ExtraTimeColumns...)
|
||||
columnNamesPlaceholder := strings.Join(
|
||||
slices.Repeat([]string{"?"}, len(columnNames)),
|
||||
", ")
|
||||
|
||||
colsArgs := []any{stmt.Table}
|
||||
for _, columnName := range columnNames {
|
||||
colsArgs = append(colsArgs, columnName)
|
||||
}
|
||||
colsArgs = append(colsArgs, "NO")
|
||||
|
||||
var cols []string
|
||||
err := db.Select(&cols, db.Rebind(fmt.Sprintf(`
|
||||
SELECT column_name
|
||||
FROM information_schema.columns
|
||||
WHERE table_schema = %s AND table_name = ? AND column_name NOT IN (?, ?, ?) AND is_nullable = ?`,
|
||||
schema)),
|
||||
stmt.Table, "environment_id", stmt.PK, stmt.Column, "NO")
|
||||
WHERE table_schema = %s AND table_name = ? AND column_name NOT IN (%s) AND is_nullable = ?`,
|
||||
schema, columnNamesPlaceholder)),
|
||||
colsArgs...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue