api: use app level lock when writing to database
Some checks failed
/ push (push) Has been cancelled
Some checks failed
/ push (push) Has been cancelled
because of database locked error
This commit is contained in:
parent
646010e3e4
commit
1896839664
|
@ -3,6 +3,7 @@ package api
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/stephenafamo/bob"
|
"github.com/stephenafamo/bob"
|
||||||
"github.com/teivah/broadcast"
|
"github.com/teivah/broadcast"
|
||||||
|
@ -32,6 +33,8 @@ type API struct {
|
||||||
|
|
||||||
subscriber message.Subscriber
|
subscriber message.Subscriber
|
||||||
publisher message.Publisher
|
publisher message.Publisher
|
||||||
|
|
||||||
|
mu *sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type Dependencies struct {
|
type Dependencies struct {
|
||||||
|
@ -59,6 +62,7 @@ func New(deps Dependencies) *API {
|
||||||
reddit: deps.Reddit,
|
reddit: deps.Reddit,
|
||||||
subscriber: deps.Subscriber,
|
subscriber: deps.Subscriber,
|
||||||
publisher: deps.Publisher,
|
publisher: deps.Publisher,
|
||||||
|
mu: &sync.Mutex{},
|
||||||
}
|
}
|
||||||
|
|
||||||
api.scheduler = scheduler.New(api.scheduleRun)
|
api.scheduler = scheduler.New(api.scheduleRun)
|
||||||
|
|
|
@ -13,12 +13,13 @@ import (
|
||||||
|
|
||||||
type DeviceCreateParams = models.DeviceSetter
|
type DeviceCreateParams = models.DeviceSetter
|
||||||
|
|
||||||
func (api *API) DevicesCreate(ctx context.Context, params *models.Device) (*models.Device, error) {
|
func (api *API) DevicesCreate(ctx context.Context, params *models.Device) (device *models.Device, err error) {
|
||||||
ctx, span := tracer.Start(ctx, "*API.DevicesCreate")
|
ctx, span := tracer.Start(ctx, "*API.DevicesCreate")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
device, err := models.Devices.Insert(ctx, api.db, &models.DeviceSetter{
|
api.lockf(func() {
|
||||||
|
device, err = models.Devices.Insert(ctx, api.db, &models.DeviceSetter{
|
||||||
Slug: omit.From(params.Slug),
|
Slug: omit.From(params.Slug),
|
||||||
Name: omit.From(params.Name),
|
Name: omit.From(params.Name),
|
||||||
ResolutionX: omit.From(params.ResolutionX),
|
ResolutionX: omit.From(params.ResolutionX),
|
||||||
|
@ -34,6 +35,7 @@ func (api *API) DevicesCreate(ctx context.Context, params *models.Device) (*mode
|
||||||
CreatedAt: omit.From(now.Unix()),
|
CreatedAt: omit.From(now.Unix()),
|
||||||
UpdatedAt: omit.From(now.Unix()),
|
UpdatedAt: omit.From(now.Unix()),
|
||||||
})
|
})
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var sqliteErr sqlite3.Error
|
var sqliteErr sqlite3.Error
|
||||||
if errors.As(err, &sqliteErr) {
|
if errors.As(err, &sqliteErr) {
|
||||||
|
|
|
@ -15,7 +15,9 @@ func (api *API) DevicesUpdate(ctx context.Context, slug string, update *models.D
|
||||||
|
|
||||||
device = &models.Device{Slug: slug}
|
device = &models.Device{Slug: slug}
|
||||||
|
|
||||||
|
api.lockf(func() {
|
||||||
err = models.Devices.Update(ctx, api.db, update, device)
|
err = models.Devices.Update(ctx, api.db, update, device)
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var sqliteErr sqlite3.Error
|
var sqliteErr sqlite3.Error
|
||||||
if errors.As(err, &sqliteErr) {
|
if errors.As(err, &sqliteErr) {
|
||||||
|
|
|
@ -246,7 +246,9 @@ func (api *API) saveImageToFSAndDatabase(ctx context.Context, image io.ReadClose
|
||||||
}
|
}
|
||||||
|
|
||||||
log.New(ctx).Debug("inserting images to database", "images", many)
|
log.New(ctx).Debug("inserting images to database", "images", many)
|
||||||
|
api.lockf(func() {
|
||||||
_, err = models.Images.InsertMany(ctx, api.db, many...)
|
_, err = models.Images.InsertMany(ctx, api.db, many...)
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errs.Wrapw(err, "failed to insert images to database", "params", many)
|
return errs.Wrapw(err, "failed to insert images to database", "params", many)
|
||||||
}
|
}
|
||||||
|
|
13
api/lock.go
Normal file
13
api/lock.go
Normal file
|
@ -0,0 +1,13 @@
|
||||||
|
package api
|
||||||
|
|
||||||
|
// lockf is a helper function to ensure to
|
||||||
|
// stop other goroutines from accessing the
|
||||||
|
// same resources at the same time.
|
||||||
|
//
|
||||||
|
// e.g. Use this function to wrap any write
|
||||||
|
// database calls to avoid `database locked error`
|
||||||
|
func (api *API) lockf(f func()) {
|
||||||
|
api.mu.Lock()
|
||||||
|
defer api.mu.Unlock()
|
||||||
|
f()
|
||||||
|
}
|
|
@ -23,12 +23,14 @@ func (api *API) scheduleHistoryInsert(ctx context.Context, exec bob.Executor, pa
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
|
api.lockf(func() {
|
||||||
history, err = models.ScheduleHistories.Insert(ctx, exec, &models.ScheduleHistorySetter{
|
history, err = models.ScheduleHistories.Insert(ctx, exec, &models.ScheduleHistorySetter{
|
||||||
Subreddit: omit.FromCond(params.Subreddit, params.Subreddit != ""),
|
Subreddit: omit.FromCond(params.Subreddit, params.Subreddit != ""),
|
||||||
Status: omit.From(params.Status.Int8()),
|
Status: omit.From(params.Status.Int8()),
|
||||||
ErrorMessage: omit.FromCond(params.ErrorMessage, params.Status == ScheduleStatusError),
|
ErrorMessage: omit.FromCond(params.ErrorMessage, params.Status == ScheduleStatusError),
|
||||||
CreatedAt: omit.From(now.Unix()),
|
CreatedAt: omit.From(now.Unix()),
|
||||||
})
|
})
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return history, errs.Wrapw(err, "failed to insert schedule history", "params", params)
|
return history, errs.Wrapw(err, "failed to insert schedule history", "params", params)
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,7 +27,9 @@ func (api *API) SubredditsCreate(ctx context.Context, params *models.Subreddit)
|
||||||
UpdatedAt: omit.From(now.Unix()),
|
UpdatedAt: omit.From(now.Unix()),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
api.lockf(func() {
|
||||||
subreddit, err = models.Subreddits.Insert(ctx, api.db, set)
|
subreddit, err = models.Subreddits.Insert(ctx, api.db, set)
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var sqliteErr sqlite3.Error
|
var sqliteErr sqlite3.Error
|
||||||
if errors.As(err, &sqliteErr) {
|
if errors.As(err, &sqliteErr) {
|
||||||
|
|
3
db/db.go
3
db/db.go
|
@ -38,9 +38,6 @@ func Open(cfg *config.Config) (*sql.DB, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return db, errs.Wrapw(err, "failed to open database", "driver", driver, "db.string", dsn)
|
return db, errs.Wrapw(err, "failed to open database", "driver", driver, "db.string", dsn)
|
||||||
}
|
}
|
||||||
if driver == "sqlite3" {
|
|
||||||
db.SetMaxOpenConns(1) // SQLITE is not thread safe. This is to prevent database is locked error.
|
|
||||||
}
|
|
||||||
if cfg.Bool("db.automigrate") {
|
if cfg.Bool("db.automigrate") {
|
||||||
goose.SetLogger(goose.NopLogger())
|
goose.SetLogger(goose.NopLogger())
|
||||||
goose.SetBaseFS(Migrations)
|
goose.SetBaseFS(Migrations)
|
||||||
|
|
Loading…
Reference in a new issue