Commit 47edcb11 authored by Jacob Vosmaer's avatar Jacob Vosmaer

Fix another upload-pack deadlock

Thanks Nick for pointing out that exec.Cmd can take care of all the
copying for us.
parent 815be1cc
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"path" "path"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api" "gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
...@@ -27,19 +28,18 @@ func UploadPack(a *api.API) http.Handler { ...@@ -27,19 +28,18 @@ func UploadPack(a *api.API) http.Handler {
return postRPCHandler(a, "handleUploadPack", handleUploadPack) return postRPCHandler(a, "handleUploadPack", handleUploadPack)
} }
func postRPCHandler(a *api.API, name string, handler func(*GitHttpResponseWriter, *http.Request, *api.Response) (int64, error)) http.Handler { func postRPCHandler(a *api.API, name string, handler func(*GitHttpResponseWriter, *http.Request, *api.Response) error) http.Handler {
return repoPreAuthorizeHandler(a, func(rw http.ResponseWriter, r *http.Request, ar *api.Response) { return repoPreAuthorizeHandler(a, func(rw http.ResponseWriter, r *http.Request, ar *api.Response) {
var writtenIn int64 cr := &countReadCloser{ReadCloser: r.Body}
var err error r.Body = cr
w := NewGitHttpResponseWriter(rw) w := NewGitHttpResponseWriter(rw)
defer func() { defer func() {
w.Log(r, writtenIn) w.Log(r, cr.Count())
}() }()
writtenIn, err = handler(w, r, ar) if err := handler(w, r, ar); err != nil {
if err != nil { helper.Fail500(w, r, fmt.Errorf("%s: %v", name, err))
helper.LogError(r, fmt.Errorf("%s: %v", name, err))
} }
}) })
} }
...@@ -70,46 +70,25 @@ func repoPreAuthorizeHandler(myAPI *api.API, handleFunc api.HandleFunc) http.Han ...@@ -70,46 +70,25 @@ func repoPreAuthorizeHandler(myAPI *api.API, handleFunc api.HandleFunc) http.Han
}, "") }, "")
} }
func setupGitCommand(action string, a *api.Response, options ...string) (cmd *exec.Cmd, stdin io.WriteCloser, stdout io.ReadCloser, err error) { func startGitCommand(a *api.Response, stdin io.Reader, stdout io.Writer, action string, options ...string) (cmd *exec.Cmd, err error) {
// Don't leak pipes when we return early after an error
defer func() {
if err == nil {
return
}
if stdin != nil {
stdin.Close()
}
if stdout != nil {
stdout.Close()
}
}()
// Prepare our Git subprocess // Prepare our Git subprocess
args := []string{subCommand(action), "--stateless-rpc"} args := []string{subCommand(action), "--stateless-rpc"}
args = append(args, options...) args = append(args, options...)
args = append(args, a.RepoPath) args = append(args, a.RepoPath)
cmd = gitCommand(a.GL_ID, "git", args...) cmd = gitCommand(a.GL_ID, "git", args...)
stdout, err = cmd.StdoutPipe() cmd.Stdin = stdin
if err != nil { cmd.Stdout = stdout
return nil, nil, nil, fmt.Errorf("stdout pipe: %v", err)
}
stdin, err = cmd.StdinPipe()
if err != nil {
return nil, nil, nil, fmt.Errorf("stdin pipe: %v", err)
}
if err = cmd.Start(); err != nil { if err = cmd.Start(); err != nil {
return nil, nil, nil, fmt.Errorf("start %v: %v", cmd.Args, err) return nil, fmt.Errorf("start %v: %v", cmd.Args, err)
} }
return cmd, stdin, stdout, nil return cmd, nil
} }
func writePostRPCHeader(w http.ResponseWriter, action string) { func writePostRPCHeader(w http.ResponseWriter, action string) {
w.Header().Set("Content-Type", fmt.Sprintf("application/x-%s-result", action)) w.Header().Set("Content-Type", fmt.Sprintf("application/x-%s-result", action))
w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Cache-Control", "no-cache")
w.WriteHeader(200) // Don't bother with HTTP 500 from this point on, just return
} }
func getService(r *http.Request) string { func getService(r *http.Request) string {
...@@ -127,3 +106,25 @@ func isExitError(err error) bool { ...@@ -127,3 +106,25 @@ func isExitError(err error) bool {
func subCommand(rpc string) string { func subCommand(rpc string) string {
return strings.TrimPrefix(rpc, "git-") return strings.TrimPrefix(rpc, "git-")
} }
type countReadCloser struct {
n int64
io.ReadCloser
sync.Mutex
}
func (c *countReadCloser) Read(p []byte) (n int, err error) {
n, err = c.ReadCloser.Read(p)
c.Lock()
defer c.Unlock()
c.n += int64(n)
return n, err
}
func (c *countReadCloser) Count() int64 {
c.Lock()
defer c.Unlock()
return c.n
}
...@@ -39,7 +39,7 @@ func TestHandleReceivePack(t *testing.T) { ...@@ -39,7 +39,7 @@ func TestHandleReceivePack(t *testing.T) {
testHandlePostRpc(t, "git-receive-pack", handleReceivePack) testHandlePostRpc(t, "git-receive-pack", handleReceivePack)
} }
func testHandlePostRpc(t *testing.T, action string, handler func(*GitHttpResponseWriter, *http.Request, *api.Response) (int64, error)) { func testHandlePostRpc(t *testing.T, action string, handler func(*GitHttpResponseWriter, *http.Request, *api.Response) error) {
execCommand = fakeExecCommand execCommand = fakeExecCommand
defer func() { execCommand = exec.Command }() defer func() { execCommand = exec.Command }()
......
...@@ -2,7 +2,6 @@ package git ...@@ -2,7 +2,6 @@ package git
import ( import (
"fmt" "fmt"
"io"
"net/http" "net/http"
"path" "path"
...@@ -45,36 +44,31 @@ func handleGetInfoRefs(rw http.ResponseWriter, r *http.Request, a *api.Response) ...@@ -45,36 +44,31 @@ func handleGetInfoRefs(rw http.ResponseWriter, r *http.Request, a *api.Response)
return return
} }
cmd, stdin, stdout, err := setupGitCommand(rpc, a, "--advertise-refs")
if err != nil {
helper.Fail500(w, r, fmt.Errorf("handleGetInfoRefs: setupGitCommand: %v", err))
return
}
defer helper.CleanUpProcessGroup(cmd) // Ensure brute force subprocess clean-up
stdin.Close() // Not needed for this request
defer stdout.Close()
// Start writing the response
w.Header().Set("Content-Type", fmt.Sprintf("application/x-%s-advertisement", rpc)) w.Header().Set("Content-Type", fmt.Sprintf("application/x-%s-advertisement", rpc))
w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Cache-Control", "no-cache")
w.WriteHeader(200) // Don't bother with HTTP 500 from this point on, just return
if err := writeBody(w, a, rpc); err != nil {
helper.LogError(r, fmt.Errorf("handleGetInfoRefs: %v", err))
}
}
func writeBody(w http.ResponseWriter, a *api.Response, rpc string) error {
if err := pktLine(w, fmt.Sprintf("# service=%s\n", rpc)); err != nil { if err := pktLine(w, fmt.Sprintf("# service=%s\n", rpc)); err != nil {
helper.LogError(r, fmt.Errorf("handleGetInfoRefs: pktLine: %v", err)) return fmt.Errorf("pktLine: %v", err)
return
} }
if err := pktFlush(w); err != nil { if err := pktFlush(w); err != nil {
helper.LogError(r, fmt.Errorf("handleGetInfoRefs: pktFlush: %v", err)) return fmt.Errorf("pktFlush: %v", err)
return
} }
if _, err := io.Copy(w, stdout); err != nil {
helper.LogError( cmd, err := startGitCommand(a, nil, w, rpc, "--advertise-refs")
r, if err != nil {
&copyError{fmt.Errorf("handleGetInfoRefs: copy output of %v: %v", cmd.Args, err)}, return fmt.Errorf("startGitCommand: %v", err)
)
return
} }
defer helper.CleanUpProcessGroup(cmd) // Ensure brute force subprocess clean-up
if err := cmd.Wait(); err != nil { if err := cmd.Wait(); err != nil {
helper.LogError(r, fmt.Errorf("handleGetInfoRefs: wait for %v: %v", cmd.Args, err)) return fmt.Errorf("wait for %v: %v", cmd.Args, err)
return
} }
return nil
} }
...@@ -2,53 +2,29 @@ package git ...@@ -2,53 +2,29 @@ package git
import ( import (
"fmt" "fmt"
"io"
"net/http" "net/http"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api" "gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
) )
func handleReceivePack(w *GitHttpResponseWriter, r *http.Request, a *api.Response) (writtenIn int64, err error) { // Will not return a non-nil error after the response body has been
// written to.
func handleReceivePack(w *GitHttpResponseWriter, r *http.Request, a *api.Response) error {
action := getService(r) action := getService(r)
cmd, stdin, stdout, err := setupGitCommand(action, a)
if err != nil {
fail500(w)
return writtenIn, fmt.Errorf("setupGitCommand: %v", err)
}
defer stdout.Close()
defer stdin.Close()
defer helper.CleanUpProcessGroup(cmd) // Ensure brute force subprocess clean-up
writtenIn, err = io.Copy(stdin, r.Body)
if err != nil {
fail500(w)
return writtenIn, fmt.Errorf("write to %v: %v", cmd.Args, err)
}
// Signal to the Git subprocess that no more data is coming
stdin.Close()
// It may take a while before we return and the deferred closes happen
// so let's free up some resources already.
r.Body.Close()
writePostRPCHeader(w, action) writePostRPCHeader(w, action)
// This io.Copy may take a long time, both for Git push and pull. cmd, err := startGitCommand(a, r.Body, w, action)
_, err = io.Copy(w, stdout)
if err != nil { if err != nil {
return writtenIn, &copyError{fmt.Errorf("copy output of %v: %v", cmd.Args, err)} return fmt.Errorf("startGitCommand: %v", err)
} }
defer helper.CleanUpProcessGroup(cmd)
err = cmd.Wait() if err := cmd.Wait(); err != nil {
helper.LogError(r, fmt.Errorf("wait for %v: %v", cmd.Args, err))
if err != nil { // Return nil because the response body has been written to already.
return writtenIn, fmt.Errorf("wait for %v: %v", cmd.Args, err) return nil
} }
return writtenIn, nil return nil
} }
...@@ -9,68 +9,38 @@ import ( ...@@ -9,68 +9,38 @@ import (
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper" "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
) )
func handleUploadPack(w *GitHttpResponseWriter, r *http.Request, a *api.Response) (writtenIn int64, err error) { // Will not return a non-nil error after the response body has been
// written to.
func handleUploadPack(w *GitHttpResponseWriter, r *http.Request, a *api.Response) error {
// The body will consist almost entirely of 'have XXX' and 'want XXX' // The body will consist almost entirely of 'have XXX' and 'want XXX'
// lines; these are about 50 bytes long. With a limit of 10MB the client // lines; these are about 50 bytes long. With a limit of 10MB the client
// can send over 200,000 have/want lines. // can send over 200,000 have/want lines.
buffer, err := helper.ReadAllTempfile(io.LimitReader(r.Body, 10*1024*1024)) buffer, err := helper.ReadAllTempfile(io.LimitReader(r.Body, 10*1024*1024))
if err != nil { if err != nil {
fail500(w) return fmt.Errorf("ReadAllTempfile: %v", err)
return writtenIn, fmt.Errorf("ReadAllTempfile: %v", err)
} }
defer buffer.Close() defer buffer.Close()
r.Body.Close() r.Body.Close()
isShallowClone := scanDeepen(buffer) isShallowClone := scanDeepen(buffer)
if _, err := buffer.Seek(0, 0); err != nil { if _, err := buffer.Seek(0, 0); err != nil {
fail500(w) return fmt.Errorf("seek tempfile: %v", err)
return writtenIn, fmt.Errorf("seek tempfile: %v", err)
} }
action := getService(r) action := getService(r)
cmd, stdin, stdout, err := setupGitCommand(action, a) writePostRPCHeader(w, action)
cmd, err := startGitCommand(a, buffer, w, action)
if err != nil { if err != nil {
fail500(w) return fmt.Errorf("startGitCommand: %v", err)
return writtenIn, fmt.Errorf("setupGitCommand: %v", err)
} }
defer helper.CleanUpProcessGroup(cmd)
defer stdout.Close() if err := cmd.Wait(); err != nil && !(isExitError(err) && isShallowClone) {
defer stdin.Close() helper.LogError(r, fmt.Errorf("wait for %v: %v", cmd.Args, err))
defer helper.CleanUpProcessGroup(cmd) // Ensure brute force subprocess clean-up // Return nil because the response body has been written to already.
return nil
stdoutError := make(chan error, 1)
go func() {
writePostRPCHeader(w, action)
// Start reading from stdout already to avoid blocking while writing to
// stdin below.
_, err := io.Copy(w, stdout)
// This error may be lost if some other error prevents us from <-ing on this channel.
stdoutError <- err
}()
// Write the client request body to Git's standard input
if writtenIn, err = io.Copy(stdin, buffer); err != nil {
fail500(w)
return writtenIn, fmt.Errorf("write to %v: %v", cmd.Args, err)
}
// Signal to the Git subprocess that no more data is coming
stdin.Close()
if err := <-stdoutError; err != nil {
return writtenIn, &copyError{fmt.Errorf("copy output of %v: %v", cmd.Args, err)}
} }
err = cmd.Wait() return nil
if err != nil && !(isExitError(err) && isShallowClone) {
return writtenIn, fmt.Errorf("wait for %v: %v", cmd.Args, err)
}
return writtenIn, nil
}
func fail500(w http.ResponseWriter) {
helper.Fail500(w, nil, nil)
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment