mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2026-03-27 12:43:09 -04:00
Some checks are pending
/ release (push) Waiting to run
testing-integration / test-unit (push) Waiting to run
testing-integration / test-sqlite (push) Waiting to run
testing-integration / test-mariadb (v10.6) (push) Waiting to run
testing-integration / test-mariadb (v11.8) (push) Waiting to run
testing / backend-checks (push) Waiting to run
testing / frontend-checks (push) Waiting to run
testing / test-unit (push) Blocked by required conditions
testing / test-e2e (push) Blocked by required conditions
testing / test-remote-cacher (redis) (push) Blocked by required conditions
testing / test-remote-cacher (valkey) (push) Blocked by required conditions
testing / test-remote-cacher (garnet) (push) Blocked by required conditions
testing / test-remote-cacher (redict) (push) Blocked by required conditions
testing / test-mysql (push) Blocked by required conditions
testing / test-pgsql (push) Blocked by required conditions
testing / test-sqlite (push) Blocked by required conditions
testing / security-check (push) Blocked by required conditions
If, for any reason (e.g. server crash), a task is recorded as done in the database but the logs are still in the database instead of being in storage, they need to be collected. The log_in_storage field is only set to true after the logs have been transfered to storage and can be relied upon to reflect which tasks have lingering logs. A cron job collects lingering logs every day, 3000 at a time, sleeping one second between them. In normal circumstances there will be only a few of them, even on a large instance, and there is no need to collect them as quickly as possible. When there are a lot of them for some reason, garbage collection must happen at a rate that is not too hard on storage I/O. Refs https://codeberg.org/forgejo/forgejo/issues/9999 --- Note on backports: the v11 backport is done manually because of minor conflicts. https://codeberg.org/forgejo/forgejo/pulls/10024 ## Checklist The [contributor guide](https://forgejo.org/docs/next/contributor/) contains information that will be helpful to first time contributors. There also are a few [conditions for merging Pull Requests in Forgejo repositories](https://codeberg.org/forgejo/governance/src/branch/main/PullRequestsAgreement.md). You are also welcome to join the [Forgejo development chatroom](https://matrix.to/#/#forgejo-development:matrix.org). ### Tests - I added test coverage for Go changes... - [x] in their respective `*_test.go` for unit tests. - [x] in the `tests/integration` directory if it involves interactions with a live Forgejo server. - I added test coverage for JavaScript changes... - [ ] in `web_src/js/*.test.js` if it can be unit tested. - [ ] in `tests/e2e/*.test.e2e.js` if it requires interactions with a live Forgejo server (see also the [developer guide for JavaScript testing](https://codeberg.org/forgejo/forgejo/src/branch/forgejo/tests/e2e/README.md#end-to-end-tests)). ### Documentation - [ ] I created a pull request [to the documentation](https://codeberg.org/forgejo/docs) to explain to Forgejo users how to use this change. - [x] I did not document these changes and I do not expect someone else to do it. ### Release notes - [ ] I do not want this change to show in the release notes. - [x] I want the title to show in the release notes with a link to this pull request. - [ ] I want the content of the `release-notes/<pull request number>.md` to be be used for the release notes instead of the title. <!--start release-notes-assistant--> ## Release notes <!--URL:https://codeberg.org/forgejo/forgejo--> - Bug fixes - [PR](https://codeberg.org/forgejo/forgejo/pulls/10009): <!--number 10009 --><!--line 0 --><!--description Z2FyYmFnZSBjb2xsZWN0IGxpbmdlcmluZyBhY3Rpb25zIGxvZ3M=-->garbage collect lingering actions logs<!--description--> <!--end release-notes-assistant--> Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net> Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/10009 Reviewed-by: Mathieu Fenniak <mfenniak@noreply.codeberg.org> Reviewed-by: Gusted <gusted@noreply.codeberg.org> Co-authored-by: Earl Warren <contact@earl-warren.org> Co-committed-by: Earl Warren <contact@earl-warren.org>
241 lines
6.6 KiB
Go
241 lines
6.6 KiB
Go
// Copyright 2022 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package actions
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"forgejo.org/models/dbfs"
|
|
"forgejo.org/modules/log"
|
|
"forgejo.org/modules/storage"
|
|
"forgejo.org/modules/zstd"
|
|
|
|
runnerv1 "code.forgejo.org/forgejo/actions-proto/runner/v1"
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
|
)
|
|
|
|
const (
|
|
MaxLineSize = 64 * 1024
|
|
DBFSPrefix = "actions_log/"
|
|
|
|
timeFormat = "2006-01-02T15:04:05.0000000Z07:00"
|
|
defaultBufSize = MaxLineSize
|
|
)
|
|
|
|
func ExistsLogs(ctx context.Context, filename string) (bool, error) {
|
|
name := DBFSPrefix + filename
|
|
f, err := dbfs.Open(ctx, name)
|
|
if err == nil {
|
|
f.Close()
|
|
return true, nil
|
|
}
|
|
if errors.Is(err, os.ErrNotExist) {
|
|
return false, nil
|
|
}
|
|
return false, err
|
|
}
|
|
|
|
// WriteLogs appends logs to DBFS file for temporary storage.
|
|
// It doesn't respect the file format in the filename like ".zst", since it's difficult to reopen a closed compressed file and append new content.
|
|
// Why doesn't it store logs in object storage directly? Because it's not efficient to append content to object storage.
|
|
func WriteLogs(ctx context.Context, filename string, offset int64, rows []*runnerv1.LogRow) ([]int, error) {
|
|
flag := os.O_WRONLY
|
|
if offset == 0 {
|
|
// Create file only if offset is 0, or it could result in content holes if the file doesn't exist.
|
|
flag |= os.O_CREATE
|
|
}
|
|
name := DBFSPrefix + filename
|
|
f, err := dbfs.OpenFile(ctx, name, flag)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("dbfs OpenFile %q: %w", name, err)
|
|
}
|
|
defer f.Close()
|
|
|
|
stat, err := f.Stat()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("dbfs Stat %q: %w", name, err)
|
|
}
|
|
if stat.Size() < offset {
|
|
// If the size is less than offset, refuse to write, or it could result in content holes.
|
|
// However, if the size is greater than offset, we can still write to overwrite the content.
|
|
return nil, fmt.Errorf("size of %q is less than offset", name)
|
|
}
|
|
|
|
if _, err := f.Seek(offset, io.SeekStart); err != nil {
|
|
return nil, fmt.Errorf("dbfs Seek %q: %w", name, err)
|
|
}
|
|
|
|
writer := bufio.NewWriterSize(f, defaultBufSize)
|
|
|
|
ns := make([]int, 0, len(rows))
|
|
for _, row := range rows {
|
|
n, err := writer.WriteString(FormatLog(row.Time.AsTime(), row.Content) + "\n")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ns = append(ns, n)
|
|
}
|
|
|
|
if err := writer.Flush(); err != nil {
|
|
return nil, err
|
|
}
|
|
return ns, nil
|
|
}
|
|
|
|
func ReadLogs(ctx context.Context, inStorage bool, filename string, offset, limit int64) ([]*runnerv1.LogRow, error) {
|
|
f, err := OpenLogs(ctx, inStorage, filename)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer f.Close()
|
|
|
|
if _, err := f.Seek(offset, io.SeekStart); err != nil {
|
|
return nil, fmt.Errorf("file seek: %w", err)
|
|
}
|
|
|
|
scanner := bufio.NewScanner(f)
|
|
maxLineSize := len(timeFormat) + MaxLineSize + 1
|
|
scanner.Buffer(make([]byte, maxLineSize), maxLineSize)
|
|
|
|
var rows []*runnerv1.LogRow
|
|
for scanner.Scan() && (int64(len(rows)) < limit || limit < 0) {
|
|
t, c, err := ParseLog(scanner.Text())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse log %q: %w", scanner.Text(), err)
|
|
}
|
|
rows = append(rows, &runnerv1.LogRow{
|
|
Time: timestamppb.New(t),
|
|
Content: c,
|
|
})
|
|
}
|
|
|
|
if err := scanner.Err(); err != nil {
|
|
return nil, fmt.Errorf("ReadLogs scan: %w", err)
|
|
}
|
|
|
|
return rows, nil
|
|
}
|
|
|
|
const (
|
|
// logZstdBlockSize is the block size for zstd compression.
|
|
// 128KB leads the compression ratio to be close to the regular zstd compression.
|
|
// And it means each read from the underlying object storage will be at least 128KB*(compression ratio).
|
|
// The compression ratio is about 30% for text files, so the actual read size is about 38KB, which should be acceptable.
|
|
logZstdBlockSize = 128 * 1024 // 128KB
|
|
)
|
|
|
|
// TransferLogs transfers logs from DBFS to object storage.
|
|
// It happens when the file is complete and no more logs will be appended.
|
|
// It respects the file format in the filename like ".zst", and compresses the content if needed.
|
|
func TransferLogs(ctx context.Context, filename string) (func(), error) {
|
|
name := DBFSPrefix + filename
|
|
remove := func() {
|
|
if err := dbfs.Remove(ctx, name); err != nil {
|
|
log.Warn("dbfs remove %q: %v", name, err)
|
|
}
|
|
}
|
|
f, err := dbfs.Open(ctx, name)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("dbfs open %q: %w", name, err)
|
|
}
|
|
defer f.Close()
|
|
|
|
var reader io.Reader = f
|
|
if strings.HasSuffix(filename, ".zst") {
|
|
r, w := io.Pipe()
|
|
reader = r
|
|
zstdWriter, err := zstd.NewSeekableWriter(w, logZstdBlockSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("zstd NewSeekableWriter: %w", err)
|
|
}
|
|
go func() {
|
|
defer func() {
|
|
_ = w.CloseWithError(zstdWriter.Close())
|
|
}()
|
|
if _, err := io.Copy(zstdWriter, f); err != nil {
|
|
_ = w.CloseWithError(err)
|
|
return
|
|
}
|
|
}()
|
|
}
|
|
|
|
if _, err := storage.Actions.Save(filename, reader, -1); err != nil {
|
|
return nil, fmt.Errorf("storage save %q: %w", filename, err)
|
|
}
|
|
return remove, nil
|
|
}
|
|
|
|
func RemoveLogs(ctx context.Context, inStorage bool, filename string) error {
|
|
if !inStorage {
|
|
name := DBFSPrefix + filename
|
|
err := dbfs.Remove(ctx, name)
|
|
if err != nil {
|
|
if errors.Is(err, os.ErrNotExist) {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("dbfs remove %q: %w", name, err)
|
|
}
|
|
return nil
|
|
}
|
|
err := storage.Actions.Delete(filename)
|
|
if err != nil {
|
|
return fmt.Errorf("storage delete %q: %w", filename, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func OpenLogs(ctx context.Context, inStorage bool, filename string) (io.ReadSeekCloser, error) {
|
|
if !inStorage {
|
|
name := DBFSPrefix + filename
|
|
f, err := dbfs.Open(ctx, name)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("dbfs open %q: %w", name, err)
|
|
}
|
|
return f, nil
|
|
}
|
|
|
|
f, err := storage.Actions.Open(filename)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storage open %q: %w", filename, err)
|
|
}
|
|
|
|
var reader io.ReadSeekCloser = f
|
|
if strings.HasSuffix(filename, ".zst") {
|
|
r, err := zstd.NewSeekableReader(f)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("zstd NewSeekableReader: %w", err)
|
|
}
|
|
reader = r
|
|
}
|
|
|
|
return reader, nil
|
|
}
|
|
|
|
func FormatLog(timestamp time.Time, content string) string {
|
|
// Content shouldn't contain new line, it will break log indexes, other control chars are safe.
|
|
content = strings.ReplaceAll(content, "\n", `\n`)
|
|
if len(content) > MaxLineSize {
|
|
content = content[:MaxLineSize]
|
|
}
|
|
return fmt.Sprintf("%s %s", timestamp.UTC().Format(timeFormat), content)
|
|
}
|
|
|
|
func ParseLog(in string) (time.Time, string, error) {
|
|
index := strings.IndexRune(in, ' ')
|
|
if index < 0 {
|
|
return time.Time{}, "", fmt.Errorf("invalid log: %q", in)
|
|
}
|
|
timestamp, err := time.Parse(timeFormat, in[:index])
|
|
if err != nil {
|
|
return time.Time{}, "", err
|
|
}
|
|
return timestamp, in[index+1:], nil
|
|
}
|