Commit df4559f8 authored by Nick Thomas's avatar Nick Thomas

Merge branch...

Merge branch 'qmnguyen0711/1220-implement-postuploadpackwithsidechannel-client-in-workhorse' into 'master'

Implement PostUploadPackWithSidechannel client in Workhorse

See merge request gitlab-org/gitlab!71047
parents 197b1c3e c82a66e1
---
name: workhorse_use_sidechannel
introduced_by_url:
rollout_issue_url: https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1193
milestone: '14.4'
type: development
group: 'group::scalability'
default_enabled: false
...@@ -32,7 +32,8 @@ module Gitlab ...@@ -32,7 +32,8 @@ module Gitlab
GitalyServer: { GitalyServer: {
address: Gitlab::GitalyClient.address(repository.storage), address: Gitlab::GitalyClient.address(repository.storage),
token: Gitlab::GitalyClient.token(repository.storage), token: Gitlab::GitalyClient.token(repository.storage),
features: Feature::Gitaly.server_feature_flags(repository.project) features: Feature::Gitaly.server_feature_flags(repository.project),
sidechannel: Feature.enabled?(:workhorse_use_sidechannel, repository.project, default_enabled: :yaml)
} }
} }
......
...@@ -244,13 +244,15 @@ RSpec.describe Gitlab::Workhorse do ...@@ -244,13 +244,15 @@ RSpec.describe Gitlab::Workhorse do
GitalyServer: { GitalyServer: {
features: { 'gitaly-feature-enforce-requests-limits' => 'true' }, features: { 'gitaly-feature-enforce-requests-limits' => 'true' },
address: Gitlab::GitalyClient.address('default'), address: Gitlab::GitalyClient.address('default'),
token: Gitlab::GitalyClient.token('default') token: Gitlab::GitalyClient.token('default'),
sidechannel: false
} }
} }
end end
before do before do
allow(Gitlab.config.gitaly).to receive(:enabled).and_return(true) allow(Gitlab.config.gitaly).to receive(:enabled).and_return(true)
stub_feature_flags(workhorse_use_sidechannel: false)
end end
it 'includes a Repository param' do it 'includes a Repository param' do
...@@ -332,6 +334,46 @@ RSpec.describe Gitlab::Workhorse do ...@@ -332,6 +334,46 @@ RSpec.describe Gitlab::Workhorse do
it { expect { subject }.to raise_exception('Unsupported action: download') } it { expect { subject }.to raise_exception('Unsupported action: download') }
end end
context 'when workhorse_use_sidechannel flag is set' do
context 'when a feature flag is set globally' do
before do
stub_feature_flags(workhorse_use_sidechannel: true)
end
it 'sets the flag to true' do
response = described_class.git_http_ok(repository, Gitlab::GlRepository::PROJECT, user, action)
expect(response.dig(:GitalyServer, :sidechannel)).to eq(true)
end
end
context 'when a feature flag is set for a single project' do
before do
stub_feature_flags(workhorse_use_sidechannel: project)
end
it 'sets the flag to true for that project' do
response = described_class.git_http_ok(repository, Gitlab::GlRepository::PROJECT, user, action)
expect(response.dig(:GitalyServer, :sidechannel)).to eq(true)
end
it 'sets the flag to false for other projects' do
other_project = create(:project, :public, :repository)
response = described_class.git_http_ok(other_project.repository, Gitlab::GlRepository::PROJECT, user, action)
expect(response.dig(:GitalyServer, :sidechannel)).to eq(false)
end
it 'sets the flag to false when there is no project' do
snippet = create(:personal_snippet, :repository)
response = described_class.git_http_ok(snippet.repository, Gitlab::GlRepository::SNIPPET, user, action)
expect(response.dig(:GitalyServer, :sidechannel)).to eq(false)
end
end
end
end end
context 'when receive_max_input_size has been updated' do context 'when receive_max_input_size has been updated' do
......
...@@ -230,7 +230,7 @@ func TestAllowedGetGitArchiveOldPayload(t *testing.T) { ...@@ -230,7 +230,7 @@ func TestAllowedGetGitArchiveOldPayload(t *testing.T) {
// Create the repository in the Gitaly server // Create the repository in the Gitaly server
apiResponse := realGitalyOkBody(t) apiResponse := realGitalyOkBody(t)
repo := apiResponse.Repository repo := &apiResponse.Repository
require.NoError(t, ensureGitalyRepository(t, apiResponse)) require.NoError(t, ensureGitalyRepository(t, apiResponse))
archivePath := path.Join(scratchDir, "my/path") archivePath := path.Join(scratchDir, "my/path")
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
"net" "net"
...@@ -20,12 +21,16 @@ import ( ...@@ -20,12 +21,16 @@ import (
"github.com/golang/protobuf/jsonpb" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868 "github.com/golang/protobuf/jsonpb" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868
"github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868 "github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
gitalyclient "gitlab.com/gitlab-org/gitaly/v14/client"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/git" "gitlab.com/gitlab-org/gitlab/workhorse/internal/git"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly" "gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly"
...@@ -375,12 +380,24 @@ func TestPostReceivePackRouting(t *testing.T) { ...@@ -375,12 +380,24 @@ func TestPostReceivePackRouting(t *testing.T) {
} }
} }
type gitalyServerStarter func(*testing.T, codes.Code) (*combinedServer, string)
// ReaderFunc is an adapter to turn a conforming function into an io.Reader. // ReaderFunc is an adapter to turn a conforming function into an io.Reader.
type ReaderFunc func(b []byte) (int, error) type ReaderFunc func(b []byte) (int, error)
func (r ReaderFunc) Read(b []byte) (int, error) { return r(b) } func (r ReaderFunc) Read(b []byte) (int, error) { return r(b) }
func TestPostUploadPackProxiedToGitalySuccessfully(t *testing.T) { func TestPostUploadPackProxiedToGitalySuccessfully(t *testing.T) {
testPostUploadPackProxiedToGitalySuccessfully(t, startGitalyServer, gitOkBody(t))
}
func TestPostUploadPackWithSidechannelProxiedToGitalySuccessfully(t *testing.T) {
testPostUploadPackProxiedToGitalySuccessfully(
t, startGitalyServerWithSideChannel(testhelper.PostUploadPackWithSidechannel), gitOkBodyWithSidechannel(t),
)
}
func testPostUploadPackProxiedToGitalySuccessfully(t *testing.T, startGitaly gitalyServerStarter, apiResponse *api.Response) {
for i, tc := range []struct { for i, tc := range []struct {
showAllRefs bool showAllRefs bool
code codes.Code code codes.Code
...@@ -391,10 +408,9 @@ func TestPostUploadPackProxiedToGitalySuccessfully(t *testing.T) { ...@@ -391,10 +408,9 @@ func TestPostUploadPackProxiedToGitalySuccessfully(t *testing.T) {
{false, codes.Unavailable}, {false, codes.Unavailable},
} { } {
t.Run(fmt.Sprintf("Case %d", i), func(t *testing.T) { t.Run(fmt.Sprintf("Case %d", i), func(t *testing.T) {
apiResponse := gitOkBody(t)
apiResponse.ShowAllRefs = tc.showAllRefs apiResponse.ShowAllRefs = tc.showAllRefs
gitalyServer, socketPath := startGitalyServer(t, tc.code) gitalyServer, socketPath := startGitaly(t, tc.code)
defer gitalyServer.GracefulStop() defer gitalyServer.GracefulStop()
apiResponse.GitalyServer.Address = "unix:" + socketPath apiResponse.GitalyServer.Address = "unix:" + socketPath
...@@ -460,8 +476,16 @@ func TestPostUploadPackProxiedToGitalySuccessfully(t *testing.T) { ...@@ -460,8 +476,16 @@ func TestPostUploadPackProxiedToGitalySuccessfully(t *testing.T) {
func TestPostUploadPackProxiedToGitalyInterrupted(t *testing.T) { func TestPostUploadPackProxiedToGitalyInterrupted(t *testing.T) {
apiResponse := gitOkBody(t) apiResponse := gitOkBody(t)
testPostUploadPackProxiedToGitalyInterrupted(t, startGitalyServer, apiResponse)
}
gitalyServer, socketPath := startGitalyServer(t, codes.OK) func TestPostUploadPackWithSidechannelProxiedToGitalyInterrupted(t *testing.T) {
apiResponse := gitOkBodyWithSidechannel(t)
testPostUploadPackProxiedToGitalyInterrupted(t, startGitalyServerWithSideChannel(testhelper.PostUploadPackWithSidechannel), apiResponse)
}
func testPostUploadPackProxiedToGitalyInterrupted(t *testing.T, startGitaly gitalyServerStarter, apiResponse *api.Response) {
gitalyServer, socketPath := startGitaly(t, codes.OK)
defer gitalyServer.GracefulStop() defer gitalyServer.GracefulStop()
apiResponse.GitalyServer.Address = "unix:" + socketPath apiResponse.GitalyServer.Address = "unix:" + socketPath
...@@ -493,10 +517,19 @@ func TestPostUploadPackProxiedToGitalyInterrupted(t *testing.T) { ...@@ -493,10 +517,19 @@ func TestPostUploadPackProxiedToGitalyInterrupted(t *testing.T) {
} }
func TestPostUploadPackRouting(t *testing.T) { func TestPostUploadPackRouting(t *testing.T) {
gitalyServer, socketPath := startGitalyServer(t, codes.OK) apiResponse := gitOkBody(t)
testPostUploadPackRouting(t, startGitalyServer, apiResponse)
}
func TestPostUploadPackWithSidechannelRouting(t *testing.T) {
apiResponse := gitOkBodyWithSidechannel(t)
testPostUploadPackRouting(t, startGitalyServerWithSideChannel(testhelper.PostUploadPackWithSidechannel), apiResponse)
}
func testPostUploadPackRouting(t *testing.T, startGitaly gitalyServerStarter, apiResponse *api.Response) {
gitalyServer, socketPath := startGitaly(t, codes.OK)
defer gitalyServer.GracefulStop() defer gitalyServer.GracefulStop()
apiResponse := gitOkBody(t)
apiResponse.GitalyServer.Address = "unix:" + socketPath apiResponse.GitalyServer.Address = "unix:" + socketPath
ts := testAuthServer(t, nil, nil, 200, apiResponse) ts := testAuthServer(t, nil, nil, 200, apiResponse)
defer ts.Close() defer ts.Close()
...@@ -869,3 +902,21 @@ func startGitalyServer(t *testing.T, finalMessageCode codes.Code) (*combinedServ ...@@ -869,3 +902,21 @@ func startGitalyServer(t *testing.T, finalMessageCode codes.Code) (*combinedServ
return &combinedServer{Server: server, GitalyTestServer: gitalyServer}, socketPath return &combinedServer{Server: server, GitalyTestServer: gitalyServer}, socketPath
} }
func startGitalyServerWithSideChannel(handler func(interface{}, grpc.ServerStream, io.ReadWriteCloser) error) gitalyServerStarter {
return func(t *testing.T, finalMessageCode codes.Code) (*combinedServer, string) {
socketPath := path.Join(scratchDir, fmt.Sprintf("gitaly-%d.sock", rand.Int()))
if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) {
t.Fatal(err)
}
server := grpc.NewServer(gitalyclient.TestSidechannelServer(logrus.NewEntry(logrus.StandardLogger()), insecure.NewCredentials(), handler)...)
listener, err := net.Listen("unix", socketPath)
require.NoError(t, err)
gitalyServer := testhelper.NewGitalyServer(finalMessageCode)
go server.Serve(listener)
return &combinedServer{Server: server, GitalyTestServer: gitalyServer}, socketPath
}
}
...@@ -3,11 +3,11 @@ module gitlab.com/gitlab-org/gitlab/workhorse ...@@ -3,11 +3,11 @@ module gitlab.com/gitlab-org/gitlab/workhorse
go 1.16 go 1.16
require ( require (
github.com/Azure/azure-storage-blob-go v0.11.1-0.20201209121048-6df5d9af221d github.com/Azure/azure-storage-blob-go v0.13.0
github.com/BurntSushi/toml v0.3.1 github.com/BurntSushi/toml v0.3.1
github.com/FZambia/sentinel v1.0.0 github.com/FZambia/sentinel v1.0.0
github.com/alecthomas/chroma v0.7.3 github.com/alecthomas/chroma v0.7.3
github.com/aws/aws-sdk-go v1.37.0 github.com/aws/aws-sdk-go v1.38.35
github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054 // indirect github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054 // indirect
github.com/disintegration/imaging v1.6.2 github.com/disintegration/imaging v1.6.2
github.com/getsentry/raven-go v0.2.0 github.com/getsentry/raven-go v0.2.0
...@@ -28,14 +28,14 @@ require ( ...@@ -28,14 +28,14 @@ require (
github.com/sirupsen/logrus v1.8.1 github.com/sirupsen/logrus v1.8.1
github.com/smartystreets/goconvey v1.6.4 github.com/smartystreets/goconvey v1.6.4
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
gitlab.com/gitlab-org/gitaly/v14 v14.0.0-rc1 gitlab.com/gitlab-org/gitaly/v14 v14.3.0-rc2.0.20211007055622-df7dadcc3f74
gitlab.com/gitlab-org/labkit v1.6.0 gitlab.com/gitlab-org/labkit v1.6.0
gocloud.dev v0.21.1-0.20201223184910-5094f54ed8bb gocloud.dev v0.23.0
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8 golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4 golang.org/x/net v0.0.0-20210505214959-0714010a04ed
golang.org/x/tools v0.1.0 golang.org/x/tools v0.1.0
google.golang.org/grpc v1.37.0 google.golang.org/grpc v1.38.0
gopkg.in/DataDog/dd-trace-go.v1 v1.31.0 // indirect gopkg.in/DataDog/dd-trace-go.v1 v1.31.0 // indirect
honnef.co/go/tools v0.1.3 honnef.co/go/tools v0.1.3
) )
This diff is collapsed.
...@@ -102,11 +102,11 @@ func TestUploadHandlerSendingToExternalStorage(t *testing.T) { ...@@ -102,11 +102,11 @@ func TestUploadHandlerSendingToExternalStorage(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
preauth api.Response preauth *api.Response
}{ }{
{ {
name: "ObjectStore Upload", name: "ObjectStore Upload",
preauth: api.Response{ preauth: &api.Response{
RemoteObject: api.RemoteObject{ RemoteObject: api.RemoteObject{
StoreURL: storeServer.URL + "/url/put" + qs, StoreURL: storeServer.URL + "/url/put" + qs,
ID: "store-id", ID: "store-id",
...@@ -145,7 +145,7 @@ func TestUploadHandlerSendingToExternalStorageAndStorageServerUnreachable(t *tes ...@@ -145,7 +145,7 @@ func TestUploadHandlerSendingToExternalStorageAndStorageServerUnreachable(t *tes
t.Fatal("it should not be called") t.Fatal("it should not be called")
} }
authResponse := api.Response{ authResponse := &api.Response{
TempPath: tempPath, TempPath: tempPath,
RemoteObject: api.RemoteObject{ RemoteObject: api.RemoteObject{
StoreURL: "http://localhost:12323/invalid/url", StoreURL: "http://localhost:12323/invalid/url",
...@@ -171,7 +171,7 @@ func TestUploadHandlerSendingToExternalStorageAndInvalidURLIsUsed(t *testing.T) ...@@ -171,7 +171,7 @@ func TestUploadHandlerSendingToExternalStorageAndInvalidURLIsUsed(t *testing.T)
t.Fatal("it should not be called") t.Fatal("it should not be called")
} }
authResponse := api.Response{ authResponse := &api.Response{
TempPath: tempPath, TempPath: tempPath,
RemoteObject: api.RemoteObject{ RemoteObject: api.RemoteObject{
StoreURL: "htt:////invalid-url", StoreURL: "htt:////invalid-url",
...@@ -203,7 +203,7 @@ func TestUploadHandlerSendingToExternalStorageAndItReturnsAnError(t *testing.T) ...@@ -203,7 +203,7 @@ func TestUploadHandlerSendingToExternalStorageAndItReturnsAnError(t *testing.T)
storeServer := httptest.NewServer(storeServerMux) storeServer := httptest.NewServer(storeServerMux)
defer storeServer.Close() defer storeServer.Close()
authResponse := api.Response{ authResponse := &api.Response{
RemoteObject: api.RemoteObject{ RemoteObject: api.RemoteObject{
StoreURL: storeServer.URL + "/url/put", StoreURL: storeServer.URL + "/url/put",
ID: "store-id", ID: "store-id",
...@@ -236,7 +236,7 @@ func TestUploadHandlerSendingToExternalStorageAndSupportRequestTimeout(t *testin ...@@ -236,7 +236,7 @@ func TestUploadHandlerSendingToExternalStorageAndSupportRequestTimeout(t *testin
storeServer := httptest.NewServer(storeServerMux) storeServer := httptest.NewServer(storeServerMux)
defer storeServer.Close() defer storeServer.Close()
authResponse := api.Response{ authResponse := &api.Response{
RemoteObject: api.RemoteObject{ RemoteObject: api.RemoteObject{
StoreURL: storeServer.URL + "/url/put", StoreURL: storeServer.URL + "/url/put",
ID: "store-id", ID: "store-id",
...@@ -262,7 +262,7 @@ func TestUploadHandlerMultipartUploadSizeLimit(t *testing.T) { ...@@ -262,7 +262,7 @@ func TestUploadHandlerMultipartUploadSizeLimit(t *testing.T) {
objectURL := server.URL + test.ObjectPath objectURL := server.URL + test.ObjectPath
uploadSize := 10 uploadSize := 10
preauth := api.Response{ preauth := &api.Response{
RemoteObject: api.RemoteObject{ RemoteObject: api.RemoteObject{
ID: "store-id", ID: "store-id",
MultipartUpload: &api.MultipartUploadParams{ MultipartUpload: &api.MultipartUploadParams{
...@@ -304,7 +304,7 @@ func TestUploadHandlerMultipartUploadMaximumSizeFromApi(t *testing.T) { ...@@ -304,7 +304,7 @@ func TestUploadHandlerMultipartUploadMaximumSizeFromApi(t *testing.T) {
uploadSize := int64(10) uploadSize := int64(10)
maxSize := uploadSize - 1 maxSize := uploadSize - 1
preauth := api.Response{ preauth := &api.Response{
MaximumSize: maxSize, MaximumSize: maxSize,
RemoteObject: api.RemoteObject{ RemoteObject: api.RemoteObject{
ID: "store-id", ID: "store-id",
......
...@@ -35,7 +35,7 @@ const ( ...@@ -35,7 +35,7 @@ const (
Path = "/url/path" Path = "/url/path"
) )
func testArtifactsUploadServer(t *testing.T, authResponse api.Response, bodyProcessor func(w http.ResponseWriter, r *http.Request)) *httptest.Server { func testArtifactsUploadServer(t *testing.T, authResponse *api.Response, bodyProcessor func(w http.ResponseWriter, r *http.Request)) *httptest.Server {
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc(Path+"/authorize", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc(Path+"/authorize", func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" { if r.Method != "POST" {
...@@ -51,7 +51,7 @@ func testArtifactsUploadServer(t *testing.T, authResponse api.Response, bodyProc ...@@ -51,7 +51,7 @@ func testArtifactsUploadServer(t *testing.T, authResponse api.Response, bodyProc
w.Write(data) w.Write(data)
}) })
mux.HandleFunc(Path, func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc(Path, func(w http.ResponseWriter, r *http.Request) {
opts, err := filestore.GetOpts(&authResponse) opts, err := filestore.GetOpts(authResponse)
require.NoError(t, err) require.NoError(t, err)
if r.Method != "POST" { if r.Method != "POST" {
...@@ -128,7 +128,7 @@ func setupWithTmpPath(t *testing.T, filename string, includeFormat bool, format ...@@ -128,7 +128,7 @@ func setupWithTmpPath(t *testing.T, filename string, includeFormat bool, format
authResponse = &api.Response{TempPath: tempPath} authResponse = &api.Response{TempPath: tempPath}
} }
ts := testArtifactsUploadServer(t, *authResponse, bodyProcessor) ts := testArtifactsUploadServer(t, authResponse, bodyProcessor)
var buffer bytes.Buffer var buffer bytes.Buffer
writer := multipart.NewWriter(&buffer) writer := multipart.NewWriter(&buffer)
......
...@@ -141,21 +141,21 @@ func TestGetOpts(t *testing.T) { ...@@ -141,21 +141,21 @@ func TestGetOpts(t *testing.T) {
func TestGetOptsFail(t *testing.T) { func TestGetOptsFail(t *testing.T) {
testCases := []struct { testCases := []struct {
desc string desc string
in api.Response in *api.Response
}{ }{
{ {
desc: "neither local nor remote", desc: "neither local nor remote",
in: api.Response{}, in: &api.Response{},
}, },
{ {
desc: "both local and remote", desc: "both local and remote",
in: api.Response{TempPath: "/foobar", RemoteObject: api.RemoteObject{ID: "id"}}, in: &api.Response{TempPath: "/foobar", RemoteObject: api.RemoteObject{ID: "id"}},
}, },
} }
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) { t.Run(tc.desc, func(t *testing.T) {
_, err := filestore.GetOpts(&tc.in) _, err := filestore.GetOpts(tc.in)
require.Error(t, err, "expect input to be rejected") require.Error(t, err, "expect input to be rejected")
}) })
} }
......
...@@ -102,7 +102,7 @@ func (a *archive) Inject(w http.ResponseWriter, r *http.Request, sendData string ...@@ -102,7 +102,7 @@ func (a *archive) Inject(w http.ResponseWriter, r *http.Request, sendData string
var archiveReader io.Reader var archiveReader io.Reader
archiveReader, err = handleArchiveWithGitaly(r, params, format) archiveReader, err = handleArchiveWithGitaly(r, &params, format)
if err != nil { if err != nil {
helper.Fail500(w, r, fmt.Errorf("operations.GetArchive: %v", err)) helper.Fail500(w, r, fmt.Errorf("operations.GetArchive: %v", err))
return return
...@@ -130,7 +130,7 @@ func (a *archive) Inject(w http.ResponseWriter, r *http.Request, sendData string ...@@ -130,7 +130,7 @@ func (a *archive) Inject(w http.ResponseWriter, r *http.Request, sendData string
} }
} }
func handleArchiveWithGitaly(r *http.Request, params archiveParams, format gitalypb.GetArchiveRequest_Format) (io.Reader, error) { func handleArchiveWithGitaly(r *http.Request, params *archiveParams, format gitalypb.GetArchiveRequest_Format) (io.Reader, error) {
var request *gitalypb.GetArchiveRequest var request *gitalypb.GetArchiveRequest
ctx, c, err := gitaly.NewRepositoryClient(r.Context(), params.GitalyServer) ctx, c, err := gitaly.NewRepositoryClient(r.Context(), params.GitalyServer)
if err != nil { if err != nil {
......
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promauto"
"github.com/sirupsen/logrus"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
...@@ -23,15 +24,19 @@ import ( ...@@ -23,15 +24,19 @@ import (
) )
type Server struct { type Server struct {
Address string `json:"address"` Address string `json:"address"`
Token string `json:"token"` Token string `json:"token"`
Features map[string]string `json:"features"` Features map[string]string `json:"features"`
Sidechannel bool `json:"sidechannel"`
} }
type cacheKey struct{ address, token string } type cacheKey struct {
address, token string
sidechannel bool
}
func (server Server) cacheKey() cacheKey { func (server Server) cacheKey() cacheKey {
return cacheKey{address: server.Address, token: server.Token} return cacheKey{address: server.Address, token: server.Token, sidechannel: server.Sidechannel}
} }
type connectionsCache struct { type connectionsCache struct {
...@@ -41,9 +46,17 @@ type connectionsCache struct { ...@@ -41,9 +46,17 @@ type connectionsCache struct {
var ( var (
jsonUnMarshaler = jsonpb.Unmarshaler{AllowUnknownFields: true} jsonUnMarshaler = jsonpb.Unmarshaler{AllowUnknownFields: true}
cache = connectionsCache{ // This connection cache map contains two types of connections:
// - Normal gRPC connections
// - Sidechannel connections. When client dials to the Gitaly server, the
// server multiplexes the connection using Yamux. In the future, the server
// can open another stream to transfer data without gRPC. Besides, we apply
// a framing protocol to add the half-close capability to Yamux streams.
// Hence, we cannot use those connections interchangeably.
cache = connectionsCache{
connections: make(map[cacheKey]*grpc.ClientConn), connections: make(map[cacheKey]*grpc.ClientConn),
} }
sidechannelRegistry *gitalyclient.SidechannelRegistry
connectionsTotal = promauto.NewCounterVec( connectionsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
...@@ -54,6 +67,12 @@ var ( ...@@ -54,6 +67,12 @@ var (
) )
) )
func InitializeSidechannelRegistry(logger *logrus.Logger) {
if sidechannelRegistry == nil {
sidechannelRegistry = gitalyclient.NewSidechannelRegistry(logrus.NewEntry(logger))
}
}
func withOutgoingMetadata(ctx context.Context, features map[string]string) context.Context { func withOutgoingMetadata(ctx context.Context, features map[string]string) context.Context {
md := metadata.New(nil) md := metadata.New(nil)
for k, v := range features { for k, v := range features {
...@@ -72,7 +91,12 @@ func NewSmartHTTPClient(ctx context.Context, server Server) (context.Context, *S ...@@ -72,7 +91,12 @@ func NewSmartHTTPClient(ctx context.Context, server Server) (context.Context, *S
return nil, nil, err return nil, nil, err
} }
grpcClient := gitalypb.NewSmartHTTPServiceClient(conn) grpcClient := gitalypb.NewSmartHTTPServiceClient(conn)
return withOutgoingMetadata(ctx, server.Features), &SmartHTTPClient{grpcClient}, nil smartHTTPClient := &SmartHTTPClient{
SmartHTTPServiceClient: grpcClient,
sidechannelRegistry: sidechannelRegistry,
useSidechannel: server.Sidechannel,
}
return withOutgoingMetadata(ctx, server.Features), smartHTTPClient, nil
} }
func NewBlobClient(ctx context.Context, server Server) (context.Context, *BlobClient, error) { func NewBlobClient(ctx context.Context, server Server) (context.Context, *BlobClient, error) {
...@@ -173,7 +197,13 @@ func newConnection(server Server) (*grpc.ClientConn, error) { ...@@ -173,7 +197,13 @@ func newConnection(server Server) (*grpc.ClientConn, error) {
), ),
) )
conn, connErr := gitalyclient.Dial(server.Address, connOpts) var conn *grpc.ClientConn
var connErr error
if server.Sidechannel {
conn, connErr = gitalyclient.DialSidechannel(context.Background(), server.Address, sidechannelRegistry, connOpts) // lint:allow context.Background
} else {
conn, connErr = gitalyclient.Dial(server.Address, connOpts)
}
label := "ok" label := "ok"
if connErr != nil { if connErr != nil {
......
...@@ -4,14 +4,32 @@ import ( ...@@ -4,14 +4,32 @@ import (
"context" "context"
"testing" "testing"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
) )
func TestNewSmartHTTPClient(t *testing.T) { func TestNewSmartHTTPClient(t *testing.T) {
ctx, _, err := NewSmartHTTPClient(context.Background(), serverFixture()) ctx, client, err := NewSmartHTTPClient(context.Background(), serverFixture())
require.NoError(t, err) require.NoError(t, err)
testOutgoingMetadata(t, ctx) testOutgoingMetadata(t, ctx)
require.False(t, client.useSidechannel)
require.Nil(t, client.sidechannelRegistry)
}
func TestNewSmartHTTPClientWithSidechannel(t *testing.T) {
InitializeSidechannelRegistry(logrus.StandardLogger())
fixture := serverFixture()
fixture.Sidechannel = true
ctx, client, err := NewSmartHTTPClient(context.Background(), fixture)
require.NoError(t, err)
testOutgoingMetadata(t, ctx)
require.True(t, client.useSidechannel)
require.NotNil(t, client.sidechannelRegistry)
} }
func TestNewBlobClient(t *testing.T) { func TestNewBlobClient(t *testing.T) {
......
...@@ -5,11 +5,14 @@ import ( ...@@ -5,11 +5,14 @@ import (
"fmt" "fmt"
"io" "io"
gitalyclient "gitlab.com/gitlab-org/gitaly/v14/client"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/v14/streamio" "gitlab.com/gitlab-org/gitaly/v14/streamio"
) )
type SmartHTTPClient struct { type SmartHTTPClient struct {
useSidechannel bool
sidechannelRegistry *gitalyclient.SidechannelRegistry
gitalypb.SmartHTTPServiceClient gitalypb.SmartHTTPServiceClient
} }
...@@ -93,6 +96,14 @@ func (client *SmartHTTPClient) ReceivePack(ctx context.Context, repo *gitalypb.R ...@@ -93,6 +96,14 @@ func (client *SmartHTTPClient) ReceivePack(ctx context.Context, repo *gitalypb.R
} }
func (client *SmartHTTPClient) UploadPack(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error { func (client *SmartHTTPClient) UploadPack(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error {
if client.useSidechannel {
return client.runUploadPackWithSidechannel(ctx, repo, clientRequest, clientResponse, gitConfigOptions, gitProtocol)
}
return client.runUploadPack(ctx, repo, clientRequest, clientResponse, gitConfigOptions, gitProtocol)
}
func (client *SmartHTTPClient) runUploadPack(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error {
stream, err := client.PostUploadPack(ctx) stream, err := client.PostUploadPack(ctx)
if err != nil { if err != nil {
return err return err
...@@ -137,3 +148,38 @@ func (client *SmartHTTPClient) UploadPack(ctx context.Context, repo *gitalypb.Re ...@@ -137,3 +148,38 @@ func (client *SmartHTTPClient) UploadPack(ctx context.Context, repo *gitalypb.Re
return nil return nil
} }
func (client *SmartHTTPClient) runUploadPackWithSidechannel(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error {
ctx, waiter := client.sidechannelRegistry.Register(ctx, func(conn gitalyclient.SidechannelConn) error {
if _, err := io.Copy(conn, clientRequest); err != nil {
return err
}
if err := conn.CloseWrite(); err != nil {
return fmt.Errorf("fail to signal sidechannel half-close: %w", err)
}
if _, err := io.Copy(clientResponse, conn); err != nil {
return err
}
return nil
})
defer waiter.Close()
rpcRequest := &gitalypb.PostUploadPackWithSidechannelRequest{
Repository: repo,
GitConfigOptions: gitConfigOptions,
GitProtocol: gitProtocol,
}
if _, err := client.PostUploadPackWithSidechannel(ctx, rpcRequest); err != nil {
return err
}
if err := waiter.Close(); err != nil {
return fmt.Errorf("fail to close sidechannel connection: %w", err)
}
return nil
}
...@@ -3,6 +3,7 @@ package gitaly ...@@ -3,6 +3,7 @@ package gitaly
import ( import (
"testing" "testing"
"github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
) )
...@@ -11,25 +12,25 @@ func TestUnmarshalJSON(t *testing.T) { ...@@ -11,25 +12,25 @@ func TestUnmarshalJSON(t *testing.T) {
testCases := []struct { testCases := []struct {
desc string desc string
in string in string
out gitalypb.Repository out *gitalypb.Repository
}{ }{
{ {
desc: "basic example", desc: "basic example",
in: `{"relative_path":"foo/bar.git"}`, in: `{"relative_path":"foo/bar.git"}`,
out: gitalypb.Repository{RelativePath: "foo/bar.git"}, out: &gitalypb.Repository{RelativePath: "foo/bar.git"},
}, },
{ {
desc: "unknown field", desc: "unknown field",
in: `{"relative_path":"foo/bar.git","unknown_field":12345}`, in: `{"relative_path":"foo/bar.git","unknown_field":12345}`,
out: gitalypb.Repository{RelativePath: "foo/bar.git"}, out: &gitalypb.Repository{RelativePath: "foo/bar.git"},
}, },
} }
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) { t.Run(tc.desc, func(t *testing.T) {
result := gitalypb.Repository{} result := &gitalypb.Repository{}
require.NoError(t, UnmarshalJSON(tc.in, &result)) require.NoError(t, UnmarshalJSON(tc.in, result))
require.Equal(t, tc.out, result) require.True(t, proto.Equal(tc.out, result))
}) })
} }
} }
package testhelper package testhelper
import ( import (
"bytes"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
...@@ -11,6 +12,7 @@ import ( ...@@ -11,6 +12,7 @@ import (
"github.com/golang/protobuf/jsonpb" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868 "github.com/golang/protobuf/jsonpb" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868
"github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868 "github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
...@@ -23,6 +25,7 @@ type GitalyTestServer struct { ...@@ -23,6 +25,7 @@ type GitalyTestServer struct {
finalMessageCode codes.Code finalMessageCode codes.Code
sync.WaitGroup sync.WaitGroup
LastIncomingMetadata metadata.MD LastIncomingMetadata metadata.MD
gitalypb.UnimplementedSmartHTTPServiceServer
gitalypb.UnimplementedRepositoryServiceServer gitalypb.UnimplementedRepositoryServiceServer
gitalypb.UnimplementedBlobServiceServer gitalypb.UnimplementedBlobServiceServer
gitalypb.UnimplementedDiffServiceServer gitalypb.UnimplementedDiffServiceServer
...@@ -191,13 +194,14 @@ func (s *GitalyTestServer) PostUploadPack(stream gitalypb.SmartHTTPService_PostU ...@@ -191,13 +194,14 @@ func (s *GitalyTestServer) PostUploadPack(stream gitalypb.SmartHTTPService_PostU
return err return err
} }
jsonString, err := marshalJSON(req) marshaler := &jsonpb.Marshaler{}
if err != nil { jsonBytes := &bytes.Buffer{}
if err := marshaler.Marshal(jsonBytes, req); err != nil {
return err return err
} }
if err := stream.Send(&gitalypb.PostUploadPackResponse{ if err := stream.Send(&gitalypb.PostUploadPackResponse{
Data: []byte(strings.Join([]string{jsonString}, "\000") + "\000"), Data: append(jsonBytes.Bytes(), 0),
}); err != nil { }); err != nil {
return err return err
} }
...@@ -229,6 +233,43 @@ func (s *GitalyTestServer) PostUploadPack(stream gitalypb.SmartHTTPService_PostU ...@@ -229,6 +233,43 @@ func (s *GitalyTestServer) PostUploadPack(stream gitalypb.SmartHTTPService_PostU
return s.finalError() return s.finalError()
} }
// PostUploadPackWithSidechannel should be a part of smarthttp server in real
// server. In workhorse, setting up a real sidechannel server is troublesome.
// Therefore, we bring up a sidechannel server with a mock server exported via
// gitalyclient.TestSidechannelServer. This is the handler for that mock
// server.
func PostUploadPackWithSidechannel(srv interface{}, stream grpc.ServerStream, conn io.ReadWriteCloser) error {
if method, ok := grpc.Method(stream.Context()); !ok || method != "/gitaly.SmartHTTPService/PostUploadPackWithSidechannel" {
return fmt.Errorf("unexpected method: %s", method)
}
var req gitalypb.PostUploadPackWithSidechannelRequest
if err := stream.RecvMsg(&req); err != nil {
return err
}
if err := validateRepository(req.GetRepository()); err != nil {
return err
}
marshaler := &jsonpb.Marshaler{}
jsonBytes := &bytes.Buffer{}
if err := marshaler.Marshal(jsonBytes, &req); err != nil {
return err
}
// Bounce back all data back to the client, plus flushing bytes
if _, err := conn.Write(append(jsonBytes.Bytes(), 0)); err != nil {
return err
}
if _, err := io.Copy(conn, conn); err != nil {
return err
}
return stream.SendMsg(&gitalypb.PostUploadPackWithSidechannelResponse{})
}
func (s *GitalyTestServer) CommitIsAncestor(ctx context.Context, in *gitalypb.CommitIsAncestorRequest) (*gitalypb.CommitIsAncestorResponse, error) { func (s *GitalyTestServer) CommitIsAncestor(ctx context.Context, in *gitalypb.CommitIsAncestorRequest) (*gitalypb.CommitIsAncestorResponse, error) {
return nil, nil return nil, nil
} }
......
...@@ -271,19 +271,19 @@ func TestUploadProcessingFile(t *testing.T) { ...@@ -271,19 +271,19 @@ func TestUploadProcessingFile(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
preauth api.Response preauth *api.Response
}{ }{
{ {
name: "FileStore Upload", name: "FileStore Upload",
preauth: api.Response{TempPath: tempPath}, preauth: &api.Response{TempPath: tempPath},
}, },
{ {
name: "ObjectStore Upload", name: "ObjectStore Upload",
preauth: api.Response{RemoteObject: api.RemoteObject{StoreURL: storeUrl}}, preauth: &api.Response{RemoteObject: api.RemoteObject{StoreURL: storeUrl}},
}, },
{ {
name: "ObjectStore and FileStore Upload", name: "ObjectStore and FileStore Upload",
preauth: api.Response{ preauth: &api.Response{
TempPath: tempPath, TempPath: tempPath,
RemoteObject: api.RemoteObject{StoreURL: storeUrl}, RemoteObject: api.RemoteObject{StoreURL: storeUrl},
}, },
......
...@@ -18,6 +18,7 @@ import ( ...@@ -18,6 +18,7 @@ import (
"gitlab.com/gitlab-org/labkit/tracing" "gitlab.com/gitlab-org/labkit/tracing"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config" "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/queueing" "gitlab.com/gitlab-org/gitlab/workhorse/internal/queueing"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/redis" "gitlab.com/gitlab-org/gitlab/workhorse/internal/redis"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/secret" "gitlab.com/gitlab-org/gitlab/workhorse/internal/secret"
...@@ -233,6 +234,8 @@ func run(boot bootConfig, cfg config.Config) error { ...@@ -233,6 +234,8 @@ func run(boot bootConfig, cfg config.Config) error {
} }
defer accessCloser.Close() defer accessCloser.Close()
gitaly.InitializeSidechannelRegistry(accessLogger)
up := wrapRaven(upstream.NewUpstream(cfg, accessLogger)) up := wrapRaven(upstream.NewUpstream(cfg, accessLogger))
done := make(chan os.Signal, 1) done := make(chan os.Signal, 1)
......
...@@ -814,6 +814,8 @@ func startWorkhorseServerWithConfig(cfg *config.Config) *httptest.Server { ...@@ -814,6 +814,8 @@ func startWorkhorseServerWithConfig(cfg *config.Config) *httptest.Server {
testhelper.ConfigureSecret() testhelper.ConfigureSecret()
u := upstream.NewUpstream(*cfg, logrus.StandardLogger()) u := upstream.NewUpstream(*cfg, logrus.StandardLogger())
gitaly.InitializeSidechannelRegistry(logrus.StandardLogger())
return httptest.NewServer(u) return httptest.NewServer(u)
} }
...@@ -834,6 +836,20 @@ func gitOkBody(t *testing.T) *api.Response { ...@@ -834,6 +836,20 @@ func gitOkBody(t *testing.T) *api.Response {
} }
} }
func gitOkBodyWithSidechannel(t *testing.T) *api.Response {
return &api.Response{
GL_ID: "user-123",
GL_USERNAME: "username",
Repository: gitalypb.Repository{
StorageName: "default",
RelativePath: "foo/bar.git",
},
GitalyServer: gitaly.Server{
Sidechannel: true,
},
}
}
func httpGet(t *testing.T, url string, headers map[string]string) (*http.Response, string) { func httpGet(t *testing.T, url string, headers map[string]string) (*http.Response, string) {
req, err := http.NewRequest("GET", url, nil) req, err := http.NewRequest("GET", url, nil)
require.NoError(t, err) require.NoError(t, err)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment