Commit c82a66e1 authored by Quang-Minh Nguyen's avatar Quang-Minh Nguyen

Implement PostUploadPackWithSidechannel client in Workhorse

Issue: https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1220
Changelog: added
parent a0032b72
---
name: workhorse_use_sidechannel
introduced_by_url:
rollout_issue_url: https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1193
milestone: '14.4'
type: development
group: 'group::scalability'
default_enabled: false
......@@ -32,7 +32,8 @@ module Gitlab
GitalyServer: {
address: Gitlab::GitalyClient.address(repository.storage),
token: Gitlab::GitalyClient.token(repository.storage),
features: Feature::Gitaly.server_feature_flags(repository.project)
features: Feature::Gitaly.server_feature_flags(repository.project),
sidechannel: Feature.enabled?(:workhorse_use_sidechannel, repository.project, default_enabled: :yaml)
}
}
......
......@@ -244,13 +244,15 @@ RSpec.describe Gitlab::Workhorse do
GitalyServer: {
features: { 'gitaly-feature-enforce-requests-limits' => 'true' },
address: Gitlab::GitalyClient.address('default'),
token: Gitlab::GitalyClient.token('default')
token: Gitlab::GitalyClient.token('default'),
sidechannel: false
}
}
end
before do
allow(Gitlab.config.gitaly).to receive(:enabled).and_return(true)
stub_feature_flags(workhorse_use_sidechannel: false)
end
it 'includes a Repository param' do
......@@ -332,6 +334,46 @@ RSpec.describe Gitlab::Workhorse do
it { expect { subject }.to raise_exception('Unsupported action: download') }
end
context 'when workhorse_use_sidechannel flag is set' do
context 'when a feature flag is set globally' do
before do
stub_feature_flags(workhorse_use_sidechannel: true)
end
it 'sets the flag to true' do
response = described_class.git_http_ok(repository, Gitlab::GlRepository::PROJECT, user, action)
expect(response.dig(:GitalyServer, :sidechannel)).to eq(true)
end
end
context 'when a feature flag is set for a single project' do
before do
stub_feature_flags(workhorse_use_sidechannel: project)
end
it 'sets the flag to true for that project' do
response = described_class.git_http_ok(repository, Gitlab::GlRepository::PROJECT, user, action)
expect(response.dig(:GitalyServer, :sidechannel)).to eq(true)
end
it 'sets the flag to false for other projects' do
other_project = create(:project, :public, :repository)
response = described_class.git_http_ok(other_project.repository, Gitlab::GlRepository::PROJECT, user, action)
expect(response.dig(:GitalyServer, :sidechannel)).to eq(false)
end
it 'sets the flag to false when there is no project' do
snippet = create(:personal_snippet, :repository)
response = described_class.git_http_ok(snippet.repository, Gitlab::GlRepository::SNIPPET, user, action)
expect(response.dig(:GitalyServer, :sidechannel)).to eq(false)
end
end
end
end
context 'when receive_max_input_size has been updated' do
......
......@@ -5,6 +5,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net"
......@@ -20,12 +21,16 @@ import (
"github.com/golang/protobuf/jsonpb" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868
"github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
gitalyclient "gitlab.com/gitlab-org/gitaly/v14/client"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/git"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly"
......@@ -375,12 +380,24 @@ func TestPostReceivePackRouting(t *testing.T) {
}
}
type gitalyServerStarter func(*testing.T, codes.Code) (*combinedServer, string)
// ReaderFunc is an adapter to turn a conforming function into an io.Reader.
type ReaderFunc func(b []byte) (int, error)
func (r ReaderFunc) Read(b []byte) (int, error) { return r(b) }
func TestPostUploadPackProxiedToGitalySuccessfully(t *testing.T) {
testPostUploadPackProxiedToGitalySuccessfully(t, startGitalyServer, gitOkBody(t))
}
func TestPostUploadPackWithSidechannelProxiedToGitalySuccessfully(t *testing.T) {
testPostUploadPackProxiedToGitalySuccessfully(
t, startGitalyServerWithSideChannel(testhelper.PostUploadPackWithSidechannel), gitOkBodyWithSidechannel(t),
)
}
func testPostUploadPackProxiedToGitalySuccessfully(t *testing.T, startGitaly gitalyServerStarter, apiResponse *api.Response) {
for i, tc := range []struct {
showAllRefs bool
code codes.Code
......@@ -391,10 +408,9 @@ func TestPostUploadPackProxiedToGitalySuccessfully(t *testing.T) {
{false, codes.Unavailable},
} {
t.Run(fmt.Sprintf("Case %d", i), func(t *testing.T) {
apiResponse := gitOkBody(t)
apiResponse.ShowAllRefs = tc.showAllRefs
gitalyServer, socketPath := startGitalyServer(t, tc.code)
gitalyServer, socketPath := startGitaly(t, tc.code)
defer gitalyServer.GracefulStop()
apiResponse.GitalyServer.Address = "unix:" + socketPath
......@@ -460,8 +476,16 @@ func TestPostUploadPackProxiedToGitalySuccessfully(t *testing.T) {
func TestPostUploadPackProxiedToGitalyInterrupted(t *testing.T) {
apiResponse := gitOkBody(t)
testPostUploadPackProxiedToGitalyInterrupted(t, startGitalyServer, apiResponse)
}
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
func TestPostUploadPackWithSidechannelProxiedToGitalyInterrupted(t *testing.T) {
apiResponse := gitOkBodyWithSidechannel(t)
testPostUploadPackProxiedToGitalyInterrupted(t, startGitalyServerWithSideChannel(testhelper.PostUploadPackWithSidechannel), apiResponse)
}
func testPostUploadPackProxiedToGitalyInterrupted(t *testing.T, startGitaly gitalyServerStarter, apiResponse *api.Response) {
gitalyServer, socketPath := startGitaly(t, codes.OK)
defer gitalyServer.GracefulStop()
apiResponse.GitalyServer.Address = "unix:" + socketPath
......@@ -493,10 +517,19 @@ func TestPostUploadPackProxiedToGitalyInterrupted(t *testing.T) {
}
func TestPostUploadPackRouting(t *testing.T) {
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
apiResponse := gitOkBody(t)
testPostUploadPackRouting(t, startGitalyServer, apiResponse)
}
func TestPostUploadPackWithSidechannelRouting(t *testing.T) {
apiResponse := gitOkBodyWithSidechannel(t)
testPostUploadPackRouting(t, startGitalyServerWithSideChannel(testhelper.PostUploadPackWithSidechannel), apiResponse)
}
func testPostUploadPackRouting(t *testing.T, startGitaly gitalyServerStarter, apiResponse *api.Response) {
gitalyServer, socketPath := startGitaly(t, codes.OK)
defer gitalyServer.GracefulStop()
apiResponse := gitOkBody(t)
apiResponse.GitalyServer.Address = "unix:" + socketPath
ts := testAuthServer(t, nil, nil, 200, apiResponse)
defer ts.Close()
......@@ -869,3 +902,21 @@ func startGitalyServer(t *testing.T, finalMessageCode codes.Code) (*combinedServ
return &combinedServer{Server: server, GitalyTestServer: gitalyServer}, socketPath
}
func startGitalyServerWithSideChannel(handler func(interface{}, grpc.ServerStream, io.ReadWriteCloser) error) gitalyServerStarter {
return func(t *testing.T, finalMessageCode codes.Code) (*combinedServer, string) {
socketPath := path.Join(scratchDir, fmt.Sprintf("gitaly-%d.sock", rand.Int()))
if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) {
t.Fatal(err)
}
server := grpc.NewServer(gitalyclient.TestSidechannelServer(logrus.NewEntry(logrus.StandardLogger()), insecure.NewCredentials(), handler)...)
listener, err := net.Listen("unix", socketPath)
require.NoError(t, err)
gitalyServer := testhelper.NewGitalyServer(finalMessageCode)
go server.Serve(listener)
return &combinedServer{Server: server, GitalyTestServer: gitalyServer}, socketPath
}
}
......@@ -11,6 +11,7 @@ import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
......@@ -23,15 +24,19 @@ import (
)
type Server struct {
Address string `json:"address"`
Token string `json:"token"`
Features map[string]string `json:"features"`
Address string `json:"address"`
Token string `json:"token"`
Features map[string]string `json:"features"`
Sidechannel bool `json:"sidechannel"`
}
type cacheKey struct{ address, token string }
type cacheKey struct {
address, token string
sidechannel bool
}
func (server Server) cacheKey() cacheKey {
return cacheKey{address: server.Address, token: server.Token}
return cacheKey{address: server.Address, token: server.Token, sidechannel: server.Sidechannel}
}
type connectionsCache struct {
......@@ -41,9 +46,17 @@ type connectionsCache struct {
var (
jsonUnMarshaler = jsonpb.Unmarshaler{AllowUnknownFields: true}
cache = connectionsCache{
// This connection cache map contains two types of connections:
// - Normal gRPC connections
// - Sidechannel connections. When client dials to the Gitaly server, the
// server multiplexes the connection using Yamux. In the future, the server
// can open another stream to transfer data without gRPC. Besides, we apply
// a framing protocol to add the half-close capability to Yamux streams.
// Hence, we cannot use those connections interchangeably.
cache = connectionsCache{
connections: make(map[cacheKey]*grpc.ClientConn),
}
sidechannelRegistry *gitalyclient.SidechannelRegistry
connectionsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
......@@ -54,6 +67,12 @@ var (
)
)
func InitializeSidechannelRegistry(logger *logrus.Logger) {
if sidechannelRegistry == nil {
sidechannelRegistry = gitalyclient.NewSidechannelRegistry(logrus.NewEntry(logger))
}
}
func withOutgoingMetadata(ctx context.Context, features map[string]string) context.Context {
md := metadata.New(nil)
for k, v := range features {
......@@ -72,7 +91,12 @@ func NewSmartHTTPClient(ctx context.Context, server Server) (context.Context, *S
return nil, nil, err
}
grpcClient := gitalypb.NewSmartHTTPServiceClient(conn)
return withOutgoingMetadata(ctx, server.Features), &SmartHTTPClient{grpcClient}, nil
smartHTTPClient := &SmartHTTPClient{
SmartHTTPServiceClient: grpcClient,
sidechannelRegistry: sidechannelRegistry,
useSidechannel: server.Sidechannel,
}
return withOutgoingMetadata(ctx, server.Features), smartHTTPClient, nil
}
func NewBlobClient(ctx context.Context, server Server) (context.Context, *BlobClient, error) {
......@@ -173,7 +197,13 @@ func newConnection(server Server) (*grpc.ClientConn, error) {
),
)
conn, connErr := gitalyclient.Dial(server.Address, connOpts)
var conn *grpc.ClientConn
var connErr error
if server.Sidechannel {
conn, connErr = gitalyclient.DialSidechannel(context.Background(), server.Address, sidechannelRegistry, connOpts) // lint:allow context.Background
} else {
conn, connErr = gitalyclient.Dial(server.Address, connOpts)
}
label := "ok"
if connErr != nil {
......
......@@ -4,14 +4,32 @@ import (
"context"
"testing"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/metadata"
)
func TestNewSmartHTTPClient(t *testing.T) {
ctx, _, err := NewSmartHTTPClient(context.Background(), serverFixture())
ctx, client, err := NewSmartHTTPClient(context.Background(), serverFixture())
require.NoError(t, err)
testOutgoingMetadata(t, ctx)
require.False(t, client.useSidechannel)
require.Nil(t, client.sidechannelRegistry)
}
func TestNewSmartHTTPClientWithSidechannel(t *testing.T) {
InitializeSidechannelRegistry(logrus.StandardLogger())
fixture := serverFixture()
fixture.Sidechannel = true
ctx, client, err := NewSmartHTTPClient(context.Background(), fixture)
require.NoError(t, err)
testOutgoingMetadata(t, ctx)
require.True(t, client.useSidechannel)
require.NotNil(t, client.sidechannelRegistry)
}
func TestNewBlobClient(t *testing.T) {
......
......@@ -5,11 +5,14 @@ import (
"fmt"
"io"
gitalyclient "gitlab.com/gitlab-org/gitaly/v14/client"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/v14/streamio"
)
type SmartHTTPClient struct {
useSidechannel bool
sidechannelRegistry *gitalyclient.SidechannelRegistry
gitalypb.SmartHTTPServiceClient
}
......@@ -93,6 +96,14 @@ func (client *SmartHTTPClient) ReceivePack(ctx context.Context, repo *gitalypb.R
}
func (client *SmartHTTPClient) UploadPack(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error {
if client.useSidechannel {
return client.runUploadPackWithSidechannel(ctx, repo, clientRequest, clientResponse, gitConfigOptions, gitProtocol)
}
return client.runUploadPack(ctx, repo, clientRequest, clientResponse, gitConfigOptions, gitProtocol)
}
func (client *SmartHTTPClient) runUploadPack(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error {
stream, err := client.PostUploadPack(ctx)
if err != nil {
return err
......@@ -137,3 +148,38 @@ func (client *SmartHTTPClient) UploadPack(ctx context.Context, repo *gitalypb.Re
return nil
}
func (client *SmartHTTPClient) runUploadPackWithSidechannel(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error {
ctx, waiter := client.sidechannelRegistry.Register(ctx, func(conn gitalyclient.SidechannelConn) error {
if _, err := io.Copy(conn, clientRequest); err != nil {
return err
}
if err := conn.CloseWrite(); err != nil {
return fmt.Errorf("fail to signal sidechannel half-close: %w", err)
}
if _, err := io.Copy(clientResponse, conn); err != nil {
return err
}
return nil
})
defer waiter.Close()
rpcRequest := &gitalypb.PostUploadPackWithSidechannelRequest{
Repository: repo,
GitConfigOptions: gitConfigOptions,
GitProtocol: gitProtocol,
}
if _, err := client.PostUploadPackWithSidechannel(ctx, rpcRequest); err != nil {
return err
}
if err := waiter.Close(); err != nil {
return fmt.Errorf("fail to close sidechannel connection: %w", err)
}
return nil
}
package testhelper
import (
"bytes"
"fmt"
"io"
"io/ioutil"
......@@ -11,6 +12,7 @@ import (
"github.com/golang/protobuf/jsonpb" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868
"github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
......@@ -23,6 +25,7 @@ type GitalyTestServer struct {
finalMessageCode codes.Code
sync.WaitGroup
LastIncomingMetadata metadata.MD
gitalypb.UnimplementedSmartHTTPServiceServer
gitalypb.UnimplementedRepositoryServiceServer
gitalypb.UnimplementedBlobServiceServer
gitalypb.UnimplementedDiffServiceServer
......@@ -191,13 +194,14 @@ func (s *GitalyTestServer) PostUploadPack(stream gitalypb.SmartHTTPService_PostU
return err
}
jsonString, err := marshalJSON(req)
if err != nil {
marshaler := &jsonpb.Marshaler{}
jsonBytes := &bytes.Buffer{}
if err := marshaler.Marshal(jsonBytes, req); err != nil {
return err
}
if err := stream.Send(&gitalypb.PostUploadPackResponse{
Data: []byte(strings.Join([]string{jsonString}, "\000") + "\000"),
Data: append(jsonBytes.Bytes(), 0),
}); err != nil {
return err
}
......@@ -229,6 +233,43 @@ func (s *GitalyTestServer) PostUploadPack(stream gitalypb.SmartHTTPService_PostU
return s.finalError()
}
// PostUploadPackWithSidechannel should be a part of smarthttp server in real
// server. In workhorse, setting up a real sidechannel server is troublesome.
// Therefore, we bring up a sidechannel server with a mock server exported via
// gitalyclient.TestSidechannelServer. This is the handler for that mock
// server.
func PostUploadPackWithSidechannel(srv interface{}, stream grpc.ServerStream, conn io.ReadWriteCloser) error {
if method, ok := grpc.Method(stream.Context()); !ok || method != "/gitaly.SmartHTTPService/PostUploadPackWithSidechannel" {
return fmt.Errorf("unexpected method: %s", method)
}
var req gitalypb.PostUploadPackWithSidechannelRequest
if err := stream.RecvMsg(&req); err != nil {
return err
}
if err := validateRepository(req.GetRepository()); err != nil {
return err
}
marshaler := &jsonpb.Marshaler{}
jsonBytes := &bytes.Buffer{}
if err := marshaler.Marshal(jsonBytes, &req); err != nil {
return err
}
// Bounce back all data back to the client, plus flushing bytes
if _, err := conn.Write(append(jsonBytes.Bytes(), 0)); err != nil {
return err
}
if _, err := io.Copy(conn, conn); err != nil {
return err
}
return stream.SendMsg(&gitalypb.PostUploadPackWithSidechannelResponse{})
}
func (s *GitalyTestServer) CommitIsAncestor(ctx context.Context, in *gitalypb.CommitIsAncestorRequest) (*gitalypb.CommitIsAncestorResponse, error) {
return nil, nil
}
......
......@@ -18,6 +18,7 @@ import (
"gitlab.com/gitlab-org/labkit/tracing"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/queueing"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/redis"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/secret"
......@@ -233,6 +234,8 @@ func run(boot bootConfig, cfg config.Config) error {
}
defer accessCloser.Close()
gitaly.InitializeSidechannelRegistry(accessLogger)
up := wrapRaven(upstream.NewUpstream(cfg, accessLogger))
done := make(chan os.Signal, 1)
......
......@@ -814,6 +814,8 @@ func startWorkhorseServerWithConfig(cfg *config.Config) *httptest.Server {
testhelper.ConfigureSecret()
u := upstream.NewUpstream(*cfg, logrus.StandardLogger())
gitaly.InitializeSidechannelRegistry(logrus.StandardLogger())
return httptest.NewServer(u)
}
......@@ -834,6 +836,20 @@ func gitOkBody(t *testing.T) *api.Response {
}
}
func gitOkBodyWithSidechannel(t *testing.T) *api.Response {
return &api.Response{
GL_ID: "user-123",
GL_USERNAME: "username",
Repository: gitalypb.Repository{
StorageName: "default",
RelativePath: "foo/bar.git",
},
GitalyServer: gitaly.Server{
Sidechannel: true,
},
}
}
func httpGet(t *testing.T, url string, headers map[string]string) (*http.Response, string) {
req, err := http.NewRequest("GET", url, nil)
require.NoError(t, err)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment