Commit b0798c9b authored by Nick Thomas's avatar Nick Thomas Committed by Jacob Vosmaer (GitLab)

Bridge between Gitaly and GitLab for a new repository snapshot endpoint

parent 9c54fcc3
......@@ -2,6 +2,7 @@ package main
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
......@@ -22,6 +23,8 @@ import (
pb "gitlab.com/gitlab-org/gitaly-proto/go"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
......@@ -579,6 +582,106 @@ func TestGetPatchProxiedToGitalyInterruptedStream(t *testing.T) {
}
}
func TestGetSnapshotProxiedToGitalySuccessfully(t *testing.T) {
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
defer gitalyServer.Stop()
gitalyAddress := "unix://" + socketPath
expectedBody := testhelper.GitalyGetSnapshotResponseMock
archiveLength := len(expectedBody)
params := buildGetSnapshotParams(gitalyAddress, buildPbRepo("default", "foo/bar.git"))
resp, body, err := doSendDataRequest("/api/v4/projects/:id/snapshot", "git-snapshot", params)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode, "GET %q: status code", resp.Request.URL)
assert.Equal(t, expectedBody, string(body), "GET %q: body", resp.Request.URL)
assert.Equal(t, archiveLength, len(body), "GET %q: body size", resp.Request.URL)
testhelper.AssertResponseHeader(t, resp, "Content-Disposition", `attachment; filename="snapshot.tar"`)
testhelper.AssertResponseHeader(t, resp, "Content-Type", "application/x-tar")
testhelper.AssertResponseHeader(t, resp, "Content-Transfer-Encoding", "binary")
testhelper.AssertResponseHeader(t, resp, "Cache-Control", "private")
}
func TestGetSnapshotProxiedToGitalyInterruptedStream(t *testing.T) {
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
defer gitalyServer.Stop()
gitalyAddress := "unix://" + socketPath
params := buildGetSnapshotParams(gitalyAddress, buildPbRepo("default", "foo/bar.git"))
resp, _, err := doSendDataRequest("/api/v4/projects/:id/snapshot", "git-snapshot", params)
require.NoError(t, err)
// This causes the server stream to be interrupted instead of consumed entirely.
resp.Body.Close()
done := make(chan struct{})
go func() {
gitalyServer.WaitGroup.Wait()
close(done)
}()
select {
case <-done:
return
case <-time.After(10 * time.Second):
t.Fatal("time out waiting for gitaly handler to return")
}
}
func buildGetSnapshotParams(gitalyAddress string, repo *pb.Repository) string {
msg := serializedMessage("GetSnapshotRequest", &pb.GetSnapshotRequest{Repository: repo})
return buildGitalyRpcParams(gitalyAddress, msg)
}
type rpcArg struct {
k string
v interface{}
}
// Gitlab asks workhorse to perform some long-running RPCs for it by sending
// the RPC arguments (which are protobuf messages) in HTTP response headers.
// The messages are encoded to JSON objects using pbjson, The strings are then
// re-encoded to JSON strings using json. We must replicate this behaviour here
func buildGitalyRpcParams(gitalyAddress string, rpcArgs ...rpcArg) string {
built := map[string]interface{}{
"GitalyServer": map[string]string{
"Address": gitalyAddress,
"Token": "",
},
}
for _, arg := range rpcArgs {
built[arg.k] = arg.v
}
b, err := json.Marshal(interface{}(built))
if err != nil {
panic(err)
}
return string(b)
}
func buildPbRepo(storageName, relativePath string) *pb.Repository {
return &pb.Repository{
StorageName: storageName,
RelativePath: relativePath,
}
}
func serializedMessage(name string, arg proto.Message) rpcArg {
m := &jsonpb.Marshaler{}
str, err := m.MarshalToString(arg)
if err != nil {
panic(err)
}
return rpcArg{name, str}
}
type combinedServer struct {
*grpc.Server
*testhelper.GitalyTestServer
......
package git
import (
"fmt"
"io"
"net/http"
"github.com/golang/protobuf/jsonpb"
pb "gitlab.com/gitlab-org/gitaly-proto/go"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/gitaly"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/senddata"
)
type snapshot struct {
senddata.Prefix
}
type snapshotParams struct {
GitalyServer gitaly.Server
GetSnapshotRequest string
}
var (
SendSnapshot = &snapshot{"git-snapshot:"}
)
func (s *snapshot) Inject(w http.ResponseWriter, r *http.Request, sendData string) {
var params snapshotParams
if err := s.Unpack(&params, sendData); err != nil {
helper.Fail500(w, r, fmt.Errorf("SendSnapshot: unpack sendData: %v", err))
return
}
request := &pb.GetSnapshotRequest{}
if err := jsonpb.UnmarshalString(params.GetSnapshotRequest, request); err != nil {
helper.Fail500(w, r, fmt.Errorf("SendSnapshot: unmarshal GetSnapshotRequest: %v", err))
return
}
c, err := gitaly.NewRepositoryClient(params.GitalyServer)
if err != nil {
helper.Fail500(w, r, fmt.Errorf("SendSnapshot: gitaly.NewRepositoryClient: %v", err))
return
}
reader, err := c.SnapshotReader(r.Context(), request)
if err != nil {
helper.Fail500(w, r, fmt.Errorf("SendSnapshot: client.SnapshotReader: %v", err))
return
}
w.Header().Del("Content-Length")
w.Header().Set("Content-Disposition", `attachment; filename="snapshot.tar"`)
w.Header().Set("Content-Type", "application/x-tar")
w.Header().Set("Content-Transfer-Encoding", "binary")
w.Header().Set("Cache-Control", "private")
w.WriteHeader(http.StatusOK) // Errors aren't detectable beyond this point
if _, err := io.Copy(w, reader); err != nil {
helper.LogError(r, fmt.Errorf("SendSnapshot: copy gitaly output: %v", err))
}
return
}
......@@ -28,3 +28,18 @@ func (client *RepositoryClient) ArchiveReader(ctx context.Context, request *pb.G
return resp.GetData(), err
}), nil
}
// SnapshotReader performs a GetSnapshot Gitaly request and returns an io.Reader
// for the response
func (client *RepositoryClient) SnapshotReader(ctx context.Context, request *pb.GetSnapshotRequest) (io.Reader, error) {
c, err := client.GetSnapshot(ctx, request)
if err != nil {
return nil, fmt.Errorf("RepositoryService::GetSnapshot: %v", err)
}
return streamio.NewReader(func() ([]byte, error) {
resp, err := c.Recv()
return resp.GetData(), err
}), nil
}
......@@ -23,11 +23,14 @@ type GitalyTestServer struct {
}
var (
GitalyInfoRefsResponseMock = strings.Repeat("Mock Gitaly InfoRefsResponse data", 100000)
GitalyGetBlobResponseMock = strings.Repeat("Mock Gitaly GetBlobResponse data", 100000)
GitalyGetArchiveResponseMock = strings.Repeat("Mock Gitaly GetArchiveResponse data", 100000)
GitalyGetDiffResponseMock = strings.Repeat("Mock Gitaly GetDiffResponse data", 100000)
GitalyGetPatchResponseMock = strings.Repeat("Mock Gitaly GetPatchResponse data", 100000)
GitalyInfoRefsResponseMock = strings.Repeat("Mock Gitaly InfoRefsResponse data", 100000)
GitalyGetBlobResponseMock = strings.Repeat("Mock Gitaly GetBlobResponse data", 100000)
GitalyGetArchiveResponseMock = strings.Repeat("Mock Gitaly GetArchiveResponse data", 100000)
GitalyGetDiffResponseMock = strings.Repeat("Mock Gitaly GetDiffResponse data", 100000)
GitalyGetPatchResponseMock = strings.Repeat("Mock Gitaly GetPatchResponse data", 100000)
GitalyGetSnapshotResponseMock = strings.Repeat("Mock Gitaly GetSnapshotResponse data", 100000)
GitalyReceivePackResponseMock []byte
GitalyUploadPackResponseMock []byte
)
......@@ -280,6 +283,27 @@ func (s *GitalyTestServer) RawPatch(in *pb.RawPatchRequest, stream pb.DiffServic
return s.finalError()
}
func (s *GitalyTestServer) GetSnapshot(in *pb.GetSnapshotRequest, stream pb.RepositoryService_GetSnapshotServer) error {
s.WaitGroup.Add(1)
defer s.WaitGroup.Done()
if err := validateRepository(in.GetRepository()); err != nil {
return err
}
nSends, err := sendBytes([]byte(GitalyGetSnapshotResponseMock), 100, func(p []byte) error {
return stream.Send(&pb.GetSnapshotResponse{Data: p})
})
if err != nil {
return err
}
if nSends <= 1 {
panic("should have sent more than one message")
}
return s.finalError()
}
func (s *GitalyTestServer) RepositoryExists(context.Context, *pb.RepositoryExistsRequest) (*pb.RepositoryExistsResponse, error) {
return nil, nil
}
......@@ -408,6 +432,10 @@ func (s *GitalyTestServer) Cleanup(context.Context, *pb.CleanupRequest) (*pb.Cle
return nil, nil
}
func (s *GitalyTestServer) CreateRepositoryFromSnapshot(context.Context, *pb.CreateRepositoryFromSnapshotRequest) (*pb.CreateRepositoryFromSnapshotResponse, error) {
return nil, nil
}
// sendBytes returns the number of times the 'sender' function was called and an error.
func sendBytes(data []byte, chunkSize int, sender func([]byte) error) (int, error) {
i := 0
......
......@@ -140,6 +140,7 @@ func (u *Upstream) configureRoutes() {
git.SendBlob,
git.SendDiff,
git.SendPatch,
git.SendSnapshot,
artifacts.SendEntry,
sendurl.SendURL,
)
......
......@@ -219,6 +219,10 @@ It has these top-level messages:
GetInfoAttributesResponse
CalculateChecksumRequest
CalculateChecksumResponse
GetSnapshotRequest
GetSnapshotResponse
CreateRepositoryFromSnapshotRequest
CreateRepositoryFromSnapshotResponse
ServerInfoRequest
ServerInfoResponse
Repository
......
......@@ -197,12 +197,12 @@
"revisionTime": "2016-11-17T07:43:51Z"
},
{
"checksumSHA1": "MVrf0BwLEpU3uGHmYOrIWVoJGCk=",
"checksumSHA1": "B+FjK1qIgELTplSb/hl3ZHGHjl0=",
"path": "gitlab.com/gitlab-org/gitaly-proto/go",
"revision": "00d4006669b30b6a0fe66aaa9fa72285d18616fb",
"revisionTime": "2018-04-03T14:25:38Z",
"version": "v0.95.0",
"versionExact": "v0.95.0"
"revision": "8680efed3cdd68f3a2d4ccd56fb6684a6761d59e",
"revisionTime": "2018-04-05T16:15:56Z",
"version": "v0.96.0",
"versionExact": "v0.96.0"
},
{
"checksumSHA1": "dUHJbKas746n5fLzlwxHb6FOCxs=",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment