Implement sync push mirror on commit (#19411)

Support synchronizing with the push mirrors whenever new commits are pushed or synced from pull mirror.

Related Issues: #18220

Co-authored-by: delvh <dev.lh@web.de>
Co-authored-by: zeripath <art27@cantab.net>
Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
This commit is contained in:
Chongyi Zheng 2022-07-08 15:45:12 -04:00 committed by GitHub
parent 496b8e3990
commit 49f9d43afe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 208 additions and 98 deletions

View File

@ -396,6 +396,8 @@ var migrations = []Migration{
NewMigration("Alter hook_task table TEXT fields to LONGTEXT", alterHookTaskTextFieldsToLongText), NewMigration("Alter hook_task table TEXT fields to LONGTEXT", alterHookTaskTextFieldsToLongText),
// v218 -> v219 // v218 -> v219
NewMigration("Improve Action table indices v2", improveActionTableIndices), NewMigration("Improve Action table indices v2", improveActionTableIndices),
// v219 -> v220
NewMigration("Add sync_on_commit column to push_mirror table", addSyncOnCommitColForPushMirror),
} }
// GetCurrentDBVersion returns the current db version // GetCurrentDBVersion returns the current db version

30
models/migrations/v219.go Normal file
View File

@ -0,0 +1,30 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package migrations
import (
"time"
"code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/timeutil"
"xorm.io/xorm"
)
func addSyncOnCommitColForPushMirror(x *xorm.Engine) error {
type PushMirror struct {
ID int64 `xorm:"pk autoincr"`
RepoID int64 `xorm:"INDEX"`
Repo *repo.Repository `xorm:"-"`
RemoteName string
SyncOnCommit bool `xorm:"NOT NULL DEFAULT true"`
Interval time.Duration
CreatedUnix timeutil.TimeStamp `xorm:"created"`
LastUpdateUnix timeutil.TimeStamp `xorm:"INDEX last_update"`
LastError string `xorm:"text"`
}
return x.Sync2(new(PushMirror))
}

View File

@ -23,6 +23,7 @@ type PushMirror struct {
Repo *Repository `xorm:"-"` Repo *Repository `xorm:"-"`
RemoteName string RemoteName string
SyncOnCommit bool `xorm:"NOT NULL DEFAULT true"`
Interval time.Duration Interval time.Duration
CreatedUnix timeutil.TimeStamp `xorm:"created"` CreatedUnix timeutil.TimeStamp `xorm:"created"`
LastUpdateUnix timeutil.TimeStamp `xorm:"INDEX last_update"` LastUpdateUnix timeutil.TimeStamp `xorm:"INDEX last_update"`
@ -93,6 +94,14 @@ func GetPushMirrorsByRepoID(repoID int64) ([]*PushMirror, error) {
return mirrors, db.GetEngine(db.DefaultContext).Where("repo_id=?", repoID).Find(&mirrors) return mirrors, db.GetEngine(db.DefaultContext).Where("repo_id=?", repoID).Find(&mirrors)
} }
// GetPushMirrorsSyncedOnCommit returns push-mirrors for this repo that should be updated by new commits
func GetPushMirrorsSyncedOnCommit(repoID int64) ([]*PushMirror, error) {
mirrors := make([]*PushMirror, 0, 10)
return mirrors, db.GetEngine(db.DefaultContext).
Where("repo_id=? AND sync_on_commit=?", repoID, true).
Find(&mirrors)
}
// PushMirrorsIterate iterates all push-mirror repositories. // PushMirrorsIterate iterates all push-mirror repositories.
func PushMirrorsIterate(limit int, f func(idx int, bean interface{}) error) error { func PushMirrorsIterate(limit int, f func(idx int, bean interface{}) error) error {
return db.GetEngine(db.DefaultContext). return db.GetEngine(db.DefaultContext).

69
modules/mirror/mirror.go Normal file
View File

@ -0,0 +1,69 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package mirror
import (
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
)
var mirrorQueue queue.UniqueQueue
// SyncType type of sync request
type SyncType int
const (
// PullMirrorType for pull mirrors
PullMirrorType SyncType = iota
// PushMirrorType for push mirrors
PushMirrorType
)
// SyncRequest for the mirror queue
type SyncRequest struct {
Type SyncType
ReferenceID int64 // RepoID for pull mirror, MirrorID for push mirror
}
// StartSyncMirrors starts a go routine to sync the mirrors
func StartSyncMirrors(queueHandle func(data ...queue.Data) []queue.Data) {
if !setting.Mirror.Enabled {
return
}
mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle, new(SyncRequest))
go graceful.GetManager().RunWithShutdownFns(mirrorQueue.Run)
}
// AddPullMirrorToQueue adds repoID to mirror queue
func AddPullMirrorToQueue(repoID int64) {
addMirrorToQueue(PullMirrorType, repoID)
}
// AddPushMirrorToQueue adds the push mirror to the queue
func AddPushMirrorToQueue(mirrorID int64) {
addMirrorToQueue(PushMirrorType, mirrorID)
}
func addMirrorToQueue(syncType SyncType, referenceID int64) {
if !setting.Mirror.Enabled {
return
}
go func() {
if err := PushToQueue(syncType, referenceID); err != nil {
log.Error("Unable to push sync request for to the queue for pull mirror repo[%d]. Error: %v", referenceID, err)
}
}()
}
// PushToQueue adds the sync request to the queue
func PushToQueue(mirrorType SyncType, referenceID int64) error {
return mirrorQueue.Push(&SyncRequest{
Type: mirrorType,
ReferenceID: referenceID,
})
}

View File

@ -0,0 +1,45 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package mirror
import (
repo_model "code.gitea.io/gitea/models/repo"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/log"
mirror_module "code.gitea.io/gitea/modules/mirror"
"code.gitea.io/gitea/modules/notification/base"
"code.gitea.io/gitea/modules/repository"
)
type mirrorNotifier struct {
base.NullNotifier
}
var _ base.Notifier = &mirrorNotifier{}
// NewNotifier create a new mirrorNotifier notifier
func NewNotifier() base.Notifier {
return &mirrorNotifier{}
}
func (m *mirrorNotifier) NotifyPushCommits(_ *user_model.User, repo *repo_model.Repository, _ *repository.PushUpdateOptions, _ *repository.PushCommits) {
syncPushMirrorWithSyncOnCommit(repo.ID)
}
func (m *mirrorNotifier) NotifySyncPushCommits(_ *user_model.User, repo *repo_model.Repository, _ *repository.PushUpdateOptions, _ *repository.PushCommits) {
syncPushMirrorWithSyncOnCommit(repo.ID)
}
func syncPushMirrorWithSyncOnCommit(repoID int64) {
pushMirrors, err := repo_model.GetPushMirrorsSyncedOnCommit(repoID)
if err != nil {
log.Error("repo_model.GetPushMirrorsSyncedOnCommit failed: %v", err)
return
}
for _, mirror := range pushMirrors {
mirror_module.AddPushMirrorToQueue(mirror.ID)
}
}

View File

@ -14,6 +14,7 @@ import (
"code.gitea.io/gitea/modules/notification/base" "code.gitea.io/gitea/modules/notification/base"
"code.gitea.io/gitea/modules/notification/indexer" "code.gitea.io/gitea/modules/notification/indexer"
"code.gitea.io/gitea/modules/notification/mail" "code.gitea.io/gitea/modules/notification/mail"
"code.gitea.io/gitea/modules/notification/mirror"
"code.gitea.io/gitea/modules/notification/ui" "code.gitea.io/gitea/modules/notification/ui"
"code.gitea.io/gitea/modules/notification/webhook" "code.gitea.io/gitea/modules/notification/webhook"
"code.gitea.io/gitea/modules/repository" "code.gitea.io/gitea/modules/repository"
@ -37,6 +38,7 @@ func NewContext() {
RegisterNotifier(indexer.NewNotifier()) RegisterNotifier(indexer.NewNotifier())
RegisterNotifier(webhook.NewNotifier()) RegisterNotifier(webhook.NewNotifier())
RegisterNotifier(action.NewNotifier()) RegisterNotifier(action.NewNotifier())
RegisterNotifier(mirror.NewNotifier())
} }
// NotifyCreateIssueComment notifies issue comment related message to notifiers // NotifyCreateIssueComment notifies issue comment related message to notifiers

View File

@ -861,8 +861,9 @@ default_branch = Default Branch
default_branch_helper = The default branch is the base branch for pull requests and code commits. default_branch_helper = The default branch is the base branch for pull requests and code commits.
mirror_prune = Prune mirror_prune = Prune
mirror_prune_desc = Remove obsolete remote-tracking references mirror_prune_desc = Remove obsolete remote-tracking references
mirror_interval = Mirror Interval (valid time units are 'h', 'm', 's'). 0 to disable automatic sync. (Minimum interval: %s) mirror_interval = Mirror Interval (valid time units are 'h', 'm', 's'). 0 to disable periodic sync. (Minimum interval: %s)
mirror_interval_invalid = The mirror interval is not valid. mirror_interval_invalid = The mirror interval is not valid.
mirror_sync_on_commit = Sync when commits are pushed
mirror_address = Clone From URL mirror_address = Clone From URL
mirror_address_desc = Put any required credentials in the Authorization section. mirror_address_desc = Put any required credentials in the Authorization section.
mirror_address_url_invalid = The provided url is invalid. You must escape all components of the url correctly. mirror_address_url_invalid = The provided url is invalid. You must escape all components of the url correctly.

View File

@ -11,8 +11,8 @@ import (
repo_model "code.gitea.io/gitea/models/repo" repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/models/unit" "code.gitea.io/gitea/models/unit"
"code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/context"
mirror_module "code.gitea.io/gitea/modules/mirror"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
mirror_service "code.gitea.io/gitea/services/mirror"
) )
// MirrorSync adds a mirrored repository to the sync queue // MirrorSync adds a mirrored repository to the sync queue
@ -59,7 +59,7 @@ func MirrorSync(ctx *context.APIContext) {
return return
} }
mirror_service.StartToMirror(repo.ID) mirror_module.AddPullMirrorToQueue(repo.ID)
ctx.Status(http.StatusOK) ctx.Status(http.StatusOK)
} }

View File

@ -29,6 +29,7 @@ import (
"code.gitea.io/gitea/modules/indexer/stats" "code.gitea.io/gitea/modules/indexer/stats"
"code.gitea.io/gitea/modules/lfs" "code.gitea.io/gitea/modules/lfs"
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
mirror_module "code.gitea.io/gitea/modules/mirror"
"code.gitea.io/gitea/modules/repository" "code.gitea.io/gitea/modules/repository"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/structs" "code.gitea.io/gitea/modules/structs"
@ -272,7 +273,7 @@ func SettingsPost(ctx *context.Context) {
return return
} }
mirror_service.StartToMirror(repo.ID) mirror_module.AddPullMirrorToQueue(repo.ID)
ctx.Flash.Info(ctx.Tr("repo.settings.mirror_sync_in_progress")) ctx.Flash.Info(ctx.Tr("repo.settings.mirror_sync_in_progress"))
ctx.Redirect(repo.Link() + "/settings") ctx.Redirect(repo.Link() + "/settings")
@ -289,7 +290,7 @@ func SettingsPost(ctx *context.Context) {
return return
} }
mirror_service.AddPushMirrorToQueue(m.ID) mirror_module.AddPushMirrorToQueue(m.ID)
ctx.Flash.Info(ctx.Tr("repo.settings.mirror_sync_in_progress")) ctx.Flash.Info(ctx.Tr("repo.settings.mirror_sync_in_progress"))
ctx.Redirect(repo.Link() + "/settings") ctx.Redirect(repo.Link() + "/settings")
@ -357,10 +358,11 @@ func SettingsPost(ctx *context.Context) {
} }
m := &repo_model.PushMirror{ m := &repo_model.PushMirror{
RepoID: repo.ID, RepoID: repo.ID,
Repo: repo, Repo: repo,
RemoteName: fmt.Sprintf("remote_mirror_%s", remoteSuffix), RemoteName: fmt.Sprintf("remote_mirror_%s", remoteSuffix),
Interval: interval, SyncOnCommit: form.PushMirrorSyncOnCommit,
Interval: interval,
} }
if err := repo_model.InsertPushMirror(m); err != nil { if err := repo_model.InsertPushMirror(m); err != nil {
ctx.ServerError("InsertPushMirror", err) ctx.ServerError("InsertPushMirror", err)

View File

@ -115,23 +115,24 @@ func ParseRemoteAddr(remoteAddr, authUsername, authPassword string) (string, err
// RepoSettingForm form for changing repository settings // RepoSettingForm form for changing repository settings
type RepoSettingForm struct { type RepoSettingForm struct {
RepoName string `binding:"Required;AlphaDashDot;MaxSize(100)"` RepoName string `binding:"Required;AlphaDashDot;MaxSize(100)"`
Description string `binding:"MaxSize(255)"` Description string `binding:"MaxSize(255)"`
Website string `binding:"ValidUrl;MaxSize(255)"` Website string `binding:"ValidUrl;MaxSize(255)"`
Interval string Interval string
MirrorAddress string MirrorAddress string
MirrorUsername string MirrorUsername string
MirrorPassword string MirrorPassword string
LFS bool `form:"mirror_lfs"` LFS bool `form:"mirror_lfs"`
LFSEndpoint string `form:"mirror_lfs_endpoint"` LFSEndpoint string `form:"mirror_lfs_endpoint"`
PushMirrorID string PushMirrorID string
PushMirrorAddress string PushMirrorAddress string
PushMirrorUsername string PushMirrorUsername string
PushMirrorPassword string PushMirrorPassword string
PushMirrorInterval string PushMirrorSyncOnCommit bool
Private bool PushMirrorInterval string
Template bool Private bool
EnablePrune bool Template bool
EnablePrune bool
// Advanced settings // Advanced settings
EnableWiki bool EnableWiki bool

View File

@ -11,38 +11,21 @@ import (
repo_model "code.gitea.io/gitea/models/repo" repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
mirror_module "code.gitea.io/gitea/modules/mirror"
"code.gitea.io/gitea/modules/queue" "code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
) )
var mirrorQueue queue.UniqueQueue
// SyncType type of sync request
type SyncType int
const (
// PullMirrorType for pull mirrors
PullMirrorType SyncType = iota
// PushMirrorType for push mirrors
PushMirrorType
)
// SyncRequest for the mirror queue
type SyncRequest struct {
Type SyncType
ReferenceID int64 // RepoID for pull mirror, MirrorID fro push mirror
}
// doMirrorSync causes this request to mirror itself // doMirrorSync causes this request to mirror itself
func doMirrorSync(ctx context.Context, req *SyncRequest) { func doMirrorSync(ctx context.Context, req *mirror_module.SyncRequest) {
if req.ReferenceID == 0 { if req.ReferenceID == 0 {
log.Warn("Skipping mirror sync request, no mirror ID was specified") log.Warn("Skipping mirror sync request, no mirror ID was specified")
return return
} }
switch req.Type { switch req.Type {
case PushMirrorType: case mirror_module.PushMirrorType:
_ = SyncPushMirror(ctx, req.ReferenceID) _ = SyncPushMirror(ctx, req.ReferenceID)
case PullMirrorType: case mirror_module.PullMirrorType:
_ = SyncPullMirror(ctx, req.ReferenceID) _ = SyncPullMirror(ctx, req.ReferenceID)
default: default:
log.Error("Unknown Request type in queue: %v for MirrorID[%d]", req.Type, req.ReferenceID) log.Error("Unknown Request type in queue: %v for MirrorID[%d]", req.Type, req.ReferenceID)
@ -60,28 +43,26 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
log.Trace("Doing: Update") log.Trace("Doing: Update")
handler := func(idx int, bean interface{}) error { handler := func(idx int, bean interface{}) error {
var item SyncRequest
var repo *repo_model.Repository var repo *repo_model.Repository
var mirrorType mirror_module.SyncType
var referenceID int64
if m, ok := bean.(*repo_model.Mirror); ok { if m, ok := bean.(*repo_model.Mirror); ok {
if m.GetRepository() == nil { if m.GetRepository() == nil {
log.Error("Disconnected mirror found: %d", m.ID) log.Error("Disconnected mirror found: %d", m.ID)
return nil return nil
} }
repo = m.Repo repo = m.Repo
item = SyncRequest{ mirrorType = mirror_module.PullMirrorType
Type: PullMirrorType, referenceID = m.RepoID
ReferenceID: m.RepoID,
}
} else if m, ok := bean.(*repo_model.PushMirror); ok { } else if m, ok := bean.(*repo_model.PushMirror); ok {
if m.GetRepository() == nil { if m.GetRepository() == nil {
log.Error("Disconnected push-mirror found: %d", m.ID) log.Error("Disconnected push-mirror found: %d", m.ID)
return nil return nil
} }
repo = m.Repo repo = m.Repo
item = SyncRequest{ mirrorType = mirror_module.PushMirrorType
Type: PushMirrorType, referenceID = m.ID
ReferenceID: m.ID,
}
} else { } else {
log.Error("Unknown bean: %v", bean) log.Error("Unknown bean: %v", bean)
return nil return nil
@ -95,9 +76,9 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
} }
// Push to the Queue // Push to the Queue
if err := mirrorQueue.Push(&item); err != nil { if err := mirror_module.PushToQueue(mirrorType, referenceID); err != nil {
if err == queue.ErrAlreadyInQueue { if err == queue.ErrAlreadyInQueue {
if item.Type == PushMirrorType { if mirrorType == mirror_module.PushMirrorType {
log.Trace("PushMirrors for %-v already queued for sync", repo) log.Trace("PushMirrors for %-v already queued for sync", repo)
} else { } else {
log.Trace("PullMirrors for %-v already queued for sync", repo) log.Trace("PullMirrors for %-v already queued for sync", repo)
@ -142,7 +123,7 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
func queueHandle(data ...queue.Data) []queue.Data { func queueHandle(data ...queue.Data) []queue.Data {
for _, datum := range data { for _, datum := range data {
req := datum.(*SyncRequest) req := datum.(*mirror_module.SyncRequest)
doMirrorSync(graceful.GetManager().ShutdownContext(), req) doMirrorSync(graceful.GetManager().ShutdownContext(), req)
} }
return nil return nil
@ -150,43 +131,5 @@ func queueHandle(data ...queue.Data) []queue.Data {
// InitSyncMirrors initializes a go routine to sync the mirrors // InitSyncMirrors initializes a go routine to sync the mirrors
func InitSyncMirrors() { func InitSyncMirrors() {
if !setting.Mirror.Enabled { mirror_module.StartSyncMirrors(queueHandle)
return
}
mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle, new(SyncRequest))
go graceful.GetManager().RunWithShutdownFns(mirrorQueue.Run)
}
// StartToMirror adds repoID to mirror queue
func StartToMirror(repoID int64) {
if !setting.Mirror.Enabled {
return
}
go func() {
err := mirrorQueue.Push(&SyncRequest{
Type: PullMirrorType,
ReferenceID: repoID,
})
if err != nil {
log.Error("Unable to push sync request for to the queue for pull mirror repo[%d]: Error: %v", repoID, err)
return
}
}()
}
// AddPushMirrorToQueue adds the push mirror to the queue
func AddPushMirrorToQueue(mirrorID int64) {
if !setting.Mirror.Enabled {
return
}
go func() {
err := mirrorQueue.Push(&SyncRequest{
Type: PushMirrorType,
ReferenceID: mirrorID,
})
if err != nil {
log.Error("Unable to push sync request to the queue for pull mirror repo[%d]: Error: %v", mirrorID, err)
}
}()
} }

View File

@ -219,6 +219,12 @@
</div> </div>
</div> </div>
</details> </details>
<div class="field">
<div class="ui checkbox">
<input id="push_mirror_sync_on_commit" name="push_mirror_sync_on_commit" type="checkbox" value="{{.push_mirror_sync_on_commit}}">
<label for="push_mirror_sync_on_commit">{{.locale.Tr "repo.mirror_sync_on_commit"}}</label>
</div>
</div>
<div class="inline field {{if .Err_PushMirrorInterval}}error{{end}}"> <div class="inline field {{if .Err_PushMirrorInterval}}error{{end}}">
<label for="push_mirror_interval">{{.locale.Tr "repo.mirror_interval" .MinimumMirrorInterval}}</label> <label for="push_mirror_interval">{{.locale.Tr "repo.mirror_interval" .MinimumMirrorInterval}}</label>
<input id="push_mirror_interval" name="push_mirror_interval" value="{{if .push_mirror_interval}}{{.push_mirror_interval}}{{else}}{{.DefaultMirrorInterval}}{{end}}"> <input id="push_mirror_interval" name="push_mirror_interval" value="{{if .push_mirror_interval}}{{.push_mirror_interval}}{{else}}{{.DefaultMirrorInterval}}{{end}}">