mirror of
https://github.com/Icinga/icingadb.git
synced 2026-02-03 20:40:34 -05:00
298 lines
11 KiB
Go
298 lines
11 KiB
Go
package main
|
|
|
|
import (
|
|
"database/sql"
|
|
_ "embed"
|
|
"github.com/jmoiron/sqlx"
|
|
"github.com/pkg/errors"
|
|
"math"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
//go:embed embed/event_time_cache_schema.sql
|
|
var eventTimeCacheSchema string
|
|
|
|
//go:embed embed/previous_hard_state_cache_schema.sql
|
|
var previousHardStateCacheSchema string
|
|
|
|
// buildEventTimeCache rationale:
|
|
//
|
|
// Icinga DB's flapping_history#id always needs start_time. flapping_end rows would need an IDO subquery for that.
|
|
// That would make the IDO reading even slower than the Icinga DB writing.
|
|
// Therefore: Stream IDO's icinga_flappinghistory once, compute flapping_history#start_time
|
|
// and cache it into an SQLite database. Then steam from that database and the IDO.
|
|
//
|
|
// Similar for acknowledgements. (On non-recoverable errors the whole program exits.)
|
|
func buildEventTimeCache(ht *historyType, idoColumns []string) {
|
|
type row = struct {
|
|
Id uint64
|
|
EventTime int64
|
|
EventTimeUsec uint32
|
|
EventIsStart uint8
|
|
ObjectId uint64
|
|
}
|
|
|
|
chunkCacheTx(ht.cache, func(tx **sqlx.Tx, commitPeriodically func()) {
|
|
var checkpoint struct {
|
|
Cnt int64
|
|
MaxId sql.NullInt64
|
|
}
|
|
cacheGet(*tx, &checkpoint, "SELECT COUNT(*) cnt, MAX(history_id) max_id FROM end_start_time")
|
|
|
|
ht.bar.SetCurrent(checkpoint.Cnt * 2)
|
|
|
|
// Stream source data...
|
|
sliceIdoHistory(
|
|
ht,
|
|
"SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+
|
|
// For actual migration icinga_objects will be joined anyway,
|
|
// so it makes no sense to take vanished objects into account.
|
|
" xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+
|
|
ht.idoIdColumn+" <= :toid AND xh."+
|
|
ht.idoIdColumn+" > :checkpoint ORDER BY xh."+ht.idoIdColumn+" LIMIT :bulk",
|
|
nil, checkpoint.MaxId.Int64, // ... since we were interrupted:
|
|
func(idoRows []row) (checkpoint interface{}) {
|
|
for _, idoRow := range idoRows {
|
|
if idoRow.EventIsStart == 0 {
|
|
// Ack/flapping end event. Get the start event time:
|
|
var lst []struct {
|
|
EventTime int64
|
|
EventTimeUsec uint32
|
|
}
|
|
cacheSelect(
|
|
*tx, &lst, "SELECT event_time, event_time_usec FROM last_start_time WHERE object_id=?",
|
|
idoRow.ObjectId,
|
|
)
|
|
|
|
// If we have that, ...
|
|
if len(lst) > 0 {
|
|
// ... save the start event time for the actual migration:
|
|
cacheExec(
|
|
*tx,
|
|
"INSERT INTO end_start_time(history_id, event_time, event_time_usec) VALUES (?, ?, ?)",
|
|
idoRow.Id, lst[0].EventTime, lst[0].EventTimeUsec,
|
|
)
|
|
|
|
// This previously queried info isn't needed anymore.
|
|
cacheExec(*tx, "DELETE FROM last_start_time WHERE object_id=?", idoRow.ObjectId)
|
|
}
|
|
} else {
|
|
// Ack/flapping start event directly after another start event (per checkable).
|
|
// The old one won't have (but the new one will) an end event (which will need its time).
|
|
cacheExec(*tx, "DELETE FROM last_start_time WHERE object_id=?", idoRow.ObjectId)
|
|
|
|
// An ack/flapping start event. The following end event (per checkable) will need its time.
|
|
cacheExec(
|
|
*tx, "INSERT INTO last_start_time(object_id, event_time, event_time_usec) VALUES (?, ?, ?)",
|
|
idoRow.ObjectId, idoRow.EventTime, idoRow.EventTimeUsec,
|
|
)
|
|
}
|
|
|
|
commitPeriodically()
|
|
checkpoint = idoRow.Id
|
|
}
|
|
|
|
ht.bar.IncrBy(len(idoRows))
|
|
return
|
|
},
|
|
)
|
|
|
|
// This never queried info isn't needed anymore.
|
|
cacheExec(*tx, "DELETE FROM last_start_time")
|
|
})
|
|
|
|
ht.bar.SetTotal(ht.bar.Current(), true)
|
|
}
|
|
|
|
// buildPreviousHardStateCache rationale:
|
|
//
|
|
// Icinga DB's state_history#previous_hard_state would need a subquery.
|
|
// That make the IDO reading even slower than the Icinga DB writing.
|
|
// Therefore: Stream IDO's icinga_statehistory once, compute state_history#previous_hard_state
|
|
// and cache it into an SQLite database. Then steam from that database and the IDO.
|
|
//
|
|
// Similar for notifications. (On non-recoverable errors the whole program exits.)
|
|
func buildPreviousHardStateCache(ht *historyType, idoColumns []string) {
|
|
type row = struct {
|
|
Id uint64
|
|
ObjectId uint64
|
|
LastHardState uint8
|
|
}
|
|
|
|
chunkCacheTx(ht.cache, func(tx **sqlx.Tx, commitPeriodically func()) {
|
|
var nextIds struct {
|
|
Cnt int64
|
|
MinId sql.NullInt64
|
|
}
|
|
cacheGet(*tx, &nextIds, "SELECT COUNT(*) cnt, MIN(history_id) min_id FROM next_ids")
|
|
|
|
var previousHardStateCnt int64
|
|
cacheGet(*tx, &previousHardStateCnt, "SELECT COUNT(*) FROM previous_hard_state")
|
|
|
|
var checkpoint int64
|
|
if nextIds.MinId.Valid { // there are next_ids
|
|
checkpoint = nextIds.MinId.Int64 // this kind of caches is filled descending
|
|
} else { // there aren't any next_ids
|
|
// next_ids contains the most recently processed IDs and is only empty if...
|
|
if previousHardStateCnt == 0 {
|
|
// ... we didn't actually start yet...
|
|
checkpoint = math.MaxInt64 // start from the largest (possible) ID
|
|
} else {
|
|
// ... or we've already finished.
|
|
checkpoint = 0 // make following query no-op
|
|
}
|
|
}
|
|
|
|
ht.bar.SetCurrent(previousHardStateCnt + nextIds.Cnt)
|
|
|
|
// We continue where we finished before. As we build the cache in reverse chronological order:
|
|
// 1. If the history grows between two migration trials, we won't migrate the difference. Workarounds:
|
|
// a. Start migration after Icinga DB is up and running.
|
|
// b. Remove the cache before the next migration trial.
|
|
// 2. If the history gets cleaned up between two migration trials,
|
|
// the difference either just doesn't appear in the cache or - if already there - will be ignored later.
|
|
|
|
// Stream source data...
|
|
sliceIdoHistory(
|
|
ht,
|
|
"SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+
|
|
// For actual migration icinga_objects will be joined anyway,
|
|
// so it makes no sense to take vanished objects into account.
|
|
" xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+
|
|
ht.idoIdColumn+" <= :toid AND xh."+
|
|
ht.idoIdColumn+" < :checkpoint ORDER BY xh."+ht.idoIdColumn+" DESC LIMIT :bulk",
|
|
nil, checkpoint, // ... since we were interrupted:
|
|
func(idoRows []row) (checkpoint interface{}) {
|
|
for _, idoRow := range idoRows {
|
|
var nhs []struct{ NextHardState uint8 }
|
|
cacheSelect(*tx, &nhs, "SELECT next_hard_state FROM next_hard_state WHERE object_id=?", idoRow.ObjectId)
|
|
|
|
if len(nhs) < 1 { // we just started (per checkable)
|
|
// At the moment (we're "travelling back in time") that's the checkable's hard state:
|
|
cacheExec(
|
|
*tx, "INSERT INTO next_hard_state(object_id, next_hard_state) VALUES (?, ?)",
|
|
idoRow.ObjectId, idoRow.LastHardState,
|
|
)
|
|
|
|
// But for the current time point the previous hard state isn't known, yet:
|
|
cacheExec(
|
|
*tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
|
|
idoRow.Id, idoRow.ObjectId,
|
|
)
|
|
} else if idoRow.LastHardState == nhs[0].NextHardState {
|
|
// The hard state didn't change yet (per checkable),
|
|
// so this time point also awaits the previous hard state.
|
|
cacheExec(
|
|
*tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
|
|
idoRow.Id, idoRow.ObjectId,
|
|
)
|
|
} else { // the hard state changed (per checkable)
|
|
// That past hard state is now available for the processed future time points:
|
|
cacheExec(
|
|
*tx,
|
|
"INSERT INTO previous_hard_state(history_id, previous_hard_state) "+
|
|
"SELECT history_id, ? FROM next_ids WHERE object_id=?",
|
|
idoRow.LastHardState, idoRow.ObjectId,
|
|
)
|
|
|
|
// Now they have what they wanted:
|
|
cacheExec(*tx, "DELETE FROM next_hard_state WHERE object_id=?", idoRow.ObjectId)
|
|
cacheExec(*tx, "DELETE FROM next_ids WHERE object_id=?", idoRow.ObjectId)
|
|
|
|
// That's done.
|
|
// Now do the same thing as in the "we just started" case above, for the same reason:
|
|
|
|
cacheExec(
|
|
*tx, "INSERT INTO next_hard_state(object_id, next_hard_state) VALUES (?, ?)",
|
|
idoRow.ObjectId, idoRow.LastHardState,
|
|
)
|
|
|
|
cacheExec(
|
|
*tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
|
|
idoRow.Id, idoRow.ObjectId,
|
|
)
|
|
}
|
|
|
|
commitPeriodically()
|
|
checkpoint = idoRow.Id
|
|
}
|
|
|
|
ht.bar.IncrBy(len(idoRows))
|
|
return
|
|
},
|
|
)
|
|
|
|
// No past hard state is available for the processed future time points, assuming pending:
|
|
cacheExec(
|
|
*tx, "INSERT INTO previous_hard_state(history_id, previous_hard_state) SELECT history_id, 99 FROM next_ids",
|
|
)
|
|
|
|
// Now they should have what they wanted:
|
|
cacheExec(*tx, "DELETE FROM next_hard_state")
|
|
cacheExec(*tx, "DELETE FROM next_ids")
|
|
})
|
|
|
|
ht.bar.SetTotal(ht.bar.Current(), true)
|
|
}
|
|
|
|
// chunkCacheTx rationale: during do operate on cache via *tx. After every completed operation call commitPeriodically()
|
|
// which periodically commits *tx and starts a new tx. (That's why tx is a **, not just a *.)
|
|
// (On non-recoverable errors the whole program exits.)
|
|
func chunkCacheTx(cache *sqlx.DB, do func(tx **sqlx.Tx, commitPeriodically func())) {
|
|
logger := log.With("backend", "cache")
|
|
|
|
tx, err := cache.Beginx()
|
|
if err != nil {
|
|
logger.Fatalf("%+v", errors.Wrap(err, "can't begin transaction"))
|
|
}
|
|
|
|
const commitInterval = 5 * time.Minute
|
|
nextCommit := time.Now().Add(commitInterval)
|
|
|
|
do(&tx, func() { // commitPeriodically
|
|
if now := time.Now(); now.After(nextCommit) {
|
|
if err := tx.Commit(); err != nil {
|
|
logger.Fatalf("%+v", errors.Wrap(err, "can't commit transaction"))
|
|
}
|
|
|
|
var err error
|
|
|
|
tx, err = cache.Beginx()
|
|
if err != nil {
|
|
logger.Fatalf("%+v", errors.Wrap(err, "can't begin transaction"))
|
|
}
|
|
|
|
nextCommit = nextCommit.Add(commitInterval)
|
|
}
|
|
})
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
logger.Fatalf("%+v", errors.Wrap(err, "can't commit transaction"))
|
|
}
|
|
}
|
|
|
|
// cacheGet does cache.Get(dest, query, args...). (On non-recoverable errors the whole program exits.)
|
|
func cacheGet(cache interface {
|
|
Get(dest interface{}, query string, args ...interface{}) error
|
|
}, dest interface{}, query string, args ...interface{}) {
|
|
if err := cache.Get(dest, query, args...); err != nil {
|
|
log.With("backend", "cache", "query", query, "args", args).
|
|
Fatalf("%+v", errors.Wrap(err, "can't perform query"))
|
|
}
|
|
}
|
|
|
|
// cacheSelect does cacheTx.Select(dest, query, args...). (On non-recoverable errors the whole program exits.)
|
|
func cacheSelect(cacheTx *sqlx.Tx, dest interface{}, query string, args ...interface{}) {
|
|
if err := cacheTx.Select(dest, query, args...); err != nil {
|
|
log.With("backend", "cache", "query", query, "args", args).
|
|
Fatalf("%+v", errors.Wrap(err, "can't perform query"))
|
|
}
|
|
}
|
|
|
|
// cacheExec does cacheTx.Exec(dml, args...). On non-recoverable errors the whole program exits.
|
|
func cacheExec(cacheTx *sqlx.Tx, dml string, args ...interface{}) {
|
|
if _, err := cacheTx.Exec(dml, args...); err != nil {
|
|
log.With("backend", "cache", "dml", dml, "args", args).Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
|
|
}
|
|
}
|