mirror of
				https://github.com/go-gitea/gitea.git
				synced 2025-10-31 19:06:18 +01:00 
			
		
		
		
	Add queue for code indexer (#10332)
* Add queue for code indexer * Fix lint * Fix test * Fix lint * Fix bug * Fix bug * Fix lint * Add noqueue * Fix tests * Rename noqueue to immediate
This commit is contained in:
		| @@ -14,6 +14,9 @@ ISSUE_INDEXER_PATH = integrations/indexers-mssql/issues.bleve | ||||
| REPO_INDEXER_ENABLED = true | ||||
| REPO_INDEXER_PATH = integrations/indexers-mssql/repos.bleve | ||||
|  | ||||
| [queue.code_indexer] | ||||
| TYPE = immediate | ||||
|  | ||||
| [repository] | ||||
| ROOT = {{REPO_TEST_DIR}}integrations/gitea-integration-mssql/gitea-repositories | ||||
|  | ||||
|   | ||||
| @@ -16,6 +16,9 @@ ISSUE_INDEXER_PATH = integrations/indexers-mysql/issues.bleve | ||||
| REPO_INDEXER_ENABLED = true | ||||
| REPO_INDEXER_PATH = integrations/indexers-mysql/repos.bleve | ||||
|  | ||||
| [queue.code_indexer] | ||||
| TYPE = immediate | ||||
|  | ||||
| [repository] | ||||
| ROOT = {{REPO_TEST_DIR}}integrations/gitea-integration-mysql/gitea-repositories | ||||
|  | ||||
|   | ||||
| @@ -14,6 +14,9 @@ ISSUE_INDEXER_PATH = integrations/indexers-mysql8/issues.bleve | ||||
| REPO_INDEXER_ENABLED = true | ||||
| REPO_INDEXER_PATH = integrations/indexers-mysql8/repos.bleve | ||||
|  | ||||
| [queue.code_indexer] | ||||
| TYPE = immediate | ||||
|  | ||||
| [repository] | ||||
| ROOT = {{REPO_TEST_DIR}}integrations/gitea-integration-mysql8/gitea-repositories | ||||
|  | ||||
|   | ||||
| @@ -15,6 +15,9 @@ ISSUE_INDEXER_PATH = integrations/indexers-pgsql/issues.bleve | ||||
| REPO_INDEXER_ENABLED = true | ||||
| REPO_INDEXER_PATH = integrations/indexers-pgsql/repos.bleve | ||||
|  | ||||
| [queue.code_indexer] | ||||
| TYPE = immediate | ||||
|  | ||||
| [repository] | ||||
| ROOT = {{REPO_TEST_DIR}}integrations/gitea-integration-pgsql/gitea-repositories | ||||
|  | ||||
|   | ||||
| @@ -7,7 +7,6 @@ package integrations | ||||
| import ( | ||||
| 	"net/http" | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	"code.gitea.io/gitea/models" | ||||
| 	code_indexer "code.gitea.io/gitea/modules/indexer/code" | ||||
| @@ -62,14 +61,6 @@ func testSearch(t *testing.T, url string, expected []string) { | ||||
| 	assert.EqualValues(t, expected, filenames) | ||||
| } | ||||
|  | ||||
| func executeIndexer(t *testing.T, repo *models.Repository, op func(*models.Repository, ...chan<- error)) { | ||||
| 	waiter := make(chan error, 1) | ||||
| 	op(repo, waiter) | ||||
|  | ||||
| 	select { | ||||
| 	case err := <-waiter: | ||||
| 		assert.NoError(t, err) | ||||
| 	case <-time.After(1 * time.Minute): | ||||
| 		assert.Fail(t, "Repository indexer took too long") | ||||
| 	} | ||||
| func executeIndexer(t *testing.T, repo *models.Repository, op func(*models.Repository)) { | ||||
| 	op(repo) | ||||
| } | ||||
|   | ||||
| @@ -10,6 +10,9 @@ ISSUE_INDEXER_PATH   = integrations/indexers-sqlite/issues.bleve | ||||
| REPO_INDEXER_ENABLED = true | ||||
| REPO_INDEXER_PATH    = integrations/indexers-sqlite/repos.bleve | ||||
|  | ||||
| [queue.code_indexer] | ||||
| TYPE = immediate | ||||
|  | ||||
| [repository] | ||||
| ROOT = {{REPO_TEST_DIR}}integrations/gitea-integration-sqlite/gitea-repositories | ||||
|  | ||||
|   | ||||
| @@ -168,6 +168,11 @@ func (b *ElasticSearchIndexer) init() (bool, error) { | ||||
| } | ||||
|  | ||||
| func (b *ElasticSearchIndexer) addUpdate(sha string, update fileUpdate, repo *models.Repository) ([]elastic.BulkableRequest, error) { | ||||
| 	// Ignore vendored files in code search | ||||
| 	if setting.Indexer.ExcludeVendored && enry.IsVendor(update.Filename) { | ||||
| 		return nil, nil | ||||
| 	} | ||||
|  | ||||
| 	stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha). | ||||
| 		RunInDir(repo.RepoPath()) | ||||
| 	if err != nil { | ||||
|   | ||||
| @@ -14,6 +14,7 @@ import ( | ||||
| 	"code.gitea.io/gitea/models" | ||||
| 	"code.gitea.io/gitea/modules/graceful" | ||||
| 	"code.gitea.io/gitea/modules/log" | ||||
| 	"code.gitea.io/gitea/modules/queue" | ||||
| 	"code.gitea.io/gitea/modules/setting" | ||||
| 	"code.gitea.io/gitea/modules/timeutil" | ||||
| ) | ||||
| @@ -38,7 +39,7 @@ type SearchResultLanguages struct { | ||||
| 	Count    int | ||||
| } | ||||
|  | ||||
| // Indexer defines an interface to indexer issues contents | ||||
| // Indexer defines an interface to index and search code contents | ||||
| type Indexer interface { | ||||
| 	Index(repo *models.Repository, sha string, changes *repoChanges) error | ||||
| 	Delete(repoID int64) error | ||||
| @@ -67,6 +68,40 @@ func filenameOfIndexerID(indexerID string) string { | ||||
| 	return indexerID[index+1:] | ||||
| } | ||||
|  | ||||
| // IndexerData represents data stored in the code indexer | ||||
| type IndexerData struct { | ||||
| 	RepoID   int64 | ||||
| 	IsDelete bool | ||||
| } | ||||
|  | ||||
| var ( | ||||
| 	indexerQueue queue.Queue | ||||
| ) | ||||
|  | ||||
| func index(indexer Indexer, repoID int64) error { | ||||
| 	repo, err := models.GetRepositoryByID(repoID) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	sha, err := getDefaultBranchSha(repo) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	changes, err := getRepoChanges(repo, sha) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} else if changes == nil { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	if err := indexer.Index(repo, sha, changes); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	return repo.UpdateIndexerStatus(models.RepoIndexerTypeCode, sha) | ||||
| } | ||||
|  | ||||
| // Init initialize the repo indexer | ||||
| func Init() { | ||||
| 	if !setting.Indexer.RepoIndexerEnabled { | ||||
| @@ -74,8 +109,6 @@ func Init() { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	initQueue(setting.Indexer.UpdateQueueLength) | ||||
|  | ||||
| 	ctx, cancel := context.WithCancel(context.Background()) | ||||
|  | ||||
| 	graceful.GetManager().RunAtTerminate(ctx, func() { | ||||
| @@ -85,6 +118,46 @@ func Init() { | ||||
| 	}) | ||||
|  | ||||
| 	waitChannel := make(chan time.Duration) | ||||
|  | ||||
| 	// Create the Queue | ||||
| 	switch setting.Indexer.RepoType { | ||||
| 	case "bleve", "elasticsearch": | ||||
| 		handler := func(data ...queue.Data) { | ||||
| 			idx, err := indexer.get() | ||||
| 			if idx == nil || err != nil { | ||||
| 				log.Error("Codes indexer handler: unable to get indexer!") | ||||
| 				return | ||||
| 			} | ||||
|  | ||||
| 			for _, datum := range data { | ||||
| 				indexerData, ok := datum.(*IndexerData) | ||||
| 				if !ok { | ||||
| 					log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum) | ||||
| 					continue | ||||
| 				} | ||||
| 				log.Trace("IndexerData Process: %v %t", indexerData.RepoID, indexerData.IsDelete) | ||||
|  | ||||
| 				if indexerData.IsDelete { | ||||
| 					if err := indexer.Delete(indexerData.RepoID); err != nil { | ||||
| 						log.Error("indexer.Delete: %v", err) | ||||
| 					} | ||||
| 				} else { | ||||
| 					if err := index(indexer, indexerData.RepoID); err != nil { | ||||
| 						log.Error("index: %v", err) | ||||
| 						continue | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		indexerQueue = queue.CreateQueue("code_indexer", handler, &IndexerData{}) | ||||
| 		if indexerQueue == nil { | ||||
| 			log.Fatal("Unable to create codes indexer queue") | ||||
| 		} | ||||
| 	default: | ||||
| 		log.Fatal("Unknown codes indexer type; %s", setting.Indexer.RepoType) | ||||
| 	} | ||||
|  | ||||
| 	go func() { | ||||
| 		start := time.Now() | ||||
| 		var ( | ||||
| @@ -139,10 +212,11 @@ func Init() { | ||||
|  | ||||
| 		indexer.set(rIndexer) | ||||
|  | ||||
| 		go processRepoIndexerOperationQueue(indexer) | ||||
| 		// Start processing the queue | ||||
| 		go graceful.GetManager().RunWithShutdownFns(indexerQueue.Run) | ||||
|  | ||||
| 		if populate { | ||||
| 			go populateRepoIndexer() | ||||
| 			go graceful.GetManager().RunWithShutdownContext(populateRepoIndexer) | ||||
| 		} | ||||
| 		select { | ||||
| 		case waitChannel <- time.Since(start): | ||||
| @@ -179,3 +253,77 @@ func Init() { | ||||
| 		}() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // DeleteRepoFromIndexer remove all of a repository's entries from the indexer | ||||
| func DeleteRepoFromIndexer(repo *models.Repository) { | ||||
| 	indexData := &IndexerData{RepoID: repo.ID, IsDelete: true} | ||||
| 	if err := indexerQueue.Push(indexData); err != nil { | ||||
| 		log.Error("Delete repo index data %v failed: %v", indexData, err) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // UpdateRepoIndexer update a repository's entries in the indexer | ||||
| func UpdateRepoIndexer(repo *models.Repository) { | ||||
| 	indexData := &IndexerData{RepoID: repo.ID} | ||||
| 	if err := indexerQueue.Push(indexData); err != nil { | ||||
| 		log.Error("Update repo index data %v failed: %v", indexData, err) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // populateRepoIndexer populate the repo indexer with pre-existing data. This | ||||
| // should only be run when the indexer is created for the first time. | ||||
| func populateRepoIndexer(ctx context.Context) { | ||||
| 	log.Info("Populating the repo indexer with existing repositories") | ||||
|  | ||||
| 	exist, err := models.IsTableNotEmpty("repository") | ||||
| 	if err != nil { | ||||
| 		log.Fatal("System error: %v", err) | ||||
| 	} else if !exist { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// if there is any existing repo indexer metadata in the DB, delete it | ||||
| 	// since we are starting afresh. Also, xorm requires deletes to have a | ||||
| 	// condition, and we want to delete everything, thus 1=1. | ||||
| 	if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { | ||||
| 		log.Fatal("System error: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	var maxRepoID int64 | ||||
| 	if maxRepoID, err = models.GetMaxID("repository"); err != nil { | ||||
| 		log.Fatal("System error: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	// start with the maximum existing repo ID and work backwards, so that we | ||||
| 	// don't include repos that are created after gitea starts; such repos will | ||||
| 	// already be added to the indexer, and we don't need to add them again. | ||||
| 	for maxRepoID > 0 { | ||||
| 		select { | ||||
| 		case <-ctx.Done(): | ||||
| 			log.Info("Repository Indexer population shutdown before completion") | ||||
| 			return | ||||
| 		default: | ||||
| 		} | ||||
| 		ids, err := models.GetUnindexedRepos(models.RepoIndexerTypeCode, maxRepoID, 0, 50) | ||||
| 		if err != nil { | ||||
| 			log.Error("populateRepoIndexer: %v", err) | ||||
| 			return | ||||
| 		} else if len(ids) == 0 { | ||||
| 			break | ||||
| 		} | ||||
| 		for _, id := range ids { | ||||
| 			select { | ||||
| 			case <-ctx.Done(): | ||||
| 				log.Info("Repository Indexer population shutdown before completion") | ||||
| 				return | ||||
| 			default: | ||||
| 			} | ||||
| 			if err := indexerQueue.Push(&IndexerData{RepoID: id}); err != nil { | ||||
| 				log.Error("indexerQueue.Push: %v", err) | ||||
| 				return | ||||
| 			} | ||||
| 			maxRepoID = id - 1 | ||||
| 		} | ||||
| 	} | ||||
| 	log.Info("Done (re)populating the repo indexer with existing repositories") | ||||
| } | ||||
|   | ||||
| @@ -1,154 +0,0 @@ | ||||
| // Copyright 2019 The Gitea Authors. All rights reserved. | ||||
| // Use of this source code is governed by a MIT-style | ||||
| // license that can be found in the LICENSE file. | ||||
|  | ||||
| package code | ||||
|  | ||||
| import ( | ||||
| 	"os" | ||||
|  | ||||
| 	"code.gitea.io/gitea/models" | ||||
| 	"code.gitea.io/gitea/modules/graceful" | ||||
| 	"code.gitea.io/gitea/modules/log" | ||||
| ) | ||||
|  | ||||
| type repoIndexerOperation struct { | ||||
| 	repoID   int64 | ||||
| 	deleted  bool | ||||
| 	watchers []chan<- error | ||||
| } | ||||
|  | ||||
| var repoIndexerOperationQueue chan repoIndexerOperation | ||||
|  | ||||
| func initQueue(queueLength int) { | ||||
| 	repoIndexerOperationQueue = make(chan repoIndexerOperation, queueLength) | ||||
| } | ||||
|  | ||||
| func index(indexer Indexer, repoID int64) error { | ||||
| 	repo, err := models.GetRepositoryByID(repoID) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	sha, err := getDefaultBranchSha(repo) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	changes, err := getRepoChanges(repo, sha) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} else if changes == nil { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	if err := indexer.Index(repo, sha, changes); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	return repo.UpdateIndexerStatus(models.RepoIndexerTypeCode, sha) | ||||
| } | ||||
|  | ||||
| func processRepoIndexerOperationQueue(indexer Indexer) { | ||||
| 	for { | ||||
| 		select { | ||||
| 		case op := <-repoIndexerOperationQueue: | ||||
| 			var err error | ||||
| 			if op.deleted { | ||||
| 				if err = indexer.Delete(op.repoID); err != nil { | ||||
| 					log.Error("indexer.Delete: %v", err) | ||||
| 				} | ||||
| 			} else { | ||||
| 				if err = index(indexer, op.repoID); err != nil { | ||||
| 					log.Error("indexer.Index: %v", err) | ||||
| 				} | ||||
| 			} | ||||
| 			for _, watcher := range op.watchers { | ||||
| 				watcher <- err | ||||
| 			} | ||||
| 		case <-graceful.GetManager().IsShutdown(): | ||||
| 			log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid()) | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // DeleteRepoFromIndexer remove all of a repository's entries from the indexer | ||||
| func DeleteRepoFromIndexer(repo *models.Repository, watchers ...chan<- error) { | ||||
| 	addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: true, watchers: watchers}) | ||||
| } | ||||
|  | ||||
| // UpdateRepoIndexer update a repository's entries in the indexer | ||||
| func UpdateRepoIndexer(repo *models.Repository, watchers ...chan<- error) { | ||||
| 	addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: false, watchers: watchers}) | ||||
| } | ||||
|  | ||||
| func addOperationToQueue(op repoIndexerOperation) { | ||||
| 	select { | ||||
| 	case repoIndexerOperationQueue <- op: | ||||
| 		break | ||||
| 	default: | ||||
| 		go func() { | ||||
| 			repoIndexerOperationQueue <- op | ||||
| 		}() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // populateRepoIndexer populate the repo indexer with pre-existing data. This | ||||
| // should only be run when the indexer is created for the first time. | ||||
| func populateRepoIndexer() { | ||||
| 	log.Info("Populating the repo indexer with existing repositories") | ||||
|  | ||||
| 	isShutdown := graceful.GetManager().IsShutdown() | ||||
|  | ||||
| 	exist, err := models.IsTableNotEmpty("repository") | ||||
| 	if err != nil { | ||||
| 		log.Fatal("System error: %v", err) | ||||
| 	} else if !exist { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// if there is any existing repo indexer metadata in the DB, delete it | ||||
| 	// since we are starting afresh. Also, xorm requires deletes to have a | ||||
| 	// condition, and we want to delete everything, thus 1=1. | ||||
| 	if err := models.DeleteAllRecords("repo_indexer_status"); err != nil { | ||||
| 		log.Fatal("System error: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	var maxRepoID int64 | ||||
| 	if maxRepoID, err = models.GetMaxID("repository"); err != nil { | ||||
| 		log.Fatal("System error: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	// start with the maximum existing repo ID and work backwards, so that we | ||||
| 	// don't include repos that are created after gitea starts; such repos will | ||||
| 	// already be added to the indexer, and we don't need to add them again. | ||||
| 	for maxRepoID > 0 { | ||||
| 		select { | ||||
| 		case <-isShutdown: | ||||
| 			log.Info("Repository Indexer population shutdown before completion") | ||||
| 			return | ||||
| 		default: | ||||
| 		} | ||||
| 		ids, err := models.GetUnindexedRepos(models.RepoIndexerTypeCode, maxRepoID, 0, 50) | ||||
| 		if err != nil { | ||||
| 			log.Error("populateRepoIndexer: %v", err) | ||||
| 			return | ||||
| 		} else if len(ids) == 0 { | ||||
| 			break | ||||
| 		} | ||||
| 		for _, id := range ids { | ||||
| 			select { | ||||
| 			case <-isShutdown: | ||||
| 				log.Info("Repository Indexer population shutdown before completion") | ||||
| 				return | ||||
| 			default: | ||||
| 			} | ||||
| 			repoIndexerOperationQueue <- repoIndexerOperation{ | ||||
| 				repoID:  id, | ||||
| 				deleted: false, | ||||
| 			} | ||||
| 			maxRepoID = id - 1 | ||||
| 		} | ||||
| 	} | ||||
| 	log.Info("Done (re)populating the repo indexer with existing repositories") | ||||
| } | ||||
| @@ -106,7 +106,64 @@ func (*DummyQueue) IsEmpty() bool { | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| var queuesMap = map[Type]NewQueueFunc{DummyQueueType: NewDummyQueue} | ||||
| // ImmediateType is the type to execute the function when push | ||||
| const ImmediateType Type = "immediate" | ||||
|  | ||||
| // NewImmediate creates a new false queue to execute the function when push | ||||
| func NewImmediate(handler HandlerFunc, opts, exemplar interface{}) (Queue, error) { | ||||
| 	return &Immediate{ | ||||
| 		handler: handler, | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| // Immediate represents an direct execution queue | ||||
| type Immediate struct { | ||||
| 	handler HandlerFunc | ||||
| } | ||||
|  | ||||
| // Run does nothing | ||||
| func (*Immediate) Run(_, _ func(context.Context, func())) {} | ||||
|  | ||||
| // Push fakes a push of data to the queue | ||||
| func (q *Immediate) Push(data Data) error { | ||||
| 	return q.PushFunc(data, nil) | ||||
| } | ||||
|  | ||||
| // PushFunc fakes a push of data to the queue with a function. The function is never run. | ||||
| func (q *Immediate) PushFunc(data Data, f func() error) error { | ||||
| 	if f != nil { | ||||
| 		if err := f(); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 	q.handler(data) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Has always returns false as this queue never does anything | ||||
| func (*Immediate) Has(Data) (bool, error) { | ||||
| 	return false, nil | ||||
| } | ||||
|  | ||||
| // Flush always returns nil | ||||
| func (*Immediate) Flush(time.Duration) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // FlushWithContext always returns nil | ||||
| func (*Immediate) FlushWithContext(context.Context) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // IsEmpty asserts that the queue is empty | ||||
| func (*Immediate) IsEmpty() bool { | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| var queuesMap = map[Type]NewQueueFunc{ | ||||
| 	DummyQueueType: NewDummyQueue, | ||||
| 	ImmediateType:  NewImmediate, | ||||
| } | ||||
|  | ||||
| // RegisteredTypes provides the list of requested types of queues | ||||
| func RegisteredTypes() []Type { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user