Commit 8badef00 authored by Nick Thomas's avatar Nick Thomas

Merge branch 'process_file' into 'master'

Handle Object Store upload in upload.HandleFileUploads

See merge request gitlab-org/gitlab-workhorse!238
parents d78f0997 c4221772
#!/bin/sh #!/bin/sh
git grep 'context.\(Background\|TODO\)' | \ git grep 'context.\(Background\|TODO\)' | \
grep -v -e '^[^:]*_test\.go:' -e '^vendor/' -e '^_support/' | \ grep -v -e '^[^:]*_test\.go:' -e '^vendor/' -e '^_support/' -e '^cmd/[^:]*/main.go' | \
grep -e '^[^:]*\.go' | \ grep -e '^[^:]*\.go' | \
awk '{ awk '{
print "Found disallowed use of context.Background or TODO" print "Found disallowed use of context.Background or TODO"
......
...@@ -2,16 +2,11 @@ package main ...@@ -2,16 +2,11 @@ package main
import ( import (
"archive/zip" "archive/zip"
"context"
"flag" "flag"
"fmt" "fmt"
"io" "io"
"net"
"net/http"
"os" "os"
"strings"
"time"
"github.com/jfbus/httprs"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/zipartifacts" "gitlab.com/gitlab-org/gitlab-workhorse/internal/zipartifacts"
...@@ -23,67 +18,6 @@ var Version = "unknown" ...@@ -23,67 +18,6 @@ var Version = "unknown"
var printVersion = flag.Bool("version", false, "Print version and exit") var printVersion = flag.Bool("version", false, "Print version and exit")
var httpClient = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 10 * time.Second,
}).DialContext,
IdleConnTimeout: 30 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
},
}
func isURL(path string) bool {
return strings.HasPrefix(path, "http://") || strings.HasPrefix(path, "https://")
}
func openHTTPArchive(archivePath string) (*zip.Reader, func()) {
scrubbedArchivePath := helper.ScrubURLParams(archivePath)
resp, err := httpClient.Get(archivePath)
if err != nil {
fatalError(fmt.Errorf("HTTP GET %q: %v", scrubbedArchivePath, err))
} else if resp.StatusCode == http.StatusNotFound {
notFoundError(fmt.Errorf("HTTP GET %q: not found", scrubbedArchivePath))
} else if resp.StatusCode != http.StatusOK {
fatalError(fmt.Errorf("HTTP GET %q: %d: %v", scrubbedArchivePath, resp.StatusCode, resp.Status))
}
rs := httprs.NewHttpReadSeeker(resp, httpClient)
archive, err := zip.NewReader(rs, resp.ContentLength)
if err != nil {
notFoundError(fmt.Errorf("open %q: %v", scrubbedArchivePath, err))
}
return archive, func() {
resp.Body.Close()
rs.Close()
}
}
func openFileArchive(archivePath string) (*zip.Reader, func()) {
archive, err := zip.OpenReader(archivePath)
if err != nil {
notFoundError(fmt.Errorf("open %q: %v", archivePath, err))
}
return &archive.Reader, func() {
archive.Close()
}
}
func openArchive(archivePath string) (*zip.Reader, func()) {
if isURL(archivePath) {
return openHTTPArchive(archivePath)
}
return openFileArchive(archivePath)
}
func main() { func main() {
flag.Parse() flag.Parse()
...@@ -110,8 +44,17 @@ func main() { ...@@ -110,8 +44,17 @@ func main() {
fatalError(fmt.Errorf("decode entry %q: %v", encodedFileName, err)) fatalError(fmt.Errorf("decode entry %q: %v", encodedFileName, err))
} }
archive, cleanFn := openArchive(archivePath) ctx, cancel := context.WithCancel(context.Background())
defer cleanFn() defer cancel()
archive, err := zipartifacts.OpenArchive(ctx, archivePath)
if err != nil {
oaError := fmt.Errorf("OpenArchive: %v", err)
if err == zipartifacts.ErrArchiveNotFound {
notFoundError(oaError)
}
fatalError(oaError)
}
file := findFileInZip(fileName, archive) file := findFileInZip(fileName, archive)
if file == nil { if file == nil {
......
package main package main
import ( import (
"context"
"flag" "flag"
"fmt" "fmt"
"os" "os"
...@@ -27,11 +28,24 @@ func main() { ...@@ -27,11 +28,24 @@ func main() {
fmt.Fprintf(os.Stderr, "Usage: %s FILE.ZIP\n", progName) fmt.Fprintf(os.Stderr, "Usage: %s FILE.ZIP\n", progName)
os.Exit(1) os.Exit(1)
} }
if err := zipartifacts.GenerateZipMetadataFromFile(os.Args[1], os.Stdout); err != nil {
fmt.Fprintf(os.Stderr, "%s: %v\n", progName, err) ctx, cancel := context.WithCancel(context.Background())
if err == os.ErrInvalid { defer cancel()
os.Exit(zipartifacts.StatusNotZip)
} archive, err := zipartifacts.OpenArchive(ctx, os.Args[1])
os.Exit(1) if err != nil {
fatalError(err)
}
if err := zipartifacts.GenerateZipMetadata(os.Stdout, archive); err != nil {
fatalError(err)
}
}
func fatalError(err error) {
fmt.Fprintf(os.Stderr, "%s: %v\n", progName, err)
if err == zipartifacts.ErrNotAZip {
os.Exit(zipartifacts.StatusNotZip)
} }
os.Exit(1)
} }
package artifacts
import (
"context"
"fmt"
"mime/multipart"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore"
)
func (a *artifactsUploadProcessor) storeFile(ctx context.Context, formName, fileName string, writer *multipart.Writer) error {
if !a.opts.IsRemote() {
return nil
}
if a.stored {
return nil
}
fh, err := filestore.SaveFileFromDisk(ctx, fileName, a.opts)
if err != nil {
return fmt.Errorf("Uploading to object store failed. %s", err)
}
for field, value := range fh.GitLabFinalizeFields(formName) {
writer.WriteField(field, value)
}
// Allow to upload only once using given credentials
a.stored = true
return nil
}
...@@ -48,6 +48,13 @@ func createTestMultipartForm(t *testing.T, data []byte) (bytes.Buffer, string) { ...@@ -48,6 +48,13 @@ func createTestMultipartForm(t *testing.T, data []byte) (bytes.Buffer, string) {
return buffer, writer.FormDataContentType() return buffer, writer.FormDataContentType()
} }
func testUploadArtifactsFromTestZip(t *testing.T, ts *httptest.Server) *httptest.ResponseRecorder {
archiveData, _ := createTestZipArchive(t)
contentBuffer, contentType := createTestMultipartForm(t, archiveData)
return testUploadArtifacts(contentType, &contentBuffer, t, ts)
}
func TestUploadHandlerSendingToExternalStorage(t *testing.T) { func TestUploadHandlerSendingToExternalStorage(t *testing.T) {
tempPath, err := ioutil.TempDir("", "uploads") tempPath, err := ioutil.TempDir("", "uploads")
if err != nil { if err != nil {
...@@ -123,10 +130,7 @@ func TestUploadHandlerSendingToExternalStorageAndStorageServerUnreachable(t *tes ...@@ -123,10 +130,7 @@ func TestUploadHandlerSendingToExternalStorageAndStorageServerUnreachable(t *tes
ts := testArtifactsUploadServer(t, authResponse, responseProcessor) ts := testArtifactsUploadServer(t, authResponse, responseProcessor)
defer ts.Close() defer ts.Close()
archiveData, _ := createTestZipArchive(t) response := testUploadArtifactsFromTestZip(t, ts)
contentBuffer, contentType := createTestMultipartForm(t, archiveData)
response := testUploadArtifacts(contentType, &contentBuffer, t, ts)
testhelper.AssertResponseCode(t, response, 500) testhelper.AssertResponseCode(t, response, 500)
} }
...@@ -152,10 +156,7 @@ func TestUploadHandlerSendingToExternalStorageAndInvalidURLIsUsed(t *testing.T) ...@@ -152,10 +156,7 @@ func TestUploadHandlerSendingToExternalStorageAndInvalidURLIsUsed(t *testing.T)
ts := testArtifactsUploadServer(t, authResponse, responseProcessor) ts := testArtifactsUploadServer(t, authResponse, responseProcessor)
defer ts.Close() defer ts.Close()
archiveData, _ := createTestZipArchive(t) response := testUploadArtifactsFromTestZip(t, ts)
contentBuffer, contentType := createTestMultipartForm(t, archiveData)
response := testUploadArtifacts(contentType, &contentBuffer, t, ts)
testhelper.AssertResponseCode(t, response, 500) testhelper.AssertResponseCode(t, response, 500)
} }
...@@ -193,10 +194,7 @@ func TestUploadHandlerSendingToExternalStorageAndItReturnsAnError(t *testing.T) ...@@ -193,10 +194,7 @@ func TestUploadHandlerSendingToExternalStorageAndItReturnsAnError(t *testing.T)
ts := testArtifactsUploadServer(t, authResponse, responseProcessor) ts := testArtifactsUploadServer(t, authResponse, responseProcessor)
defer ts.Close() defer ts.Close()
archiveData, _ := createTestZipArchive(t) response := testUploadArtifactsFromTestZip(t, ts)
contentBuffer, contentType := createTestMultipartForm(t, archiveData)
response := testUploadArtifacts(contentType, &contentBuffer, t, ts)
testhelper.AssertResponseCode(t, response, 500) testhelper.AssertResponseCode(t, response, 500)
assert.Equal(t, 1, putCalledTimes, "upload should be called only once") assert.Equal(t, 1, putCalledTimes, "upload should be called only once")
} }
...@@ -237,10 +235,7 @@ func TestUploadHandlerSendingToExternalStorageAndSupportRequestTimeout(t *testin ...@@ -237,10 +235,7 @@ func TestUploadHandlerSendingToExternalStorageAndSupportRequestTimeout(t *testin
ts := testArtifactsUploadServer(t, authResponse, responseProcessor) ts := testArtifactsUploadServer(t, authResponse, responseProcessor)
defer ts.Close() defer ts.Close()
archiveData, _ := createTestZipArchive(t) response := testUploadArtifactsFromTestZip(t, ts)
contentBuffer, contentType := createTestMultipartForm(t, archiveData)
response := testUploadArtifacts(contentType, &contentBuffer, t, ts)
testhelper.AssertResponseCode(t, response, 500) testhelper.AssertResponseCode(t, response, 500)
assert.Equal(t, 1, putCalledTimes, "upload should be called only once") assert.Equal(t, 1, putCalledTimes, "upload should be called only once")
} }
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"mime/multipart" "mime/multipart"
"net/http" "net/http"
"os" "os"
...@@ -24,28 +23,55 @@ type artifactsUploadProcessor struct { ...@@ -24,28 +23,55 @@ type artifactsUploadProcessor struct {
stored bool stored bool
} }
func (a *artifactsUploadProcessor) generateMetadataFromZip(fileName string, metadataFile io.Writer) (bool, error) { func (a *artifactsUploadProcessor) generateMetadataFromZip(ctx context.Context, file *filestore.FileHandler) (*filestore.FileHandler, error) {
// Generate metadata and save to file metaReader, metaWriter := io.Pipe()
zipMd := exec.Command("gitlab-zip-metadata", fileName) defer metaWriter.Close()
metaOpts := &filestore.SaveFileOpts{
LocalTempPath: a.opts.LocalTempPath,
TempFilePrefix: "metadata.gz",
}
fileName := file.LocalPath
if fileName == "" {
fileName = file.RemoteURL
}
zipMd := exec.CommandContext(ctx, "gitlab-zip-metadata", fileName)
zipMd.Stderr = os.Stderr zipMd.Stderr = os.Stderr
zipMd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} zipMd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
zipMd.Stdout = metadataFile zipMd.Stdout = metaWriter
if err := zipMd.Start(); err != nil { if err := zipMd.Start(); err != nil {
return false, err return nil, err
} }
defer helper.CleanUpProcessGroup(zipMd) defer helper.CleanUpProcessGroup(zipMd)
type saveResult struct {
error
*filestore.FileHandler
}
done := make(chan saveResult)
go func() {
var result saveResult
result.FileHandler, result.error = filestore.SaveFileFromReader(ctx, metaReader, -1, metaOpts)
done <- result
}()
if err := zipMd.Wait(); err != nil { if err := zipMd.Wait(); err != nil {
if st, ok := helper.ExitStatus(err); ok && st == zipartifacts.StatusNotZip { if st, ok := helper.ExitStatus(err); ok && st == zipartifacts.StatusNotZip {
return false, nil return nil, nil
} }
return false, err return nil, err
} }
return true, nil metaWriter.Close()
result := <-done
return result.FileHandler, result.error
} }
func (a *artifactsUploadProcessor) ProcessFile(ctx context.Context, formName, fileName string, writer *multipart.Writer) error { func (a *artifactsUploadProcessor) ProcessFile(ctx context.Context, formName string, file *filestore.FileHandler, writer *multipart.Writer) error {
// ProcessFile for artifacts requires file form-data field name to eq `file` // ProcessFile for artifacts requires file form-data field name to eq `file`
if formName != "file" { if formName != "file" {
...@@ -55,28 +81,22 @@ func (a *artifactsUploadProcessor) ProcessFile(ctx context.Context, formName, fi ...@@ -55,28 +81,22 @@ func (a *artifactsUploadProcessor) ProcessFile(ctx context.Context, formName, fi
return fmt.Errorf("Artifacts request contains more than one file!") return fmt.Errorf("Artifacts request contains more than one file!")
} }
// Create temporary file for metadata and store it's path select {
tempFile, err := ioutil.TempFile(a.opts.LocalTempPath, "metadata_") case <-ctx.Done():
if err != nil { return fmt.Errorf("ProcessFile: context done")
return err
}
defer tempFile.Close()
a.metadataFile = tempFile.Name()
generatedMetadata, err := a.generateMetadataFromZip(fileName, tempFile)
if err != nil {
return fmt.Errorf("generateMetadataFromZip: %v", err)
}
if generatedMetadata { default:
// Pass metadata file path to Rails // TODO: can we rely on disk for shipping metadata? Not if we split workhorse and rails in 2 different PODs
writer.WriteField("metadata.path", a.metadataFile) metadata, err := a.generateMetadataFromZip(ctx, file)
writer.WriteField("metadata.name", "metadata.gz") if err != nil {
} return fmt.Errorf("generateMetadataFromZip: %v", err)
}
if err := a.storeFile(ctx, formName, fileName, writer); err != nil { if metadata != nil {
return fmt.Errorf("storeFile: %v", err) for k, v := range metadata.GitLabFinalizeFields("metadata") {
writer.WriteField(k, v)
}
}
} }
return nil return nil
} }
...@@ -93,12 +113,6 @@ func (a *artifactsUploadProcessor) Name() string { ...@@ -93,12 +113,6 @@ func (a *artifactsUploadProcessor) Name() string {
return "artifacts" return "artifacts"
} }
func (a *artifactsUploadProcessor) Cleanup() {
if a.metadataFile != "" {
os.Remove(a.metadataFile)
}
}
func UploadArtifacts(myAPI *api.API, h http.Handler) http.Handler { func UploadArtifacts(myAPI *api.API, h http.Handler) http.Handler {
return myAPI.PreAuthorizeHandler(func(w http.ResponseWriter, r *http.Request, a *api.Response) { return myAPI.PreAuthorizeHandler(func(w http.ResponseWriter, r *http.Request, a *api.Response) {
if a.TempPath == "" { if a.TempPath == "" {
...@@ -107,8 +121,7 @@ func UploadArtifacts(myAPI *api.API, h http.Handler) http.Handler { ...@@ -107,8 +121,7 @@ func UploadArtifacts(myAPI *api.API, h http.Handler) http.Handler {
} }
mg := &artifactsUploadProcessor{opts: filestore.GetOpts(a)} mg := &artifactsUploadProcessor{opts: filestore.GetOpts(a)}
defer mg.Cleanup()
upload.HandleFileUploads(w, r, h, a.TempPath, mg) upload.HandleFileUploads(w, r, h, a, mg)
}, "/authorize") }, "/authorize")
} }
...@@ -98,12 +98,6 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts ...@@ -98,12 +98,6 @@ func SaveFileFromReader(ctx context.Context, reader io.Reader, size int64, opts
}() }()
if opts.IsRemote() { if opts.IsRemote() {
// Unknown ContentLength must be implemented in order to achieve Artifact Uploading
if size == -1 && !opts.isGoogleCloudStorage() {
// TODO add support for artifact upload to S3-compatible object storage
return nil, errors.New("Not implemented")
}
object, err = objectstore.NewObject(ctx, opts.PresignedPut, opts.PresignedDelete, opts.Timeout, size) object, err = objectstore.NewObject(ctx, opts.PresignedPut, opts.PresignedDelete, opts.Timeout, size)
if err != nil { if err != nil {
return nil, err return nil, err
......
...@@ -42,7 +42,6 @@ func IsGoogleCloudStorage(u *url.URL) bool { ...@@ -42,7 +42,6 @@ func IsGoogleCloudStorage(u *url.URL) bool {
return strings.ToLower(u.Host) == "storage.googleapis.com" return strings.ToLower(u.Host) == "storage.googleapis.com"
} }
type MissingContentLengthError error
type StatusCodeError error type StatusCodeError error
// Object represents an object on a S3 compatible Object Store service. // Object represents an object on a S3 compatible Object Store service.
...@@ -79,14 +78,7 @@ func NewObject(ctx context.Context, putURL, deleteURL string, timeout time.Durat ...@@ -79,14 +78,7 @@ func NewObject(ctx context.Context, putURL, deleteURL string, timeout time.Durat
objectStorageUploadRequestsRequestFailed.Inc() objectStorageUploadRequestsRequestFailed.Inc()
return nil, fmt.Errorf("PUT %q: %v", helper.ScrubURLParams(o.PutURL), err) return nil, fmt.Errorf("PUT %q: %v", helper.ScrubURLParams(o.PutURL), err)
} }
if size == -1 { req.ContentLength = size
if !IsGoogleCloudStorage(req.URL) {
objectStorageUploadRequestsRequestFailed.Inc()
return nil, MissingContentLengthError(fmt.Errorf("Unknown Content-Length not allowed on %s", req.URL.Host))
}
} else {
req.ContentLength = size
}
req.Header.Set("Content-Type", "application/octet-stream") req.Header.Set("Content-Type", "application/octet-stream")
if timeout == 0 { if timeout == 0 {
......
...@@ -97,13 +97,3 @@ func TestObjectUpload404(t *testing.T) { ...@@ -97,13 +97,3 @@ func TestObjectUpload404(t *testing.T) {
assert.True(isStatusCodeError, "Should fail with StatusCodeError") assert.True(isStatusCodeError, "Should fail with StatusCodeError")
assert.Contains(err.Error(), "404") assert.Contains(err.Error(), "404")
} }
func TestUnknownSizeUpload(t *testing.T) {
assert := assert.New(t)
object, err := objectstore.NewObject(context.Background(), "http://example.com/bucket/object", "", 0, -1)
assert.Error(err)
_, isMissingContentLengthError := err.(objectstore.MissingContentLengthError)
assert.True(isMissingContentLengthError, "Should fail with MissingContentLengthError")
assert.Nil(object)
}
...@@ -6,6 +6,8 @@ import ( ...@@ -6,6 +6,8 @@ import (
"mime/multipart" "mime/multipart"
"net/http" "net/http"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/secret" "gitlab.com/gitlab-org/gitlab-workhorse/internal/secret"
jwt "github.com/dgrijalva/jwt-go" jwt "github.com/dgrijalva/jwt-go"
...@@ -24,17 +26,19 @@ type MultipartClaims struct { ...@@ -24,17 +26,19 @@ type MultipartClaims struct {
} }
func Accelerate(tempDir string, h http.Handler) http.Handler { func Accelerate(tempDir string, h http.Handler) http.Handler {
// TODO: for Object Store this will need a authorize call
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
localOnlyPreAuth := &api.Response{TempPath: tempDir}
s := &savedFileTracker{request: r} s := &savedFileTracker{request: r}
HandleFileUploads(w, r, h, tempDir, s) HandleFileUploads(w, r, h, localOnlyPreAuth, s)
}) })
} }
func (s *savedFileTracker) ProcessFile(_ context.Context, fieldName, fileName string, _ *multipart.Writer) error { func (s *savedFileTracker) ProcessFile(_ context.Context, fieldName string, file *filestore.FileHandler, _ *multipart.Writer) error {
if s.rewrittenFields == nil { if s.rewrittenFields == nil {
s.rewrittenFields = make(map[string]string) s.rewrittenFields = make(map[string]string)
} }
s.rewrittenFields[fieldName] = fileName s.rewrittenFields[fieldName] = file.LocalPath
return nil return nil
} }
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore" "gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore"
) )
...@@ -41,9 +42,9 @@ var ( ...@@ -41,9 +42,9 @@ var (
) )
type rewriter struct { type rewriter struct {
writer *multipart.Writer writer *multipart.Writer
tempPath string preauth *api.Response
filter MultipartFormProcessor filter MultipartFormProcessor
} }
func init() { func init() {
...@@ -52,7 +53,7 @@ func init() { ...@@ -52,7 +53,7 @@ func init() {
prometheus.MustRegister(multipartFiles) prometheus.MustRegister(multipartFiles)
} }
func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, tempPath string, filter MultipartFormProcessor) error { func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, preauth *api.Response, filter MultipartFormProcessor) error {
// Create multipart reader // Create multipart reader
reader, err := r.MultipartReader() reader, err := r.MultipartReader()
if err != nil { if err != nil {
...@@ -66,9 +67,9 @@ func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, te ...@@ -66,9 +67,9 @@ func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, te
multipartUploadRequests.WithLabelValues(filter.Name()).Inc() multipartUploadRequests.WithLabelValues(filter.Name()).Inc()
rew := &rewriter{ rew := &rewriter{
writer: writer, writer: writer,
tempPath: tempPath, preauth: preauth,
filter: filter, filter: filter,
} }
for { for {
...@@ -88,7 +89,6 @@ func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, te ...@@ -88,7 +89,6 @@ func rewriteFormFilesFromMultipart(r *http.Request, writer *multipart.Writer, te
// Copy form field // Copy form field
if p.FileName() != "" { if p.FileName() != "" {
err = rew.handleFilePart(r.Context(), name, p) err = rew.handleFilePart(r.Context(), name, p)
} else { } else {
err = rew.copyPart(r.Context(), name, p) err = rew.copyPart(r.Context(), name, p)
} }
...@@ -110,10 +110,8 @@ func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipa ...@@ -110,10 +110,8 @@ func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipa
return fmt.Errorf("illegal filename: %q", filename) return fmt.Errorf("illegal filename: %q", filename)
} }
opts := &filestore.SaveFileOpts{ opts := filestore.GetOpts(rew.preauth)
LocalTempPath: rew.tempPath, opts.TempFilePrefix = filename
TempFilePrefix: filename,
}
fh, err := filestore.SaveFileFromReader(ctx, p, -1, opts) fh, err := filestore.SaveFileFromReader(ctx, p, -1, opts)
if err != nil { if err != nil {
...@@ -126,7 +124,7 @@ func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipa ...@@ -126,7 +124,7 @@ func (rew *rewriter) handleFilePart(ctx context.Context, name string, p *multipa
multipartFileUploadBytes.WithLabelValues(rew.filter.Name()).Add(float64(fh.Size)) multipartFileUploadBytes.WithLabelValues(rew.filter.Name()).Add(float64(fh.Size))
return rew.filter.ProcessFile(ctx, name, fh.LocalPath, rew.writer) return rew.filter.ProcessFile(ctx, name, fh, rew.writer)
} }
func (rew *rewriter) copyPart(ctx context.Context, name string, p *multipart.Part) error { func (rew *rewriter) copyPart(ctx context.Context, name string, p *multipart.Part) error {
......
...@@ -8,19 +8,21 @@ import ( ...@@ -8,19 +8,21 @@ import (
"mime/multipart" "mime/multipart"
"net/http" "net/http"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
) )
// These methods are allowed to have thread-unsafe implementations. // These methods are allowed to have thread-unsafe implementations.
type MultipartFormProcessor interface { type MultipartFormProcessor interface {
ProcessFile(ctx context.Context, formName, fileName string, writer *multipart.Writer) error ProcessFile(ctx context.Context, formName string, file *filestore.FileHandler, writer *multipart.Writer) error
ProcessField(ctx context.Context, formName string, writer *multipart.Writer) error ProcessField(ctx context.Context, formName string, writer *multipart.Writer) error
Finalize(ctx context.Context) error Finalize(ctx context.Context) error
Name() string Name() string
} }
func HandleFileUploads(w http.ResponseWriter, r *http.Request, h http.Handler, tempPath string, filter MultipartFormProcessor) { func HandleFileUploads(w http.ResponseWriter, r *http.Request, h http.Handler, preauth *api.Response, filter MultipartFormProcessor) {
if tempPath == "" { if preauth.TempPath == "" {
helper.Fail500(w, r, fmt.Errorf("handleFileUploads: tempPath empty")) helper.Fail500(w, r, fmt.Errorf("handleFileUploads: tempPath empty"))
return return
} }
...@@ -30,7 +32,7 @@ func HandleFileUploads(w http.ResponseWriter, r *http.Request, h http.Handler, t ...@@ -30,7 +32,7 @@ func HandleFileUploads(w http.ResponseWriter, r *http.Request, h http.Handler, t
defer writer.Close() defer writer.Close()
// Rewrite multipart form data // Rewrite multipart form data
err := rewriteFormFilesFromMultipart(r, writer, tempPath, filter) err := rewriteFormFilesFromMultipart(r, writer, preauth, filter)
if err != nil { if err != nil {
if err == http.ErrNotMultipart { if err == http.ErrNotMultipart {
h.ServeHTTP(w, r) h.ServeHTTP(w, r)
......
...@@ -16,7 +16,9 @@ import ( ...@@ -16,7 +16,9 @@ import (
"testing" "testing"
"time" "time"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/badgateway" "gitlab.com/gitlab-org/gitlab-workhorse/internal/badgateway"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/filestore"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/proxy" "gitlab.com/gitlab-org/gitlab-workhorse/internal/proxy"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/testhelper" "gitlab.com/gitlab-org/gitlab-workhorse/internal/testhelper"
...@@ -26,8 +28,8 @@ var nilHandler = http.HandlerFunc(func(http.ResponseWriter, *http.Request) {}) ...@@ -26,8 +28,8 @@ var nilHandler = http.HandlerFunc(func(http.ResponseWriter, *http.Request) {})
type testFormProcessor struct{} type testFormProcessor struct{}
func (a *testFormProcessor) ProcessFile(ctx context.Context, formName, fileName string, writer *multipart.Writer) error { func (a *testFormProcessor) ProcessFile(ctx context.Context, formName string, file *filestore.FileHandler, writer *multipart.Writer) error {
if formName != "file" && fileName != "my.file" { if formName != "file" && file.LocalPath != "my.file" {
return errors.New("illegal file") return errors.New("illegal file")
} }
return nil return nil
...@@ -54,7 +56,7 @@ func TestUploadTempPathRequirement(t *testing.T) { ...@@ -54,7 +56,7 @@ func TestUploadTempPathRequirement(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
HandleFileUploads(response, request, nilHandler, "", nil) HandleFileUploads(response, request, nilHandler, &api.Response{}, nil)
testhelper.AssertResponseCode(t, response, 500) testhelper.AssertResponseCode(t, response, 500)
} }
...@@ -89,7 +91,7 @@ func TestUploadHandlerForwardingRawData(t *testing.T) { ...@@ -89,7 +91,7 @@ func TestUploadHandlerForwardingRawData(t *testing.T) {
response := httptest.NewRecorder() response := httptest.NewRecorder()
handler := newProxy(ts.URL) handler := newProxy(ts.URL)
HandleFileUploads(response, httpRequest, handler, tempPath, nil) HandleFileUploads(response, httpRequest, handler, &api.Response{TempPath: tempPath}, nil)
testhelper.AssertResponseCode(t, response, 202) testhelper.AssertResponseCode(t, response, 202)
if response.Body.String() != "RESPONSE" { if response.Body.String() != "RESPONSE" {
t.Fatal("Expected RESPONSE in response body") t.Fatal("Expected RESPONSE in response body")
...@@ -115,30 +117,25 @@ func TestUploadHandlerRewritingMultiPartData(t *testing.T) { ...@@ -115,30 +117,25 @@ func TestUploadHandlerRewritingMultiPartData(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if len(r.MultipartForm.Value) != 8 {
t.Fatal("Expected to receive exactly 8 values")
}
if len(r.MultipartForm.File) != 0 { if len(r.MultipartForm.File) != 0 {
t.Fatal("Expected to not receive any files") t.Error("Expected to not receive any files")
} }
if r.FormValue("token") != "test" { if r.FormValue("token") != "test" {
t.Fatal("Expected to receive token") t.Error("Expected to receive token")
} }
if r.FormValue("file.name") != "my.file" { if r.FormValue("file.name") != "my.file" {
t.Fatal("Expected to receive a filename") t.Error("Expected to receive a filename")
} }
filePath = r.FormValue("file.path") filePath = r.FormValue("file.path")
if !strings.HasPrefix(filePath, tempPath) {
if !strings.HasPrefix(r.FormValue("file.path"), tempPath) { t.Error("Expected to the file to be in tempPath")
t.Fatal("Expected to the file to be in tempPath")
} }
if r.FormValue("file.size") != "4" { if r.FormValue("file.size") != "4" {
t.Fatal("Expected to receive the file size") t.Error("Expected to receive the file size")
} }
hashes := map[string]string{ hashes := map[string]string{
...@@ -150,10 +147,14 @@ func TestUploadHandlerRewritingMultiPartData(t *testing.T) { ...@@ -150,10 +147,14 @@ func TestUploadHandlerRewritingMultiPartData(t *testing.T) {
for algo, hash := range hashes { for algo, hash := range hashes {
if r.FormValue("file."+algo) != hash { if r.FormValue("file."+algo) != hash {
t.Fatalf("Expected to receive file %s hash", algo) t.Errorf("Expected to receive file %s hash", algo)
} }
} }
if valueCnt := len(r.MultipartForm.Value); valueCnt != 8 {
t.Fatal("Expected to receive exactly 8 values but got", valueCnt)
}
w.WriteHeader(202) w.WriteHeader(202)
fmt.Fprint(w, "RESPONSE") fmt.Fprint(w, "RESPONSE")
}) })
...@@ -182,7 +183,7 @@ func TestUploadHandlerRewritingMultiPartData(t *testing.T) { ...@@ -182,7 +183,7 @@ func TestUploadHandlerRewritingMultiPartData(t *testing.T) {
response := httptest.NewRecorder() response := httptest.NewRecorder()
handler := newProxy(ts.URL) handler := newProxy(ts.URL)
HandleFileUploads(response, httpRequest, handler, tempPath, &testFormProcessor{}) HandleFileUploads(response, httpRequest, handler, &api.Response{TempPath: tempPath}, &testFormProcessor{})
testhelper.AssertResponseCode(t, response, 202) testhelper.AssertResponseCode(t, response, 202)
cancel() // this will trigger an async cleanup cancel() // this will trigger an async cleanup
...@@ -221,7 +222,7 @@ func TestUploadProcessingField(t *testing.T) { ...@@ -221,7 +222,7 @@ func TestUploadProcessingField(t *testing.T) {
httpRequest.Header.Set("Content-Type", writer.FormDataContentType()) httpRequest.Header.Set("Content-Type", writer.FormDataContentType())
response := httptest.NewRecorder() response := httptest.NewRecorder()
HandleFileUploads(response, httpRequest, nilHandler, tempPath, &testFormProcessor{}) HandleFileUploads(response, httpRequest, nilHandler, &api.Response{TempPath: tempPath}, &testFormProcessor{})
testhelper.AssertResponseCode(t, response, 500) testhelper.AssertResponseCode(t, response, 500)
} }
...@@ -249,7 +250,7 @@ func TestUploadProcessingFile(t *testing.T) { ...@@ -249,7 +250,7 @@ func TestUploadProcessingFile(t *testing.T) {
httpRequest.Header.Set("Content-Type", writer.FormDataContentType()) httpRequest.Header.Set("Content-Type", writer.FormDataContentType())
response := httptest.NewRecorder() response := httptest.NewRecorder()
HandleFileUploads(response, httpRequest, nilHandler, tempPath, &testFormProcessor{}) HandleFileUploads(response, httpRequest, nilHandler, &api.Response{TempPath: tempPath}, &testFormProcessor{})
testhelper.AssertResponseCode(t, response, 500) testhelper.AssertResponseCode(t, response, 500)
} }
...@@ -289,7 +290,7 @@ func TestInvalidFileNames(t *testing.T) { ...@@ -289,7 +290,7 @@ func TestInvalidFileNames(t *testing.T) {
httpRequest.Header.Set("Content-Type", writer.FormDataContentType()) httpRequest.Header.Set("Content-Type", writer.FormDataContentType())
response := httptest.NewRecorder() response := httptest.NewRecorder()
HandleFileUploads(response, httpRequest, nilHandler, tempPath, &savedFileTracker{request: httpRequest}) HandleFileUploads(response, httpRequest, nilHandler, &api.Response{TempPath: tempPath}, &savedFileTracker{request: httpRequest})
testhelper.AssertResponseCode(t, response, testCase.code) testhelper.AssertResponseCode(t, response, testCase.code)
} }
} }
......
...@@ -6,7 +6,6 @@ import ( ...@@ -6,7 +6,6 @@ import (
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
"io" "io"
"os"
"path" "path"
"sort" "sort"
"strconv" "strconv"
...@@ -60,7 +59,10 @@ func writeZipEntryMetadata(output io.Writer, path string, entry *zip.File) error ...@@ -60,7 +59,10 @@ func writeZipEntryMetadata(output io.Writer, path string, entry *zip.File) error
return nil return nil
} }
func generateZipMetadata(output io.Writer, archive *zip.Reader) error { func GenerateZipMetadata(w io.Writer, archive *zip.Reader) error {
output := gzip.NewWriter(w)
defer output.Close()
if err := writeString(output, MetadataHeader); err != nil { if err := writeString(output, MetadataHeader); err != nil {
return err return err
} }
...@@ -101,20 +103,6 @@ func generateZipMetadata(output io.Writer, archive *zip.Reader) error { ...@@ -101,20 +103,6 @@ func generateZipMetadata(output io.Writer, archive *zip.Reader) error {
return nil return nil
} }
func GenerateZipMetadataFromFile(fileName string, w io.Writer) error {
archive, err := zip.OpenReader(fileName)
if err != nil {
// Ignore non-zip archives
return os.ErrInvalid
}
defer archive.Close()
gz := gzip.NewWriter(w)
defer gz.Close()
return generateZipMetadata(gz, &archive.Reader)
}
func writeBytes(output io.Writer, data []byte) error { func writeBytes(output io.Writer, data []byte) error {
err := binary.Write(output, binary.BigEndian, uint32(len(data))) err := binary.Write(output, binary.BigEndian, uint32(len(data)))
if err == nil { if err == nil {
......
package zipartifacts package zipartifacts_test
import ( import (
"archive/zip" "archive/zip"
"bytes" "bytes"
"encoding/binary" "compress/gzip"
"context"
"fmt" "fmt"
"io"
"io/ioutil"
"os"
"testing" "testing"
)
func TestMissingMetadataEntries(t *testing.T) { "github.com/stretchr/testify/assert"
var zipBuffer, metaBuffer bytes.Buffer "github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/zipartifacts"
)
archive := zip.NewWriter(&zipBuffer) func generateTestArchive(w io.Writer) error {
archive := zip.NewWriter(w)
// non-POSIX paths are here just to test if we never enter infinite loop // non-POSIX paths are here just to test if we never enter infinite loop
files := []string{"file1", "some/file/dir/", "some/file/dir/file2", "../../test12/test", files := []string{"file1", "some/file/dir/", "some/file/dir/file2", "../../test12/test",
...@@ -20,23 +27,78 @@ func TestMissingMetadataEntries(t *testing.T) { ...@@ -20,23 +27,78 @@ func TestMissingMetadataEntries(t *testing.T) {
for _, file := range files { for _, file := range files {
archiveFile, err := archive.Create(file) archiveFile, err := archive.Create(file)
if err != nil { if err != nil {
t.Fatal(err) return err
} }
fmt.Fprint(archiveFile, file) fmt.Fprint(archiveFile, file)
} }
archive.Close() return archive.Close()
}
func validateMetadata(r io.Reader) error {
gz, err := gzip.NewReader(r)
if err != nil {
return err
}
zipReader := bytes.NewReader(zipBuffer.Bytes()) meta, err := ioutil.ReadAll(gz)
zipArchiveReader, _ := zip.NewReader(zipReader, int64(binary.Size(zipBuffer.Bytes()))) if err != nil {
if err := generateZipMetadata(&metaBuffer, zipArchiveReader); err != nil { return err
t.Fatal("zipartifacts: generateZipMetadata failed", err)
} }
paths := []string{"file1", "some/", "some/file/", "some/file/dir/", "some/file/dir/file2"} paths := []string{"file1", "some/", "some/file/", "some/file/dir/", "some/file/dir/file2"}
for _, path := range paths { for _, path := range paths {
if !bytes.Contains(metaBuffer.Bytes(), []byte(path+"\x00")) { if !bytes.Contains(meta, []byte(path+"\x00")) {
t.Fatal("zipartifacts: metadata for path", path, "not found") return fmt.Errorf(fmt.Sprintf("zipartifacts: metadata for path %q not found", path))
} }
} }
return nil
}
func TestGenerateZipMetadataFromFile(t *testing.T) {
var metaBuffer bytes.Buffer
require := require.New(t)
f, err := ioutil.TempFile("", "workhorse-metadata.zip-")
if f != nil {
defer os.Remove(f.Name())
}
require.NoError(err)
defer f.Close()
err = generateTestArchive(f)
require.NoError(err)
f.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
archive, err := zipartifacts.OpenArchive(ctx, f.Name())
require.NoError(err, "zipartifacts: OpenArchive failed")
err = zipartifacts.GenerateZipMetadata(&metaBuffer, archive)
require.NoError(err, "zipartifacts: GenerateZipMetadata failed")
err = validateMetadata(&metaBuffer)
require.NoError(err)
}
func TestErrNotAZip(t *testing.T) {
f, err := ioutil.TempFile("", "workhorse-metadata.zip-")
if f != nil {
defer os.Remove(f.Name())
}
require.NoError(t, err)
defer f.Close()
_, err = fmt.Fprint(f, "Not a zip file")
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, err = zipartifacts.OpenArchive(ctx, f.Name())
assert.Equal(t, zipartifacts.ErrNotAZip, err, "OpenArchive requires a zip file")
} }
package zipartifacts
import (
"archive/zip"
"context"
"errors"
"fmt"
"net"
"net/http"
"os"
"strings"
"time"
"github.com/jfbus/httprs"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
)
// ErrNotAZip will be used when the file is not a zip archive
var ErrNotAZip = errors.New("not a zip")
// ErrNotAZip will be used when the file can't be found
var ErrArchiveNotFound = errors.New("archive not found")
var httpClient = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 10 * time.Second,
}).DialContext,
IdleConnTimeout: 30 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
},
}
// OpenArchive will open a zip.Reader from a local path or a remote object store URL
// in case of remote url it will make use of ranged requestes to support seeking.
// If the path do not exists error will be ErrArchiveNotFound,
// if the file isn't a zip archive error will be ErrNotAZip
func OpenArchive(ctx context.Context, archivePath string) (*zip.Reader, error) {
if isURL(archivePath) {
return openHTTPArchive(ctx, archivePath)
}
return openFileArchive(ctx, archivePath)
}
func isURL(path string) bool {
return strings.HasPrefix(path, "http://") || strings.HasPrefix(path, "https://")
}
func openHTTPArchive(ctx context.Context, archivePath string) (*zip.Reader, error) {
scrubbedArchivePath := helper.ScrubURLParams(archivePath)
req, err := http.NewRequest(http.MethodGet, archivePath, nil)
if err != nil {
return nil, fmt.Errorf("Can't create HTTP GET %q: %v", scrubbedArchivePath, err)
}
resp, err := httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, fmt.Errorf("HTTP GET %q: %v", scrubbedArchivePath, err)
} else if resp.StatusCode == http.StatusNotFound {
return nil, ErrArchiveNotFound
} else if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("HTTP GET %q: %d: %v", scrubbedArchivePath, resp.StatusCode, resp.Status)
}
rs := httprs.NewHttpReadSeeker(resp, httpClient)
go func() {
<-ctx.Done()
resp.Body.Close()
rs.Close()
}()
archive, err := zip.NewReader(rs, resp.ContentLength)
if err != nil {
return nil, ErrNotAZip
}
return archive, nil
}
func openFileArchive(ctx context.Context, archivePath string) (*zip.Reader, error) {
archive, err := zip.OpenReader(archivePath)
if err != nil {
if os.IsNotExist(err) {
return nil, ErrArchiveNotFound
}
return nil, ErrNotAZip
}
go func() {
<-ctx.Done()
// We close the archive from this goroutine so that we can safely return a *zip.Reader instead of a *zip.ReadCloser
archive.Close()
}()
return &archive.Reader, nil
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment