diff --git a/server/common/mime.go b/server/common/mime.go index a17a2f1b..ea8b9c4d 100644 --- a/server/common/mime.go +++ b/server/common/mime.go @@ -8,9 +8,9 @@ import ( "strings" ) -var MimeTypes map[string]string +var MimeTypes map[string]string = map[string]string{ "txt": "text/plain" } -func init(){ +func init() { path := filepath.Join(GetCurrentDir(), CONFIG_PATH + "mime.json") if f, err := os.OpenFile(path, os.O_RDONLY, os.ModePerm); err == nil { j, _ := ioutil.ReadAll(f) diff --git a/server/ctrl/files.go b/server/ctrl/files.go index 69fbd9c6..b419455f 100644 --- a/server/ctrl/files.go +++ b/server/ctrl/files.go @@ -46,13 +46,13 @@ func FileLs(ctx App, res http.ResponseWriter, req *http.Request) { SendErrorResult(res, err) return } - model.SProc.Append(&ctx, path) // ping the search indexer entries, err := ctx.Backend.Ls(path) if err != nil { SendErrorResult(res, err) return } + go model.SProc.HintLs(&ctx, path) files := make([]FileInfo, len(entries)) etagger := fnv.New32() @@ -154,6 +154,7 @@ func FileCat(ctx App, res http.ResponseWriter, req *http.Request) { if req.Header.Get("range") != "" { needToCreateCache = true } + go model.SProc.HintLs(&ctx, filepath.Dir(path) + "/") } // plugin hooks @@ -296,6 +297,8 @@ func FileSave(ctx App, res http.ResponseWriter, req *http.Request) { SendErrorResult(res, NewError(err.Error(), 403)) return } + go model.SProc.HintLs(&ctx, filepath.Dir(path) + "/") + go model.SProc.HintFile(&ctx, path) SendSuccessResult(res, nil) } @@ -325,6 +328,9 @@ func FileMv(ctx App, res http.ResponseWriter, req *http.Request) { SendErrorResult(res, err) return } + + go model.SProc.HintRm(&ctx, filepath.Dir(from) + "/") + go model.SProc.HintLs(&ctx, filepath.Dir(to) + "/") SendSuccessResult(res, nil) } @@ -344,6 +350,7 @@ func FileRm(ctx App, res http.ResponseWriter, req *http.Request) { SendErrorResult(res, err) return } + model.SProc.HintRm(&ctx, path) SendSuccessResult(res, nil) } @@ -364,6 +371,7 @@ func FileMkdir(ctx App, res http.ResponseWriter, req *http.Request) { SendErrorResult(res, err) return } + go model.SProc.HintLs(&ctx, filepath.Dir(path) + "/") SendSuccessResult(res, nil) } @@ -384,6 +392,7 @@ func FileTouch(ctx App, res http.ResponseWriter, req *http.Request) { SendErrorResult(res, err) return } + go model.SProc.HintLs(&ctx, filepath.Dir(path) + "/") SendSuccessResult(res, nil) } diff --git a/server/model/search.go b/server/model/search.go index c8a3b4f9..825cd1f7 100644 --- a/server/model/search.go +++ b/server/model/search.go @@ -8,8 +8,8 @@ import ( . "github.com/mickael-kerjean/filestash/server/common" "github.com/mickael-kerjean/filestash/server/model/formater" "hash/fnv" - "io" - "math/rand" + "io/ioutil" + "os" "path/filepath" "regexp" "strconv" @@ -22,6 +22,8 @@ const ( PHASE_EXPLORE = "PHASE_EXPLORE" PHASE_INDEXING = "PHASE_INDEXING" PHASE_MAINTAIN = "PHASE_MAINTAIN" + PHASE_PAUSE = "PHASE_PAUSE" + MAX_HEAP_SIZE = 100000 ) var ( SEARCH_ENABLE func() bool @@ -29,8 +31,9 @@ var ( SEARCH_PROCESS_PAR func() int SEARCH_REINDEX func() int CYCLE_TIME func() int - MAX_INDEXING_FSIZE func() int INDEXING_EXT func() string + MAX_INDEXING_FSIZE func() int + INDEXING_EXCLUSION = []string{"/node_modules/", "/bower_components/", "/.cache/", "/.npm/", "/.git/"} ) var SProc SearchProcess = SearchProcess{ @@ -70,7 +73,6 @@ func init(){ } SEARCH_PROCESS_MAX() SEARCH_PROCESS_PAR = func() int { - return 1 return Config.Get("features.search.process_par").Schema(func(f *FormElement) *FormElement { if f == nil { f = &FormElement{} @@ -139,13 +141,14 @@ func init(){ f.Name = "indexer_ext" f.Type = "string" f.Description = "File extension we want to see indexed" - f.Placeholder = "Default: org,txt,docx,pdf,md" - f.Default = "/" + f.Placeholder = "Default: org,txt,docx,pdf,md,form" + f.Default = "org,txt,docx,pdf,md,form" return f }).String() } INDEXING_EXT() + runner := func() { for { if SEARCH_ENABLE() == false { @@ -156,9 +159,6 @@ func init(){ if sidx == nil { time.Sleep(5 * time.Second) continue - } else if sidx.FoldersUnknown.Len() == 0 { - time.Sleep(5 * time.Second) - continue } sidx.mu.Lock() sidx.Execute() @@ -174,7 +174,7 @@ func Search(app *App, path string, keyword string) []File { var files []File = make([]File, 0) // extract our search indexer - s := SProc.Append(app, path) + s := SProc.HintLs(app, path) if s == nil { return files } @@ -183,7 +183,7 @@ func Search(app *App, path string, keyword string) []File { path = "/" } - rows, err := s.db.Query( + rows, err := s.DB.Query( "SELECT type, path, size, modTime FROM file WHERE path IN (" + " SELECT path FROM file_index WHERE file_index MATCH ? AND path > ? AND path < ?" + " ORDER BY rank LIMIT 2000" + @@ -194,6 +194,7 @@ func Search(app *App, path string, keyword string) []File { if err != nil { return files } + defer rows.Close() for rows.Next() { f := File{} var t string @@ -216,7 +217,7 @@ type SearchProcess struct { mu sync.Mutex } -func(this *SearchProcess) Append(app *App, path string) *SearchIndexer { +func(this *SearchProcess) HintLs(app *App, path string) *SearchIndexer { id := GenerateID(app) this.mu.Lock() defer this.mu.Unlock() @@ -244,15 +245,14 @@ func(this *SearchProcess) Append(app *App, path string) *SearchIndexer { } // Having all indexers running in memory could be expensive => instead we're cycling a pool - search_process_max := 2//SEARCH_PROCESS_MAX() + search_process_max := SEARCH_PROCESS_MAX() if len(this.idx) > ( search_process_max - 1) { toDel := this.idx[0 : len(this.idx) - ( search_process_max - 1)] for i := range toDel { - toDel[i].db.Close() + toDel[i].DB.Close() } this.idx = this.idx[len(this.idx) - ( search_process_max - 1) :] } - // instantiate the new indexer s := NewSearchIndexer(id, app.Backend) heap.Push(&s.FoldersUnknown, &Document{ @@ -265,6 +265,27 @@ func(this *SearchProcess) Append(app *App, path string) *SearchIndexer { return &s } +func(this *SearchProcess) HintRm(app *App, path string) { + id := GenerateID(app) + for i:=len(this.idx)-1; i>=0; i-- { + if id == this.idx[i].Id { + this.idx[i].DB.Exec("DELETE FROM file WHERE path >= ? AND path < ?", path, path + "~") + break + } + } +} + +func(this *SearchProcess) HintFile(app *App, path string) { + id := GenerateID(app) + for i:=len(this.idx)-1; i>=0; i-- { + if id == this.idx[i].Id { + this.idx[i].DB.Exec("UPDATE file set indexTime = NULL WHERE path = ?", path) + break + } + } +} + + func(this *SearchProcess) Peek() *SearchIndexer { if len(this.idx) == 0 { return nil @@ -280,27 +301,35 @@ func(this *SearchProcess) Peek() *SearchIndexer { return s } +func(this *SearchProcess) Reset() { + for i := range this.idx { + this.idx[i].DB.Close() + } + this.idx = make([]SearchIndexer, 0) + this.n = -1 +} type SearchIndexer struct { Id string FoldersUnknown HeapDoc - FilesUnknown HeapDoc + CurrentPhase string Backend IBackend - db *sql.DB + DBPath string + DB *sql.DB mu sync.Mutex + lastHash string } func NewSearchIndexer(id string, b IBackend) SearchIndexer { s := SearchIndexer { + DBPath: filepath.Join(GetCurrentDir(), FTS_PATH, "fts_" + id + ".sql"), Id: id, Backend: b, FoldersUnknown: make(HeapDoc, 0, 1), - FilesUnknown: make(HeapDoc, 0, 1), } heap.Init(&s.FoldersUnknown) - heap.Init(&s.FilesUnknown) - db, err := sql.Open("sqlite3", filepath.Join(GetCurrentDir(), FTS_PATH, "fts_" + id + ".sql")) + db, err := sql.Open("sqlite3", s.DBPath + "?_journal_mode=wal") if err != nil { Log.Warning("search::init can't open database (%v)", err) return s @@ -311,6 +340,7 @@ func NewSearchIndexer(id string, b IBackend) SearchIndexer { Log.Warning("search::initschema prepare schema error(%v)", err) return err } + defer stmt.Close() _, err = stmt.Exec() if err != nil { Log.Warning("search::initschema execute error(%v)", err) @@ -318,10 +348,16 @@ func NewSearchIndexer(id string, b IBackend) SearchIndexer { } return err } - if queryDB("CREATE TABLE IF NOT EXISTS file(path VARCHAR(1024) PRIMARY KEY, filename VARCHAR(64), filetype VARCHAR(16), type VARCHAR(16), size INTEGER, modTime timestamp, indexTime timestamp DEFAULT NULL);"); err != nil { + if queryDB("CREATE TABLE IF NOT EXISTS file(path VARCHAR(1024) PRIMARY KEY, filename VARCHAR(64), filetype VARCHAR(16), type VARCHAR(16), parent VARCHAR(1024), size INTEGER, modTime timestamp, indexTime timestamp DEFAULT NULL);"); err != nil { return s } - if queryDB("CREATE VIRTUAL TABLE IF NOT EXISTS file_index USING fts5(path UNINDEXED, filename, filetype, content);"); err != nil { + if queryDB("CREATE INDEX idx_file_index_time ON file(indexTime) WHERE indexTime IS NOT NULL;"); err != nil { + return s + } + if queryDB("CREATE INDEX idx_file_parent ON file(parent);"); err != nil { + return s + } + if queryDB("CREATE VIRTUAL TABLE IF NOT EXISTS file_index USING fts5(path UNINDEXED, filename, filetype, content, tokenize = 'porter');"); err != nil { return s } if queryDB("CREATE TRIGGER IF NOT EXISTS after_file_insert AFTER INSERT ON file BEGIN INSERT INTO file_index (path, filename, filetype) VALUES(new.path, new.filename, new.filetype); END;"); err != nil { @@ -333,59 +369,65 @@ func NewSearchIndexer(id string, b IBackend) SearchIndexer { if queryDB("CREATE TRIGGER IF NOT EXISTS after_file_update_path UPDATE OF path ON file BEGIN UPDATE file_index SET path = new.path, filepath = new.filepath, filetype = new.filetype WHERE path = old.path; END;"); err != nil { return s } - s.db = db + s.DB = db return s } func(this *SearchIndexer) Execute(){ - currentPhase := func() string { - if len(this.FoldersUnknown) != 0 { - return PHASE_EXPLORE - } - if len(this.FilesUnknown) != 0 { - return PHASE_INDEXING - } - return PHASE_MAINTAIN - }() - cycleExecute := func(fn func() bool) { + if this.CurrentPhase == "" { + time.Sleep(1 * time.Second) + this.CurrentPhase = PHASE_EXPLORE + } + + cycleExecute := func(fn func(*sql.Tx) bool) { stopTime := time.Now().Add(time.Duration(CYCLE_TIME()) * time.Second) + tx, err := this.DB.Begin() + if err != nil { + Log.Warning("search::index cycle_begin (%+v)", err) + time.Sleep(5 * time.Second) + } for { - if fn() == false { + if fn(tx) == false { break } if stopTime.After(time.Now()) == false { break } } + if err = tx.Commit(); err != nil { + Log.Warning("search::index cycle_commit (%+v)", err) + } } - if currentPhase == PHASE_EXPLORE { + if this.CurrentPhase == PHASE_EXPLORE { cycleExecute(this.Discover) return - } else if currentPhase == PHASE_INDEXING { - r := rand.Intn(100) - if r < 30 { - cycleExecute(this.Bookkeeping) - return - } + } else if this.CurrentPhase == PHASE_INDEXING { cycleExecute(this.Indexing) return - } else if currentPhase == PHASE_MAINTAIN { - cycleExecute(this.Bookkeeping) + } else if this.CurrentPhase == PHASE_MAINTAIN { + cycleExecute(this.Consolidate) return + } else if this.CurrentPhase == PHASE_PAUSE { + time.Sleep(5 * time.Second) + this.CurrentPhase = "" } return } -func(this *SearchIndexer) Discover() bool { +func(this *SearchIndexer) Discover(tx *sql.Tx) bool { if this.FoldersUnknown.Len() == 0 { + this.CurrentPhase = PHASE_INDEXING return false } - doc := heap.Pop(&this.FoldersUnknown).(*Document) + var doc *Document + doc = heap.Pop(&this.FoldersUnknown).(*Document) if doc == nil { + this.CurrentPhase = PHASE_INDEXING return false } files, err := this.Backend.Ls(doc.Path) if err != nil { + this.CurrentPhase = "" return true } if len(files) == 0 { @@ -406,32 +448,26 @@ func(this *SearchIndexer) Discover() bool { } return base64.StdEncoding.EncodeToString(hasher.Sum(nil)) }() + if hashFiles == this.lastHash { + return true + } + this.lastHash = "" for i:=0; i= ? AND path < ?", path, path + "~") + return err + } + + // Fetch FS as appear in our search cache + rows, err := tx.Query("SELECT filename, type, size FROM file WHERE parent = ?", path) + if err != nil { + return err + } + defer rows.Close() + previousFiles := make([]File, 0) + for rows.Next() { + var f File + rows.Scan(&f.FName, &f.FType, f.FSize) + previousFiles = append(previousFiles, f) + } + + // Perform the DB operation to ensure previousFiles and currFiles are in sync + // 1. Find the content that have been created and did not exist before + for i:=0; i= ? AND path < ?", + path, path + "~", + ) + return err } type Document struct { @@ -542,12 +773,16 @@ type Document struct { ModTime time.Time `json:"time"` Size int64 `json:"size"` Content []byte `json:"content"` + Priority int `json:"-"` } // https://golang.org/pkg/container/heap/ type HeapDoc []*Document func(h HeapDoc) Len() int { return len(h) } func(h HeapDoc) Less(i, j int) bool { + if h[i].Priority != 0 || h[j].Priority != 0 { + return h[i].Priority < h[j].Priority + } scoreA := len(strings.Split(h[i].Path, "/")) / len(strings.Split(h[i].InitialPath, "/")) scoreB := len(strings.Split(h[j].Path, "/")) / len(strings.Split(h[j].InitialPath, "/")) return scoreA < scoreB @@ -557,7 +792,11 @@ func(h HeapDoc) Swap(i, j int) { h[i] = h[j] h[j] = a } -func (h *HeapDoc) Push(x interface{}) { *h = append(*h, x.(*Document)) } +func (h *HeapDoc) Push(x interface{}) { + if h.Len() < MAX_HEAP_SIZE { + *h = append(*h, x.(*Document)) + } +} func (h *HeapDoc) Pop() interface{} { old := *h n := len(old)