From 97341ec698f6316bb5a57ac4bd233e0d9c0bba51 Mon Sep 17 00:00:00 2001 From: Tigor Hutasuhut Date: Tue, 27 Aug 2024 09:10:58 +0700 Subject: [PATCH] sqlite: added pragma support for wal --- go/.gitignore | 2 ++ go/cmd/bluemage/main.go | 3 ++ go/cmd/bluemage/serve/serve.go | 20 ++++++++++++ go/config/default.go | 1 + go/pkg/log/sql.go | 2 +- go/server/log_handlers.go | 21 +++++++++++++ schemas/proto/buf.lock | 3 ++ schemas/proto/buf.yaml | 1 + schemas/proto/logs/v1/logs.proto | 54 ++++++++++++++++++++++++++++++++ 9 files changed, 106 insertions(+), 1 deletion(-) create mode 100644 go/server/log_handlers.go create mode 100644 schemas/proto/logs/v1/logs.proto diff --git a/go/.gitignore b/go/.gitignore index d2396d3..005b4fa 100644 --- a/go/.gitignore +++ b/go/.gitignore @@ -1,4 +1,6 @@ models/ *.db +*.db-shm +*.db-wal gen/ .env diff --git a/go/cmd/bluemage/main.go b/go/cmd/bluemage/main.go index cd1b233..174c182 100644 --- a/go/cmd/bluemage/main.go +++ b/go/cmd/bluemage/main.go @@ -36,6 +36,8 @@ func init() { flags.Float32(key, v, desc) case float64: flags.Float64(key, v, desc) + case []string: + flags.StringSlice(key, v, desc) default: flags.String(key, fmt.Sprintf("%v", v), desc) } @@ -70,6 +72,7 @@ func main() { defer cancel() ctx = config.WithContext(ctx, cfg) if err := Cmd.ExecuteContext(ctx); err != nil { + slog.ErrorContext(ctx, err.Error(), "error", err) os.Exit(1) } } diff --git a/go/cmd/bluemage/serve/serve.go b/go/cmd/bluemage/serve/serve.go index eb29607..22ad4c2 100644 --- a/go/cmd/bluemage/serve/serve.go +++ b/go/cmd/bluemage/serve/serve.go @@ -9,6 +9,7 @@ import ( "net/http" "os" "path/filepath" + "strings" "time" "connectrpc.com/connect" @@ -76,6 +77,25 @@ var Cmd = &cobra.Command{ "dsn", dsn, ) } + for i, pragma := range cfg.Strings("db.pragma") { + split := strings.SplitN(pragma, "=", 2) + if len(split) < 2 { + return errs.Failw("failed to parse db pragma. expected key=value format", + "index", i+1, + "pragma", pragma, + ) + } + key, value := split[0], split[1] + query := fmt.Sprintf("pragma %s = %s", key, value) + _, err = sqldb.Exec(query) + if err != nil { + return errs.Wrapw(err, "failed to execute pragma", + "index", i+1, + "pragma", pragma, + "query", query, + ) + } + } cleanups = append(cleanups, sqldb.Close) if err := otelsql.RegisterDBStatsMetrics(sqldb, otelsql.WithAttributes(semconv.DBSystemSqlite)); err != nil { return errs.Wrapw( diff --git a/go/config/default.go b/go/config/default.go index 7f8bf4b..bf7f4e3 100644 --- a/go/config/default.go +++ b/go/config/default.go @@ -35,6 +35,7 @@ var DefaultConfig = Entries{ {"db.driver", "sqlite3", "Database driver", false}, {"db.path", path.Join(xdg.Home, ".local", "share", "bluemage", "data.db"), "Database path", false}, + {"db.pragma", []string{"journal_mode=wal", "synchronous=normal", "temp_store=memory", "mmap_size=30000000000"}, `Database pragma options key=value comma separated values (or repeat). Example: '--db.pragma "journal_mode=wal,synchronous=normal"'`, false}, {"pubsub.db.path", path.Join(xdg.Home, ".local", "share", "bluemage", "pubsub.db"), "PubSub database path", false}, {"pubsub.db.timeout", "5s", "PubSub database timeout", false}, diff --git a/go/pkg/log/sql.go b/go/pkg/log/sql.go index d4f283c..a6daeac 100644 --- a/go/pkg/log/sql.go +++ b/go/pkg/log/sql.go @@ -44,5 +44,5 @@ func (SQLLogger) Log(ctx context.Context, level sqldblogger.Level, msg string, d qc.Args = data["args"] return } - slog.ErrorContext(ctx, strings.ReplaceAll(msg, "\n", " "), "lvl", level.String(), "data", data) + slog.DebugContext(ctx, strings.ReplaceAll(msg, "\n", " "), "lvl", level.String(), "data", data) } diff --git a/go/server/log_handlers.go b/go/server/log_handlers.go new file mode 100644 index 0000000..be98985 --- /dev/null +++ b/go/server/log_handlers.go @@ -0,0 +1,21 @@ +package server + +import ( + "context" + + "connectrpc.com/connect" + logsv1 "github.com/tigorlazuardi/bluemage/go/gen/proto/logs/v1" + "github.com/tigorlazuardi/bluemage/go/gen/proto/logs/v1/logsv1connect" +) + +type LogHandlers struct { + logsv1connect.UnimplementedLogServiceHandler +} + +func (lo *LogHandlers) Log(ctx context.Context, request *connect.Request[logsv1.LogRequest]) (*connect.Response[logsv1.LogResponse], error) { + panic("not implemented") // TODO: Implement +} + +func (lo *LogHandlers) Tail(ctx context.Context, request *connect.Request[logsv1.TailRequest], response *connect.ServerStream[logsv1.TailResponse]) error { + panic("not implemented") // TODO: Implement +} diff --git a/schemas/proto/buf.lock b/schemas/proto/buf.lock index 186cc36..2b4a48a 100644 --- a/schemas/proto/buf.lock +++ b/schemas/proto/buf.lock @@ -4,3 +4,6 @@ deps: - name: buf.build/bufbuild/protovalidate commit: a6c49f84cc0f4e038680d390392e2ab0 digest: b5:e968392e88ff7915adcbd1635d670b45bff8836ec2415d81fc559ca5470a695dbdc30030bad8bc5764647c731079e9e7bba0023ea25c4e4a1672a7d2561d4a19 + - name: buf.build/protocolbuffers/wellknowntypes + commit: f17e05fe4a764a3482b8e033daec742e + digest: b5:405a06d8a554f01c830c643879f54feffe2087a6927643dc9418403653ff4033227a2ce9b2b19b8ad350a611ef6576b4df109c96e5dfb6164191eb73d779fb21 diff --git a/schemas/proto/buf.yaml b/schemas/proto/buf.yaml index 291ef16..13da433 100644 --- a/schemas/proto/buf.yaml +++ b/schemas/proto/buf.yaml @@ -1,3 +1,4 @@ version: v2 deps: - buf.build/bufbuild/protovalidate + - buf.build/protocolbuffers/wellknowntypes:v21.12 diff --git a/schemas/proto/logs/v1/logs.proto b/schemas/proto/logs/v1/logs.proto new file mode 100644 index 0000000..8db4b73 --- /dev/null +++ b/schemas/proto/logs/v1/logs.proto @@ -0,0 +1,54 @@ +syntax = "proto3"; + +package logs.v1; + +service LogService { + rpc Log(LogRequest) returns (LogResponse) {} + rpc Tail(TailRequest) returns (stream TailResponse) {} +} + +message LogRequest { + Level level = 1; + string search = 2; + string trace_id = 3; + int64 from = 4; + int64 to = 5; + int64 limit = 6; +} + +message LogResponse { + repeated Log logs = 1; +} + +message Log { + int64 timestamp = 1; + Level level = 2; + string message = 3; + Origin origin = 4; + string trace_id = 5; + string span_id = 6; + optional bytes details = 7; + optional bytes error = 8; +} + +enum Level { + LEVEL_UNSPECIFIED = 0; + LEVEL_DEBUG = 1; + LEVEL_INFO = 2; + LEVEL_WARN = 3; + LEVEL_ERROR = 4; +} + +message Origin { + string file = 1; + int64 line = 2; + string function = 3; +} + +message TailRequest { + Level level = 1; +} + +message TailResponse { + Log log = 1; +}