Refactor code indexer (#9313)

* Refactor code indexer

* fix test

* fix test

* refactor code indexer

* fix import

* improve code

* fix typo

* fix test and make code clean

* fix lint
This commit is contained in:
Lunny Xiao 2019-12-23 20:31:16 +08:00 committed by GitHub
parent 2f9564f993
commit 89b4e0477b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 670 additions and 659 deletions

View File

@ -9,182 +9,90 @@ import (
"os" "os"
"strconv" "strconv"
"strings" "strings"
"time"
"code.gitea.io/gitea/models" "code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/base"
"code.gitea.io/gitea/modules/charset" "code.gitea.io/gitea/modules/charset"
"code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
"github.com/blevesearch/bleve"
"github.com/blevesearch/bleve/analysis/analyzer/custom"
"github.com/blevesearch/bleve/analysis/token/lowercase"
"github.com/blevesearch/bleve/analysis/token/unicodenorm"
"github.com/blevesearch/bleve/analysis/tokenizer/unicode"
"github.com/blevesearch/bleve/index/upsidedown"
"github.com/blevesearch/bleve/mapping"
"github.com/blevesearch/bleve/search/query"
"github.com/ethantkoenig/rupture" "github.com/ethantkoenig/rupture"
) )
type repoIndexerOperation struct { const unicodeNormalizeName = "unicodeNormalize"
repoID int64 const maxBatchSize = 16
deleted bool
watchers []chan<- error // indexerID a bleve-compatible unique identifier for an integer id
func indexerID(id int64) string {
return strconv.FormatInt(id, 36)
} }
var repoIndexerOperationQueue chan repoIndexerOperation // numericEqualityQuery a numeric equality query for the given value and field
func numericEqualityQuery(value int64, field string) *query.NumericRangeQuery {
// InitRepoIndexer initialize the repo indexer f := float64(value)
func InitRepoIndexer() { tru := true
if !setting.Indexer.RepoIndexerEnabled { q := bleve.NewNumericRangeInclusiveQuery(&f, &f, &tru, &tru)
return q.SetField(field)
} return q
waitChannel := make(chan time.Duration)
// FIXME: graceful: This should use a persistable queue
repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength)
go func() {
start := time.Now()
log.Info("PID: %d: Initializing Repository Indexer", os.Getpid())
initRepoIndexer(populateRepoIndexerAsynchronously)
go processRepoIndexerOperationQueue()
waitChannel <- time.Since(start)
}()
if setting.Indexer.StartupTimeout > 0 {
go func() {
timeout := setting.Indexer.StartupTimeout
if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 {
timeout += setting.GracefulHammerTime
}
select {
case duration := <-waitChannel:
log.Info("Repository Indexer Initialization took %v", duration)
case <-time.After(timeout):
log.Fatal("Repository Indexer Initialization Timed-Out after: %v", timeout)
}
}()
}
} }
// populateRepoIndexerAsynchronously asynchronously populates the repo indexer func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error {
// with pre-existing data. This should only be run when the indexer is created return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]interface{}{
// for the first time. "type": unicodenorm.Name,
func populateRepoIndexerAsynchronously() error { "form": unicodenorm.NFC,
exist, err := models.IsTableNotEmpty("repository") })
if err != nil {
return err
} else if !exist {
return nil
}
var maxRepoID int64
if maxRepoID, err = models.GetMaxID("repository"); err != nil {
return err
}
go populateRepoIndexer(maxRepoID)
return nil
} }
// populateRepoIndexer populate the repo indexer with pre-existing data. This // openIndexer open the index at the specified path, checking for metadata
// should only be run when the indexer is created for the first time. // updates and bleve version updates. If index needs to be created (or
// FIXME: graceful: This should use a persistable queue // re-created), returns (nil, nil)
func populateRepoIndexer(maxRepoID int64) { func openIndexer(path string, latestVersion int) (bleve.Index, error) {
log.Info("Populating the repo indexer with existing repositories") _, err := os.Stat(path)
if err != nil && os.IsNotExist(err) {
isShutdown := graceful.GetManager().IsShutdown() return nil, nil
} else if err != nil {
// start with the maximum existing repo ID and work backwards, so that we
// don't include repos that are created after gitea starts; such repos will
// already be added to the indexer, and we don't need to add them again.
for maxRepoID > 0 {
select {
case <-isShutdown:
log.Info("Repository Indexer population shutdown before completion")
return
default:
}
ids, err := models.GetUnindexedRepos(maxRepoID, 0, 50)
if err != nil {
log.Error("populateRepoIndexer: %v", err)
return
} else if len(ids) == 0 {
break
}
for _, id := range ids {
select {
case <-isShutdown:
log.Info("Repository Indexer population shutdown before completion")
return
default:
}
repoIndexerOperationQueue <- repoIndexerOperation{
repoID: id,
deleted: false,
}
maxRepoID = id - 1
}
}
log.Info("Done (re)populating the repo indexer with existing repositories")
}
func updateRepoIndexer(repoID int64) error {
repo, err := models.GetRepositoryByID(repoID)
if err != nil {
return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepositoryByID: %d, Error: %v", repoID, err)
}
sha, err := getDefaultBranchSha(repo)
if err != nil {
return fmt.Errorf("UpdateRepoIndexer: Unable to GetDefaultBranchSha for: %s/%s, Error: %v", repo.MustOwnerName(), repo.Name, err)
}
changes, err := getRepoChanges(repo, sha)
if err != nil {
return fmt.Errorf("UpdateRepoIndexer: Unable to GetRepoChanges for: %s/%s Sha: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, err)
} else if changes == nil {
return nil
}
batch := RepoIndexerBatch()
for _, update := range changes.Updates {
if err := addUpdate(update, repo, batch); err != nil {
return fmt.Errorf("UpdateRepoIndexer: Unable to addUpdate to: %s/%s Sha: %s, update: %s(%s) Error: %v", repo.MustOwnerName(), repo.Name, sha, update.Filename, update.BlobSha, err)
}
}
for _, filename := range changes.RemovedFilenames {
if err := addDelete(filename, repo, batch); err != nil {
return fmt.Errorf("UpdateRepoIndexer: Unable to addDelete to: %s/%s Sha: %s, filename: %s Error: %v", repo.MustOwnerName(), repo.Name, sha, filename, err)
}
}
if err = batch.Flush(); err != nil {
return fmt.Errorf("UpdateRepoIndexer: Unable to flush batch to indexer for repo: %s/%s Error: %v", repo.MustOwnerName(), repo.Name, err)
}
return repo.UpdateIndexerStatus(sha)
}
// repoChanges changes (file additions/updates/removals) to a repo
type repoChanges struct {
Updates []fileUpdate
RemovedFilenames []string
}
type fileUpdate struct {
Filename string
BlobSha string
}
func getDefaultBranchSha(repo *models.Repository) (string, error) {
stdout, err := git.NewCommand("show-ref", "-s", git.BranchPrefix+repo.DefaultBranch).RunInDir(repo.RepoPath())
if err != nil {
return "", err
}
return strings.TrimSpace(stdout), nil
}
// getRepoChanges returns changes to repo since last indexer update
func getRepoChanges(repo *models.Repository, revision string) (*repoChanges, error) {
if err := repo.GetIndexerStatus(); err != nil {
return nil, err return nil, err
} }
if len(repo.IndexerStatus.CommitSha) == 0 { metadata, err := rupture.ReadIndexMetadata(path)
return genesisChanges(repo, revision) if err != nil {
return nil, err
} }
return nonGenesisChanges(repo, revision) if metadata.Version < latestVersion {
// the indexer is using a previous version, so we should delete it and
// re-populate
return nil, os.RemoveAll(path)
}
index, err := bleve.Open(path)
if err != nil && err == upsidedown.IncompatibleVersion {
// the indexer was built with a previous version of bleve, so we should
// delete it and re-populate
return nil, os.RemoveAll(path)
} else if err != nil {
return nil, err
}
return index, nil
}
// RepoIndexerData data stored in the repo indexer
type RepoIndexerData struct {
RepoID int64
Content string
}
// Type returns the document type, for bleve's mapping.Classifier interface.
func (d *RepoIndexerData) Type() string {
return repoIndexerDocType
} }
func addUpdate(update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error { func addUpdate(update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error {
@ -207,174 +115,231 @@ func addUpdate(update fileUpdate, repo *models.Repository, batch rupture.Flushin
// FIXME: UTF-16 files will probably fail here // FIXME: UTF-16 files will probably fail here
return nil return nil
} }
indexerUpdate := RepoIndexerUpdate{
Filepath: update.Filename, id := filenameIndexerID(repo.ID, update.Filename)
Op: RepoIndexerOpUpdate, return batch.Index(id, &RepoIndexerData{
Data: &RepoIndexerData{ RepoID: repo.ID,
RepoID: repo.ID, Content: string(charset.ToUTF8DropErrors(fileContents)),
Content: string(charset.ToUTF8DropErrors(fileContents)), })
},
}
return indexerUpdate.AddToFlushingBatch(batch)
} }
func addDelete(filename string, repo *models.Repository, batch rupture.FlushingBatch) error { func addDelete(filename string, repo *models.Repository, batch rupture.FlushingBatch) error {
indexerUpdate := RepoIndexerUpdate{ id := filenameIndexerID(repo.ID, filename)
Filepath: filename, return batch.Delete(id)
Op: RepoIndexerOpDelete,
Data: &RepoIndexerData{
RepoID: repo.ID,
},
}
return indexerUpdate.AddToFlushingBatch(batch)
} }
func isIndexable(entry *git.TreeEntry) bool { const (
if !entry.IsRegular() && !entry.IsExecutable() { repoIndexerAnalyzer = "repoIndexerAnalyzer"
return false repoIndexerDocType = "repoIndexerDocType"
} repoIndexerLatestVersion = 4
name := strings.ToLower(entry.Name()) )
for _, g := range setting.Indexer.ExcludePatterns {
if g.Match(name) {
return false
}
}
for _, g := range setting.Indexer.IncludePatterns {
if g.Match(name) {
return true
}
}
return len(setting.Indexer.IncludePatterns) == 0
}
// parseGitLsTreeOutput parses the output of a `git ls-tree -r --full-name` command // createRepoIndexer create a repo indexer if one does not already exist
func parseGitLsTreeOutput(stdout []byte) ([]fileUpdate, error) { func createRepoIndexer(path string, latestVersion int) (bleve.Index, error) {
entries, err := git.ParseTreeEntries(stdout) docMapping := bleve.NewDocumentMapping()
numericFieldMapping := bleve.NewNumericFieldMapping()
numericFieldMapping.IncludeInAll = false
docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping)
textFieldMapping := bleve.NewTextFieldMapping()
textFieldMapping.IncludeInAll = false
docMapping.AddFieldMappingsAt("Content", textFieldMapping)
mapping := bleve.NewIndexMapping()
if err := addUnicodeNormalizeTokenFilter(mapping); err != nil {
return nil, err
} else if err := mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{
"type": custom.Name,
"char_filters": []string{},
"tokenizer": unicode.Name,
"token_filters": []string{unicodeNormalizeName, lowercase.Name},
}); err != nil {
return nil, err
}
mapping.DefaultAnalyzer = repoIndexerAnalyzer
mapping.AddDocumentMapping(repoIndexerDocType, docMapping)
mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping())
indexer, err := bleve.New(path, mapping)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var idxCount = 0
updates := make([]fileUpdate, len(entries))
for _, entry := range entries {
if isIndexable(entry) {
updates[idxCount] = fileUpdate{
Filename: entry.Name(),
BlobSha: entry.ID.String(),
}
idxCount++
}
}
return updates[:idxCount], nil
}
// genesisChanges get changes to add repo to the indexer for the first time if err = rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{
func genesisChanges(repo *models.Repository, revision string) (*repoChanges, error) { Version: latestVersion,
var changes repoChanges }); err != nil {
stdout, err := git.NewCommand("ls-tree", "--full-tree", "-r", revision).
RunInDirBytes(repo.RepoPath())
if err != nil {
return nil, err return nil, err
} }
changes.Updates, err = parseGitLsTreeOutput(stdout) return indexer, nil
return &changes, err
} }
// nonGenesisChanges get changes since the previous indexer update func filenameIndexerID(repoID int64, filename string) string {
func nonGenesisChanges(repo *models.Repository, revision string) (*repoChanges, error) { return indexerID(repoID) + "_" + filename
diffCmd := git.NewCommand("diff", "--name-status", }
repo.IndexerStatus.CommitSha, revision)
stdout, err := diffCmd.RunInDir(repo.RepoPath()) func filenameOfIndexerID(indexerID string) string {
index := strings.IndexByte(indexerID, '_')
if index == -1 {
log.Error("Unexpected ID in repo indexer: %s", indexerID)
}
return indexerID[index+1:]
}
var (
_ Indexer = &BleveIndexer{}
)
// BleveIndexer represents a bleve indexer implementation
type BleveIndexer struct {
indexDir string
indexer bleve.Index
}
// NewBleveIndexer creates a new bleve local indexer
func NewBleveIndexer(indexDir string) (*BleveIndexer, bool, error) {
indexer := &BleveIndexer{
indexDir: indexDir,
}
created, err := indexer.init()
return indexer, created, err
}
// init init the indexer
func (b *BleveIndexer) init() (bool, error) {
var err error
b.indexer, err = openIndexer(b.indexDir, repoIndexerLatestVersion)
if err != nil { if err != nil {
// previous commit sha may have been removed by a force push, so return false, err
// try rebuilding from scratch
log.Warn("git diff: %v", err)
if err = deleteRepoFromIndexer(repo.ID); err != nil {
return nil, err
}
return genesisChanges(repo, revision)
} }
var changes repoChanges if b.indexer != nil {
updatedFilenames := make([]string, 0, 10) return false, nil
for _, line := range strings.Split(stdout, "\n") {
line = strings.TrimSpace(line)
if len(line) == 0 {
continue
}
filename := strings.TrimSpace(line[1:])
if len(filename) == 0 {
continue
} else if filename[0] == '"' {
filename, err = strconv.Unquote(filename)
if err != nil {
return nil, err
}
}
switch status := line[0]; status {
case 'M', 'A':
updatedFilenames = append(updatedFilenames, filename)
case 'D':
changes.RemovedFilenames = append(changes.RemovedFilenames, filename)
default:
log.Warn("Unrecognized status: %c (line=%s)", status, line)
}
} }
cmd := git.NewCommand("ls-tree", "--full-tree", revision, "--") b.indexer, err = createRepoIndexer(b.indexDir, repoIndexerLatestVersion)
cmd.AddArguments(updatedFilenames...)
lsTreeStdout, err := cmd.RunInDirBytes(repo.RepoPath())
if err != nil { if err != nil {
return nil, err return false, err
} }
changes.Updates, err = parseGitLsTreeOutput(lsTreeStdout)
return &changes, err return true, nil
} }
func processRepoIndexerOperationQueue() { // Close close the indexer
for { func (b *BleveIndexer) Close() {
select { log.Debug("Closing repo indexer")
case op := <-repoIndexerOperationQueue: if b.indexer != nil {
var err error err := b.indexer.Close()
if op.deleted { if err != nil {
if err = deleteRepoFromIndexer(op.repoID); err != nil { log.Error("Error whilst closing the repository indexer: %v", err)
log.Error("DeleteRepoFromIndexer: %v", err) }
} }
} else { log.Info("PID: %d Repository Indexer closed", os.Getpid())
if err = updateRepoIndexer(op.repoID); err != nil { }
log.Error("updateRepoIndexer: %v", err)
} // Index indexes the data
} func (b *BleveIndexer) Index(repoID int64) error {
for _, watcher := range op.watchers { repo, err := models.GetRepositoryByID(repoID)
watcher <- err if err != nil {
} return err
case <-graceful.GetManager().IsShutdown(): }
log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid())
return sha, err := getDefaultBranchSha(repo)
if err != nil {
return err
}
changes, err := getRepoChanges(repo, sha)
if err != nil {
return err
} else if changes == nil {
return nil
}
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
for _, update := range changes.Updates {
if err := addUpdate(update, repo, batch); err != nil {
return err
}
}
for _, filename := range changes.RemovedFilenames {
if err := addDelete(filename, repo, batch); err != nil {
return err
}
}
if err = batch.Flush(); err != nil {
return err
}
return repo.UpdateIndexerStatus(sha)
}
// Delete deletes indexes by ids
func (b *BleveIndexer) Delete(repoID int64) error {
query := numericEqualityQuery(repoID, "RepoID")
searchRequest := bleve.NewSearchRequestOptions(query, 2147483647, 0, false)
result, err := b.indexer.Search(searchRequest)
if err != nil {
return err
}
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
for _, hit := range result.Hits {
if err = batch.Delete(hit.ID); err != nil {
return err
}
}
return batch.Flush()
}
// Search searches for files in the specified repo.
// Returns the matching file-paths
func (b *BleveIndexer) Search(repoIDs []int64, keyword string, page, pageSize int) (int64, []*SearchResult, error) {
phraseQuery := bleve.NewMatchPhraseQuery(keyword)
phraseQuery.FieldVal = "Content"
phraseQuery.Analyzer = repoIndexerAnalyzer
var indexerQuery query.Query
if len(repoIDs) > 0 {
var repoQueries = make([]query.Query, 0, len(repoIDs))
for _, repoID := range repoIDs {
repoQueries = append(repoQueries, numericEqualityQuery(repoID, "RepoID"))
} }
indexerQuery = bleve.NewConjunctionQuery(
bleve.NewDisjunctionQuery(repoQueries...),
phraseQuery,
)
} else {
indexerQuery = phraseQuery
} }
}
// DeleteRepoFromIndexer remove all of a repository's entries from the indexer from := (page - 1) * pageSize
func DeleteRepoFromIndexer(repo *models.Repository, watchers ...chan<- error) { searchRequest := bleve.NewSearchRequestOptions(indexerQuery, pageSize, from, false)
addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: true, watchers: watchers}) searchRequest.Fields = []string{"Content", "RepoID"}
} searchRequest.IncludeLocations = true
// UpdateRepoIndexer update a repository's entries in the indexer result, err := b.indexer.Search(searchRequest)
func UpdateRepoIndexer(repo *models.Repository, watchers ...chan<- error) { if err != nil {
addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: false, watchers: watchers}) return 0, nil, err
}
func addOperationToQueue(op repoIndexerOperation) {
if !setting.Indexer.RepoIndexerEnabled {
return
} }
select {
case repoIndexerOperationQueue <- op: searchResults := make([]*SearchResult, len(result.Hits))
break for i, hit := range result.Hits {
default: var startIndex, endIndex int = -1, -1
go func() { for _, locations := range hit.Locations["Content"] {
repoIndexerOperationQueue <- op location := locations[0]
}() locationStart := int(location.Start)
locationEnd := int(location.End)
if startIndex < 0 || locationStart < startIndex {
startIndex = locationStart
}
if endIndex < 0 || locationEnd > endIndex {
endIndex = locationEnd
}
}
searchResults[i] = &SearchResult{
RepoID: int64(hit.Fields["RepoID"].(float64)),
StartIndex: startIndex,
EndIndex: endIndex,
Filename: filenameOfIndexerID(hit.ID),
Content: hit.Fields["Content"].(string),
}
} }
return int64(result.Total), searchResults, nil
} }

View File

@ -5,12 +5,66 @@
package code package code
import ( import (
"os"
"path/filepath" "path/filepath"
"testing" "testing"
"code.gitea.io/gitea/models" "code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"github.com/stretchr/testify/assert"
) )
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
models.MainTest(m, filepath.Join("..", "..", "..")) models.MainTest(m, filepath.Join("..", "..", ".."))
} }
func TestIndexAndSearch(t *testing.T) {
models.PrepareTestEnv(t)
dir := "./bleve.index"
os.RemoveAll(dir)
setting.Indexer.RepoIndexerEnabled = true
idx, _, err := NewBleveIndexer(dir)
if err != nil {
idx.Close()
log.Fatal("indexer.Init: %v", err)
}
err = idx.Index(1)
assert.NoError(t, err)
var (
keywords = []struct {
Keyword string
IDs []int64
}{
{
Keyword: "Description",
IDs: []int64{1},
},
{
Keyword: "repo1",
IDs: []int64{1},
},
{
Keyword: "non-exist",
IDs: []int64{},
},
}
)
for _, kw := range keywords {
total, res, err := idx.Search(nil, kw.Keyword, 1, 10)
assert.NoError(t, err)
assert.EqualValues(t, len(kw.IDs), total)
var ids = make([]int64, 0, len(res))
for _, hit := range res {
ids = append(ids, hit.RepoID)
}
assert.EqualValues(t, kw.IDs, ids)
}
}

147
modules/indexer/code/git.go Normal file
View File

@ -0,0 +1,147 @@
// Copyright 2019 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package code
import (
"strconv"
"strings"
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
)
type fileUpdate struct {
Filename string
BlobSha string
}
// repoChanges changes (file additions/updates/removals) to a repo
type repoChanges struct {
Updates []fileUpdate
RemovedFilenames []string
}
func getDefaultBranchSha(repo *models.Repository) (string, error) {
stdout, err := git.NewCommand("show-ref", "-s", git.BranchPrefix+repo.DefaultBranch).RunInDir(repo.RepoPath())
if err != nil {
return "", err
}
return strings.TrimSpace(stdout), nil
}
// getRepoChanges returns changes to repo since last indexer update
func getRepoChanges(repo *models.Repository, revision string) (*repoChanges, error) {
if err := repo.GetIndexerStatus(); err != nil {
return nil, err
}
if len(repo.IndexerStatus.CommitSha) == 0 {
return genesisChanges(repo, revision)
}
return nonGenesisChanges(repo, revision)
}
func isIndexable(entry *git.TreeEntry) bool {
if !entry.IsRegular() && !entry.IsExecutable() {
return false
}
name := strings.ToLower(entry.Name())
for _, g := range setting.Indexer.ExcludePatterns {
if g.Match(name) {
return false
}
}
for _, g := range setting.Indexer.IncludePatterns {
if g.Match(name) {
return true
}
}
return len(setting.Indexer.IncludePatterns) == 0
}
// parseGitLsTreeOutput parses the output of a `git ls-tree -r --full-name` command
func parseGitLsTreeOutput(stdout []byte) ([]fileUpdate, error) {
entries, err := git.ParseTreeEntries(stdout)
if err != nil {
return nil, err
}
var idxCount = 0
updates := make([]fileUpdate, len(entries))
for _, entry := range entries {
if isIndexable(entry) {
updates[idxCount] = fileUpdate{
Filename: entry.Name(),
BlobSha: entry.ID.String(),
}
idxCount++
}
}
return updates[:idxCount], nil
}
// genesisChanges get changes to add repo to the indexer for the first time
func genesisChanges(repo *models.Repository, revision string) (*repoChanges, error) {
var changes repoChanges
stdout, err := git.NewCommand("ls-tree", "--full-tree", "-r", revision).
RunInDirBytes(repo.RepoPath())
if err != nil {
return nil, err
}
changes.Updates, err = parseGitLsTreeOutput(stdout)
return &changes, err
}
// nonGenesisChanges get changes since the previous indexer update
func nonGenesisChanges(repo *models.Repository, revision string) (*repoChanges, error) {
diffCmd := git.NewCommand("diff", "--name-status",
repo.IndexerStatus.CommitSha, revision)
stdout, err := diffCmd.RunInDir(repo.RepoPath())
if err != nil {
// previous commit sha may have been removed by a force push, so
// try rebuilding from scratch
log.Warn("git diff: %v", err)
if err = indexer.Delete(repo.ID); err != nil {
return nil, err
}
return genesisChanges(repo, revision)
}
var changes repoChanges
updatedFilenames := make([]string, 0, 10)
for _, line := range strings.Split(stdout, "\n") {
line = strings.TrimSpace(line)
if len(line) == 0 {
continue
}
filename := strings.TrimSpace(line[1:])
if len(filename) == 0 {
continue
} else if filename[0] == '"' {
filename, err = strconv.Unquote(filename)
if err != nil {
return nil, err
}
}
switch status := line[0]; status {
case 'M', 'A':
updatedFilenames = append(updatedFilenames, filename)
case 'D':
changes.RemovedFilenames = append(changes.RemovedFilenames, filename)
default:
log.Warn("Unrecognized status: %c (line=%s)", status, line)
}
}
cmd := git.NewCommand("ls-tree", "--full-tree", revision, "--")
cmd.AddArguments(updatedFilenames...)
lsTreeStdout, err := cmd.RunInDirBytes(repo.RepoPath())
if err != nil {
return nil, err
}
changes.Updates, err = parseGitLsTreeOutput(lsTreeStdout)
return &changes, err
}

View File

@ -5,72 +5,73 @@
package code package code
import ( import (
"os" "time"
"strconv"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
"github.com/blevesearch/bleve"
"github.com/blevesearch/bleve/analysis/token/unicodenorm"
"github.com/blevesearch/bleve/index/upsidedown"
"github.com/blevesearch/bleve/mapping"
"github.com/blevesearch/bleve/search/query"
"github.com/ethantkoenig/rupture"
) )
// indexerID a bleve-compatible unique identifier for an integer id var (
func indexerID(id int64) string { indexer Indexer
return strconv.FormatInt(id, 36) )
// SearchResult result of performing a search in a repo
type SearchResult struct {
RepoID int64
StartIndex int
EndIndex int
Filename string
Content string
} }
// numericEqualityQuery a numeric equality query for the given value and field // Indexer defines an interface to indexer issues contents
func numericEqualityQuery(value int64, field string) *query.NumericRangeQuery { type Indexer interface {
f := float64(value) Index(repoID int64) error
tru := true Delete(repoID int64) error
q := bleve.NewNumericRangeInclusiveQuery(&f, &f, &tru, &tru) Search(repoIDs []int64, keyword string, page, pageSize int) (int64, []*SearchResult, error)
q.SetField(field) Close()
return q
} }
const unicodeNormalizeName = "unicodeNormalize" // Init initialize the repo indexer
func Init() {
if !setting.Indexer.RepoIndexerEnabled {
return
}
func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { waitChannel := make(chan time.Duration)
return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]interface{}{ go func() {
"type": unicodenorm.Name, start := time.Now()
"form": unicodenorm.NFC, log.Info("Initializing Repository Indexer")
}) var created bool
} var err error
indexer, created, err = NewBleveIndexer(setting.Indexer.RepoPath)
const maxBatchSize = 16 if err != nil {
indexer.Close()
// openIndexer open the index at the specified path, checking for metadata log.Fatal("indexer.Init: %v", err)
// updates and bleve version updates. If index needs to be created (or }
// re-created), returns (nil, nil)
func openIndexer(path string, latestVersion int) (bleve.Index, error) { go processRepoIndexerOperationQueue(indexer)
_, err := os.Stat(setting.Indexer.IssuePath)
if err != nil && os.IsNotExist(err) { if created {
return nil, nil go populateRepoIndexer()
} else if err != nil { }
return nil, err
} waitChannel <- time.Since(start)
}()
metadata, err := rupture.ReadIndexMetadata(path)
if err != nil { if setting.Indexer.StartupTimeout > 0 {
return nil, err go func() {
} timeout := setting.Indexer.StartupTimeout
if metadata.Version < latestVersion { if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 {
// the indexer is using a previous version, so we should delete it and timeout += setting.GracefulHammerTime
// re-populate }
return nil, os.RemoveAll(path) select {
} case duration := <-waitChannel:
log.Info("Repository Indexer Initialization took %v", duration)
index, err := bleve.Open(path) case <-time.After(timeout):
if err != nil && err == upsidedown.IncompatibleVersion { log.Fatal("Repository Indexer Initialization Timed-Out after: %v", timeout)
// the indexer was built with a previous version of bleve, so we should }
// delete it and re-populate }()
return nil, os.RemoveAll(path) }
} else if err != nil {
return nil, err
}
return index, nil
} }

View File

@ -0,0 +1,133 @@
// Copyright 2019 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package code
import (
"os"
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
)
type repoIndexerOperation struct {
repoID int64
deleted bool
watchers []chan<- error
}
var repoIndexerOperationQueue chan repoIndexerOperation
func processRepoIndexerOperationQueue(indexer Indexer) {
defer indexer.Close()
repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength)
for {
select {
case op := <-repoIndexerOperationQueue:
var err error
if op.deleted {
if err = indexer.Delete(op.repoID); err != nil {
log.Error("indexer.Delete: %v", err)
}
} else {
if err = indexer.Index(op.repoID); err != nil {
log.Error("indexer.Index: %v", err)
}
}
for _, watcher := range op.watchers {
watcher <- err
}
case <-graceful.GetManager().IsShutdown():
log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid())
return
}
}
}
// DeleteRepoFromIndexer remove all of a repository's entries from the indexer
func DeleteRepoFromIndexer(repo *models.Repository, watchers ...chan<- error) {
addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: true, watchers: watchers})
}
// UpdateRepoIndexer update a repository's entries in the indexer
func UpdateRepoIndexer(repo *models.Repository, watchers ...chan<- error) {
addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: false, watchers: watchers})
}
func addOperationToQueue(op repoIndexerOperation) {
if !setting.Indexer.RepoIndexerEnabled {
return
}
select {
case repoIndexerOperationQueue <- op:
break
default:
go func() {
repoIndexerOperationQueue <- op
}()
}
}
// populateRepoIndexer populate the repo indexer with pre-existing data. This
// should only be run when the indexer is created for the first time.
func populateRepoIndexer() {
log.Info("Populating the repo indexer with existing repositories")
isShutdown := graceful.GetManager().IsShutdown()
exist, err := models.IsTableNotEmpty("repository")
if err != nil {
log.Fatal("System error: %v", err)
} else if !exist {
return
}
// if there is any existing repo indexer metadata in the DB, delete it
// since we are starting afresh. Also, xorm requires deletes to have a
// condition, and we want to delete everything, thus 1=1.
if err := models.DeleteAllRecords("repo_indexer_status"); err != nil {
log.Fatal("System error: %v", err)
}
var maxRepoID int64
if maxRepoID, err = models.GetMaxID("repository"); err != nil {
log.Fatal("System error: %v", err)
}
// start with the maximum existing repo ID and work backwards, so that we
// don't include repos that are created after gitea starts; such repos will
// already be added to the indexer, and we don't need to add them again.
for maxRepoID > 0 {
select {
case <-isShutdown:
log.Info("Repository Indexer population shutdown before completion")
return
default:
}
ids, err := models.GetUnindexedRepos(maxRepoID, 0, 50)
if err != nil {
log.Error("populateRepoIndexer: %v", err)
return
} else if len(ids) == 0 {
break
}
for _, id := range ids {
select {
case <-isShutdown:
log.Info("Repository Indexer population shutdown before completion")
return
default:
}
repoIndexerOperationQueue <- repoIndexerOperation{
repoID: id,
deleted: false,
}
maxRepoID = id - 1
}
}
log.Info("Done (re)populating the repo indexer with existing repositories")
}

View File

@ -1,290 +0,0 @@
// Copyright 2017 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package code
import (
"context"
"os"
"strings"
"sync"
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"github.com/blevesearch/bleve"
"github.com/blevesearch/bleve/analysis/analyzer/custom"
"github.com/blevesearch/bleve/analysis/token/lowercase"
"github.com/blevesearch/bleve/analysis/tokenizer/unicode"
"github.com/blevesearch/bleve/search/query"
"github.com/ethantkoenig/rupture"
)
const (
repoIndexerAnalyzer = "repoIndexerAnalyzer"
repoIndexerDocType = "repoIndexerDocType"
repoIndexerLatestVersion = 4
)
type bleveIndexerHolder struct {
index bleve.Index
mutex sync.RWMutex
cond *sync.Cond
}
func newBleveIndexerHolder() *bleveIndexerHolder {
b := &bleveIndexerHolder{}
b.cond = sync.NewCond(b.mutex.RLocker())
return b
}
func (r *bleveIndexerHolder) set(index bleve.Index) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.index = index
r.cond.Broadcast()
}
func (r *bleveIndexerHolder) get() bleve.Index {
r.mutex.RLock()
defer r.mutex.RUnlock()
if r.index == nil {
r.cond.Wait()
}
return r.index
}
// repoIndexer (thread-safe) index for repository contents
var indexerHolder = newBleveIndexerHolder()
// RepoIndexerOp type of operation to perform on repo indexer
type RepoIndexerOp int
const (
// RepoIndexerOpUpdate add/update a file's contents
RepoIndexerOpUpdate = iota
// RepoIndexerOpDelete delete a file
RepoIndexerOpDelete
)
// RepoIndexerData data stored in the repo indexer
type RepoIndexerData struct {
RepoID int64
Content string
}
// Type returns the document type, for bleve's mapping.Classifier interface.
func (d *RepoIndexerData) Type() string {
return repoIndexerDocType
}
// RepoIndexerUpdate an update to the repo indexer
type RepoIndexerUpdate struct {
Filepath string
Op RepoIndexerOp
Data *RepoIndexerData
}
// AddToFlushingBatch adds the update to the given flushing batch.
func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) error {
id := filenameIndexerID(update.Data.RepoID, update.Filepath)
switch update.Op {
case RepoIndexerOpUpdate:
return batch.Index(id, update.Data)
case RepoIndexerOpDelete:
return batch.Delete(id)
default:
log.Error("Unrecognized repo indexer op: %d", update.Op)
}
return nil
}
// initRepoIndexer initialize repo indexer
func initRepoIndexer(populateIndexer func() error) {
indexer, err := openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion)
if err != nil {
log.Fatal("InitRepoIndexer %s: %v", setting.Indexer.RepoPath, err)
}
if indexer != nil {
indexerHolder.set(indexer)
closeAtTerminate()
// Continue population from where left off
if err = populateIndexer(); err != nil {
log.Fatal("PopulateRepoIndex: %v", err)
}
return
}
if err = createRepoIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion); err != nil {
log.Fatal("CreateRepoIndexer: %v", err)
}
closeAtTerminate()
// if there is any existing repo indexer metadata in the DB, delete it
// since we are starting afresh. Also, xorm requires deletes to have a
// condition, and we want to delete everything, thus 1=1.
if err := models.DeleteAllRecords("repo_indexer_status"); err != nil {
log.Fatal("DeleteAllRepoIndexerStatus: %v", err)
}
if err = populateIndexer(); err != nil {
log.Fatal("PopulateRepoIndex: %v", err)
}
}
func closeAtTerminate() {
graceful.GetManager().RunAtTerminate(context.Background(), func() {
log.Debug("Closing repo indexer")
indexer := indexerHolder.get()
if indexer != nil {
err := indexer.Close()
if err != nil {
log.Error("Error whilst closing the repository indexer: %v", err)
}
}
log.Info("PID: %d Repository Indexer closed", os.Getpid())
})
}
// createRepoIndexer create a repo indexer if one does not already exist
func createRepoIndexer(path string, latestVersion int) error {
docMapping := bleve.NewDocumentMapping()
numericFieldMapping := bleve.NewNumericFieldMapping()
numericFieldMapping.IncludeInAll = false
docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping)
textFieldMapping := bleve.NewTextFieldMapping()
textFieldMapping.IncludeInAll = false
docMapping.AddFieldMappingsAt("Content", textFieldMapping)
mapping := bleve.NewIndexMapping()
if err := addUnicodeNormalizeTokenFilter(mapping); err != nil {
return err
} else if err := mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{
"type": custom.Name,
"char_filters": []string{},
"tokenizer": unicode.Name,
"token_filters": []string{unicodeNormalizeName, lowercase.Name},
}); err != nil {
return err
}
mapping.DefaultAnalyzer = repoIndexerAnalyzer
mapping.AddDocumentMapping(repoIndexerDocType, docMapping)
mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping())
indexer, err := bleve.New(path, mapping)
if err != nil {
return err
}
indexerHolder.set(indexer)
return rupture.WriteIndexMetadata(path, &rupture.IndexMetadata{
Version: latestVersion,
})
}
func filenameIndexerID(repoID int64, filename string) string {
return indexerID(repoID) + "_" + filename
}
func filenameOfIndexerID(indexerID string) string {
index := strings.IndexByte(indexerID, '_')
if index == -1 {
log.Error("Unexpected ID in repo indexer: %s", indexerID)
}
return indexerID[index+1:]
}
// RepoIndexerBatch batch to add updates to
func RepoIndexerBatch() rupture.FlushingBatch {
return rupture.NewFlushingBatch(indexerHolder.get(), maxBatchSize)
}
// deleteRepoFromIndexer delete all of a repo's files from indexer
func deleteRepoFromIndexer(repoID int64) error {
query := numericEqualityQuery(repoID, "RepoID")
searchRequest := bleve.NewSearchRequestOptions(query, 2147483647, 0, false)
result, err := indexerHolder.get().Search(searchRequest)
if err != nil {
return err
}
batch := RepoIndexerBatch()
for _, hit := range result.Hits {
if err = batch.Delete(hit.ID); err != nil {
return err
}
}
return batch.Flush()
}
// RepoSearchResult result of performing a search in a repo
type RepoSearchResult struct {
RepoID int64
StartIndex int
EndIndex int
Filename string
Content string
}
// SearchRepoByKeyword searches for files in the specified repo.
// Returns the matching file-paths
func SearchRepoByKeyword(repoIDs []int64, keyword string, page, pageSize int) (int64, []*RepoSearchResult, error) {
phraseQuery := bleve.NewMatchPhraseQuery(keyword)
phraseQuery.FieldVal = "Content"
phraseQuery.Analyzer = repoIndexerAnalyzer
var indexerQuery query.Query
if len(repoIDs) > 0 {
var repoQueries = make([]query.Query, 0, len(repoIDs))
for _, repoID := range repoIDs {
repoQueries = append(repoQueries, numericEqualityQuery(repoID, "RepoID"))
}
indexerQuery = bleve.NewConjunctionQuery(
bleve.NewDisjunctionQuery(repoQueries...),
phraseQuery,
)
} else {
indexerQuery = phraseQuery
}
from := (page - 1) * pageSize
searchRequest := bleve.NewSearchRequestOptions(indexerQuery, pageSize, from, false)
searchRequest.Fields = []string{"Content", "RepoID"}
searchRequest.IncludeLocations = true
result, err := indexerHolder.get().Search(searchRequest)
if err != nil {
return 0, nil, err
}
searchResults := make([]*RepoSearchResult, len(result.Hits))
for i, hit := range result.Hits {
var startIndex, endIndex int = -1, -1
for _, locations := range hit.Locations["Content"] {
location := locations[0]
locationStart := int(location.Start)
locationEnd := int(location.End)
if startIndex < 0 || locationStart < startIndex {
startIndex = locationStart
}
if endIndex < 0 || locationEnd > endIndex {
endIndex = locationEnd
}
}
searchResults[i] = &RepoSearchResult{
RepoID: int64(hit.Fields["RepoID"].(float64)),
StartIndex: startIndex,
EndIndex: endIndex,
Filename: filenameOfIndexerID(hit.ID),
Content: hit.Fields["Content"].(string),
}
}
return int64(result.Total), searchResults, nil
}

View File

@ -2,7 +2,7 @@
// Use of this source code is governed by a MIT-style // Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package search package code
import ( import (
"bytes" "bytes"
@ -11,7 +11,6 @@ import (
"strings" "strings"
"code.gitea.io/gitea/modules/highlight" "code.gitea.io/gitea/modules/highlight"
code_indexer "code.gitea.io/gitea/modules/indexer/code"
"code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/modules/util"
) )
@ -60,7 +59,7 @@ func writeStrings(buf *bytes.Buffer, strs ...string) error {
return nil return nil
} }
func searchResult(result *code_indexer.RepoSearchResult, startIndex, endIndex int) (*Result, error) { func searchResult(result *SearchResult, startIndex, endIndex int) (*Result, error) {
startLineNum := 1 + strings.Count(result.Content[:startIndex], "\n") startLineNum := 1 + strings.Count(result.Content[:startIndex], "\n")
var formattedLinesBuffer bytes.Buffer var formattedLinesBuffer bytes.Buffer
@ -113,7 +112,7 @@ func PerformSearch(repoIDs []int64, keyword string, page, pageSize int) (int, []
return 0, nil, nil return 0, nil, nil
} }
total, results, err := code_indexer.SearchRepoByKeyword(repoIDs, keyword, page, pageSize) total, results, err := indexer.Search(repoIDs, keyword, page, pageSize)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }

View File

@ -6,7 +6,7 @@ package issues
import "code.gitea.io/gitea/models" import "code.gitea.io/gitea/models"
// DBIndexer implements Indexer inteface to use database's like search // DBIndexer implements Indexer interface to use database's like search
type DBIndexer struct { type DBIndexer struct {
} }

View File

@ -38,7 +38,7 @@ type SearchResult struct {
Hits []Match Hits []Match
} }
// Indexer defines an inteface to indexer issues contents // Indexer defines an interface to indexer issues contents
type Indexer interface { type Indexer interface {
Init() (bool, error) Init() (bool, error)
Index(issue []*IndexerData) error Index(issue []*IndexerData) error

View File

@ -45,6 +45,8 @@ var (
IssueQueueDir: "indexers/issues.queue", IssueQueueDir: "indexers/issues.queue",
IssueQueueConnStr: "", IssueQueueConnStr: "",
IssueQueueBatchNumber: 20, IssueQueueBatchNumber: 20,
MaxIndexerFileSize: 1024 * 1024,
} }
) )

View File

@ -12,8 +12,8 @@ import (
"code.gitea.io/gitea/models" "code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/base"
"code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/context"
code_indexer "code.gitea.io/gitea/modules/indexer/code"
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/search"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/modules/util"
"code.gitea.io/gitea/routers/user" "code.gitea.io/gitea/routers/user"
@ -312,7 +312,7 @@ func ExploreCode(ctx *context.Context) {
var ( var (
total int total int
searchResults []*search.Result searchResults []*code_indexer.Result
) )
// if non-admin login user, we need check UnitTypeCode at first // if non-admin login user, we need check UnitTypeCode at first
@ -334,14 +334,14 @@ func ExploreCode(ctx *context.Context) {
ctx.Data["RepoMaps"] = rightRepoMap ctx.Data["RepoMaps"] = rightRepoMap
total, searchResults, err = search.PerformSearch(repoIDs, keyword, page, setting.UI.RepoSearchPagingNum) total, searchResults, err = code_indexer.PerformSearch(repoIDs, keyword, page, setting.UI.RepoSearchPagingNum)
if err != nil { if err != nil {
ctx.ServerError("SearchResults", err) ctx.ServerError("SearchResults", err)
return return
} }
// if non-login user or isAdmin, no need to check UnitTypeCode // if non-login user or isAdmin, no need to check UnitTypeCode
} else if (ctx.User == nil && len(repoIDs) > 0) || isAdmin { } else if (ctx.User == nil && len(repoIDs) > 0) || isAdmin {
total, searchResults, err = search.PerformSearch(repoIDs, keyword, page, setting.UI.RepoSearchPagingNum) total, searchResults, err = code_indexer.PerformSearch(repoIDs, keyword, page, setting.UI.RepoSearchPagingNum)
if err != nil { if err != nil {
ctx.ServerError("SearchResults", err) ctx.ServerError("SearchResults", err)
return return

View File

@ -110,7 +110,7 @@ func GlobalInit(ctx context.Context) {
// Booting long running goroutines. // Booting long running goroutines.
cron.NewContext() cron.NewContext()
issue_indexer.InitIssueIndexer(false) issue_indexer.InitIssueIndexer(false)
code_indexer.InitRepoIndexer() code_indexer.Init()
mirror_service.InitSyncMirrors() mirror_service.InitSyncMirrors()
webhook.InitDeliverHooks() webhook.InitDeliverHooks()
pull_service.Init() pull_service.Init()

View File

@ -10,7 +10,7 @@ import (
"code.gitea.io/gitea/modules/base" "code.gitea.io/gitea/modules/base"
"code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/context"
"code.gitea.io/gitea/modules/search" code_indexer "code.gitea.io/gitea/modules/indexer/code"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
) )
@ -27,7 +27,7 @@ func Search(ctx *context.Context) {
if page <= 0 { if page <= 0 {
page = 1 page = 1
} }
total, searchResults, err := search.PerformSearch([]int64{ctx.Repo.Repository.ID}, total, searchResults, err := code_indexer.PerformSearch([]int64{ctx.Repo.Repository.ID},
keyword, page, setting.UI.RepoSearchPagingNum) keyword, page, setting.UI.RepoSearchPagingNum)
if err != nil { if err != nil {
ctx.ServerError("SearchResults", err) ctx.ServerError("SearchResults", err)