// Copyright 2018 The Gitea Authors. All rights reserved. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. package issues import ( "context" "encoding/json" "os" "sync" "time" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/util" ) // IndexerData data stored in the issue indexer type IndexerData struct { ID int64 RepoID int64 Title string Content string Comments []string IsDelete bool IDs []int64 } // Match represents on search result type Match struct { ID int64 `json:"id"` Score float64 `json:"score"` } // SearchResult represents search results type SearchResult struct { Total int64 Hits []Match } // Indexer defines an interface to indexer issues contents type Indexer interface { Init() (bool, error) Index(issue []*IndexerData) error Delete(ids ...int64) error Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error) Close() error } type indexerHolder struct { indexer Indexer mutex sync.RWMutex cond *sync.Cond } func newIndexerHolder() *indexerHolder { h := &indexerHolder{} h.cond = sync.NewCond(h.mutex.RLocker()) return h } func (h *indexerHolder) set(indexer Indexer) { h.mutex.Lock() defer h.mutex.Unlock() h.indexer = indexer h.cond.Broadcast() } func (h *indexerHolder) get() Indexer { h.mutex.RLock() defer h.mutex.RUnlock() if h.indexer == nil { h.cond.Wait() } return h.indexer } var ( // issueIndexerQueue queue of issue ids to be updated issueIndexerQueue queue.Queue holder = newIndexerHolder() ) // InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until // all issue index done. func InitIssueIndexer(syncReindex bool) { waitChannel := make(chan time.Duration) // Create the Queue switch setting.Indexer.IssueType { case "bleve": handler := func(data ...queue.Data) { iData := make([]*IndexerData, 0, setting.Indexer.IssueQueueBatchNumber) for _, datum := range data { indexerData, ok := datum.(*IndexerData) if !ok { log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum) continue } log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete) if indexerData.IsDelete { _ = holder.get().Delete(indexerData.IDs...) continue } iData = append(iData, indexerData) } if err := holder.get().Index(iData); err != nil { log.Error("Error whilst indexing: %v Error: %v", iData, err) } } queueType := queue.PersistableChannelQueueType switch setting.Indexer.IssueQueueType { case setting.LevelQueueType: queueType = queue.LevelQueueType case setting.ChannelQueueType: queueType = queue.PersistableChannelQueueType case setting.RedisQueueType: queueType = queue.RedisQueueType default: log.Fatal("Unsupported indexer queue type: %v", setting.Indexer.IssueQueueType) } name := "issue_indexer_queue" opts := make(map[string]interface{}) opts["QueueLength"] = setting.Indexer.UpdateQueueLength opts["BatchLength"] = setting.Indexer.IssueQueueBatchNumber opts["DataDir"] = setting.Indexer.IssueQueueDir addrs, password, dbIdx, err := setting.ParseQueueConnStr(setting.Indexer.IssueQueueConnStr) if queueType == queue.RedisQueueType && err != nil { log.Fatal("Unable to parse connection string for RedisQueueType: %s : %v", setting.Indexer.IssueQueueConnStr, err) } opts["Addresses"] = addrs opts["Password"] = password opts["DBIndex"] = dbIdx opts["QueueName"] = name opts["Name"] = name opts["Workers"] = 1 opts["BlockTimeout"] = 1 * time.Second opts["BoostTimeout"] = 5 * time.Minute opts["BoostWorkers"] = 5 cfg, err := json.Marshal(opts) if err != nil { log.Error("Unable to marshall generic options: %v Error: %v", opts, err) log.Fatal("Unable to create issue indexer queue with type %s: %v", queueType, err) } log.Debug("Creating issue indexer queue with type %s: configuration: %s", queueType, string(cfg)) issueIndexerQueue, err = queue.CreateQueue(queueType, handler, cfg, &IndexerData{}) if err != nil { issueIndexerQueue, err = queue.CreateQueue(queue.WrappedQueueType, handler, queue.WrappedQueueConfiguration{ Underlying: queueType, Timeout: setting.GracefulHammerTime + 30*time.Second, MaxAttempts: 10, Config: cfg, QueueLength: setting.Indexer.UpdateQueueLength, Name: name, }, &IndexerData{}) } if err != nil { log.Fatal("Unable to create issue indexer queue with type %s: %v : %v", queueType, string(cfg), err) } default: issueIndexerQueue = &queue.DummyQueue{} } // Create the Indexer go func() { start := time.Now() log.Info("PID %d: Initializing Issue Indexer: %s", os.Getpid(), setting.Indexer.IssueType) var populate bool switch setting.Indexer.IssueType { case "bleve": graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(context.Context, func())) { issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath) exist, err := issueIndexer.Init() if err != nil { log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err) } populate = !exist holder.set(issueIndexer) atTerminate(context.Background(), func() { log.Debug("Closing issue indexer") issueIndexer := holder.get() if issueIndexer != nil { err := issueIndexer.Close() if err != nil { log.Error("Error whilst closing the issue indexer: %v", err) } } log.Info("PID: %d Issue Indexer closed", os.Getpid()) }) log.Debug("Created Bleve Indexer") }) case "db": issueIndexer := &DBIndexer{} holder.set(issueIndexer) default: log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType) } // Start processing the queue go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run) // Populate the index if populate { if syncReindex { graceful.GetManager().RunWithShutdownContext(populateIssueIndexer) } else { go graceful.GetManager().RunWithShutdownContext(populateIssueIndexer) } } waitChannel <- time.Since(start) }() if syncReindex { <-waitChannel } else if setting.Indexer.StartupTimeout > 0 { go func() { timeout := setting.Indexer.StartupTimeout if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 { timeout += setting.GracefulHammerTime } select { case duration := <-waitChannel: log.Info("Issue Indexer Initialization took %v", duration) case <-time.After(timeout): if shutdownable, ok := issueIndexerQueue.(queue.Shutdownable); ok { shutdownable.Terminate() } log.Fatal("Issue Indexer Initialization timed-out after: %v", timeout) } }() } } // populateIssueIndexer populate the issue indexer with issue data func populateIssueIndexer(ctx context.Context) { for page := 1; ; page++ { select { case <-ctx.Done(): log.Warn("Issue Indexer population shutdown before completion") return default: } repos, _, err := models.SearchRepositoryByName(&models.SearchRepoOptions{ Page: page, PageSize: models.RepositoryListDefaultPageSize, OrderBy: models.SearchOrderByID, Private: true, Collaborate: util.OptionalBoolFalse, }) if err != nil { log.Error("SearchRepositoryByName: %v", err) continue } if len(repos) == 0 { log.Debug("Issue Indexer population complete") return } for _, repo := range repos { select { case <-ctx.Done(): log.Info("Issue Indexer population shutdown before completion") return default: } UpdateRepoIndexer(repo) } } } // UpdateRepoIndexer add/update all issues of the repositories func UpdateRepoIndexer(repo *models.Repository) { is, err := models.Issues(&models.IssuesOptions{ RepoIDs: []int64{repo.ID}, IsClosed: util.OptionalBoolNone, IsPull: util.OptionalBoolNone, }) if err != nil { log.Error("Issues: %v", err) return } if err = models.IssueList(is).LoadDiscussComments(); err != nil { log.Error("LoadComments: %v", err) return } for _, issue := range is { UpdateIssueIndexer(issue) } } // UpdateIssueIndexer add/update an issue to the issue indexer func UpdateIssueIndexer(issue *models.Issue) { var comments []string for _, comment := range issue.Comments { if comment.Type == models.CommentTypeComment { comments = append(comments, comment.Content) } } indexerData := &IndexerData{ ID: issue.ID, RepoID: issue.RepoID, Title: issue.Title, Content: issue.Content, Comments: comments, } log.Debug("Adding to channel: %v", indexerData) if err := issueIndexerQueue.Push(indexerData); err != nil { log.Error("Unable to push to issue indexer: %v: Error: %v", indexerData, err) } } // DeleteRepoIssueIndexer deletes repo's all issues indexes func DeleteRepoIssueIndexer(repo *models.Repository) { var ids []int64 ids, err := models.GetIssueIDsByRepoID(repo.ID) if err != nil { log.Error("getIssueIDsByRepoID failed: %v", err) return } if len(ids) == 0 { return } indexerData := &IndexerData{ IDs: ids, IsDelete: true, } if err := issueIndexerQueue.Push(indexerData); err != nil { log.Error("Unable to push to issue indexer: %v: Error: %v", indexerData, err) } } // SearchIssuesByKeyword search issue ids by keywords and repo id func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) { var issueIDs []int64 res, err := holder.get().Search(keyword, repoIDs, 1000, 0) if err != nil { return nil, err } for _, r := range res.Hits { issueIDs = append(issueIDs, r.ID) } return issueIDs, nil }