fix(search): BuildIndex concurrency error (#7035)

This commit is contained in:
Rammiah 2024-08-22 00:44:55 +08:00 committed by GitHub
parent 74887922b4
commit 48f50a2ceb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 55 additions and 22 deletions

View File

@ -3,5 +3,6 @@ package errs
import "fmt" import "fmt"
var ( var (
SearchNotAvailable = fmt.Errorf("search not available") SearchNotAvailable = fmt.Errorf("search not available")
BuildIndexIsRunning = fmt.Errorf("build index is running, please try later")
) )

View File

@ -136,9 +136,7 @@ func List(ctx context.Context, storage driver.Driver, path string, args model.Li
model.WrapObjsName(files) model.WrapObjsName(files)
// call hooks // call hooks
go func(reqPath string, files []model.Obj) { go func(reqPath string, files []model.Obj) {
for _, hook := range objsUpdateHooks { HandleObjsUpdateHook(reqPath, files)
hook(reqPath, files)
}
}(utils.GetFullPath(storage.GetStorage().MountPath, path), files) }(utils.GetFullPath(storage.GetStorage().MountPath, path), files)
// sort objs // sort objs

View File

@ -5,10 +5,12 @@ import (
"path" "path"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/alist-org/alist/v3/internal/conf" "github.com/alist-org/alist/v3/internal/conf"
"github.com/alist-org/alist/v3/internal/errs"
"github.com/alist-org/alist/v3/internal/fs" "github.com/alist-org/alist/v3/internal/fs"
"github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op" "github.com/alist-org/alist/v3/internal/op"
@ -21,10 +23,13 @@ import (
) )
var ( var (
Running = atomic.Bool{} Quit = atomic.Pointer[chan struct{}]{}
Quit chan struct{}
) )
func Running() bool {
return Quit.Load() != nil
}
func BuildIndex(ctx context.Context, indexPaths, ignorePaths []string, maxDepth int, count bool) error { func BuildIndex(ctx context.Context, indexPaths, ignorePaths []string, maxDepth int, count bool) error {
var ( var (
err error err error
@ -33,11 +38,27 @@ func BuildIndex(ctx context.Context, indexPaths, ignorePaths []string, maxDepth
) )
log.Infof("build index for: %+v", indexPaths) log.Infof("build index for: %+v", indexPaths)
log.Infof("ignore paths: %+v", ignorePaths) log.Infof("ignore paths: %+v", ignorePaths)
Running.Store(true) quit := make(chan struct{}, 1)
Quit = make(chan struct{}, 1) if !Quit.CompareAndSwap(nil, &quit) {
indexMQ := mq.NewInMemoryMQ[ObjWithParent]() // other goroutine is running
return errs.BuildIndexIsRunning
}
var (
indexMQ = mq.NewInMemoryMQ[ObjWithParent]()
running = atomic.Bool{} // current goroutine running
wg = &sync.WaitGroup{}
)
running.Store(true)
wg.Add(1)
go func() { go func() {
ticker := time.NewTicker(time.Second) ticker := time.NewTicker(time.Second)
defer func() {
Quit.Store(nil)
wg.Done()
// notify walk to exit when StopIndex api called
running.Store(false)
ticker.Stop()
}()
tickCount := 0 tickCount := 0
for { for {
select { select {
@ -70,9 +91,8 @@ func BuildIndex(ctx context.Context, indexPaths, ignorePaths []string, maxDepth
} }
}) })
case <-Quit: case <-quit:
Running.Store(false) log.Debugf("build index for %+v received quit", indexPaths)
ticker.Stop()
eMsg := "" eMsg := ""
now := time.Now() now := time.Now()
originErr := err originErr := err
@ -100,14 +120,22 @@ func BuildIndex(ctx context.Context, indexPaths, ignorePaths []string, maxDepth
}) })
} }
}) })
log.Debugf("build index for %+v quit success", indexPaths)
return return
} }
} }
}() }()
defer func() { defer func() {
if Running.Load() { if !running.Load() || Quit.Load() != &quit {
Quit <- struct{}{} log.Debugf("build index for %+v stopped by StopIndex", indexPaths)
return
} }
select {
// avoid goroutine leak
case quit <- struct{}{}:
default:
}
wg.Wait()
}() }()
admin, err := op.GetAdmin() admin, err := op.GetAdmin()
if err != nil { if err != nil {
@ -121,7 +149,7 @@ func BuildIndex(ctx context.Context, indexPaths, ignorePaths []string, maxDepth
} }
for _, indexPath := range indexPaths { for _, indexPath := range indexPaths {
walkFn := func(indexPath string, info model.Obj) error { walkFn := func(indexPath string, info model.Obj) error {
if !Running.Load() { if !running.Load() {
return filepath.SkipDir return filepath.SkipDir
} }
for _, avoidPath := range ignorePaths { for _, avoidPath := range ignorePaths {
@ -167,7 +195,7 @@ func Config(ctx context.Context) searcher.Config {
} }
func Update(parent string, objs []model.Obj) { func Update(parent string, objs []model.Obj) {
if instance == nil || !instance.Config().AutoUpdate || !setting.GetBool(conf.AutoUpdateIndex) || Running.Load() { if instance == nil || !instance.Config().AutoUpdate || !setting.GetBool(conf.AutoUpdateIndex) || Running() {
return return
} }
if isIgnorePath(parent) { if isIgnorePath(parent) {

View File

@ -27,7 +27,7 @@ func Init(mode string) error {
} }
instance = nil instance = nil
} }
if Running.Load() { if Running() {
return fmt.Errorf("index is running") return fmt.Errorf("index is running")
} }
if mode == "none" { if mode == "none" {

View File

@ -57,5 +57,7 @@ func (mq *inMemoryMQ[T]) Clear() {
} }
func (mq *inMemoryMQ[T]) Len() int { func (mq *inMemoryMQ[T]) Len() int {
mq.Lock()
defer mq.Unlock()
return mq.queue.Len() return mq.queue.Len()
} }

View File

@ -19,7 +19,7 @@ type UpdateIndexReq struct {
} }
func BuildIndex(c *gin.Context) { func BuildIndex(c *gin.Context) {
if search.Running.Load() { if search.Running() {
common.ErrorStrResp(c, "index is running", 400) common.ErrorStrResp(c, "index is running", 400)
return return
} }
@ -45,7 +45,7 @@ func UpdateIndex(c *gin.Context) {
common.ErrorResp(c, err, 400) common.ErrorResp(c, err, 400)
return return
} }
if search.Running.Load() { if search.Running() {
common.ErrorStrResp(c, "index is running", 400) common.ErrorStrResp(c, "index is running", 400)
return return
} }
@ -72,16 +72,20 @@ func UpdateIndex(c *gin.Context) {
} }
func StopIndex(c *gin.Context) { func StopIndex(c *gin.Context) {
if !search.Running.Load() { quit := search.Quit.Load()
if quit == nil {
common.ErrorStrResp(c, "index is not running", 400) common.ErrorStrResp(c, "index is not running", 400)
return return
} }
search.Quit <- struct{}{} select {
case *quit <- struct{}{}:
default:
}
common.SuccessResp(c) common.SuccessResp(c)
} }
func ClearIndex(c *gin.Context) { func ClearIndex(c *gin.Context) {
if search.Running.Load() { if search.Running() {
common.ErrorStrResp(c, "index is running", 400) common.ErrorStrResp(c, "index is running", 400)
return return
} }