icingadb/cmd/icingadb-migrate/misc.go
2024-05-24 09:56:28 +02:00

319 lines
9.3 KiB
Go

package main
import (
"context"
"crypto/sha1"
"github.com/icinga/icinga-go-library/database"
"github.com/icinga/icinga-go-library/objectpacker"
"github.com/icinga/icinga-go-library/types"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"github.com/vbauerster/mpb/v6"
"github.com/vbauerster/mpb/v6/decor"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"strings"
"time"
)
type IdoMigrationProgressUpserter struct {
LastIdoId any `json:"last_ido_id"`
}
// Upsert implements the contracts.Upserter interface.
func (impu *IdoMigrationProgressUpserter) Upsert() interface{} {
return impu
}
type IdoMigrationProgress struct {
IdoMigrationProgressUpserter `json:",inline"`
EnvironmentId string `json:"environment_id"`
HistoryType string `json:"history_type"`
FromTs int32 `json:"from_ts"`
ToTs int32 `json:"to_ts"`
}
// Assert interface compliance.
var (
_ database.Upserter = (*IdoMigrationProgressUpserter)(nil)
_ database.Upserter = (*IdoMigrationProgress)(nil)
)
// log is the root logger.
var log = func() *zap.SugaredLogger {
logger, err := zap.NewDevelopmentConfig().Build()
if err != nil {
panic(err)
}
return logger.Sugar()
}()
// objectTypes maps IDO values to Icinga DB ones.
var objectTypes = map[uint8]string{1: "host", 2: "service"}
// hashAny combines objectpacker.PackAny and SHA1 hashing.
func hashAny(in interface{}) []byte {
hash := sha1.New()
if err := objectpacker.PackAny(in, hash); err != nil {
panic(err)
}
return hash.Sum(nil)
}
// convertTime converts *nix timestamps from the IDO for Icinga DB.
func convertTime(ts int64, tsUs uint32) types.UnixMilli {
if ts == 0 && tsUs == 0 {
return types.UnixMilli{}
}
return types.UnixMilli(time.Unix(ts, int64(tsUs)*int64(time.Microsecond/time.Nanosecond)))
}
// calcObjectId calculates the ID of the config object named name1 for Icinga DB.
func calcObjectId(env, name1 string) []byte {
if name1 == "" {
return nil
}
return hashAny([2]string{env, name1})
}
// calcServiceId calculates the ID of the service name2 of the host name1 for Icinga DB.
func calcServiceId(env, name1, name2 string) []byte {
if name2 == "" {
return nil
}
return hashAny([2]string{env, name1 + "!" + name2})
}
// sliceIdoHistory performs query with args+fromid,toid,checkpoint,bulk on ht.snapshot
// and passes the results to onRows until either an empty result set or onRows() returns nil.
// Rationale: split the likely large result set of a query by adding a WHERE condition and a LIMIT,
// both with :named placeholders (:checkpoint, :bulk).
// checkpoint is the initial value for the WHERE condition, onRows() returns follow-up ones.
// (On non-recoverable errors the whole program exits.)
func sliceIdoHistory[Row any](
ht *historyType, query string, args map[string]any,
checkpoint interface{}, onRows func([]Row) (checkpoint interface{}),
) {
if args == nil {
args = map[string]interface{}{}
}
args["fromid"] = ht.fromId
args["toid"] = ht.toId
args["checkpoint"] = checkpoint
args["bulk"] = 20000
if ht.snapshot.DriverName() != database.MySQL {
query = strings.ReplaceAll(query, " USE INDEX (PRIMARY)", "")
}
for {
// TODO: use Tx#SelectNamed() one nice day (https://github.com/jmoiron/sqlx/issues/779)
stmt, err := ht.snapshot.PrepareNamed(query)
if err != nil {
log.With("query", query).Fatalf("%+v", errors.Wrap(err, "can't prepare query"))
}
var rows []Row
if err := stmt.Select(&rows, args); err != nil {
log.With("query", query).Fatalf("%+v", errors.Wrap(err, "can't perform query"))
}
_ = stmt.Close()
if len(rows) < 1 {
break
}
if checkpoint = onRows(rows); checkpoint == nil {
break
}
args["checkpoint"] = checkpoint
}
}
type progressBar struct {
*mpb.Bar
lastUpdate time.Time
}
// IncrBy does pb.Bar.DecoratorEwmaUpdate() automatically.
func (pb *progressBar) IncrBy(n int) {
pb.Bar.IncrBy(n)
now := time.Now()
if !pb.lastUpdate.IsZero() {
pb.Bar.DecoratorEwmaUpdate(now.Sub(pb.lastUpdate))
}
pb.lastUpdate = now
}
// historyType specifies a history data type.
type historyType struct {
// name is a human-readable common name.
name string
// idoTable specifies the source table.
idoTable string
// idoIdColumn specifies idoTable's primary key.
idoIdColumn string
// idoStartColumns specifies idoTable's event start time locations. (First non-NULL is used.)
idoStartColumns []string
// idoEndColumns specifies idoTable's event end time locations. (First non-NULL is used.)
idoEndColumns []string
// cacheSchema specifies <name>.sqlite3's structure.
cacheSchema string
// cacheFiller fills cache from snapshot.
cacheFiller func(*historyType)
// cacheLimitQuery rationale: see migrate().
cacheLimitQuery string
// migrationQuery SELECTs source data for actual migration.
migrationQuery string
// migrate does the actual migration.
migrate func(c *Config, idb *database.DB, envId []byte, ht *historyType)
// cacheFile locates <name>.sqlite3.
cacheFile string
// cache represents <cacheFile>.
cache *sqlx.DB
// snapshot represents the data source.
snapshot *sqlx.Tx
// fromId is the first IDO row ID to migrate.
fromId uint64
// toId is the last IDO row ID to migrate.
toId uint64
// total summarizes the source data.
total int64
// cacheTotal summarizes the cache source data.
cacheTotal int64
// done summarizes the migrated data.
done int64
// bar represents the current progress bar.
bar *progressBar
// lastId is the last already migrated ID.
lastId uint64
}
// setupBar (re-)initializes ht.bar.
func (ht *historyType) setupBar(progress *mpb.Progress, total int64) {
ht.bar = &progressBar{Bar: progress.AddBar(
total,
mpb.BarFillerClearOnComplete(),
mpb.PrependDecorators(
decor.Name(ht.name, decor.WC{W: len(ht.name) + 1, C: decor.DidentRight}),
decor.Percentage(decor.WC{W: 5}),
),
mpb.AppendDecorators(
decor.EwmaETA(decor.ET_STYLE_GO, 0, decor.WC{W: 4}),
decor.Name(" "),
decor.EwmaSpeed(0, "%.0f/s", 0, decor.WC{W: 4}),
),
)}
}
type historyTypes []*historyType
// forEach performs f per hts in parallel.
func (hts historyTypes) forEach(f func(*historyType)) {
eg, _ := errgroup.WithContext(context.Background())
for _, ht := range hts {
ht := ht
eg.Go(func() error {
f(ht)
return nil
})
}
_ = eg.Wait()
}
type icingaDbOutputStage struct {
insert, upsert []database.Entity
}
var typesToMigrate = historyTypes{
{
name: "ack & comment",
idoTable: "icinga_commenthistory",
idoIdColumn: "commenthistory_id",
idoStartColumns: []string{"entry_time"},
// Manual deletion time wins vs. time of expiration which never happens due to manual deletion.
idoEndColumns: []string{"deletion_time", "expiration_time"},
migrationQuery: commentMigrationQuery,
migrate: func(c *Config, idb *database.DB, envId []byte, ht *historyType) {
migrateOneType(c, idb, envId, ht, convertCommentRows)
},
},
{
name: "downtime",
idoTable: "icinga_downtimehistory",
idoIdColumn: "downtimehistory_id",
// Fall back to scheduled time if actual time is missing.
idoStartColumns: []string{"actual_start_time", "scheduled_start_time"},
idoEndColumns: []string{"actual_end_time", "scheduled_end_time"},
migrationQuery: downtimeMigrationQuery,
migrate: func(c *Config, idb *database.DB, envId []byte, ht *historyType) {
migrateOneType(c, idb, envId, ht, convertDowntimeRows)
},
},
{
name: "flapping",
idoTable: "icinga_flappinghistory",
idoIdColumn: "flappinghistory_id",
idoStartColumns: []string{"event_time"},
idoEndColumns: []string{"event_time"},
cacheSchema: eventTimeCacheSchema,
cacheFiller: func(ht *historyType) {
buildEventTimeCache(ht, []string{
"xh.flappinghistory_id id", "UNIX_TIMESTAMP(xh.event_time) event_time",
"xh.event_time_usec", "1001-xh.event_type event_is_start", "xh.object_id",
})
},
migrationQuery: flappingMigrationQuery,
migrate: func(c *Config, idb *database.DB, envId []byte, ht *historyType) {
migrateOneType(c, idb, envId, ht, convertFlappingRows)
},
},
{
name: "notification",
idoTable: "icinga_notifications",
idoIdColumn: "notification_id",
idoStartColumns: []string{"start_time"},
idoEndColumns: []string{"end_time"},
cacheSchema: previousHardStateCacheSchema,
cacheFiller: func(ht *historyType) {
buildPreviousHardStateCache(ht, []string{
"xh.notification_id id", "xh.object_id", "xh.state last_hard_state",
})
},
cacheLimitQuery: "SELECT MAX(history_id) FROM previous_hard_state",
migrationQuery: notificationMigrationQuery,
migrate: func(c *Config, idb *database.DB, envId []byte, ht *historyType) {
migrateOneType(c, idb, envId, ht, convertNotificationRows)
},
},
{
name: "state",
idoTable: "icinga_statehistory",
idoIdColumn: "statehistory_id",
idoStartColumns: []string{"state_time"},
idoEndColumns: []string{"state_time"},
cacheSchema: previousHardStateCacheSchema,
cacheFiller: func(ht *historyType) {
buildPreviousHardStateCache(ht, []string{"xh.statehistory_id id", "xh.object_id", "xh.last_hard_state"})
},
cacheLimitQuery: "SELECT MAX(history_id) FROM previous_hard_state",
migrationQuery: stateMigrationQuery,
migrate: func(c *Config, idb *database.DB, envId []byte, ht *historyType) {
migrateOneType(c, idb, envId, ht, convertStateRows)
},
},
}