Commit b8ab8bed authored by Stan Hu's avatar Stan Hu

Fix stalled HTTP fetches with large payloads

For fetches over HTTP, Workhorse executes git-upload-pack and first
attempts to send all the input data to stdin before reading from the
stdout pipe. However, when the payload is large, the stdout pipe may
fill up, causing git-upload-pack to stop reading from stdin. Workhorse
will then be deadlocked, since it will be waiting to send more data
to a buffer that will never be drained.

An addition side effect is that git-upload-pack processes also get left
around. These processes are cleaned up only after Workhorse is
restarted.

This fix modifies the git-upload-pack behavior to consume the entire
HTTP input first so that reading the data from stdout and sending the
reply can be performed in a separate Goroutine.

Closes #92

Closes gitlab-org/gitlab-ce#25916

Closes gitlab-com/infrastructure#941
parent c8589c13
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"log" "log"
"net/http" "net/http"
"os" "os"
...@@ -15,6 +16,7 @@ import ( ...@@ -15,6 +16,7 @@ import (
"path" "path"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api" "gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config" "gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
...@@ -155,6 +157,17 @@ func handlePostRPC(rw http.ResponseWriter, r *http.Request, a *api.Response) { ...@@ -155,6 +157,17 @@ func handlePostRPC(rw http.ResponseWriter, r *http.Request, a *api.Response) {
isShallowClone = scanDeepen(bytes.NewReader(buffer.Bytes())) isShallowClone = scanDeepen(bytes.NewReader(buffer.Bytes()))
body = io.MultiReader(buffer, r.Body) body = io.MultiReader(buffer, r.Body)
// Read out the full HTTP request body so that we can reply
buf, err := ioutil.ReadAll(body)
if err != nil {
helper.Fail500(w, r, &copyError{fmt.Errorf("handlePostRPC: full buffer git-upload-pack body: %v", err)})
return
}
body = ioutil.NopCloser(bytes.NewBuffer(buf))
r.Body.Close()
} else { } else {
body = r.Body body = r.Body
} }
...@@ -179,6 +192,28 @@ func handlePostRPC(rw http.ResponseWriter, r *http.Request, a *api.Response) { ...@@ -179,6 +192,28 @@ func handlePostRPC(rw http.ResponseWriter, r *http.Request, a *api.Response) {
} }
defer helper.CleanUpProcessGroup(cmd) // Ensure brute force subprocess clean-up defer helper.CleanUpProcessGroup(cmd) // Ensure brute force subprocess clean-up
stdoutError := make(chan error, 1)
var wg sync.WaitGroup
wg.Add(1)
if action == "git-upload-pack" {
// Start writing the response
writePostRPCHeader(w, action)
go func() {
defer wg.Done()
if _, err := io.Copy(w, stdout); err != nil {
helper.LogError(
r,
&copyError{fmt.Errorf("handlePostRPC: copy output of %v: %v", cmd.Args, err)},
)
stdoutError <- err
return
}
}()
}
// Write the client request body to Git's standard input // Write the client request body to Git's standard input
if writtenIn, err = io.Copy(stdin, body); err != nil { if writtenIn, err = io.Copy(stdin, body); err != nil {
helper.Fail500(w, r, fmt.Errorf("handlePostRPC: write to %v: %v", cmd.Args, err)) helper.Fail500(w, r, fmt.Errorf("handlePostRPC: write to %v: %v", cmd.Args, err))
...@@ -187,22 +222,27 @@ func handlePostRPC(rw http.ResponseWriter, r *http.Request, a *api.Response) { ...@@ -187,22 +222,27 @@ func handlePostRPC(rw http.ResponseWriter, r *http.Request, a *api.Response) {
// Signal to the Git subprocess that no more data is coming // Signal to the Git subprocess that no more data is coming
stdin.Close() stdin.Close()
// It may take a while before we return and the deferred closes happen if action == "git-upload-pack" {
// so let's free up some resources already. wg.Wait()
r.Body.Close()
// Start writing the response if len(stdoutError) > 0 {
w.Header().Set("Content-Type", fmt.Sprintf("application/x-%s-result", action)) return
w.Header().Set("Cache-Control", "no-cache") }
w.WriteHeader(200) // Don't bother with HTTP 500 from this point on, just return } else {
// It may take a while before we return and the deferred closes happen
// so let's free up some resources already.
r.Body.Close()
// This io.Copy may take a long time, both for Git push and pull. writePostRPCHeader(w, action)
if _, err := io.Copy(w, stdout); err != nil {
helper.LogError( // This io.Copy may take a long time, both for Git push and pull.
r, if _, err := io.Copy(w, stdout); err != nil {
&copyError{fmt.Errorf("handlePostRPC: copy output of %v: %v", cmd.Args, err)}, helper.LogError(
) r,
return &copyError{fmt.Errorf("handlePostRPC: copy output of %v: %v", cmd.Args, err)},
)
return
}
} }
if err := cmd.Wait(); err != nil && !(isExitError(err) && isShallowClone) { if err := cmd.Wait(); err != nil && !(isExitError(err) && isShallowClone) {
helper.LogError(r, fmt.Errorf("handlePostRPC: wait for %v: %v", cmd.Args, err)) helper.LogError(r, fmt.Errorf("handlePostRPC: wait for %v: %v", cmd.Args, err))
...@@ -210,6 +250,12 @@ func handlePostRPC(rw http.ResponseWriter, r *http.Request, a *api.Response) { ...@@ -210,6 +250,12 @@ func handlePostRPC(rw http.ResponseWriter, r *http.Request, a *api.Response) {
} }
} }
func writePostRPCHeader(w http.ResponseWriter, action string) {
w.Header().Set("Content-Type", fmt.Sprintf("application/x-%s-result", action))
w.Header().Set("Cache-Control", "no-cache")
w.WriteHeader(200) // Don't bother with HTTP 500 from this point on, just return
}
func getService(r *http.Request) string { func getService(r *http.Request) string {
if r.Method == "GET" { if r.Method == "GET" {
return r.URL.Query().Get("service") return r.URL.Query().Get("service")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment