Commit 7231f85e authored by Stan Hu's avatar Stan Hu

Support adding PUT headers for object storage from Rails

As revealed in https://gitlab.com/gitlab-org/gitlab-ce/issues/49957, Rails
generates a signed URL with a fixed HTTP header with `Content-Type:
application/octet-stream`. However, if we change or remove that for
some reason in Workhorse, this breaks the upload with a 403 Unauthorized because
the signed URL is not valid.

We can make this more robust by doing the following:

1. In the `/uploads/authorize` request, Rails can return a `StoreHeaders` key-value
pair in the JSON response containing the required headers that the PUT
request must include.
2. Use those HTTP headers if that value is present.
3. For backwards compatibility, if that key is not present, default to
the old behavior of sending the fixed `Content-Type` header.
parent 0e72f606
...@@ -85,6 +85,10 @@ type RemoteObject struct { ...@@ -85,6 +85,10 @@ type RemoteObject struct {
DeleteURL string DeleteURL string
// StoreURL is the temporary presigned S3 PutObject URL to which upload the first found file // StoreURL is the temporary presigned S3 PutObject URL to which upload the first found file
StoreURL string StoreURL string
// Boolean to indicate whether to use headers included in PutHeaders
CustomPutHeaders bool
// PutHeaders are HTTP headers (e.g. Content-Type) to be sent with StoreURL
PutHeaders map[string]string
// ID is a unique identifier of object storage upload // ID is a unique identifier of object storage upload
ID string ID string
// Timeout is a number that represents timeout in seconds for sending data to StoreURL // Timeout is a number that represents timeout in seconds for sending data to StoreURL
......
...@@ -100,14 +100,14 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts ...@@ -100,14 +100,14 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
}() }()
if opts.IsMultipart() { if opts.IsMultipart() {
remoteWriter, err = objectstore.NewMultipart(ctx, opts.PresignedParts, opts.PresignedCompleteMultipart, opts.PresignedAbortMultipart, opts.PresignedDelete, opts.Deadline, opts.PartSize) remoteWriter, err = objectstore.NewMultipart(ctx, opts.PresignedParts, opts.PresignedCompleteMultipart, opts.PresignedAbortMultipart, opts.PresignedDelete, opts.PutHeaders, opts.Deadline, opts.PartSize)
if err != nil { if err != nil {
return nil, err return nil, err
} }
writers = append(writers, remoteWriter) writers = append(writers, remoteWriter)
} else if opts.IsRemote() { } else if opts.IsRemote() {
remoteWriter, err = objectstore.NewObject(ctx, opts.PresignedPut, opts.PresignedDelete, opts.Deadline, size) remoteWriter, err = objectstore.NewObject(ctx, opts.PresignedPut, opts.PresignedDelete, opts.PutHeaders, opts.Deadline, size)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -23,6 +23,9 @@ type SaveFileOpts struct { ...@@ -23,6 +23,9 @@ type SaveFileOpts struct {
PresignedPut string PresignedPut string
// PresignedDelete is a presigned S3 DeleteObject compatible URL. // PresignedDelete is a presigned S3 DeleteObject compatible URL.
PresignedDelete string PresignedDelete string
// HTTP headers to be sent along with PUT request
PutHeaders map[string]string
// Deadline it the S3 operation deadline, the upload will be aborted if not completed in time // Deadline it the S3 operation deadline, the upload will be aborted if not completed in time
Deadline time.Time Deadline time.Time
...@@ -65,9 +68,17 @@ func GetOpts(apiResponse *api.Response) *SaveFileOpts { ...@@ -65,9 +68,17 @@ func GetOpts(apiResponse *api.Response) *SaveFileOpts {
RemoteURL: apiResponse.RemoteObject.GetURL, RemoteURL: apiResponse.RemoteObject.GetURL,
PresignedPut: apiResponse.RemoteObject.StoreURL, PresignedPut: apiResponse.RemoteObject.StoreURL,
PresignedDelete: apiResponse.RemoteObject.DeleteURL, PresignedDelete: apiResponse.RemoteObject.DeleteURL,
PutHeaders: apiResponse.RemoteObject.PutHeaders,
Deadline: time.Now().Add(timeout), Deadline: time.Now().Add(timeout),
} }
// Backwards compatibility to ensure API servers that do not include the
// CustomPutHeaders flag will default to the original content type.
if !apiResponse.RemoteObject.CustomPutHeaders {
opts.PutHeaders = make(map[string]string)
opts.PutHeaders["Content-Type"] = "application/octet-stream"
}
if multiParams := apiResponse.RemoteObject.MultipartUpload; multiParams != nil { if multiParams := apiResponse.RemoteObject.MultipartUpload; multiParams != nil {
opts.PartSize = multiParams.PartSize opts.PartSize = multiParams.PartSize
opts.PresignedCompleteMultipart = multiParams.CompleteURL opts.PresignedCompleteMultipart = multiParams.CompleteURL
......
...@@ -78,6 +78,8 @@ func TestGetOpts(t *testing.T) { ...@@ -78,6 +78,8 @@ func TestGetOpts(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
multipart *api.MultipartUploadParams multipart *api.MultipartUploadParams
customPutHeaders bool
putHeaders map[string]string
}{ }{
{ {
name: "Single upload", name: "Single upload",
...@@ -90,6 +92,21 @@ func TestGetOpts(t *testing.T) { ...@@ -90,6 +92,21 @@ func TestGetOpts(t *testing.T) {
PartURLs: []string{"http://part1", "http://part2"}, PartURLs: []string{"http://part1", "http://part2"},
}, },
}, },
{
name: "Single upload with custom content type",
customPutHeaders: true,
putHeaders: map[string]string{"Content-Type": "image/jpeg"},
}, {
name: "Multipart upload with custom content type",
multipart: &api.MultipartUploadParams{
PartSize: 10,
CompleteURL: "http://complete",
AbortURL: "http://abort",
PartURLs: []string{"http://part1", "http://part2"},
},
customPutHeaders: true,
putHeaders: map[string]string{"Content-Type": "image/jpeg"},
},
} }
for _, test := range tests { for _, test := range tests {
...@@ -105,6 +122,8 @@ func TestGetOpts(t *testing.T) { ...@@ -105,6 +122,8 @@ func TestGetOpts(t *testing.T) {
StoreURL: "http://store", StoreURL: "http://store",
DeleteURL: "http://delete", DeleteURL: "http://delete",
MultipartUpload: test.multipart, MultipartUpload: test.multipart,
CustomPutHeaders: test.customPutHeaders,
PutHeaders: test.putHeaders,
}, },
} }
deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second) deadline := time.Now().Add(time.Duration(apiResponse.RemoteObject.Timeout) * time.Second)
...@@ -116,6 +135,12 @@ func TestGetOpts(t *testing.T) { ...@@ -116,6 +135,12 @@ func TestGetOpts(t *testing.T) {
assert.Equal(apiResponse.RemoteObject.GetURL, opts.RemoteURL) assert.Equal(apiResponse.RemoteObject.GetURL, opts.RemoteURL)
assert.Equal(apiResponse.RemoteObject.StoreURL, opts.PresignedPut) assert.Equal(apiResponse.RemoteObject.StoreURL, opts.PresignedPut)
assert.Equal(apiResponse.RemoteObject.DeleteURL, opts.PresignedDelete) assert.Equal(apiResponse.RemoteObject.DeleteURL, opts.PresignedDelete)
if test.customPutHeaders {
assert.Equal(opts.PutHeaders, apiResponse.RemoteObject.PutHeaders)
} else {
assert.Equal(opts.PutHeaders, map[string]string{"Content-Type": "application/octet-stream"})
}
if test.multipart == nil { if test.multipart == nil {
assert.False(opts.IsMultipart()) assert.False(opts.IsMultipart())
assert.Empty(opts.PresignedCompleteMultipart) assert.Empty(opts.PresignedCompleteMultipart)
......
...@@ -36,7 +36,7 @@ type Multipart struct { ...@@ -36,7 +36,7 @@ type Multipart struct {
// NewMultipart provides Multipart pointer that can be used for uploading. Data written will be split buffered on disk up to size bytes // NewMultipart provides Multipart pointer that can be used for uploading. Data written will be split buffered on disk up to size bytes
// then uploaded with S3 Upload Part. Once Multipart is Closed a final call to CompleteMultipartUpload will be sent. // then uploaded with S3 Upload Part. Once Multipart is Closed a final call to CompleteMultipartUpload will be sent.
// In case of any error a call to AbortMultipartUpload will be made to cleanup all the resources // In case of any error a call to AbortMultipartUpload will be made to cleanup all the resources
func NewMultipart(ctx context.Context, partURLs []string, completeURL, abortURL, deleteURL string, deadline time.Time, partSize int64) (*Multipart, error) { func NewMultipart(ctx context.Context, partURLs []string, completeURL, abortURL, deleteURL string, putHeaders map[string]string, deadline time.Time, partSize int64) (*Multipart, error) {
pr, pw := io.Pipe() pr, pw := io.Pipe()
uploadCtx, cancelFn := context.WithDeadline(ctx, deadline) uploadCtx, cancelFn := context.WithDeadline(ctx, deadline)
m := &Multipart{ m := &Multipart{
...@@ -62,7 +62,7 @@ func NewMultipart(ctx context.Context, partURLs []string, completeURL, abortURL, ...@@ -62,7 +62,7 @@ func NewMultipart(ctx context.Context, partURLs []string, completeURL, abortURL,
cmu := &CompleteMultipartUpload{} cmu := &CompleteMultipartUpload{}
for i, partURL := range partURLs { for i, partURL := range partURLs {
src := io.LimitReader(pr, partSize) src := io.LimitReader(pr, partSize)
part, err := m.readAndUploadOnePart(partURL, src, i+1) part, err := m.readAndUploadOnePart(partURL, putHeaders, src, i+1)
if err != nil { if err != nil {
m.uploadError = err m.uploadError = err
return return
...@@ -175,7 +175,7 @@ func (m *Multipart) verifyETag(cmu *CompleteMultipartUpload) error { ...@@ -175,7 +175,7 @@ func (m *Multipart) verifyETag(cmu *CompleteMultipartUpload) error {
return nil return nil
} }
func (m *Multipart) readAndUploadOnePart(partURL string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) { func (m *Multipart) readAndUploadOnePart(partURL string, putHeaders map[string]string, src io.Reader, partNumber int) (*completeMultipartUploadPart, error) {
file, err := ioutil.TempFile("", "part-buffer") file, err := ioutil.TempFile("", "part-buffer")
if err != nil { if err != nil {
return nil, fmt.Errorf("Unable to create a temporary file for buffering: %v", err) return nil, fmt.Errorf("Unable to create a temporary file for buffering: %v", err)
...@@ -198,20 +198,20 @@ func (m *Multipart) readAndUploadOnePart(partURL string, src io.Reader, partNumb ...@@ -198,20 +198,20 @@ func (m *Multipart) readAndUploadOnePart(partURL string, src io.Reader, partNumb
return nil, fmt.Errorf("Cannot rewind part %d temporary dump : %v", partNumber, err) return nil, fmt.Errorf("Cannot rewind part %d temporary dump : %v", partNumber, err)
} }
etag, err := m.uploadPart(partURL, file, n) etag, err := m.uploadPart(partURL, putHeaders, file, n)
if err != nil { if err != nil {
return nil, fmt.Errorf("Cannot upload part %d: %v", partNumber, err) return nil, fmt.Errorf("Cannot upload part %d: %v", partNumber, err)
} }
return &completeMultipartUploadPart{PartNumber: partNumber, ETag: etag}, nil return &completeMultipartUploadPart{PartNumber: partNumber, ETag: etag}, nil
} }
func (m *Multipart) uploadPart(url string, body io.Reader, size int64) (string, error) { func (m *Multipart) uploadPart(url string, headers map[string]string, body io.Reader, size int64) (string, error) {
deadline, ok := m.ctx.Deadline() deadline, ok := m.ctx.Deadline()
if !ok { if !ok {
return "", fmt.Errorf("Missing deadline") return "", fmt.Errorf("Missing deadline")
} }
part, err := newObject(m.ctx, url, "", deadline, size, false) part, err := newObject(m.ctx, url, "", headers, deadline, size, false)
if err != nil { if err != nil {
return "", err return "", err
} }
......
...@@ -47,11 +47,11 @@ type Object struct { ...@@ -47,11 +47,11 @@ type Object struct {
} }
// NewObject opens an HTTP connection to Object Store and returns an Object pointer that can be used for uploading. // NewObject opens an HTTP connection to Object Store and returns an Object pointer that can be used for uploading.
func NewObject(ctx context.Context, putURL, deleteURL string, deadline time.Time, size int64) (*Object, error) { func NewObject(ctx context.Context, putURL, deleteURL string, putHeaders map[string]string, deadline time.Time, size int64) (*Object, error) {
return newObject(ctx, putURL, deleteURL, deadline, size, true) return newObject(ctx, putURL, deleteURL, putHeaders, deadline, size, true)
} }
func newObject(ctx context.Context, putURL, deleteURL string, deadline time.Time, size int64, metrics bool) (*Object, error) { func newObject(ctx context.Context, putURL, deleteURL string, putHeaders map[string]string, deadline time.Time, size int64, metrics bool) (*Object, error) {
started := time.Now() started := time.Now()
pr, pw := io.Pipe() pr, pw := io.Pipe()
// we should prevent pr.Close() otherwise it may shadow error set with pr.CloseWithError(err) // we should prevent pr.Close() otherwise it may shadow error set with pr.CloseWithError(err)
...@@ -63,7 +63,10 @@ func newObject(ctx context.Context, putURL, deleteURL string, deadline time.Time ...@@ -63,7 +63,10 @@ func newObject(ctx context.Context, putURL, deleteURL string, deadline time.Time
return nil, fmt.Errorf("PUT %q: %v", helper.ScrubURLParams(putURL), err) return nil, fmt.Errorf("PUT %q: %v", helper.ScrubURLParams(putURL), err)
} }
req.ContentLength = size req.ContentLength = size
req.Header.Set("Content-Type", "application/octet-stream")
for k, v := range putHeaders {
req.Header.Set(k, v)
}
uploadCtx, cancelFn := context.WithDeadline(ctx, deadline) uploadCtx, cancelFn := context.WithDeadline(ctx, deadline)
o := &Object{ o := &Object{
......
...@@ -18,7 +18,7 @@ import ( ...@@ -18,7 +18,7 @@ import (
const testTimeout = 10 * time.Second const testTimeout = 10 * time.Second
func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool) { func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool, contentType string) {
assert := assert.New(t) assert := assert.New(t)
osStub, ts := test.StartObjectStore() osStub, ts := test.StartObjectStore()
...@@ -30,11 +30,13 @@ func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool) { ...@@ -30,11 +30,13 @@ func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool) {
deleteURL = objectURL deleteURL = objectURL
} }
putHeaders := map[string]string{"Content-Type": contentType}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
deadline := time.Now().Add(testTimeout) deadline := time.Now().Add(testTimeout)
object, err := objectstore.NewObject(ctx, objectURL, deleteURL, deadline, test.ObjectSize) object, err := objectstore.NewObject(ctx, objectURL, deleteURL, putHeaders, deadline, test.ObjectSize)
require.NoError(t, err) require.NoError(t, err)
// copy data // copy data
...@@ -46,6 +48,8 @@ func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool) { ...@@ -46,6 +48,8 @@ func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool) {
err = object.Close() err = object.Close()
assert.NoError(err) assert.NoError(err)
assert.Equal(contentType, osStub.GetHeader(test.ObjectPath, "Content-Type"))
// Checking MD5 extraction // Checking MD5 extraction
assert.Equal(osStub.GetObjectMD5(test.ObjectPath), object.ETag()) assert.Equal(osStub.GetObjectMD5(test.ObjectPath), object.ETag())
...@@ -73,8 +77,9 @@ func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool) { ...@@ -73,8 +77,9 @@ func testObjectUploadNoErrors(t *testing.T, useDeleteURL bool) {
} }
func TestObjectUpload(t *testing.T) { func TestObjectUpload(t *testing.T) {
t.Run("with delete URL", func(t *testing.T) { testObjectUploadNoErrors(t, true) }) t.Run("with delete URL", func(t *testing.T) { testObjectUploadNoErrors(t, true, "application/octet-stream") })
t.Run("without delete URL", func(t *testing.T) { testObjectUploadNoErrors(t, false) }) t.Run("without delete URL", func(t *testing.T) { testObjectUploadNoErrors(t, false, "application/octet-stream") })
t.Run("with custom content type", func(t *testing.T) { testObjectUploadNoErrors(t, false, "image/jpeg") })
} }
func TestObjectUpload404(t *testing.T) { func TestObjectUpload404(t *testing.T) {
...@@ -89,7 +94,7 @@ func TestObjectUpload404(t *testing.T) { ...@@ -89,7 +94,7 @@ func TestObjectUpload404(t *testing.T) {
deadline := time.Now().Add(testTimeout) deadline := time.Now().Add(testTimeout)
objectURL := ts.URL + test.ObjectPath objectURL := ts.URL + test.ObjectPath
object, err := objectstore.NewObject(ctx, objectURL, "", deadline, test.ObjectSize) object, err := objectstore.NewObject(ctx, objectURL, "", map[string]string{}, deadline, test.ObjectSize)
require.NoError(err) require.NoError(err)
_, err = io.Copy(object, strings.NewReader(test.ObjectContent)) _, err = io.Copy(object, strings.NewReader(test.ObjectContent))
...@@ -134,7 +139,7 @@ func TestObjectUploadBrokenConnection(t *testing.T) { ...@@ -134,7 +139,7 @@ func TestObjectUploadBrokenConnection(t *testing.T) {
deadline := time.Now().Add(testTimeout) deadline := time.Now().Add(testTimeout)
objectURL := ts.URL + test.ObjectPath objectURL := ts.URL + test.ObjectPath
object, err := objectstore.NewObject(ctx, objectURL, "", deadline, -1) object, err := objectstore.NewObject(ctx, objectURL, "", map[string]string{}, deadline, -1)
require.NoError(t, err) require.NoError(t, err)
_, copyErr := io.Copy(object, &endlessReader{}) _, copyErr := io.Copy(object, &endlessReader{})
......
...@@ -27,6 +27,8 @@ type ObjectstoreStub struct { ...@@ -27,6 +27,8 @@ type ObjectstoreStub struct {
overwriteMD5 map[string]string overwriteMD5 map[string]string
// multipart is a map of MultipartUploads // multipart is a map of MultipartUploads
multipart map[string]partsEtagMap multipart map[string]partsEtagMap
// HTTP header sent along request
headers map[string]*http.Header
puts int puts int
deletes int deletes int
...@@ -45,6 +47,7 @@ func StartObjectStoreWithCustomMD5(md5Hashes map[string]string) (*ObjectstoreStu ...@@ -45,6 +47,7 @@ func StartObjectStoreWithCustomMD5(md5Hashes map[string]string) (*ObjectstoreStu
bucket: make(map[string]string), bucket: make(map[string]string),
multipart: make(map[string]partsEtagMap), multipart: make(map[string]partsEtagMap),
overwriteMD5: make(map[string]string), overwriteMD5: make(map[string]string),
headers: make(map[string]*http.Header),
} }
for k, v := range md5Hashes { for k, v := range md5Hashes {
...@@ -79,6 +82,18 @@ func (o *ObjectstoreStub) GetObjectMD5(path string) string { ...@@ -79,6 +82,18 @@ func (o *ObjectstoreStub) GetObjectMD5(path string) string {
return o.bucket[path] return o.bucket[path]
} }
// GetHeader returns a given HTTP header of the object uploaded to the path
func (o *ObjectstoreStub) GetHeader(path, key string) string {
o.m.Lock()
defer o.m.Unlock()
if val, ok := o.headers[path]; ok {
return val.Get(key)
}
return ""
}
// InitiateMultipartUpload prepare the ObjectstoreStob to receive a MultipartUpload on path // InitiateMultipartUpload prepare the ObjectstoreStob to receive a MultipartUpload on path
// It will return an error if a MultipartUpload is already in progress on that path // It will return an error if a MultipartUpload is already in progress on that path
// InitiateMultipartUpload is only used during test setup. // InitiateMultipartUpload is only used during test setup.
...@@ -146,6 +161,7 @@ func (o *ObjectstoreStub) putObject(w http.ResponseWriter, r *http.Request) { ...@@ -146,6 +161,7 @@ func (o *ObjectstoreStub) putObject(w http.ResponseWriter, r *http.Request) {
etag = hex.EncodeToString(checksum) etag = hex.EncodeToString(checksum)
} }
o.headers[objectPath] = &r.Header
o.puts++ o.puts++
if o.isMultipartUpload(objectPath) { if o.isMultipartUpload(objectPath) {
pNumber := r.URL.Query().Get("partNumber") pNumber := r.URL.Query().Get("partNumber")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment