diff --git a/models/repo/mirror.go b/models/repo/mirror.go index e73226534ea..8494331ff71 100644 --- a/models/repo/mirror.go +++ b/models/repo/mirror.go @@ -120,11 +120,12 @@ func DeleteMirrorByRepoID(repoID int64) error { } // MirrorsIterate iterates all mirror repositories. -func MirrorsIterate(f func(idx int, bean interface{}) error) error { +func MirrorsIterate(limit int, f func(idx int, bean interface{}) error) error { return db.GetEngine(db.DefaultContext). Where("next_update_unix<=?", time.Now().Unix()). And("next_update_unix!=0"). OrderBy("updated_unix ASC"). + Limit(limit). Iterate(new(Mirror), f) } diff --git a/models/repo/pushmirror.go b/models/repo/pushmirror.go index bf39bb1ac07..b5c6411bd66 100644 --- a/models/repo/pushmirror.go +++ b/models/repo/pushmirror.go @@ -101,10 +101,11 @@ func GetPushMirrorsByRepoID(repoID int64) ([]*PushMirror, error) { } // PushMirrorsIterate iterates all push-mirror repositories. -func PushMirrorsIterate(f func(idx int, bean interface{}) error) error { +func PushMirrorsIterate(limit int, f func(idx int, bean interface{}) error) error { return db.GetEngine(db.DefaultContext). Where("last_update + (`interval` / ?) <= ?", time.Second, time.Now().Unix()). And("`interval` != 0"). OrderBy("last_update ASC"). + Limit(limit). Iterate(new(PushMirror), f) } diff --git a/models/repo/pushmirror_test.go b/models/repo/pushmirror_test.go index eff31fbac25..83cf86131f4 100644 --- a/models/repo/pushmirror_test.go +++ b/models/repo/pushmirror_test.go @@ -40,7 +40,7 @@ func TestPushMirrorsIterate(t *testing.T) { time.Sleep(1 * time.Millisecond) - PushMirrorsIterate(func(idx int, bean interface{}) error { + PushMirrorsIterate(1, func(idx int, bean interface{}) error { m, ok := bean.(*PushMirror) assert.True(t, ok) assert.Equal(t, "test-1", m.RemoteName) diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go index 5639a08f964..ed3a878d019 100644 --- a/services/mirror/mirror.go +++ b/services/mirror/mirror.go @@ -55,9 +55,7 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error { } log.Trace("Doing: Update") - requested := 0 - - handler := func(idx int, bean interface{}, limit int) error { + handler := func(idx int, bean interface{}) error { var item SyncRequest var repo *repo_model.Repository if m, ok := bean.(*repo_model.Mirror); ok { @@ -104,35 +102,35 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error { } return err } - - requested++ - if limit > 0 && requested > limit { - return errLimit - } return nil } pullMirrorsRequested := 0 if pullLimit != 0 { - requested = 0 - if err := repo_model.MirrorsIterate(func(idx int, bean interface{}) error { - return handler(idx, bean, pullLimit) + if err := repo_model.MirrorsIterate(pullLimit, func(idx int, bean interface{}) error { + if err := handler(idx, bean); err != nil { + return err + } + pullMirrorsRequested++ + return nil }); err != nil && err != errLimit { log.Error("MirrorsIterate: %v", err) return err } - pullMirrorsRequested, requested = requested, 0 } + pushMirrorsRequested := 0 if pushLimit != 0 { - requested = 0 - if err := repo_model.PushMirrorsIterate(func(idx int, bean interface{}) error { - return handler(idx, bean, pushLimit) + if err := repo_model.PushMirrorsIterate(pushLimit, func(idx int, bean interface{}) error { + if err := handler(idx, bean); err != nil { + return err + } + pushMirrorsRequested++ + return nil }); err != nil && err != errLimit { log.Error("PushMirrorsIterate: %v", err) return err } - pushMirrorsRequested, requested = requested, 0 } log.Trace("Finished: Update: %d pull mirrors and %d push mirrors queued", pullMirrorsRequested, pushMirrorsRequested) return nil