Commit 19649275 authored by Kirill Smelkov's avatar Kirill Smelkov

NXD blob/auth: Cache auth backend reply for 30s

[ Sent upstream: https://gitlab.com/gitlab-org/gitlab-workhorse/merge_requests/17

  This patch was sent upstream but was not accepted for "complexity"
  reason of auth cache, despite that provides more than an order of magnitude
  speedup. Just carry it with us as NXD ]

In previous patch we added code to serve blob content via running `git cat-file
...` directly, but for every such request a request to slow RoR-based auth
backend is made, which is bad for performance.

Let's cache auth backend reply for small period of time, e.g. 30 seconds, which
will change the situation dramatically:

If we have a lot of requests to the same repository, we query auth backend only
for every Nth request and with e.g. 100 raw blob request/s N=3000 which means
that previous load to RoR code essentially goes away.

On the other hand as we query auth backend only once in a while and refresh the
cache, we will not miss potential changes in project settings. I mean potential
e.g. 25 seconds delay for a project to become public, or vise versa to become
private does no real harm.

The cache is done with the idea to allow the read side codepath to execute in
parallel and to be not blocked by eventual cache updates.

Overall this improves performance a lot:

  (on a 8-CPU i7-3770S with 16GB of RAM, 2001:67c:1254:e:8b::c776 is on localhost)

  # request is handled by gitlab-workhorse, but without auth caching
  $ ./wrk -c40 -d10 -t1 --latency http://[2001:67c:1254:e:8b::c776]:7777/nexedi/slapos/raw/master/software/wendelin/software.cfg
  Running 10s test @ http://[2001:67c:1254:e:8b::c776]:7777/nexedi/slapos/raw/master/software/wendelin/software.cfg
    1 threads and 40 connections
    Thread Stats   Avg      Stdev     Max   +/- Stdev
      Latency   458.42ms   66.26ms 766.12ms   84.76%
      Req/Sec    85.38     16.59   120.00     82.00%
    Latency Distribution
       50%  459.26ms
       75%  490.09ms
       90%  523.95ms
       99%  611.33ms
    853 requests in 10.01s, 1.51MB read
  Requests/sec:     85.18
  Transfer/sec:    154.90KB

  # request goes to gitlab-workhorse with auth caching (this patch)
  $ ./wrk -c40 -d10 -t1 --latency http://[2001:67c:1254:e:8b::c776]:7777/nexedi/slapos/raw/master/software/wendelin/software.cfg
  Running 10s test @ http://[2001:67c:1254:e:8b::c776]:7777/nexedi/slapos/raw/master/software/wendelin/software.cfg
    1 threads and 40 connections
    Thread Stats   Avg      Stdev     Max   +/- Stdev
      Latency    34.52ms   19.28ms 288.63ms   74.74%
      Req/Sec     1.20k   127.21     1.39k    85.00%
    Latency Distribution
       50%   32.67ms
       75%   42.73ms
       90%   56.26ms
       99%   99.86ms
    11961 requests in 10.01s, 21.24MB read
  Requests/sec:   1194.51
  Transfer/sec:      2.12MB

i.e. it is ~ 14x improvement.
parent 387a2d45
...@@ -13,20 +13,23 @@ import ( ...@@ -13,20 +13,23 @@ import (
) )
type API struct { type API struct {
Client *http.Client Client *http.Client
URL *url.URL URL *url.URL
Version string Version string
authCache *AuthCache
} }
func NewAPI(myURL *url.URL, version string, roundTripper *badgateway.RoundTripper) *API { func NewAPI(myURL *url.URL, version string, roundTripper *badgateway.RoundTripper) *API {
if roundTripper == nil { if roundTripper == nil {
roundTripper = badgateway.NewRoundTripper("", 0) roundTripper = badgateway.NewRoundTripper("", 0)
} }
return &API{ a := &API{
Client: &http.Client{Transport: roundTripper}, Client: &http.Client{Transport: roundTripper},
URL: myURL, URL: myURL,
Version: version, Version: version,
} }
a.authCache = NewAuthCache(a)
return a
} }
type HandleFunc func(http.ResponseWriter, *http.Request, *Response) type HandleFunc func(http.ResponseWriter, *http.Request, *Response)
......
...@@ -10,12 +10,16 @@ import ( ...@@ -10,12 +10,16 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url"
"strings"
"sync"
"time"
) )
// Reply from auth backend, e.g. for "download from repo" authorization request // Reply from auth backend, e.g. for "download from repo" authorization request
type AuthReply struct { type AuthReply struct {
// raw reply from auth backend & PreAuthorizeHandler(). // raw reply from auth backend & PreAuthorizeHandler().
// recorded so we can replay it to client in full // recorded so we can replay it from auth cache to each client in full
// if e.g. access is rejected. // if e.g. access is rejected.
RawReply *httptest.ResponseRecorder RawReply *httptest.ResponseRecorder
...@@ -23,6 +27,192 @@ type AuthReply struct { ...@@ -23,6 +27,192 @@ type AuthReply struct {
Response Response
} }
// Entry in authorization reply cache
type AuthCacheEntry struct {
// FIXME we need to lock the entry only to "correctly" update Nhit on
// read side, but we can tolerate some looses in Nhit and update it
// without mutex or atomic. Only -race complains...
// ( we could use atomic.Value for atomic whole cache entry updates from
// refresher without need for locks on readers side, but the need to do
// .Nhit++ on readers side ruins that )
sync.Mutex
AuthReply
Tauth int64 // in seconds
Nhit int64 // how many times this entry was hit when querying auth cache during the last refresh period.
ready chan struct{} // closed when entry is ready
}
// Entries are keyed by project + credentials
type AuthCacheKey struct {
project string
query string // e.g. with passing in private_token=...
header string // request header url-encoded, e.g. PRIVATE-TOKEN=...
}
// Authorization reply cache
// {} AuthCacheKey -> *AuthCacheEntry
type AuthCache struct {
a *API // for which API we cache auth
mu sync.RWMutex // guards .cached
cached map[AuthCacheKey]*AuthCacheEntry
}
func NewAuthCache(a *API) *AuthCache {
return &AuthCache{a: a, cached: make(map[AuthCacheKey]*AuthCacheEntry)}
}
// Verify that download access is ok or not.
// first we try to use the cache; if information is not there -> ask auth backend
// download is ok if AuthReply.RepoPath != ""
func (c *AuthCache) VerifyDownloadAccess(project string, query string, header http.Header) AuthReply {
// Use only tokens from query/header and selected cookies to minimize cache and avoid
// creating redundant cache entries because of e.g. unrelated headers.
queryValues, _ := url.ParseQuery(query) // this is what URL.Query() does
q := url.Values{}
for k, v := range queryValues {
if strings.HasSuffix(k, "_token") {
q[k] = v
}
}
h := url.Values{}
for k, v := range header {
if strings.HasSuffix(strings.ToUpper(k), "-TOKEN") {
h[k] = v
}
}
// r: net.readCookies() is private - workaround via http.Request.Cookies().
// rc: cookie composition is exposed to http.Request only - use it.
r := http.Request{Header: header}
rc := http.Request{Header: make(http.Header)}
for _, c := range r.Cookies() {
switch c.Name {
case
"_gitlab_session":
rc.AddCookie(c)
}
}
if hc := rc.Header.Get("Cookie"); hc != "" {
h["Cookie"] = []string{hc}
}
key := AuthCacheKey{project, q.Encode(), h.Encode()}
return c.verifyDownloadAccess(key)
}
func (c *AuthCache) verifyDownloadAccess(key AuthCacheKey) AuthReply {
var authReply AuthReply
// first try to read from cache in parallel with other readers
c.mu.RLock()
auth := c.cached[key]
c.mu.RUnlock()
have_entry:
// entry in cache - use it
if auth != nil {
<-auth.ready // wait until it is ready
auth.Lock()
auth.Nhit++
//log.Printf("authReply for %v cached ago: %v (hits: %v)",
// key,
// time.Since(time.Unix(auth.Tauth, 0)),
// auth.Nhit)
authReply = auth.AuthReply
auth.Unlock()
} else {
// no entry - relock the cache in exclusive mode, create empty entry,
// and start filling it
c.mu.Lock()
// another ex-reader could be trying to create this entry
// simultaneously with us - recheck
auth = c.cached[key]
if auth != nil {
c.mu.Unlock()
goto have_entry
}
// new not-yet-ready entry
auth = &AuthCacheEntry{ready: make(chan struct{})}
c.cached[key] = auth
c.mu.Unlock()
// this goroutine becomes responsible for querying auth backend
auth.AuthReply = c.askAuthBackend(key)
auth.Tauth = time.Now().Unix()
auth.Nhit = 0
authReply = auth.AuthReply
// broadcast to other goroutines that this entry is ready
close(auth.ready)
// launch entry refresher
go c.refreshEntry(auth, key)
}
return authReply
}
// Time period for refreshing / removing unused entires in authCache
const authCacheRefresh = 30 * time.Second
// Goroutine to refresh auth cache entry periodically while it is used.
// if the entry is detected to be not used - remove it from cache and stop refreshing.
func (c *AuthCache) refreshEntry(auth *AuthCacheEntry, key AuthCacheKey) {
for {
time.Sleep(authCacheRefresh)
auth.Lock()
nhit := auth.Nhit
auth.Unlock()
// clear cache entry if it is not used
//log.Printf("AUTH refresh - %v #hit: %v", key, nhit)
if nhit == 0 { // not used - we can remove and stop refreshing
//log.Printf("AUTH - removing %v", key)
// NOTE it is ok even if someone gets this auth in this time window
// and use it for some time
c.mu.Lock()
delete(c.cached, key)
c.mu.Unlock()
break
}
//log.Printf("AUTH - refreshing %v", key)
authReply := c.askAuthBackend(key)
auth.Lock()
auth.AuthReply = authReply
auth.Tauth = time.Now().Unix()
auth.Nhit = 0
auth.Unlock()
}
}
// Ask auth backend about cache key
func (c *AuthCache) askAuthBackend(key AuthCacheKey) AuthReply {
// key.header -> url.Values -> http.Header
hv, err := url.ParseQuery(key.header)
if err != nil {
// we prepared key.header ourselves via url-encoding in AuthCache.VerifyDownloadAccess().
// It must be ok
panic(err)
}
header := make(http.Header)
for k, v := range hv {
header[k] = v
}
return c.a.verifyDownloadAccess(key.project, key.query, header)
}
// for detecting whether archive download is ok via senddata mechanism // for detecting whether archive download is ok via senddata mechanism
type testDownloadOkViaSendArchive struct { type testDownloadOkViaSendArchive struct {
senddata.Prefix senddata.Prefix
...@@ -44,7 +234,14 @@ func (aok *testDownloadOkViaSendArchive) Inject(w http.ResponseWriter, r *http.R ...@@ -44,7 +234,14 @@ func (aok *testDownloadOkViaSendArchive) Inject(w http.ResponseWriter, r *http.R
// Ask auth backend about whether download is ok for a project. // Ask auth backend about whether download is ok for a project.
// Authorization is approved if AuthReply.RepoPath != "" on return // Authorization is approved if AuthReply.RepoPath != "" on return
// Raw auth backend response is emitted to AuthReply.RawReply // Raw auth backend response is emitted to AuthReply.RawReply
//
// Replies from authentication backend are cached for 30 seconds as each
// request to Rails code is heavy and slow.
func (a *API) VerifyDownloadAccess(project, query string, header http.Header) AuthReply { func (a *API) VerifyDownloadAccess(project, query string, header http.Header) AuthReply {
return a.authCache.VerifyDownloadAccess(project, query, header)
}
func (a *API) verifyDownloadAccess(project, query string, header http.Header) AuthReply {
authReply := AuthReply{ authReply := AuthReply{
RawReply: httptest.NewRecorder(), RawReply: httptest.NewRecorder(),
} }
......
...@@ -49,7 +49,9 @@ func handleGetBlobRaw(a *api.API, w http.ResponseWriter, r *http.Request) { ...@@ -49,7 +49,9 @@ func handleGetBlobRaw(a *api.API, w http.ResponseWriter, r *http.Request) {
w.Header()[k] = v w.Header()[k] = v
} }
w.WriteHeader(authReply.RawReply.Code) w.WriteHeader(authReply.RawReply.Code)
_, err := io.Copy(w, authReply.RawReply.Body) // NOTE do not consume authReply.RawReply.Body with io.Copy() -
// this way it will be read one time only and next reads will be empty.
_, err := w.Write(authReply.RawReply.Body.Bytes())
if err != nil { if err != nil {
helper.LogError(fmt.Errorf("writing authReply.RawReply.Body: %v", err)) helper.LogError(fmt.Errorf("writing authReply.RawReply.Body: %v", err))
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment