icingadb/cmd/icingadb-migrate/cache.go
2022-10-26 11:18:52 +02:00

298 lines
11 KiB
Go

package main
import (
"database/sql"
_ "embed"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"math"
"strings"
"time"
)
//go:embed embed/event_time_cache_schema.sql
var eventTimeCacheSchema string
//go:embed embed/previous_hard_state_cache_schema.sql
var previousHardStateCacheSchema string
// buildEventTimeCache rationale:
//
// Icinga DB's flapping_history#id always needs start_time. flapping_end rows would need an IDO subquery for that.
// That would make the IDO reading even slower than the Icinga DB writing.
// Therefore: Stream IDO's icinga_flappinghistory once, compute flapping_history#start_time
// and cache it into an SQLite database. Then steam from that database and the IDO.
//
// Similar for acknowledgements. (On non-recoverable errors the whole program exits.)
func buildEventTimeCache(ht *historyType, idoColumns []string) {
type row = struct {
Id uint64
EventTime int64
EventTimeUsec uint32
EventIsStart uint8
ObjectId uint64
}
chunkCacheTx(ht.cache, func(tx **sqlx.Tx, commitPeriodically func()) {
var checkpoint struct {
Cnt int64
MaxId sql.NullInt64
}
cacheGet(*tx, &checkpoint, "SELECT COUNT(*) cnt, MAX(history_id) max_id FROM end_start_time")
ht.bar.SetCurrent(checkpoint.Cnt * 2)
// Stream source data...
sliceIdoHistory(
ht,
"SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+
// For actual migration icinga_objects will be joined anyway,
// so it makes no sense to take vanished objects into account.
" xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+
ht.idoIdColumn+" <= :toid AND xh."+
ht.idoIdColumn+" > :checkpoint ORDER BY xh."+ht.idoIdColumn+" LIMIT :bulk",
nil, checkpoint.MaxId.Int64, // ... since we were interrupted:
func(idoRows []row) (checkpoint interface{}) {
for _, idoRow := range idoRows {
if idoRow.EventIsStart == 0 {
// Ack/flapping end event. Get the start event time:
var lst []struct {
EventTime int64
EventTimeUsec uint32
}
cacheSelect(
*tx, &lst, "SELECT event_time, event_time_usec FROM last_start_time WHERE object_id=?",
idoRow.ObjectId,
)
// If we have that, ...
if len(lst) > 0 {
// ... save the start event time for the actual migration:
cacheExec(
*tx,
"INSERT INTO end_start_time(history_id, event_time, event_time_usec) VALUES (?, ?, ?)",
idoRow.Id, lst[0].EventTime, lst[0].EventTimeUsec,
)
// This previously queried info isn't needed anymore.
cacheExec(*tx, "DELETE FROM last_start_time WHERE object_id=?", idoRow.ObjectId)
}
} else {
// Ack/flapping start event directly after another start event (per checkable).
// The old one won't have (but the new one will) an end event (which will need its time).
cacheExec(*tx, "DELETE FROM last_start_time WHERE object_id=?", idoRow.ObjectId)
// An ack/flapping start event. The following end event (per checkable) will need its time.
cacheExec(
*tx, "INSERT INTO last_start_time(object_id, event_time, event_time_usec) VALUES (?, ?, ?)",
idoRow.ObjectId, idoRow.EventTime, idoRow.EventTimeUsec,
)
}
commitPeriodically()
checkpoint = idoRow.Id
}
ht.bar.IncrBy(len(idoRows))
return
},
)
// This never queried info isn't needed anymore.
cacheExec(*tx, "DELETE FROM last_start_time")
})
ht.bar.SetTotal(ht.bar.Current(), true)
}
// buildPreviousHardStateCache rationale:
//
// Icinga DB's state_history#previous_hard_state would need a subquery.
// That make the IDO reading even slower than the Icinga DB writing.
// Therefore: Stream IDO's icinga_statehistory once, compute state_history#previous_hard_state
// and cache it into an SQLite database. Then steam from that database and the IDO.
//
// Similar for notifications. (On non-recoverable errors the whole program exits.)
func buildPreviousHardStateCache(ht *historyType, idoColumns []string) {
type row = struct {
Id uint64
ObjectId uint64
LastHardState uint8
}
chunkCacheTx(ht.cache, func(tx **sqlx.Tx, commitPeriodically func()) {
var nextIds struct {
Cnt int64
MinId sql.NullInt64
}
cacheGet(*tx, &nextIds, "SELECT COUNT(*) cnt, MIN(history_id) min_id FROM next_ids")
var previousHardStateCnt int64
cacheGet(*tx, &previousHardStateCnt, "SELECT COUNT(*) FROM previous_hard_state")
var checkpoint int64
if nextIds.MinId.Valid { // there are next_ids
checkpoint = nextIds.MinId.Int64 // this kind of caches is filled descending
} else { // there aren't any next_ids
// next_ids contains the most recently processed IDs and is only empty if...
if previousHardStateCnt == 0 {
// ... we didn't actually start yet...
checkpoint = math.MaxInt64 // start from the largest (possible) ID
} else {
// ... or we've already finished.
checkpoint = 0 // make following query no-op
}
}
ht.bar.SetCurrent(previousHardStateCnt + nextIds.Cnt)
// We continue where we finished before. As we build the cache in reverse chronological order:
// 1. If the history grows between two migration trials, we won't migrate the difference. Workarounds:
// a. Start migration after Icinga DB is up and running.
// b. Remove the cache before the next migration trial.
// 2. If the history gets cleaned up between two migration trials,
// the difference either just doesn't appear in the cache or - if already there - will be ignored later.
// Stream source data...
sliceIdoHistory(
ht,
"SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+
// For actual migration icinga_objects will be joined anyway,
// so it makes no sense to take vanished objects into account.
" xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+
ht.idoIdColumn+" <= :toid AND xh."+
ht.idoIdColumn+" < :checkpoint ORDER BY xh."+ht.idoIdColumn+" DESC LIMIT :bulk",
nil, checkpoint, // ... since we were interrupted:
func(idoRows []row) (checkpoint interface{}) {
for _, idoRow := range idoRows {
var nhs []struct{ NextHardState uint8 }
cacheSelect(*tx, &nhs, "SELECT next_hard_state FROM next_hard_state WHERE object_id=?", idoRow.ObjectId)
if len(nhs) < 1 { // we just started (per checkable)
// At the moment (we're "travelling back in time") that's the checkable's hard state:
cacheExec(
*tx, "INSERT INTO next_hard_state(object_id, next_hard_state) VALUES (?, ?)",
idoRow.ObjectId, idoRow.LastHardState,
)
// But for the current time point the previous hard state isn't known, yet:
cacheExec(
*tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
idoRow.Id, idoRow.ObjectId,
)
} else if idoRow.LastHardState == nhs[0].NextHardState {
// The hard state didn't change yet (per checkable),
// so this time point also awaits the previous hard state.
cacheExec(
*tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
idoRow.Id, idoRow.ObjectId,
)
} else { // the hard state changed (per checkable)
// That past hard state is now available for the processed future time points:
cacheExec(
*tx,
"INSERT INTO previous_hard_state(history_id, previous_hard_state) "+
"SELECT history_id, ? FROM next_ids WHERE object_id=?",
idoRow.LastHardState, idoRow.ObjectId,
)
// Now they have what they wanted:
cacheExec(*tx, "DELETE FROM next_hard_state WHERE object_id=?", idoRow.ObjectId)
cacheExec(*tx, "DELETE FROM next_ids WHERE object_id=?", idoRow.ObjectId)
// That's done.
// Now do the same thing as in the "we just started" case above, for the same reason:
cacheExec(
*tx, "INSERT INTO next_hard_state(object_id, next_hard_state) VALUES (?, ?)",
idoRow.ObjectId, idoRow.LastHardState,
)
cacheExec(
*tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
idoRow.Id, idoRow.ObjectId,
)
}
commitPeriodically()
checkpoint = idoRow.Id
}
ht.bar.IncrBy(len(idoRows))
return
},
)
// No past hard state is available for the processed future time points, assuming pending:
cacheExec(
*tx, "INSERT INTO previous_hard_state(history_id, previous_hard_state) SELECT history_id, 99 FROM next_ids",
)
// Now they should have what they wanted:
cacheExec(*tx, "DELETE FROM next_hard_state")
cacheExec(*tx, "DELETE FROM next_ids")
})
ht.bar.SetTotal(ht.bar.Current(), true)
}
// chunkCacheTx rationale: during do operate on cache via *tx. After every completed operation call commitPeriodically()
// which periodically commits *tx and starts a new tx. (That's why tx is a **, not just a *.)
// (On non-recoverable errors the whole program exits.)
func chunkCacheTx(cache *sqlx.DB, do func(tx **sqlx.Tx, commitPeriodically func())) {
logger := log.With("backend", "cache")
tx, err := cache.Beginx()
if err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "can't begin transaction"))
}
const commitInterval = 5 * time.Minute
nextCommit := time.Now().Add(commitInterval)
do(&tx, func() { // commitPeriodically
if now := time.Now(); now.After(nextCommit) {
if err := tx.Commit(); err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "can't commit transaction"))
}
var err error
tx, err = cache.Beginx()
if err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "can't begin transaction"))
}
nextCommit = nextCommit.Add(commitInterval)
}
})
if err := tx.Commit(); err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "can't commit transaction"))
}
}
// cacheGet does cache.Get(dest, query, args...). (On non-recoverable errors the whole program exits.)
func cacheGet(cache interface {
Get(dest interface{}, query string, args ...interface{}) error
}, dest interface{}, query string, args ...interface{}) {
if err := cache.Get(dest, query, args...); err != nil {
log.With("backend", "cache", "query", query, "args", args).
Fatalf("%+v", errors.Wrap(err, "can't perform query"))
}
}
// cacheSelect does cacheTx.Select(dest, query, args...). (On non-recoverable errors the whole program exits.)
func cacheSelect(cacheTx *sqlx.Tx, dest interface{}, query string, args ...interface{}) {
if err := cacheTx.Select(dest, query, args...); err != nil {
log.With("backend", "cache", "query", query, "args", args).
Fatalf("%+v", errors.Wrap(err, "can't perform query"))
}
}
// cacheExec does cacheTx.Exec(dml, args...). On non-recoverable errors the whole program exits.
func cacheExec(cacheTx *sqlx.Tx, dml string, args ...interface{}) {
if _, err := cacheTx.Exec(dml, args...); err != nil {
log.With("backend", "cache", "dml", dml, "args", args).Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
}
}