Commit 3ad81e62 authored by Alessio Caiazza's avatar Alessio Caiazza Committed by Jacob Vosmaer (GitLab)

Objectstorage ETag checking

parent a8ade9b0
......@@ -83,7 +83,7 @@ func (fh *FileHandler) GitLabFinalizeFields(prefix string) map[string]string {
// SaveFileFromReader persists the provided reader content to all the location specified in opts. A cleanup will be performed once ctx is Done
// Make sure the provided context will not expire before finalizing upload with GitLab Rails.
func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts *SaveFileOpts) (fh *FileHandler, err error) {
var remoteWriter io.WriteCloser
var remoteWriter objectstore.Upload
fh = &FileHandler{
Name: opts.TempFilePrefix,
RemoteID: opts.RemoteID,
......@@ -149,6 +149,9 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
}
return nil, err
}
etag := remoteWriter.ETag()
fh.hashes["etag"] = etag
}
return fh, err
......
......@@ -80,6 +80,52 @@ func TestSaveFromDiskNotExistingFile(t *testing.T) {
assert.Nil(fh, "On error FileHandler should be nil")
}
func TestSaveFileWrongETag(t *testing.T) {
tests := []struct {
name string
multipart bool
}{
{name: "single part"},
{name: "multi part", multipart: true},
}
for _, spec := range tests {
t.Run(spec.name, func(t *testing.T) {
assert := assert.New(t)
osStub, ts := test.StartObjectStoreWithCustomMD5(map[string]string{test.ObjectPath: "brokenMD5"})
defer ts.Close()
objectURL := ts.URL + test.ObjectPath
opts := &filestore.SaveFileOpts{
RemoteID: "test-file",
RemoteURL: objectURL,
PresignedPut: objectURL + "?Signature=ASignature",
PresignedDelete: objectURL + "?Signature=AnotherSignature",
Deadline: testDeadline(),
}
if spec.multipart {
opts.PresignedParts = []string{objectURL + "?partNumber=1"}
opts.PresignedCompleteMultipart = objectURL + "?Signature=CompleteSig"
opts.PresignedAbortMultipart = objectURL + "?Signature=AbortSig"
opts.PartSize = test.ObjectSize
osStub.InitiateMultipartUpload(test.ObjectPath)
}
ctx, cancel := context.WithCancel(context.Background())
fh, err := filestore.SaveFileFromReader(ctx, strings.NewReader(test.ObjectContent), test.ObjectSize, opts)
assert.Nil(fh)
assert.Error(err)
assert.Equal(1, osStub.PutsCnt(), "File not uploaded")
cancel() // this will trigger an async cleanup
assertObjectStoreDeletedAsync(t, 1, osStub)
assert.False(spec.multipart && osStub.IsMultipartUpload(test.ObjectPath), "there must be no multipart upload in progress now")
})
}
}
func TestSaveFileFromDiskToLocalPath(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
......@@ -111,7 +157,8 @@ func TestSaveFileFromDiskToLocalPath(t *testing.T) {
func TestSaveFile(t *testing.T) {
type remote int
const (
remoteSingle remote = iota
notRemote remote = iota
remoteSingle
remoteMultipart
)
......@@ -221,6 +268,11 @@ func TestSaveFile(t *testing.T) {
assert.Equal(test.ObjectSHA1, fields["file.sha1"])
assert.Equal(test.ObjectSHA256, fields["file.sha256"])
assert.Equal(test.ObjectSHA512, fields["file.sha512"])
if spec.remote == notRemote {
assert.NotContains(fields, "file.etag")
} else {
assert.Contains(fields, "file.etag")
}
})
}
}
......
......@@ -151,6 +151,27 @@ func (m *Multipart) complete(cmu *CompleteMultipartUpload) error {
return result
}
if result.CompleteMultipartUploadResult == nil {
return fmt.Errorf("Cannot read CompleteMultipartUpload answer")
}
m.extractETag(result.ETag)
if err := m.verifyETag(cmu); err != nil {
return fmt.Errorf("ETag verification failure: %v", err)
}
return nil
}
func (m *Multipart) verifyETag(cmu *CompleteMultipartUpload) error {
expectedChecksum, err := cmu.BuildMultipartUploadETag()
if err != nil {
return err
}
if expectedChecksum != m.etag {
return fmt.Errorf("got %q expected %q", m.etag, expectedChecksum)
}
return nil
}
......@@ -205,7 +226,7 @@ func (m *Multipart) uploadPart(url string, body io.Reader, size int64) (string,
return "", err
}
return part.MD5(), nil
return part.ETag(), nil
}
func (m *Multipart) delete() {
......
......@@ -42,8 +42,6 @@ type Object struct {
PutURL string
// DeleteURL is a presigned URL for RemoveObject
DeleteURL string
// md5 is the checksum provided by the Object Store
md5 string
uploader
}
......@@ -71,7 +69,7 @@ func newObject(ctx context.Context, putURL, deleteURL string, deadline time.Time
o := &Object{
PutURL: putURL,
DeleteURL: deleteURL,
uploader: newUploader(uploadCtx, pw),
uploader: newMD5Uploader(uploadCtx, pw),
}
if metrics {
......@@ -120,28 +118,16 @@ func newObject(ctx context.Context, putURL, deleteURL string, deadline time.Time
return
}
o.extractMD5(resp.Header)
o.extractETag(resp.Header.Get("ETag"))
if o.etag != o.md5Sum() {
o.uploadError = fmt.Errorf("ETag mismatch. expected %q got %q", o.md5Sum(), o.etag)
return
}
}()
return o, nil
}
// MD5 returns the md5sum of the uploaded returned by the Object Store provider via ETag Header.
// This method will wait until upload context is done before returning.
func (o *Object) MD5() string {
<-o.ctx.Done()
return o.md5
}
func (o *Object) extractMD5(h http.Header) {
etag := h.Get("ETag")
if etag != "" && etag[0] == '"' {
etag = etag[1 : len(etag)-1]
}
o.md5 = etag
}
func (o *Object) delete() {
o.syncAndDelete(o.DeleteURL)
}
......@@ -47,7 +47,7 @@ func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool) {
assert.NoError(err)
// Checking MD5 extraction
assert.Equal(osStub.GetObjectMD5(test.ObjectPath), object.MD5())
assert.Equal(osStub.GetObjectMD5(test.ObjectPath), object.ETag())
// Checking cleanup
cancel()
......
package objectstore
import (
"crypto/md5"
"encoding/hex"
"encoding/xml"
"fmt"
)
......@@ -49,3 +51,25 @@ type compoundCompleteMultipartUploadResult struct {
func (c *compoundCompleteMultipartUploadResult) isError() bool {
return c.CompleteMultipartUploadError != nil
}
// BuildMultipartUploadETag creates an S3 compatible ETag for MultipartUpload
// Given the MD5 hash for each uploaded part of the file, concatenate
// the hashes into a single binary string and calculate the MD5 hash of that result,
// the append "-len(etags)"
// http://permalink.gmane.org/gmane.comp.file-systems.s3.s3tools/583
func (cmu *CompleteMultipartUpload) BuildMultipartUploadETag() (string, error) {
hasher := md5.New()
for _, part := range cmu.Part {
checksum, err := hex.DecodeString(part.ETag)
if err != nil {
return "", err
}
_, err = hasher.Write(checksum)
if err != nil {
return "", err
}
}
multipartChecksum := hasher.Sum(nil)
return fmt.Sprintf("%s-%d", hex.EncodeToString(multipartChecksum), len(cmu.Part)), nil
}
package objectstore
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestMultipartUploadETag(t *testing.T) {
cmu := CompleteMultipartUpload{
Part: []*completeMultipartUploadPart{
{PartNumber: 1, ETag: "550cf6b6e60f65a0e3104a26e70fea42"},
{PartNumber: 2, ETag: "920b914bca0a70780b40881b8f376135"},
{PartNumber: 3, ETag: "175719e13d23c021058bc7376696f26f"},
},
}
expectedETag := "1dc6ab8f946f699770f14f46a8739671-3"
etag, err := cmu.BuildMultipartUploadETag()
require.NoError(t, err)
require.Equal(t, expectedETag, etag)
}
......@@ -204,7 +204,11 @@ func (o *ObjectstoreStub) completeMultipartUpload(w http.ResponseWriter, r *http
etag, overwritten := o.overwriteMD5[objectPath]
if !overwritten {
etag = "not an md5 hash"
etag, err = msg.BuildMultipartUploadETag()
if err != nil {
http.Error(w, "cannot compute ETag", 400)
return
}
}
o.bucket[objectPath] = etag
......
......@@ -106,6 +106,7 @@ func TestObjectStoreCompleteMultipartUpload(t *testing.T) {
contentMD5: "920b914bca0a70780b40881b8f376135",
},
}
expectedETag := "2f2f82eceacf5bd0ac5d7c3d3d388849-2"
stub.InitiateMultipartUpload(ObjectPath)
......@@ -148,7 +149,7 @@ func TestObjectStoreCompleteMultipartUpload(t *testing.T) {
assert.Equal(len(parts), stub.PutsCnt())
assert.Equal(0, stub.DeletesCnt())
assert.NotEmpty(stub.GetObjectMD5(ObjectPath), "MultipartUpload not completed")
assert.Equal(expectedETag, stub.GetObjectMD5(ObjectPath))
assert.False(stub.IsMultipartUpload(ObjectPath), "MultipartUpload is still in progress")
}
......
......@@ -2,6 +2,9 @@ package objectstore
import (
"context"
"crypto/md5"
"encoding/hex"
"hash"
"io"
"net/http"
......@@ -10,10 +13,22 @@ import (
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
)
// Upload represents an upload to an ObjectStorage provider
type Upload interface {
io.WriteCloser
ETag() string
}
// uploader is an io.WriteCloser that can be used as write end of the uploading pipe.
type uploader struct {
// writeCloser is the writer bound to the request body
io.WriteCloser
// etag is the object storage provided checksum
etag string
// md5 is an optional hasher for calculating md5 on the fly
md5 hash.Hash
w io.Writer
c io.Closer
// uploadError is the last error occourred during upload
uploadError error
......@@ -22,13 +37,19 @@ type uploader struct {
}
func newUploader(ctx context.Context, w io.WriteCloser) uploader {
return uploader{WriteCloser: w, ctx: ctx}
return uploader{w: w, c: w, ctx: ctx}
}
func newMD5Uploader(ctx context.Context, w io.WriteCloser) uploader {
hasher := md5.New()
mw := io.MultiWriter(w, hasher)
return uploader{w: mw, c: w, md5: hasher, ctx: ctx}
}
// Close implements the standard io.Closer interface: it closes the http client request.
// This method will also wait for the connection to terminate and return any error occurred during the upload
func (u *uploader) Close() error {
if err := u.WriteCloser.Close(); err != nil {
if err := u.c.Close(); err != nil {
return err
}
......@@ -41,6 +62,10 @@ func (u *uploader) Close() error {
return u.uploadError
}
func (u *uploader) Write(p []byte) (int, error) {
return u.w.Write(p)
}
// syncAndDelete wait for Context to be Done and then performs the requested HTTP call
func (u *uploader) syncAndDelete(url string) {
if url == "" {
......@@ -63,3 +88,27 @@ func (u *uploader) syncAndDelete(url string) {
}
resp.Body.Close()
}
func (u *uploader) extractETag(rawETag string) {
if rawETag != "" && rawETag[0] == '"' {
rawETag = rawETag[1 : len(rawETag)-1]
}
u.etag = rawETag
}
func (u *uploader) md5Sum() string {
if u.md5 == nil {
return ""
}
checksum := u.md5.Sum(nil)
return hex.EncodeToString(checksum)
}
// ETag returns the checksum of the uploaded object returned by the ObjectStorage provider via ETag Header.
// This method will wait until upload context is done before returning.
func (u *uploader) ETag() string {
<-u.ctx.Done()
return u.etag
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment