From a1ae83f36ece5e37c738bd796227cf5990ccc030 Mon Sep 17 00:00:00 2001
From: KN4CK3R <admin@oldschoolhack.me>
Date: Fri, 25 Nov 2022 06:47:46 +0100
Subject: [PATCH] Workaround for container registry push/pull errors (#21862)

This PR addresses #19586

I added a mutex to the upload version creation which will prevent the
push errors when two requests try to create these database entries. I'm
not sure if this should be the final solution for this problem.

I added a workaround to allow a reupload of missing blobs. Normally a
reupload is skipped because the database knows the blob is already
present. The workaround checks if the blob exists on the file system.
This should not be needed anymore with the above fix so I marked this
code to be removed with Gitea v1.20.

Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
---
 modules/packages/content_store.go             |  7 ++++
 routers/api/packages/container/blob.go        | 32 ++++++++++++++++++-
 routers/api/packages/container/container.go   | 28 ++++++++++++++--
 routers/api/packages/container/manifest.go    | 12 +++++++
 .../api_packages_container_test.go            | 28 ++++++++++++++++
 5 files changed, 103 insertions(+), 4 deletions(-)

diff --git a/modules/packages/content_store.go b/modules/packages/content_store.go
index be416ac269..13763db986 100644
--- a/modules/packages/content_store.go
+++ b/modules/packages/content_store.go
@@ -32,6 +32,13 @@ func (s *ContentStore) Get(key BlobHash256Key) (storage.Object, error) {
 	return s.store.Open(KeyToRelativePath(key))
 }
 
+// FIXME: Workaround to be removed in v1.20
+// https://github.com/go-gitea/gitea/issues/19586
+func (s *ContentStore) Has(key BlobHash256Key) error {
+	_, err := s.store.Stat(KeyToRelativePath(key))
+	return err
+}
+
 // Save stores a package blob
 func (s *ContentStore) Save(key BlobHash256Key, r io.Reader, size int64) error {
 	_, err := s.store.Save(KeyToRelativePath(key), r, size)
diff --git a/routers/api/packages/container/blob.go b/routers/api/packages/container/blob.go
index df6b7aed92..06b450de02 100644
--- a/routers/api/packages/container/blob.go
+++ b/routers/api/packages/container/blob.go
@@ -7,8 +7,11 @@ package container
 import (
 	"context"
 	"encoding/hex"
+	"errors"
 	"fmt"
+	"os"
 	"strings"
+	"sync"
 
 	"code.gitea.io/gitea/models/db"
 	packages_model "code.gitea.io/gitea/models/packages"
@@ -16,9 +19,12 @@ import (
 	"code.gitea.io/gitea/modules/log"
 	packages_module "code.gitea.io/gitea/modules/packages"
 	container_module "code.gitea.io/gitea/modules/packages/container"
+	"code.gitea.io/gitea/modules/util"
 	packages_service "code.gitea.io/gitea/services/packages"
 )
 
+var uploadVersionMutex sync.Mutex
+
 // saveAsPackageBlob creates a package blob from an upload
 // The uploaded blob gets stored in a special upload version to link them to the package/image
 func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_service.PackageInfo) (*packages_model.PackageBlob, error) {
@@ -28,6 +34,11 @@ func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_servic
 
 	contentStore := packages_module.NewContentStore()
 
+	var uploadVersion *packages_model.PackageVersion
+
+	// FIXME: Replace usage of mutex with database transaction
+	// https://github.com/go-gitea/gitea/pull/21862
+	uploadVersionMutex.Lock()
 	err := db.WithTx(db.DefaultContext, func(ctx context.Context) error {
 		created := true
 		p := &packages_model.Package{
@@ -68,11 +79,30 @@ func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_servic
 			}
 		}
 
+		uploadVersion = pv
+
+		return nil
+	})
+	uploadVersionMutex.Unlock()
+	if err != nil {
+		return nil, err
+	}
+
+	err = db.WithTx(db.DefaultContext, func(ctx context.Context) error {
 		pb, exists, err = packages_model.GetOrInsertBlob(ctx, pb)
 		if err != nil {
 			log.Error("Error inserting package blob: %v", err)
 			return err
 		}
+		// FIXME: Workaround to be removed in v1.20
+		// https://github.com/go-gitea/gitea/issues/19586
+		if exists {
+			err = contentStore.Has(packages_module.BlobHash256Key(pb.HashSHA256))
+			if err != nil && (errors.Is(err, util.ErrNotExist) || errors.Is(err, os.ErrNotExist)) {
+				log.Debug("Package registry inconsistent: blob %s does not exist on file system", pb.HashSHA256)
+				exists = false
+			}
+		}
 		if !exists {
 			if err := contentStore.Save(packages_module.BlobHash256Key(pb.HashSHA256), hsr, hsr.Size()); err != nil {
 				log.Error("Error saving package blob in content store: %v", err)
@@ -83,7 +113,7 @@ func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_servic
 		filename := strings.ToLower(fmt.Sprintf("sha256_%s", pb.HashSHA256))
 
 		pf := &packages_model.PackageFile{
-			VersionID:    pv.ID,
+			VersionID:    uploadVersion.ID,
 			BlobID:       pb.ID,
 			Name:         filename,
 			LowerName:    filename,
diff --git a/routers/api/packages/container/container.go b/routers/api/packages/container/container.go
index 5bc64e1b29..7af06a9171 100644
--- a/routers/api/packages/container/container.go
+++ b/routers/api/packages/container/container.go
@@ -10,6 +10,7 @@ import (
 	"io"
 	"net/http"
 	"net/url"
+	"os"
 	"regexp"
 	"strconv"
 	"strings"
@@ -24,6 +25,7 @@ import (
 	container_module "code.gitea.io/gitea/modules/packages/container"
 	"code.gitea.io/gitea/modules/packages/container/oci"
 	"code.gitea.io/gitea/modules/setting"
+	"code.gitea.io/gitea/modules/util"
 	"code.gitea.io/gitea/routers/api/packages/helper"
 	packages_service "code.gitea.io/gitea/services/packages"
 	container_service "code.gitea.io/gitea/services/packages/container"
@@ -193,7 +195,7 @@ func InitiateUploadBlob(ctx *context.Context) {
 	mount := ctx.FormTrim("mount")
 	from := ctx.FormTrim("from")
 	if mount != "" {
-		blob, _ := container_model.GetContainerBlob(ctx, &container_model.BlobSearchOptions{
+		blob, _ := workaroundGetContainerBlob(ctx, &container_model.BlobSearchOptions{
 			Image:  from,
 			Digest: mount,
 		})
@@ -406,7 +408,7 @@ func getBlobFromContext(ctx *context.Context) (*packages_model.PackageFileDescri
 		return nil, container_model.ErrContainerBlobNotExist
 	}
 
-	return container_model.GetContainerBlob(ctx, &container_model.BlobSearchOptions{
+	return workaroundGetContainerBlob(ctx, &container_model.BlobSearchOptions{
 		OwnerID: ctx.Package.Owner.ID,
 		Image:   ctx.Params("image"),
 		Digest:  digest,
@@ -548,7 +550,7 @@ func getManifestFromContext(ctx *context.Context) (*packages_model.PackageFileDe
 		return nil, container_model.ErrContainerBlobNotExist
 	}
 
-	return container_model.GetContainerBlob(ctx, opts)
+	return workaroundGetContainerBlob(ctx, opts)
 }
 
 // https://github.com/opencontainers/distribution-spec/blob/main/spec.md#checking-if-content-exists-in-the-registry
@@ -688,3 +690,23 @@ func GetTagList(ctx *context.Context) {
 		Tags: tags,
 	})
 }
+
+// FIXME: Workaround to be removed in v1.20
+// https://github.com/go-gitea/gitea/issues/19586
+func workaroundGetContainerBlob(ctx *context.Context, opts *container_model.BlobSearchOptions) (*packages_model.PackageFileDescriptor, error) {
+	blob, err := container_model.GetContainerBlob(ctx, opts)
+	if err != nil {
+		return nil, err
+	}
+
+	err = packages_module.NewContentStore().Has(packages_module.BlobHash256Key(blob.Blob.HashSHA256))
+	if err != nil {
+		if errors.Is(err, util.ErrNotExist) || errors.Is(err, os.ErrNotExist) {
+			log.Debug("Package registry inconsistent: blob %s does not exist on file system", blob.Blob.HashSHA256)
+			return nil, container_model.ErrContainerBlobNotExist
+		}
+		return nil, err
+	}
+
+	return blob, nil
+}
diff --git a/routers/api/packages/container/manifest.go b/routers/api/packages/container/manifest.go
index a48b1de3b2..64c958c2d0 100644
--- a/routers/api/packages/container/manifest.go
+++ b/routers/api/packages/container/manifest.go
@@ -6,8 +6,10 @@ package container
 
 import (
 	"context"
+	"errors"
 	"fmt"
 	"io"
+	"os"
 	"strings"
 
 	"code.gitea.io/gitea/models/db"
@@ -19,6 +21,7 @@ import (
 	packages_module "code.gitea.io/gitea/modules/packages"
 	container_module "code.gitea.io/gitea/modules/packages/container"
 	"code.gitea.io/gitea/modules/packages/container/oci"
+	"code.gitea.io/gitea/modules/util"
 	packages_service "code.gitea.io/gitea/services/packages"
 )
 
@@ -403,6 +406,15 @@ func createManifestBlob(ctx context.Context, mci *manifestCreationInfo, pv *pack
 		log.Error("Error inserting package blob: %v", err)
 		return nil, false, "", err
 	}
+	// FIXME: Workaround to be removed in v1.20
+	// https://github.com/go-gitea/gitea/issues/19586
+	if exists {
+		err = packages_module.NewContentStore().Has(packages_module.BlobHash256Key(pb.HashSHA256))
+		if err != nil && (errors.Is(err, util.ErrNotExist) || errors.Is(err, os.ErrNotExist)) {
+			log.Debug("Package registry inconsistent: blob %s does not exist on file system", pb.HashSHA256)
+			exists = false
+		}
+	}
 	if !exists {
 		contentStore := packages_module.NewContentStore()
 		if err := contentStore.Save(packages_module.BlobHash256Key(pb.HashSHA256), buf, buf.Size()); err != nil {
diff --git a/tests/integration/api_packages_container_test.go b/tests/integration/api_packages_container_test.go
index ba76ee4baa..60cbecd067 100644
--- a/tests/integration/api_packages_container_test.go
+++ b/tests/integration/api_packages_container_test.go
@@ -6,10 +6,12 @@ package integration
 
 import (
 	"bytes"
+	"crypto/sha256"
 	"encoding/base64"
 	"fmt"
 	"net/http"
 	"strings"
+	"sync"
 	"testing"
 
 	"code.gitea.io/gitea/models/db"
@@ -594,6 +596,32 @@ func TestPackageContainer(t *testing.T) {
 		})
 	}
 
+	// https://github.com/go-gitea/gitea/issues/19586
+	t.Run("ParallelUpload", func(t *testing.T) {
+		defer tests.PrintCurrentTest(t)()
+
+		url := fmt.Sprintf("%sv2/%s/parallel", setting.AppURL, user.Name)
+
+		var wg sync.WaitGroup
+		for i := 0; i < 10; i++ {
+			wg.Add(1)
+
+			content := []byte{byte(i)}
+			digest := fmt.Sprintf("sha256:%x", sha256.Sum256(content))
+
+			go func() {
+				defer wg.Done()
+
+				req := NewRequestWithBody(t, "POST", fmt.Sprintf("%s/blobs/uploads?digest=%s", url, digest), bytes.NewReader(content))
+				addTokenAuthHeader(req, userToken)
+				resp := MakeRequest(t, req, http.StatusCreated)
+
+				assert.Equal(t, digest, resp.Header().Get("Docker-Content-Digest"))
+			}()
+		}
+		wg.Wait()
+	})
+
 	t.Run("OwnerNameChange", func(t *testing.T) {
 		defer tests.PrintCurrentTest(t)()