Commit fe1def6e authored by Ahmad Sherif's avatar Ahmad Sherif

Migrate Send{Diff,Patch} to Gitaly

Closes gitaly#622
Closes gitaly#612
parent 17bd7545
......@@ -340,6 +340,46 @@ func TestGetBlobProxiedToGitalySuccessfully(t *testing.T) {
testhelper.AssertResponseHeader(t, resp, "Content-Length", strconv.Itoa(blobLength))
}
func TestGetDiffProxiedToGitalySuccessfully(t *testing.T) {
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
defer gitalyServer.Stop()
gitalyAddress := "unix://" + socketPath
repoStorage := "default"
rightCommit := "e395f646b1499e8e0279445fc99a0596a65fab7e"
leftCommit := "8a0f2ee90d940bfb0ba1e14e8214b0649056e4ab"
repoRelativePath := "foo/bar.git"
jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"RawDiffRequest":"{\"repository\":{\"storageName\":\"%s\",\"relativePath\":\"%s\"},\"rightCommitId\":\"%s\",\"leftCommitId\":\"%s\"}"}`,
gitalyAddress, repoStorage, repoRelativePath, leftCommit, rightCommit)
expectedBody := testhelper.GitalyGetDiffResponseMock
resp, body, err := doSendDataRequest("/something", "git-diff", jsonParams)
require.NoError(t, err)
assert.Equal(t, 200, resp.StatusCode, "GET %q: status code", resp.Request.URL)
assert.Equal(t, expectedBody, string(body), "GET %q: response body", resp.Request.URL)
}
func TestGetPatchProxiedToGitalySuccessfully(t *testing.T) {
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
defer gitalyServer.Stop()
gitalyAddress := "unix://" + socketPath
repoStorage := "default"
rightCommit := "e395f646b1499e8e0279445fc99a0596a65fab7e"
leftCommit := "8a0f2ee90d940bfb0ba1e14e8214b0649056e4ab"
repoRelativePath := "foo/bar.git"
jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"RawPatchRequest":"{\"repository\":{\"storageName\":\"%s\",\"relativePath\":\"%s\"},\"rightCommitId\":\"%s\",\"leftCommitId\":\"%s\"}"}`,
gitalyAddress, repoStorage, repoRelativePath, leftCommit, rightCommit)
expectedBody := testhelper.GitalyGetPatchResponseMock
resp, body, err := doSendDataRequest("/something", "git-format-patch", jsonParams)
require.NoError(t, err)
assert.Equal(t, 200, resp.StatusCode, "GET %q: status code", resp.Request.URL)
assert.Equal(t, expectedBody, string(body), "GET %q: response body", resp.Request.URL)
}
func TestGetBlobProxiedToGitalyInterruptedStream(t *testing.T) {
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
defer gitalyServer.Stop()
......@@ -427,6 +467,70 @@ func TestGetArchiveProxiedToGitalyInterruptedStream(t *testing.T) {
}
}
func TestGetDiffProxiedToGitalyInterruptedStream(t *testing.T) {
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
defer gitalyServer.Stop()
gitalyAddress := "unix://" + socketPath
repoStorage := "default"
rightCommit := "e395f646b1499e8e0279445fc99a0596a65fab7e"
leftCommit := "8a0f2ee90d940bfb0ba1e14e8214b0649056e4ab"
repoRelativePath := "foo/bar.git"
jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"RawDiffRequest":"{\"repository\":{\"storageName\":\"%s\",\"relativePath\":\"%s\"},\"rightCommitId\":\"%s\",\"leftCommitId\":\"%s\"}"}`,
gitalyAddress, repoStorage, repoRelativePath, leftCommit, rightCommit)
resp, _, err := doSendDataRequest("/something", "git-diff", jsonParams)
require.NoError(t, err)
// This causes the server stream to be interrupted instead of consumed entirely.
resp.Body.Close()
done := make(chan struct{})
go func() {
gitalyServer.WaitGroup.Wait()
close(done)
}()
select {
case <-done:
return
case <-time.After(10 * time.Second):
t.Fatal("time out waiting for gitaly handler to return")
}
}
func TestGetPatchProxiedToGitalyInterruptedStream(t *testing.T) {
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
defer gitalyServer.Stop()
gitalyAddress := "unix://" + socketPath
repoStorage := "default"
rightCommit := "e395f646b1499e8e0279445fc99a0596a65fab7e"
leftCommit := "8a0f2ee90d940bfb0ba1e14e8214b0649056e4ab"
repoRelativePath := "foo/bar.git"
jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"RawPatchRequest":"{\"repository\":{\"storageName\":\"%s\",\"relativePath\":\"%s\"},\"rightCommitId\":\"%s\",\"leftCommitId\":\"%s\"}"}`,
gitalyAddress, repoStorage, repoRelativePath, leftCommit, rightCommit)
resp, _, err := doSendDataRequest("/something", "git-format-patch", jsonParams)
require.NoError(t, err)
// This causes the server stream to be interrupted instead of consumed entirely.
resp.Body.Close()
done := make(chan struct{})
go func() {
gitalyServer.WaitGroup.Wait()
close(done)
}()
select {
case <-done:
return
case <-time.After(10 * time.Second):
t.Fatal("time out waiting for gitaly handler to return")
}
}
type combinedServer struct {
*grpc.Server
*testhelper.GitalyTestServer
......@@ -445,6 +549,7 @@ func startGitalyServer(t *testing.T, finalMessageCode codes.Code) (*combinedServ
pb.RegisterSmartHTTPServiceServer(server, gitalyServer)
pb.RegisterBlobServiceServer(server, gitalyServer)
pb.RegisterRepositoryServiceServer(server, gitalyServer)
pb.RegisterDiffServiceServer(server, gitalyServer)
go server.Serve(listener)
......
......@@ -6,8 +6,13 @@ import (
"log"
"net/http"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/gitaly"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/senddata"
pb "gitlab.com/gitlab-org/gitaly-proto/go"
"github.com/golang/protobuf/jsonpb"
)
type diff struct{ senddata.Prefix }
......@@ -15,6 +20,8 @@ type diffParams struct {
RepoPath string
ShaFrom string
ShaTo string
GitalyServer gitaly.Server
RawDiffRequest string
}
var SendDiff = &diff{"git-diff:"}
......@@ -26,6 +33,33 @@ func (d *diff) Inject(w http.ResponseWriter, r *http.Request, sendData string) {
return
}
if params.GitalyServer.Address != "" {
handleSendDiffWithGitaly(w, r, &params)
} else {
handleSendDiffLocally(w, r, &params)
}
}
func handleSendDiffWithGitaly(w http.ResponseWriter, r *http.Request, params *diffParams) {
request := &pb.RawDiffRequest{}
if err := jsonpb.UnmarshalString(params.RawDiffRequest, request); err != nil {
helper.Fail500(w, r, fmt.Errorf("diff.RawDiff: %v", err))
}
diffClient, err := gitaly.NewDiffClient(params.GitalyServer)
if err != nil {
helper.Fail500(w, r, fmt.Errorf("diff.RawDiff: %v", err))
}
if err := diffClient.SendRawDiff(r.Context(), w, request); err != nil {
helper.LogError(
r,
&copyError{fmt.Errorf("diff.RawDiff: request=%v, err=%v", request, err)},
)
}
}
func handleSendDiffLocally(w http.ResponseWriter, r *http.Request, params *diffParams) {
log.Printf("SendDiff: sending diff between %q and %q for %q", params.ShaFrom, params.ShaTo, r.URL.Path)
gitDiffCmd := gitCommand("git", "--git-dir="+params.RepoPath, "diff", params.ShaFrom, params.ShaTo)
......
......@@ -6,8 +6,13 @@ import (
"log"
"net/http"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/gitaly"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/senddata"
pb "gitlab.com/gitlab-org/gitaly-proto/go"
"github.com/golang/protobuf/jsonpb"
)
type patch struct{ senddata.Prefix }
......@@ -15,6 +20,8 @@ type patchParams struct {
RepoPath string
ShaFrom string
ShaTo string
GitalyServer gitaly.Server
RawPatchRequest string
}
var SendPatch = &patch{"git-format-patch:"}
......@@ -25,7 +32,33 @@ func (p *patch) Inject(w http.ResponseWriter, r *http.Request, sendData string)
helper.Fail500(w, r, fmt.Errorf("SendPatch: unpack sendData: %v", err))
return
}
if params.GitalyServer.Address != "" {
handleSendPatchWithGitaly(w, r, &params)
} else {
handleSendPatchLocally(w, r, &params)
}
}
func handleSendPatchWithGitaly(w http.ResponseWriter, r *http.Request, params *patchParams) {
request := &pb.RawPatchRequest{}
if err := jsonpb.UnmarshalString(params.RawPatchRequest, request); err != nil {
helper.Fail500(w, r, fmt.Errorf("diff.RawPatch: %v", err))
}
diffClient, err := gitaly.NewDiffClient(params.GitalyServer)
if err != nil {
helper.Fail500(w, r, fmt.Errorf("diff.RawPatch: %v", err))
}
if err := diffClient.SendRawPatch(r.Context(), w, request); err != nil {
helper.LogError(
r,
&copyError{fmt.Errorf("diff.RawPatch: request=%v, err=%v", request, err)},
)
}
}
func handleSendPatchLocally(w http.ResponseWriter, r *http.Request, params *patchParams) {
log.Printf("SendPatch: sending patch between %q and %q for %q", params.ShaFrom, params.ShaTo, r.URL.Path)
gitRange := fmt.Sprintf("%s..%s", params.ShaFrom, params.ShaTo)
......
package gitaly
import (
"context"
"fmt"
"io"
"net/http"
"gitlab.com/gitlab-org/gitaly/streamio"
pb "gitlab.com/gitlab-org/gitaly-proto/go"
)
type DiffClient struct {
pb.DiffServiceClient
}
func (client *DiffClient) SendRawDiff(ctx context.Context, w http.ResponseWriter, request *pb.RawDiffRequest) error {
c, err := client.RawDiff(ctx, request)
if err != nil {
return fmt.Errorf("rpc failed: %v", err)
}
w.Header().Del("Content-Length")
rr := streamio.NewReader(func() ([]byte, error) {
resp, err := c.Recv()
return resp.GetData(), err
})
if _, err := io.Copy(w, rr); err != nil {
return fmt.Errorf("copy rpc data: %v", err)
}
return nil
}
func (client *DiffClient) SendRawPatch(ctx context.Context, w http.ResponseWriter, request *pb.RawPatchRequest) error {
c, err := client.RawPatch(ctx, request)
if err != nil {
return fmt.Errorf("rpc failed: %v", err)
}
w.Header().Del("Content-Length")
rr := streamio.NewReader(func() ([]byte, error) {
resp, err := c.Recv()
return resp.GetData(), err
})
if _, err := io.Copy(w, rr); err != nil {
return fmt.Errorf("copy rpc data: %v", err)
}
return nil
}
......@@ -50,6 +50,15 @@ func NewRepositoryClient(server Server) (*RepositoryClient, error) {
return &RepositoryClient{grpcClient}, nil
}
func NewDiffClient(server Server) (*DiffClient, error) {
conn, err := getOrCreateConnection(server)
if err != nil {
return nil, err
}
grpcClient := pb.NewDiffServiceClient(conn)
return &DiffClient{grpcClient}, nil
}
func getOrCreateConnection(server Server) (*grpc.ClientConn, error) {
cache.RLock()
conn := cache.connections[server]
......
......@@ -25,6 +25,8 @@ var (
GitalyInfoRefsResponseMock = strings.Repeat("Mock Gitaly InfoRefsResponse data", 100000)
GitalyGetBlobResponseMock = strings.Repeat("Mock Gitaly GetBlobResponse data", 100000)
GitalyGetArchiveResponseMock = strings.Repeat("Mock Gitaly GetArchiveResponse data", 100000)
GitalyGetDiffResponseMock = strings.Repeat("Mock Gitaly GetDiffResponse data", 100000)
GitalyGetPatchResponseMock = strings.Repeat("Mock Gitaly GetPatchResponse data", 100000)
GitalyReceivePackResponseMock []byte
GitalyUploadPackResponseMock []byte
)
......@@ -230,6 +232,45 @@ func (s *GitalyTestServer) GetArchive(in *pb.GetArchiveRequest, stream pb.Reposi
return s.finalError()
}
func (s *GitalyTestServer) RawDiff(in *pb.RawDiffRequest, stream pb.DiffService_RawDiffServer) error {
nSends, err := sendBytes([]byte(GitalyGetDiffResponseMock), 100, func(p []byte) error {
return stream.Send(&pb.RawDiffResponse{
Data: p,
})
})
if err != nil {
return err
}
if nSends <= 1 {
panic("should have sent more than one message")
}
return s.finalError()
}
func (s *GitalyTestServer) RawPatch(in *pb.RawPatchRequest, stream pb.DiffService_RawPatchServer) error {
s.WaitGroup.Add(1)
defer s.WaitGroup.Done()
if err := validateRepository(in.GetRepository()); err != nil {
return err
}
nSends, err := sendBytes([]byte(GitalyGetPatchResponseMock), 100, func(p []byte) error {
return stream.Send(&pb.RawPatchResponse{
Data: p,
})
})
if err != nil {
return err
}
if nSends <= 1 {
panic("should have sent more than one message")
}
return s.finalError()
}
func (s *GitalyTestServer) RepositoryExists(context.Context, *pb.RepositoryExistsRequest) (*pb.RepositoryExistsResponse, error) {
return nil, nil
}
......@@ -266,6 +307,18 @@ func (s *GitalyTestServer) Exists(context.Context, *pb.RepositoryExistsRequest)
return nil, nil
}
func (s *GitalyTestServer) CommitDelta(in *pb.CommitDeltaRequest, stream pb.DiffService_CommitDeltaServer) error {
return nil
}
func (s *GitalyTestServer) CommitDiff(in *pb.CommitDiffRequest, stream pb.DiffService_CommitDiffServer) error {
return nil
}
func (s *GitalyTestServer) CommitPatch(in *pb.CommitPatchRequest, stream pb.DiffService_CommitPatchServer) error {
return nil
}
// sendBytes returns the number of times the 'sender' function was called and an error.
func sendBytes(data []byte, chunkSize int, sender func([]byte) error) (int, error) {
i := 0
......
This diff is collapsed.
This diff is collapsed.
// Protocol Buffers - Google's data interchange format
// Copyright 2008 Google Inc. All rights reserved.
// https://developers.google.com/protocol-buffers/
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
syntax = "proto3";
package google.protobuf;
option csharp_namespace = "Google.Protobuf.WellKnownTypes";
option cc_enable_arenas = true;
option go_package = "github.com/golang/protobuf/ptypes/struct;structpb";
option java_package = "com.google.protobuf";
option java_outer_classname = "StructProto";
option java_multiple_files = true;
option objc_class_prefix = "GPB";
// `Struct` represents a structured data value, consisting of fields
// which map to dynamically typed values. In some languages, `Struct`
// might be supported by a native representation. For example, in
// scripting languages like JS a struct is represented as an
// object. The details of that representation are described together
// with the proto support for the language.
//
// The JSON representation for `Struct` is JSON object.
message Struct {
// Unordered map of dynamically typed values.
map<string, Value> fields = 1;
}
// `Value` represents a dynamically typed value which can be either
// null, a number, a string, a boolean, a recursive struct value, or a
// list of values. A producer of value is expected to set one of that
// variants, absence of any variant indicates an error.
//
// The JSON representation for `Value` is JSON value.
message Value {
// The kind of value.
oneof kind {
// Represents a null value.
NullValue null_value = 1;
// Represents a double value.
double number_value = 2;
// Represents a string value.
string string_value = 3;
// Represents a boolean value.
bool bool_value = 4;
// Represents a structured value.
Struct struct_value = 5;
// Represents a repeated `Value`.
ListValue list_value = 6;
}
}
// `NullValue` is a singleton enumeration to represent the null value for the
// `Value` type union.
//
// The JSON representation for `NullValue` is JSON `null`.
enum NullValue {
// Null value.
NULL_VALUE = 0;
}
// `ListValue` is a wrapper around a repeated field of values.
//
// The JSON representation for `ListValue` is JSON array.
message ListValue {
// Repeated field of dynamically typed values.
repeated Value values = 1;
}
......@@ -57,6 +57,10 @@ It has these top-level messages:
CommitDeltaResponse
CommitPatchRequest
CommitPatchResponse
RawDiffRequest
RawDiffResponse
RawPatchRequest
RawPatchResponse
AddNamespaceRequest
RemoveNamespaceRequest
RenameNamespaceRequest
......
......@@ -42,11 +42,23 @@
"path": "github.com/getsentry/raven-go",
"revision": "379f8d0a68ca237cf8893a1cdfd4f574125e2c51"
},
{
"checksumSHA1": "eG45Hg4ZnB4b1KuqsMknc5eAx4A=",
"path": "github.com/golang/protobuf/jsonpb",
"revision": "130e6b02ab059e7b717a096f397c5b60111cae74",
"revisionTime": "2017-09-20T22:06:47Z"
},
{
"checksumSHA1": "kBeNcaKk56FguvPSUCEaH6AxpRc=",
"path": "github.com/golang/protobuf/proto",
"revision": "8ee79997227bf9b34611aee7946ae64735e6fd93"
},
{
"checksumSHA1": "Ylq6kq3KWBy6mu68oyEwenhNMdg=",
"path": "github.com/golang/protobuf/ptypes/struct",
"revision": "130e6b02ab059e7b717a096f397c5b60111cae74",
"revisionTime": "2017-09-20T22:06:47Z"
},
{
"checksumSHA1": "sfoot+dHmmOgWZS6GJ5X79ClZM0=",
"path": "github.com/golang/protobuf/ptypes/timestamp",
......@@ -152,12 +164,12 @@
"revisionTime": "2016-11-17T07:43:51Z"
},
{
"checksumSHA1": "tisAil16tojFqhqWYbs2kXwBYyk=",
"checksumSHA1": "I7Kz+IncdenJ5tOCo/5qJlQ4U7k=",
"path": "gitlab.com/gitlab-org/gitaly-proto/go",
"revision": "12872bd8dad9dc72328b2c590386e67a17c65612",
"revisionTime": "2017-09-27T21:53:01Z",
"version": "v0.38.0",
"versionExact": "v0.38.0"
"revision": "8ac6c4c6cb2c124ff95cdd0f97bc4624055edc8f",
"revisionTime": "2017-10-02T18:55:11Z",
"version": "v0.39.0",
"versionExact": "v0.39.0"
},
{
"checksumSHA1": "dUHJbKas746n5fLzlwxHb6FOCxs=",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment