Add sync repository function

This commit is contained in:
harryzcy 2022-07-15 20:35:39 -04:00
parent 9ac5518ee6
commit 2ae6e4ce98
No known key found for this signature in database
GPG Key ID: CC2953E050C19686
1 changed files with 287 additions and 0 deletions

View File

@ -12,6 +12,7 @@ import (
"net/url"
"path/filepath"
"strings"
"time"
"code.gitea.io/gitea/models"
admin_model "code.gitea.io/gitea/models/admin"
@ -462,6 +463,292 @@ func migrateRepository(downloader base.Downloader, uploader base.Uploader, opts
return uploader.Finish()
}
// SyncRepository syncs a repository according MigrateOptions
func SyncRepository(ctx context.Context, doer *user_model.User, ownerName string, opts base.MigrateOptions, messenger base.Messenger, lastSynced time.Time) (*repo_model.Repository, error) {
downloader, err := newDownloader(ctx, ownerName, opts)
if err != nil {
return nil, err
}
uploader := NewGiteaLocalUploader(ctx, doer, ownerName, opts.RepoName)
uploader.gitServiceType = opts.GitServiceType
if err := syncRepository(downloader, uploader, opts, messenger, lastSynced); err != nil {
if err1 := uploader.Rollback(); err1 != nil {
log.Error("rollback failed: %v", err1)
}
if err2 := admin_model.CreateRepositoryNotice(fmt.Sprintf("Syncing repository from %s failed: %v", opts.OriginalURL, err)); err2 != nil {
log.Error("create repository notice failed: ", err2)
}
return nil, err
}
return uploader.repo, nil
}
// syncRepository will download new information and then upload it to Uploader.
func syncRepository(downloader base.Downloader, uploader base.Uploader, opts base.MigrateOptions, messenger base.Messenger, lastSynced time.Time) error {
if messenger == nil {
messenger = base.NilMessenger
}
log.Trace("syncing topics")
messenger("repo.migrate.syncing_topics")
topics, err := downloader.GetTopics()
if err != nil {
if !base.IsErrNotSupported(err) {
return err
}
log.Warn("syncing topics is not supported, ignored")
}
if len(topics) != 0 {
if err = uploader.UpdateTopics(topics...); err != nil {
return err
}
}
if opts.Milestones {
log.Trace("syncing milestones")
messenger("repo.migrate.syncing_milestones")
milestones, err := downloader.GetMilestones()
if err != nil {
if !base.IsErrNotSupported(err) {
return err
}
log.Warn("syncing milestones is not supported, ignored")
}
msBatchSize := uploader.MaxBatchInsertSize("milestone")
for len(milestones) > 0 {
if len(milestones) < msBatchSize {
msBatchSize = len(milestones)
}
if err := uploader.UpdateMilestones(milestones...); err != nil {
return err
}
milestones = milestones[msBatchSize:]
}
}
if opts.Labels {
log.Trace("syncing labels")
messenger("repo.migrate.syncing_labels")
labels, err := downloader.GetLabels()
if err != nil {
if !base.IsErrNotSupported(err) {
return err
}
log.Warn("syncing labels is not supported, ignored")
}
lbBatchSize := uploader.MaxBatchInsertSize("label")
for len(labels) > 0 {
if len(labels) < lbBatchSize {
lbBatchSize = len(labels)
}
if err := uploader.UpdateLabels(labels...); err != nil {
return err
}
labels = labels[lbBatchSize:]
}
}
if opts.Releases {
log.Trace("syncing releases")
messenger("repo.migrate.syncing_releases")
releases, err := downloader.GetReleases()
if err != nil {
if !base.IsErrNotSupported(err) {
return err
}
log.Warn("syncing releases is not supported, ignored")
}
relBatchSize := uploader.MaxBatchInsertSize("release")
for len(releases) > 0 {
if len(releases) < relBatchSize {
relBatchSize = len(releases)
}
if err = uploader.PatchReleases(releases[:relBatchSize]...); err != nil {
return err
}
releases = releases[relBatchSize:]
}
// Once all releases (if any) are inserted, sync any remaining non-release tags
if err = uploader.SyncTags(); err != nil {
return err
}
}
var (
commentBatchSize = uploader.MaxBatchInsertSize("comment")
reviewBatchSize = uploader.MaxBatchInsertSize("review")
)
supportAllComments := downloader.SupportGetRepoComments()
if opts.Issues {
log.Trace("syncing issues and comments")
messenger("repo.migrate.syncing_issues")
issueBatchSize := uploader.MaxBatchInsertSize("issue")
for i := 1; ; i++ {
issues, isEnd, err := downloader.GetNewIssues(i, issueBatchSize, lastSynced)
if err != nil {
if !base.IsErrNotSupported(err) {
return err
}
log.Warn("syncing issues is not supported, ignored")
break
}
if err := uploader.PatchIssues(issues...); err != nil {
return err
}
if opts.Comments && !supportAllComments {
allComments := make([]*base.Comment, 0, commentBatchSize)
for _, issue := range issues {
log.Trace("syncing issue %d's comments", issue.Number)
comments, _, err := downloader.GetComments(issue)
if err != nil {
if !base.IsErrNotSupported(err) {
return err
}
log.Warn("syncing comments is not supported, ignored")
}
allComments = append(allComments, comments...)
if len(allComments) >= commentBatchSize {
if err = uploader.PatchComments(allComments[:commentBatchSize]...); err != nil {
return err
}
allComments = allComments[commentBatchSize:]
}
}
if len(allComments) > 0 {
if err = uploader.PatchComments(allComments...); err != nil {
return err
}
}
}
if isEnd {
break
}
}
}
if opts.PullRequests {
log.Trace("syncing pull requests and comments")
messenger("repo.migrate.syncing_pulls")
prBatchSize := uploader.MaxBatchInsertSize("pullrequest")
for i := 1; ; i++ {
prs, isEnd, err := downloader.GetNewPullRequests(i, prBatchSize, lastSynced)
if err != nil {
if !base.IsErrNotSupported(err) {
return err
}
log.Warn("syncing pull requests is not supported, ignored")
break
}
if err := uploader.PatchPullRequests(prs...); err != nil {
return err
}
if opts.Comments {
if !supportAllComments {
// plain comments
allComments := make([]*base.Comment, 0, commentBatchSize)
for _, pr := range prs {
log.Trace("syncing pull request %d's comments", pr.Number)
comments, _, err := downloader.GetComments(pr)
if err != nil {
if !base.IsErrNotSupported(err) {
return err
}
log.Warn("syncing comments is not supported, ignored")
}
allComments = append(allComments, comments...)
if len(allComments) >= commentBatchSize {
if err = uploader.PatchComments(allComments[:commentBatchSize]...); err != nil {
return err
}
allComments = allComments[commentBatchSize:]
}
}
if len(allComments) > 0 {
if err = uploader.PatchComments(allComments...); err != nil {
return err
}
}
}
// migrate reviews
allReviews := make([]*base.Review, 0, reviewBatchSize)
for _, pr := range prs {
reviews, err := downloader.GetReviews(pr)
if err != nil {
if !base.IsErrNotSupported(err) {
return err
}
log.Warn("syncing reviews is not supported, ignored")
break
}
allReviews = append(allReviews, reviews...)
if len(allReviews) >= reviewBatchSize {
if err = uploader.PatchReviews(allReviews[:reviewBatchSize]...); err != nil {
return err
}
allReviews = allReviews[reviewBatchSize:]
}
}
if len(allReviews) > 0 {
if err = uploader.PatchReviews(allReviews...); err != nil {
return err
}
}
}
if isEnd {
break
}
}
}
if opts.Comments && supportAllComments {
log.Trace("syncing comments")
for i := 1; ; i++ {
comments, isEnd, err := downloader.GetAllComments(i, commentBatchSize)
if err != nil {
return err
}
if err := uploader.PatchComments(comments...); err != nil {
return err
}
if isEnd {
break
}
}
}
return uploader.Finish()
}
// Init migrations service
func Init() error {
// TODO: maybe we can deprecate these legacy ALLOWED_DOMAINS/ALLOW_LOCALNETWORKS/BLOCKED_DOMAINS, use ALLOWED_HOST_LIST/BLOCKED_HOST_LIST instead