Merge pull request '[Port] Make LFS http_client parallel within a batch' (#6170) from fnetx/lfs-concurrency into forgejo

Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/6170
Reviewed-by: Gusted <gusted@noreply.codeberg.org>
This commit is contained in:
Earl Warren 2024-12-06 05:40:37 +00:00
commit 902102ef24
7 changed files with 186 additions and 153 deletions

View file

@ -2668,9 +2668,15 @@ LEVEL = Info
;; override the minio base path if storage type is minio ;; override the minio base path if storage type is minio
;MINIO_BASE_PATH = lfs/ ;MINIO_BASE_PATH = lfs/
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; settings for Gitea's LFS client (eg: mirroring an upstream lfs endpoint)
;;
;[lfs_client] ;[lfs_client]
;; When mirroring an upstream lfs endpoint, limit the number of pointers in each batch request to this number ;; Limit the number of pointers in each batch request to this number
;BATCH_SIZE = 20 ;BATCH_SIZE = 20
;; Limit the number of concurrent upload/download operations within a batch
;BATCH_OPERATION_CONCURRENCY = 8
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

2
go.mod
View file

@ -108,6 +108,7 @@ require (
golang.org/x/image v0.23.0 golang.org/x/image v0.23.0
golang.org/x/net v0.31.0 golang.org/x/net v0.31.0
golang.org/x/oauth2 v0.23.0 golang.org/x/oauth2 v0.23.0
golang.org/x/sync v0.10.0
golang.org/x/sys v0.28.0 golang.org/x/sys v0.28.0
golang.org/x/text v0.21.0 golang.org/x/text v0.21.0
golang.org/x/tools v0.26.0 golang.org/x/tools v0.26.0
@ -284,7 +285,6 @@ require (
go.uber.org/zap v1.27.0 // indirect go.uber.org/zap v1.27.0 // indirect
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect
golang.org/x/mod v0.21.0 // indirect golang.org/x/mod v0.21.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/time v0.5.0 // indirect golang.org/x/time v0.5.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect

View file

@ -17,6 +17,8 @@ import (
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/proxy" "code.gitea.io/gitea/modules/proxy"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
"golang.org/x/sync/errgroup"
) )
// HTTPClient is used to communicate with the LFS server // HTTPClient is used to communicate with the LFS server
@ -113,6 +115,7 @@ func (c *HTTPClient) Upload(ctx context.Context, objects []Pointer, callback Upl
return c.performOperation(ctx, objects, nil, callback) return c.performOperation(ctx, objects, nil, callback)
} }
// performOperation takes a slice of LFS object pointers, batches them, and performs the upload/download operations concurrently in each batch
func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc DownloadCallback, uc UploadCallback) error { func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc DownloadCallback, uc UploadCallback) error {
if len(objects) == 0 { if len(objects) == 0 {
return nil return nil
@ -133,71 +136,90 @@ func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc
return fmt.Errorf("TransferAdapter not found: %s", result.Transfer) return fmt.Errorf("TransferAdapter not found: %s", result.Transfer)
} }
if setting.LFSClient.BatchOperationConcurrency <= 0 {
panic("BatchOperationConcurrency must be greater than 0, forgot to init?")
}
errGroup, groupCtx := errgroup.WithContext(ctx)
errGroup.SetLimit(setting.LFSClient.BatchOperationConcurrency)
for _, object := range result.Objects { for _, object := range result.Objects {
if object.Error != nil { errGroup.Go(func() error {
log.Trace("Error on object %v: %v", object.Pointer, object.Error) return performSingleOperation(groupCtx, object, dc, uc, transferAdapter)
if uc != nil { })
if _, err := uc(object.Pointer, object.Error); err != nil {
return err
}
} else {
if err := dc(object.Pointer, nil, object.Error); err != nil {
return err
}
}
continue
}
if uc != nil {
if len(object.Actions) == 0 {
log.Trace("%v already present on server", object.Pointer)
continue
}
link, ok := object.Actions["upload"]
if !ok {
log.Debug("%+v", object)
return errors.New("missing action 'upload'")
}
content, err := uc(object.Pointer, nil)
if err != nil {
return err
}
err = transferAdapter.Upload(ctx, link, object.Pointer, content)
if err != nil {
return err
}
link, ok = object.Actions["verify"]
if ok {
if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil {
return err
}
}
} else {
link, ok := object.Actions["download"]
if !ok {
// no actions block in response, try legacy response schema
link, ok = object.Links["download"]
}
if !ok {
log.Debug("%+v", object)
return errors.New("missing action 'download'")
}
content, err := transferAdapter.Download(ctx, link)
if err != nil {
return err
}
if err := dc(object.Pointer, content, nil); err != nil {
return err
}
}
} }
// only the first error is returned, preserving legacy behavior before concurrency
return errGroup.Wait()
}
// performSingleOperation performs an LFS upload or download operation on a single object
func performSingleOperation(ctx context.Context, object *ObjectResponse, dc DownloadCallback, uc UploadCallback, transferAdapter TransferAdapter) error {
// the response from a lfs batch api request for this specific object id contained an error
if object.Error != nil {
log.Trace("Error on object %v: %v", object.Pointer, object.Error)
// this was an 'upload' request inside the batch request
if uc != nil {
if _, err := uc(object.Pointer, object.Error); err != nil {
return err
}
} else {
// this was NOT an 'upload' request inside the batch request, meaning it must be a 'download' request
if err := dc(object.Pointer, nil, object.Error); err != nil {
return err
}
}
// if the callback returns no err, then the error could be ignored, and the operations should continue
return nil
}
// the response from an lfs batch api request contained necessary upload/download fields to act upon
if uc != nil {
if len(object.Actions) == 0 {
log.Trace("%v already present on server", object.Pointer)
return nil
}
link, ok := object.Actions["upload"]
if !ok {
return errors.New("missing action 'upload'")
}
content, err := uc(object.Pointer, nil)
if err != nil {
return err
}
err = transferAdapter.Upload(ctx, link, object.Pointer, content)
if err != nil {
return err
}
link, ok = object.Actions["verify"]
if ok {
if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil {
return err
}
}
} else {
link, ok := object.Actions["download"]
if !ok {
// no actions block in response, try legacy response schema
link, ok = object.Links["download"]
}
if !ok {
log.Debug("%+v", object)
return errors.New("missing action 'download'")
}
content, err := transferAdapter.Download(ctx, link)
if err != nil {
return err
}
if err := dc(object.Pointer, content, nil); err != nil {
return err
}
}
return nil return nil
} }

View file

@ -12,6 +12,8 @@ import (
"testing" "testing"
"code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/test"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -184,93 +186,84 @@ func TestHTTPClientDownload(t *testing.T) {
cases := []struct { cases := []struct {
endpoint string endpoint string
expectederror string expectedError string
}{ }{
// case 0
{ {
endpoint: "https://status-not-ok.io", endpoint: "https://status-not-ok.io",
expectederror: io.ErrUnexpectedEOF.Error(), expectedError: io.ErrUnexpectedEOF.Error(),
}, },
// case 1
{ {
endpoint: "https://invalid-json-response.io", endpoint: "https://invalid-json-response.io",
expectederror: "invalid json", expectedError: "invalid json",
}, },
// case 2
{ {
endpoint: "https://valid-batch-request-download.io", endpoint: "https://valid-batch-request-download.io",
expectederror: "", expectedError: "",
}, },
// case 3
{ {
endpoint: "https://response-no-objects.io", endpoint: "https://response-no-objects.io",
expectederror: "", expectedError: "",
}, },
// case 4
{ {
endpoint: "https://unknown-transfer-adapter.io", endpoint: "https://unknown-transfer-adapter.io",
expectederror: "TransferAdapter not found: ", expectedError: "TransferAdapter not found: ",
}, },
// case 5
{ {
endpoint: "https://error-in-response-objects.io", endpoint: "https://error-in-response-objects.io",
expectederror: "Object not found", expectedError: "Object not found",
}, },
// case 6
{ {
endpoint: "https://empty-actions-map.io", endpoint: "https://empty-actions-map.io",
expectederror: "missing action 'download'", expectedError: "missing action 'download'",
}, },
// case 7
{ {
endpoint: "https://download-actions-map.io", endpoint: "https://download-actions-map.io",
expectederror: "", expectedError: "",
}, },
// case 8
{ {
endpoint: "https://upload-actions-map.io", endpoint: "https://upload-actions-map.io",
expectederror: "missing action 'download'", expectedError: "missing action 'download'",
}, },
// case 9
{ {
endpoint: "https://verify-actions-map.io", endpoint: "https://verify-actions-map.io",
expectederror: "missing action 'download'", expectedError: "missing action 'download'",
}, },
// case 10
{ {
endpoint: "https://unknown-actions-map.io", endpoint: "https://unknown-actions-map.io",
expectederror: "missing action 'download'", expectedError: "missing action 'download'",
}, },
// case 11
{ {
endpoint: "https://legacy-batch-request-download.io", endpoint: "https://legacy-batch-request-download.io",
expectederror: "", expectedError: "",
}, },
} }
for n, c := range cases { defer test.MockVariableValue(&setting.LFSClient.BatchOperationConcurrency, 8)()
client := &HTTPClient{ for _, c := range cases {
client: hc, t.Run(c.endpoint, func(t *testing.T) {
endpoint: c.endpoint, client := &HTTPClient{
transfers: map[string]TransferAdapter{ client: hc,
"dummy": dummy, endpoint: c.endpoint,
}, transfers: map[string]TransferAdapter{
} "dummy": dummy,
},
err := client.Download(context.Background(), []Pointer{p}, func(p Pointer, content io.ReadCloser, objectError error) error { }
if objectError != nil {
return objectError err := client.Download(context.Background(), []Pointer{p}, func(p Pointer, content io.ReadCloser, objectError error) error {
if objectError != nil {
return objectError
}
b, err := io.ReadAll(content)
require.NoError(t, err)
assert.Equal(t, []byte("dummy"), b)
return nil
})
if c.expectedError != "" {
assert.ErrorContains(t, err, c.expectedError)
} else {
require.NoError(t, err)
} }
b, err := io.ReadAll(content)
require.NoError(t, err)
assert.Equal(t, []byte("dummy"), b)
return nil
}) })
if len(c.expectederror) > 0 {
assert.Contains(t, err.Error(), c.expectederror, "case %d: '%s' should contain '%s'", n, err.Error(), c.expectederror)
} else {
require.NoError(t, err, "case %d", n)
}
} }
} }
@ -297,81 +290,73 @@ func TestHTTPClientUpload(t *testing.T) {
cases := []struct { cases := []struct {
endpoint string endpoint string
expectederror string expectedError string
}{ }{
// case 0
{ {
endpoint: "https://status-not-ok.io", endpoint: "https://status-not-ok.io",
expectederror: io.ErrUnexpectedEOF.Error(), expectedError: io.ErrUnexpectedEOF.Error(),
}, },
// case 1
{ {
endpoint: "https://invalid-json-response.io", endpoint: "https://invalid-json-response.io",
expectederror: "invalid json", expectedError: "invalid json",
}, },
// case 2
{ {
endpoint: "https://valid-batch-request-upload.io", endpoint: "https://valid-batch-request-upload.io",
expectederror: "", expectedError: "",
}, },
// case 3
{ {
endpoint: "https://response-no-objects.io", endpoint: "https://response-no-objects.io",
expectederror: "", expectedError: "",
}, },
// case 4
{ {
endpoint: "https://unknown-transfer-adapter.io", endpoint: "https://unknown-transfer-adapter.io",
expectederror: "TransferAdapter not found: ", expectedError: "TransferAdapter not found: ",
}, },
// case 5
{ {
endpoint: "https://error-in-response-objects.io", endpoint: "https://error-in-response-objects.io",
expectederror: "Object not found", expectedError: "Object not found",
}, },
// case 6
{ {
endpoint: "https://empty-actions-map.io", endpoint: "https://empty-actions-map.io",
expectederror: "", expectedError: "",
}, },
// case 7
{ {
endpoint: "https://download-actions-map.io", endpoint: "https://download-actions-map.io",
expectederror: "missing action 'upload'", expectedError: "missing action 'upload'",
}, },
// case 8
{ {
endpoint: "https://upload-actions-map.io", endpoint: "https://upload-actions-map.io",
expectederror: "", expectedError: "",
}, },
// case 9
{ {
endpoint: "https://verify-actions-map.io", endpoint: "https://verify-actions-map.io",
expectederror: "missing action 'upload'", expectedError: "missing action 'upload'",
}, },
// case 10
{ {
endpoint: "https://unknown-actions-map.io", endpoint: "https://unknown-actions-map.io",
expectederror: "missing action 'upload'", expectedError: "missing action 'upload'",
}, },
} }
for n, c := range cases { defer test.MockVariableValue(&setting.LFSClient.BatchOperationConcurrency, 8)()
client := &HTTPClient{ for _, c := range cases {
client: hc, t.Run(c.endpoint, func(t *testing.T) {
endpoint: c.endpoint, client := &HTTPClient{
transfers: map[string]TransferAdapter{ client: hc,
"dummy": dummy, endpoint: c.endpoint,
}, transfers: map[string]TransferAdapter{
} "dummy": dummy,
},
}
err := client.Upload(context.Background(), []Pointer{p}, func(p Pointer, objectError error) (io.ReadCloser, error) { err := client.Upload(context.Background(), []Pointer{p}, func(p Pointer, objectError error) (io.ReadCloser, error) {
return io.NopCloser(new(bytes.Buffer)), objectError return io.NopCloser(new(bytes.Buffer)), objectError
})
if c.expectedError != "" {
assert.ErrorContains(t, err, c.expectedError)
} else {
require.NoError(t, err)
}
}) })
if len(c.expectederror) > 0 {
assert.Contains(t, err.Error(), c.expectederror, "case %d: '%s' should contain '%s'", n, err.Error(), c.expectederror)
} else {
require.NoError(t, err, "case %d", n)
}
} }
} }

View file

@ -182,11 +182,12 @@ func StoreMissingLfsObjectsInRepository(ctx context.Context, repo *repo_model.Re
downloadObjects := func(pointers []lfs.Pointer) error { downloadObjects := func(pointers []lfs.Pointer) error {
err := lfsClient.Download(ctx, pointers, func(p lfs.Pointer, content io.ReadCloser, objectError error) error { err := lfsClient.Download(ctx, pointers, func(p lfs.Pointer, content io.ReadCloser, objectError error) error {
if errors.Is(objectError, lfs.ErrObjectNotExist) {
log.Warn("Ignoring missing upstream LFS object %-v: %v", p, objectError)
return nil
}
if objectError != nil { if objectError != nil {
if errors.Is(objectError, lfs.ErrObjectNotExist) {
log.Warn("Repo[%-v]: Ignore missing LFS object %-v: %v", repo, p, objectError)
return nil
}
return objectError return objectError
} }

View file

@ -27,7 +27,8 @@ var LFS = struct {
// LFSClient represents configuration for Gitea's LFS clients, for example: mirroring upstream Git LFS // LFSClient represents configuration for Gitea's LFS clients, for example: mirroring upstream Git LFS
var LFSClient = struct { var LFSClient = struct {
BatchSize int `ini:"BATCH_SIZE"` BatchSize int `ini:"BATCH_SIZE"`
BatchOperationConcurrency int `ini:"BATCH_OPERATION_CONCURRENCY"`
}{} }{}
func loadLFSFrom(rootCfg ConfigProvider) error { func loadLFSFrom(rootCfg ConfigProvider) error {
@ -65,6 +66,11 @@ func loadLFSFrom(rootCfg ConfigProvider) error {
LFSClient.BatchSize = 20 LFSClient.BatchSize = 20
} }
if LFSClient.BatchOperationConcurrency < 1 {
// match the default git-lfs's `lfs.concurrenttransfers` https://github.com/git-lfs/git-lfs/blob/main/docs/man/git-lfs-config.adoc#upload-and-download-transfer-settings
LFSClient.BatchOperationConcurrency = 8
}
LFS.HTTPAuthExpiry = sec.Key("LFS_HTTP_AUTH_EXPIRY").MustDuration(24 * time.Hour) LFS.HTTPAuthExpiry = sec.Key("LFS_HTTP_AUTH_EXPIRY").MustDuration(24 * time.Hour)
if !LFS.StartServer || !InstallLock { if !LFS.StartServer || !InstallLock {

View file

@ -115,4 +115,17 @@ BATCH_SIZE = 0
assert.NoError(t, loadLFSFrom(cfg)) assert.NoError(t, loadLFSFrom(cfg))
assert.EqualValues(t, 100, LFS.MaxBatchSize) assert.EqualValues(t, 100, LFS.MaxBatchSize)
assert.EqualValues(t, 20, LFSClient.BatchSize) assert.EqualValues(t, 20, LFSClient.BatchSize)
assert.EqualValues(t, 8, LFSClient.BatchOperationConcurrency)
iniStr = `
[lfs_client]
BATCH_SIZE = 50
BATCH_OPERATION_CONCURRENCY = 10
`
cfg, err = NewConfigProviderFromData(iniStr)
assert.NoError(t, err)
assert.NoError(t, loadLFSFrom(cfg))
assert.EqualValues(t, 50, LFSClient.BatchSize)
assert.EqualValues(t, 10, LFSClient.BatchOperationConcurrency)
} }