Commit 1fb0c809 authored by Jacob Vosmaer (GitLab)'s avatar Jacob Vosmaer (GitLab)

Merge branch 'git-archive-gitaly-prep' into 'master'

Refactor Git archive creation

See merge request !190
parents 0345579d e503209f
...@@ -10,10 +10,8 @@ import ( ...@@ -10,10 +10,8 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"os" "os"
"os/exec"
"path" "path"
"path/filepath" "path/filepath"
"syscall"
"time" "time"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
...@@ -52,7 +50,6 @@ func (a *archive) Inject(w http.ResponseWriter, r *http.Request, sendData string ...@@ -52,7 +50,6 @@ func (a *archive) Inject(w http.ResponseWriter, r *http.Request, sendData string
return return
} }
var format string
urlPath := r.URL.Path urlPath := r.URL.Path
format, ok := parseBasename(filepath.Base(urlPath)) format, ok := parseBasename(filepath.Base(urlPath))
if !ok { if !ok {
...@@ -87,64 +84,21 @@ func (a *archive) Inject(w http.ResponseWriter, r *http.Request, sendData string ...@@ -87,64 +84,21 @@ func (a *archive) Inject(w http.ResponseWriter, r *http.Request, sendData string
defer tempFile.Close() defer tempFile.Close()
defer os.Remove(tempFile.Name()) defer os.Remove(tempFile.Name())
compressCmd, archiveFormat := parseArchiveFormat(format) archiveReader, err := newArchiveReader(r.Context(), params.RepoPath, format, params.ArchivePrefix, params.CommitId)
archiveCmd := gitCommand("", "", "git", "--git-dir="+params.RepoPath, "archive", "--format="+archiveFormat, "--prefix="+params.ArchivePrefix+"/", params.CommitId)
archiveStdout, err := archiveCmd.StdoutPipe()
if err != nil { if err != nil {
helper.Fail500(w, r, fmt.Errorf("SendArchive: archive stdout: %v", err)) helper.Fail500(w, r, err)
return
}
defer archiveStdout.Close()
if err := archiveCmd.Start(); err != nil {
helper.Fail500(w, r, fmt.Errorf("SendArchive: start %v: %v", archiveCmd.Args, err))
return return
} }
defer helper.CleanUpProcessGroup(archiveCmd) // Ensure brute force subprocess clean-up
var stdout io.ReadCloser reader := io.TeeReader(archiveReader, tempFile)
if compressCmd == nil {
stdout = archiveStdout
} else {
compressCmd.Stdin = archiveStdout
compressCmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
stdout, err = compressCmd.StdoutPipe()
if err != nil {
helper.Fail500(w, r, fmt.Errorf("SendArchive: compress stdout: %v", err))
return
}
defer stdout.Close()
if err := compressCmd.Start(); err != nil {
helper.Fail500(w, r, fmt.Errorf("SendArchive: start %v: %v", compressCmd.Args, err))
return
}
defer helper.CleanUpProcessGroup(compressCmd)
archiveStdout.Close()
}
// Every Read() from stdout will be synchronously written to tempFile
// before it comes out the TeeReader.
archiveReader := io.TeeReader(stdout, tempFile)
// Start writing the response // Start writing the response
setArchiveHeaders(w, format, archiveFilename) setArchiveHeaders(w, format, archiveFilename)
w.WriteHeader(200) // Don't bother with HTTP 500 from this point on, just return w.WriteHeader(200) // Don't bother with HTTP 500 from this point on, just return
if _, err := io.Copy(w, archiveReader); err != nil { if _, err := io.Copy(w, reader); err != nil {
helper.LogError(r, &copyError{fmt.Errorf("SendArchive: copy 'git archive' output: %v", err)}) helper.LogError(r, &copyError{fmt.Errorf("SendArchive: copy 'git archive' output: %v", err)})
return return
} }
if err := archiveCmd.Wait(); err != nil {
helper.LogError(r, fmt.Errorf("SendArchive: archiveCmd: %v", err))
return
}
if compressCmd != nil {
if err := compressCmd.Wait(); err != nil {
helper.LogError(r, fmt.Errorf("SendArchive: compressCmd: %v", err))
return
}
}
if err := finalizeCachedArchive(tempFile, params.ArchivePath); err != nil { if err := finalizeCachedArchive(tempFile, params.ArchivePath); err != nil {
helper.LogError(r, fmt.Errorf("SendArchive: finalize cached archive: %v", err)) helper.LogError(r, fmt.Errorf("SendArchive: finalize cached archive: %v", err))
...@@ -152,10 +106,10 @@ func (a *archive) Inject(w http.ResponseWriter, r *http.Request, sendData string ...@@ -152,10 +106,10 @@ func (a *archive) Inject(w http.ResponseWriter, r *http.Request, sendData string
} }
} }
func setArchiveHeaders(w http.ResponseWriter, format string, archiveFilename string) { func setArchiveHeaders(w http.ResponseWriter, format ArchiveFormat, archiveFilename string) {
w.Header().Del("Content-Length") w.Header().Del("Content-Length")
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, archiveFilename)) w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, archiveFilename))
if format == "zip" { if format == ZipFormat {
w.Header().Set("Content-Type", "application/zip") w.Header().Set("Content-Type", "application/zip")
} else { } else {
w.Header().Set("Content-Type", "application/octet-stream") w.Header().Set("Content-Type", "application/octet-stream")
...@@ -164,20 +118,6 @@ func setArchiveHeaders(w http.ResponseWriter, format string, archiveFilename str ...@@ -164,20 +118,6 @@ func setArchiveHeaders(w http.ResponseWriter, format string, archiveFilename str
w.Header().Set("Cache-Control", "private") w.Header().Set("Cache-Control", "private")
} }
func parseArchiveFormat(format string) (*exec.Cmd, string) {
switch format {
case "tar":
return nil, "tar"
case "tar.gz":
return exec.Command("gzip", "-c", "-n"), "tar"
case "tar.bz2":
return exec.Command("bzip2", "-c"), "tar"
case "zip":
return nil, "zip"
}
return nil, "unknown"
}
func prepareArchiveTempfile(dir string, prefix string) (*os.File, error) { func prepareArchiveTempfile(dir string, prefix string) (*os.File, error) {
if err := os.MkdirAll(dir, 0700); err != nil { if err := os.MkdirAll(dir, 0700); err != nil {
return nil, err return nil, err
...@@ -196,20 +136,20 @@ func finalizeCachedArchive(tempFile *os.File, archivePath string) error { ...@@ -196,20 +136,20 @@ func finalizeCachedArchive(tempFile *os.File, archivePath string) error {
return nil return nil
} }
func parseBasename(basename string) (string, bool) { func parseBasename(basename string) (ArchiveFormat, bool) {
var format string var format ArchiveFormat
switch basename { switch basename {
case "archive.zip": case "archive.zip":
format = "zip" format = ZipFormat
case "archive.tar": case "archive.tar":
format = "tar" format = TarFormat
case "archive", "archive.tar.gz", "archive.tgz", "archive.gz": case "archive", "archive.tar.gz", "archive.tgz", "archive.gz":
format = "tar.gz" format = TarGzFormat
case "archive.tar.bz2", "archive.tbz", "archive.tbz2", "archive.tb2", "archive.bz2": case "archive.tar.bz2", "archive.tbz", "archive.tbz2", "archive.tb2", "archive.bz2":
format = "tar.bz2" format = TarBz2Format
default: default:
return "", false return InvalidFormat, false
} }
return format, true return format, true
......
...@@ -9,16 +9,19 @@ import ( ...@@ -9,16 +9,19 @@ import (
) )
func TestParseBasename(t *testing.T) { func TestParseBasename(t *testing.T) {
for _, testCase := range []struct{ in, out string }{ for _, testCase := range []struct {
{"", "tar.gz"}, in string
{".tar.gz", "tar.gz"}, out ArchiveFormat
{".tgz", "tar.gz"}, }{
{".gz", "tar.gz"}, {"", TarGzFormat},
{".tar.bz2", "tar.bz2"}, {".tar.gz", TarGzFormat},
{".tbz", "tar.bz2"}, {".tgz", TarGzFormat},
{".tbz2", "tar.bz2"}, {".gz", TarGzFormat},
{".tb2", "tar.bz2"}, {".tar.bz2", TarBz2Format},
{".bz2", "tar.bz2"}, {".tbz", TarBz2Format},
{".tbz2", TarBz2Format},
{".tb2", TarBz2Format},
{".bz2", TarBz2Format},
} { } {
basename := "archive" + testCase.in basename := "archive" + testCase.in
out, ok := parseBasename(basename) out, ok := parseBasename(basename)
...@@ -47,11 +50,13 @@ func TestFinalizeArchive(t *testing.T) { ...@@ -47,11 +50,13 @@ func TestFinalizeArchive(t *testing.T) {
} }
func TestSetArchiveHeaders(t *testing.T) { func TestSetArchiveHeaders(t *testing.T) {
for _, testCase := range []struct{ in, out string }{ for _, testCase := range []struct {
{"zip", "application/zip"}, in ArchiveFormat
{"zippy", "application/octet-stream"}, out string
{"rezip", "application/octet-stream"}, }{
{"_anything_", "application/octet-stream"}, {ZipFormat, "application/zip"},
{TarFormat, "application/octet-stream"},
{InvalidFormat, "application/octet-stream"},
} { } {
w := httptest.NewRecorder() w := httptest.NewRecorder()
......
package git
import (
"context"
"fmt"
"io"
"os/exec"
"syscall"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
)
type ArchiveFormat int
const (
InvalidFormat ArchiveFormat = iota
ZipFormat
TarFormat
TarGzFormat
TarBz2Format
)
func parseArchiveFormat(format ArchiveFormat) (*exec.Cmd, string) {
switch format {
case TarFormat:
return nil, "tar"
case TarGzFormat:
return exec.Command("gzip", "-c", "-n"), "tar"
case TarBz2Format:
return exec.Command("bzip2", "-c"), "tar"
case ZipFormat:
return nil, "zip"
default:
return nil, "invalid format"
}
}
type archiveReader struct {
waitCmds []*exec.Cmd
stdout io.Reader
}
func (a *archiveReader) Read(p []byte) (int, error) {
n, err := a.stdout.Read(p)
if err != io.EOF {
return n, err
}
err = a.wait()
if err == nil {
err = io.EOF
}
return n, err
}
func (a *archiveReader) wait() error {
var waitErrors []error
// Must call Wait() on _all_ commands
for _, cmd := range a.waitCmds {
waitErrors = append(waitErrors, cmd.Wait())
}
for _, err := range waitErrors {
if err != nil {
return err
}
}
return nil
}
func newArchiveReader(ctx context.Context, repoPath string, format ArchiveFormat, archivePrefix string, commitId string) (a *archiveReader, err error) {
a = &archiveReader{}
compressCmd, formatArg := parseArchiveFormat(format)
archiveCmd := gitCommand("", "", "git", "--git-dir="+repoPath, "archive", "--format="+formatArg, "--prefix="+archivePrefix+"/", commitId)
var archiveStdout io.ReadCloser
archiveStdout, err = archiveCmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("SendArchive: archive stdout: %v", err)
}
defer func() {
if err != nil {
archiveStdout.Close()
}
}()
a.stdout = archiveStdout
if compressCmd != nil {
compressCmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
compressCmd.Stdin = archiveStdout
var compressStdout io.ReadCloser
compressStdout, err = compressCmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("SendArchive: compress stdout: %v", err)
}
defer func() {
if err != nil {
compressStdout.Close()
}
}()
if err := compressCmd.Start(); err != nil {
return nil, fmt.Errorf("SendArchive: start %v: %v", compressCmd.Args, err)
}
go ctxKill(ctx, compressCmd)
a.waitCmds = append(a.waitCmds, compressCmd)
a.stdout = compressStdout
archiveStdout.Close()
}
if err := archiveCmd.Start(); err != nil {
return nil, fmt.Errorf("SendArchive: start %v: %v", archiveCmd.Args, err)
}
go ctxKill(ctx, archiveCmd)
a.waitCmds = append(a.waitCmds, archiveCmd)
return a, nil
}
func ctxKill(ctx context.Context, cmd *exec.Cmd) {
<-ctx.Done()
helper.CleanUpProcessGroup(cmd)
cmd.Wait()
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment