stream.go 29.9 KB
Newer Older
1 2
/*
 *
3
 * Copyright 2014 gRPC authors.
4
 *
5 6 7
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11 12 13 14 15
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
16 17 18 19 20 21 22 23
 *
 */

package grpc

import (
	"errors"
	"io"
Ben Kochie's avatar
Ben Kochie committed
24 25
	"math"
	"strconv"
26 27 28 29 30
	"sync"
	"time"

	"golang.org/x/net/context"
	"golang.org/x/net/trace"
31
	"google.golang.org/grpc/balancer"
32
	"google.golang.org/grpc/codes"
Jacob Vosmaer's avatar
Jacob Vosmaer committed
33
	"google.golang.org/grpc/encoding"
Ben Kochie's avatar
Ben Kochie committed
34
	"google.golang.org/grpc/grpclog"
Ben Kochie's avatar
Ben Kochie committed
35
	"google.golang.org/grpc/internal/channelz"
Ben Kochie's avatar
Ben Kochie committed
36 37
	"google.golang.org/grpc/internal/grpcrand"
	"google.golang.org/grpc/internal/transport"
38 39
	"google.golang.org/grpc/metadata"
	"google.golang.org/grpc/stats"
40
	"google.golang.org/grpc/status"
41 42 43
)

// StreamHandler defines the handler called by gRPC server to complete the
Ben Kochie's avatar
Ben Kochie committed
44 45 46 47
// execution of a streaming RPC. If a StreamHandler returns an error, it
// should be produced by the status package, or else gRPC will use
// codes.Unknown as the status code and err.Error() as the status message
// of the RPC.
48 49 50 51 52 53 54 55 56 57 58 59 60
type StreamHandler func(srv interface{}, stream ServerStream) error

// StreamDesc represents a streaming RPC service's method specification.
type StreamDesc struct {
	StreamName string
	Handler    StreamHandler

	// At least one of these is true.
	ServerStreams bool
	ClientStreams bool
}

// Stream defines the common interface a client or server stream has to satisfy.
Ben Kochie's avatar
Ben Kochie committed
61
//
Ben Kochie's avatar
Ben Kochie committed
62
// Deprecated: See ClientStream and ServerStream documentation instead.
63
type Stream interface {
Ben Kochie's avatar
Ben Kochie committed
64
	// Deprecated: See ClientStream and ServerStream documentation instead.
65
	Context() context.Context
Ben Kochie's avatar
Ben Kochie committed
66
	// Deprecated: See ClientStream and ServerStream documentation instead.
67
	SendMsg(m interface{}) error
Ben Kochie's avatar
Ben Kochie committed
68
	// Deprecated: See ClientStream and ServerStream documentation instead.
69 70 71
	RecvMsg(m interface{}) error
}

Ben Kochie's avatar
Ben Kochie committed
72 73 74 75
// ClientStream defines the client-side behavior of a streaming RPC.
//
// All errors returned from ClientStream methods are compatible with the
// status package.
76 77 78 79 80 81 82 83 84 85 86
type ClientStream interface {
	// Header returns the header metadata received from the server if there
	// is any. It blocks if the metadata is not ready to read.
	Header() (metadata.MD, error)
	// Trailer returns the trailer metadata from the server, if there is any.
	// It must only be called after stream.CloseAndRecv has returned, or
	// stream.Recv has returned a non-nil error (including io.EOF).
	Trailer() metadata.MD
	// CloseSend closes the send direction of the stream. It closes the stream
	// when non-nil error is met.
	CloseSend() error
Ben Kochie's avatar
Ben Kochie committed
87 88 89 90 91 92 93 94 95 96 97 98 99 100
	// Context returns the context for this stream.
	//
	// It should not be called until after Header or RecvMsg has returned. Once
	// called, subsequent client-side retries are disabled.
	Context() context.Context
	// SendMsg is generally called by generated code. On error, SendMsg aborts
	// the stream. If the error was generated by the client, the status is
	// returned directly; otherwise, io.EOF is returned and the status of
	// the stream may be discovered using RecvMsg.
	//
	// SendMsg blocks until:
	//   - There is sufficient flow control to schedule m with the transport, or
	//   - The stream is done, or
	//   - The stream breaks.
Ben Kochie's avatar
Ben Kochie committed
101
	//
Ben Kochie's avatar
Ben Kochie committed
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
	// SendMsg does not wait until the message is received by the server. An
	// untimely stream closure may result in lost messages. To ensure delivery,
	// users should ensure the RPC completed successfully using RecvMsg.
	//
	// It is safe to have a goroutine calling SendMsg and another goroutine
	// calling RecvMsg on the same stream at the same time, but it is not safe
	// to call SendMsg on the same stream in different goroutines.
	SendMsg(m interface{}) error
	// RecvMsg blocks until it receives a message into m or the stream is
	// done. It returns io.EOF when the stream completes successfully. On
	// any other error, the stream is aborted and the error contains the RPC
	// status.
	//
	// It is safe to have a goroutine calling SendMsg and another goroutine
	// calling RecvMsg on the same stream at the same time, but it is not
	// safe to call RecvMsg on the same stream in different goroutines.
	RecvMsg(m interface{}) error
119 120
}

Jacob Vosmaer's avatar
Jacob Vosmaer committed
121
// NewStream creates a new Stream for the client side. This is typically
Ben Kochie's avatar
Ben Kochie committed
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
// called by generated code. ctx is used for the lifetime of the stream.
//
// To ensure resources are not leaked due to the stream returned, one of the following
// actions must be performed:
//
//      1. Call Close on the ClientConn.
//      2. Cancel the context provided.
//      3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
//         client-streaming RPC, for instance, might use the helper function
//         CloseAndRecv (note that CloseSend does not Recv, therefore is not
//         guaranteed to release all resources).
//      4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
//
// If none of the above happen, a goroutine and a context will be leaked, and grpc
// will not call the optionally-configured stats handler with a stats.End message.
Jacob Vosmaer's avatar
Jacob Vosmaer committed
137
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
Ben Kochie's avatar
Ben Kochie committed
138 139 140 141
	// allow interceptor to see all applicable call options, which means those
	// configured as defaults from dial option as well as per-call options
	opts = combine(cc.dopts.callOptions, opts)

142 143 144 145 146 147
	if cc.dopts.streamInt != nil {
		return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
	}
	return newClientStream(ctx, desc, cc, method, opts...)
}

Ben Kochie's avatar
Ben Kochie committed
148
// NewClientStream is a wrapper for ClientConn.NewStream.
Jacob Vosmaer's avatar
Jacob Vosmaer committed
149 150 151 152
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
	return cc.NewStream(ctx, desc, method, opts...)
}

153
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
Ben Kochie's avatar
Ben Kochie committed
154 155 156 157 158 159 160 161
	if channelz.IsOn() {
		cc.incrCallsStarted()
		defer func() {
			if err != nil {
				cc.incrCallsFailed()
			}
		}()
	}
162 163 164 165 166 167
	c := defaultCallInfo()
	mc := cc.GetMethodConfig(method)
	if mc.WaitForReady != nil {
		c.failFast = !*mc.WaitForReady
	}

Ben Kochie's avatar
Ben Kochie committed
168 169 170 171 172 173
	// Possible context leak:
	// The cancel function for the child context we create will only be called
	// when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
	// an error is generated by SendMsg.
	// https://github.com/grpc/grpc-go/issues/1818.
	var cancel context.CancelFunc
Jacob Vosmaer's avatar
Jacob Vosmaer committed
174
	if mc.Timeout != nil && *mc.Timeout >= 0 {
175
		ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
Ben Kochie's avatar
Ben Kochie committed
176 177
	} else {
		ctx, cancel = context.WithCancel(ctx)
178
	}
Ben Kochie's avatar
Ben Kochie committed
179 180 181 182 183
	defer func() {
		if err != nil {
			cancel()
		}
	}()
184

185
	for _, o := range opts {
186
		if err := o.before(c); err != nil {
187 188 189
			return nil, toRPCErr(err)
		}
	}
190 191
	c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
	c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
Ben Kochie's avatar
Ben Kochie committed
192 193 194
	if err := setCallInfoCodec(c); err != nil {
		return nil, err
	}
195

196
	callHdr := &transport.CallHdr{
Ben Kochie's avatar
Ben Kochie committed
197 198
		Host:           cc.authority,
		Method:         method,
Ben Kochie's avatar
Ben Kochie committed
199
		ContentSubtype: c.contentSubtype,
200
	}
Jacob Vosmaer's avatar
Jacob Vosmaer committed
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216

	// Set our outgoing compression according to the UseCompressor CallOption, if
	// set.  In that case, also find the compressor from the encoding package.
	// Otherwise, use the compressor configured by the WithCompressor DialOption,
	// if set.
	var cp Compressor
	var comp encoding.Compressor
	if ct := c.compressorType; ct != "" {
		callHdr.SendCompress = ct
		if ct != encoding.Identity {
			comp = encoding.GetCompressor(ct)
			if comp == nil {
				return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
			}
		}
	} else if cc.dopts.cp != nil {
217
		callHdr.SendCompress = cc.dopts.cp.Type()
Jacob Vosmaer's avatar
Jacob Vosmaer committed
218
		cp = cc.dopts.cp
219
	}
220 221 222
	if c.creds != nil {
		callHdr.Creds = c.creds
	}
223 224 225 226 227 228 229 230 231 232
	var trInfo traceInfo
	if EnableTracing {
		trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
		trInfo.firstLine.client = true
		if deadline, ok := ctx.Deadline(); ok {
			trInfo.firstLine.deadline = deadline.Sub(time.Now())
		}
		trInfo.tr.LazyLog(&trInfo.firstLine, false)
		ctx = trace.NewContext(ctx, trInfo.tr)
	}
233
	ctx = newContextWithRPCInfo(ctx, c.failFast)
234
	sh := cc.dopts.copts.StatsHandler
Ben Kochie's avatar
Ben Kochie committed
235
	var beginTime time.Time
236
	if sh != nil {
237
		ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
Ben Kochie's avatar
Ben Kochie committed
238
		beginTime = time.Now()
239 240
		begin := &stats.Begin{
			Client:    true,
Ben Kochie's avatar
Ben Kochie committed
241
			BeginTime: beginTime,
242 243 244 245
			FailFast:  c.failFast,
		}
		sh.HandleRPC(ctx, begin)
	}
Jacob Vosmaer's avatar
Jacob Vosmaer committed
246

Ben Kochie's avatar
Ben Kochie committed
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
	cs := &clientStream{
		callHdr:      callHdr,
		ctx:          ctx,
		methodConfig: &mc,
		opts:         opts,
		callInfo:     c,
		cc:           cc,
		desc:         desc,
		codec:        c.codec,
		cp:           cp,
		comp:         comp,
		cancel:       cancel,
		beginTime:    beginTime,
		firstAttempt: true,
	}
	if !cc.dopts.disableRetry {
		cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
	}

	cs.callInfo.stream = cs
	// Only this initial attempt has stats/tracing.
	// TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
	if err := cs.newAttemptLocked(sh, trInfo); err != nil {
		cs.finish(err)
		return nil, err
	}
273

Ben Kochie's avatar
Ben Kochie committed
274 275 276 277
	op := func(a *csAttempt) error { return a.newStream() }
	if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
		cs.finish(err)
		return nil, err
278
	}
Jacob Vosmaer's avatar
Jacob Vosmaer committed
279

Ben Kochie's avatar
Ben Kochie committed
280 281 282 283 284 285 286 287 288 289 290 291 292 293
	if desc != unaryStreamDesc {
		// Listen on cc and stream contexts to cleanup when the user closes the
		// ClientConn or cancels the stream context.  In all other cases, an error
		// should already be injected into the recv buffer by the transport, which
		// the client will eventually receive, and then we will cancel the stream's
		// context in clientStream.finish.
		go func() {
			select {
			case <-cc.ctx.Done():
				cs.finish(ErrClientConnClosing)
			case <-ctx.Done():
				cs.finish(toRPCErr(ctx.Err()))
			}
		}()
294 295 296 297
	}
	return cs, nil
}

Ben Kochie's avatar
Ben Kochie committed
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo traceInfo) error {
	cs.attempt = &csAttempt{
		cs:           cs,
		dc:           cs.cc.dopts.dc,
		statsHandler: sh,
		trInfo:       trInfo,
	}

	if err := cs.ctx.Err(); err != nil {
		return toRPCErr(err)
	}
	t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method)
	if err != nil {
		return err
	}
	cs.attempt.t = t
	cs.attempt.done = done
	return nil
}

func (a *csAttempt) newStream() error {
	cs := a.cs
	cs.callHdr.PreviousAttempts = cs.numRetries
	s, err := a.t.NewStream(cs.ctx, cs.callHdr)
	if err != nil {
		return toRPCErr(err)
	}
	cs.attempt.s = s
	cs.attempt.p = &parser{r: s}
	return nil
}

330 331
// clientStream implements a client side Stream.
type clientStream struct {
Ben Kochie's avatar
Ben Kochie committed
332 333 334 335 336
	callHdr  *transport.CallHdr
	opts     []CallOption
	callInfo *callInfo
	cc       *ClientConn
	desc     *StreamDesc
Ben Kochie's avatar
Ben Kochie committed
337 338 339 340 341 342 343

	codec baseCodec
	cp    Compressor
	comp  encoding.Compressor

	cancel context.CancelFunc // cancels all attempts

Ben Kochie's avatar
Ben Kochie committed
344 345 346 347 348 349
	sentLast  bool // sent an end stream
	beginTime time.Time

	methodConfig *MethodConfig

	ctx context.Context // the application's context, wrapped by stats/tracing
Ben Kochie's avatar
Ben Kochie committed
350

Ben Kochie's avatar
Ben Kochie committed
351
	retryThrottler *retryThrottler // The throttler active when the RPC began.
Ben Kochie's avatar
Ben Kochie committed
352

Ben Kochie's avatar
Ben Kochie committed
353 354 355 356 357 358
	mu                      sync.Mutex
	firstAttempt            bool       // if true, transparent retry is valid
	numRetries              int        // exclusive of transparent retry attempt(s)
	numRetriesSincePushback int        // retries since pushback; to reset backoff
	finished                bool       // TODO: replace with atomic cmpxchg or sync.Once?
	attempt                 *csAttempt // the active client stream attempt
Ben Kochie's avatar
Ben Kochie committed
359
	// TODO(hedging): hedging will have multiple attempts simultaneously.
Ben Kochie's avatar
Ben Kochie committed
360 361 362
	committed  bool                       // active attempt committed for retry?
	buffer     []func(a *csAttempt) error // operations to replay on retry
	bufferSize int                        // current size of buffer
Ben Kochie's avatar
Ben Kochie committed
363 364 365 366 367 368
}

// csAttempt implements a single transport stream attempt within a
// clientStream.
type csAttempt struct {
	cs   *clientStream
Jacob Vosmaer's avatar
Jacob Vosmaer committed
369 370 371
	t    transport.ClientTransport
	s    *transport.Stream
	p    *parser
Ben Kochie's avatar
Ben Kochie committed
372
	done func(balancer.DoneInfo)
Jacob Vosmaer's avatar
Jacob Vosmaer committed
373

Ben Kochie's avatar
Ben Kochie committed
374
	finished  bool
Jacob Vosmaer's avatar
Jacob Vosmaer committed
375 376 377 378
	dc        Decompressor
	decomp    encoding.Compressor
	decompSet bool

Ben Kochie's avatar
Ben Kochie committed
379 380 381
	mu sync.Mutex // guards trInfo.tr
	// trInfo.tr is set when created (if EnableTracing is true),
	// and cleared when the finish method is called.
382 383 384
	trInfo traceInfo

	statsHandler stats.Handler
Ben Kochie's avatar
Ben Kochie committed
385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515
}

func (cs *clientStream) commitAttemptLocked() {
	cs.committed = true
	cs.buffer = nil
}

func (cs *clientStream) commitAttempt() {
	cs.mu.Lock()
	cs.commitAttemptLocked()
	cs.mu.Unlock()
}

// shouldRetry returns nil if the RPC should be retried; otherwise it returns
// the error that should be returned by the operation.
func (cs *clientStream) shouldRetry(err error) error {
	if cs.attempt.s == nil && !cs.callInfo.failFast {
		// In the event of any error from NewStream (attempt.s == nil), we
		// never attempted to write anything to the wire, so we can retry
		// indefinitely for non-fail-fast RPCs.
		return nil
	}
	if cs.finished || cs.committed {
		// RPC is finished or committed; cannot retry.
		return err
	}
	// Wait for the trailers.
	if cs.attempt.s != nil {
		<-cs.attempt.s.Done()
	}
	if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
		// First attempt, wait-for-ready, stream unprocessed: transparently retry.
		cs.firstAttempt = false
		return nil
	}
	cs.firstAttempt = false
	if cs.cc.dopts.disableRetry {
		return err
	}

	pushback := 0
	hasPushback := false
	if cs.attempt.s != nil {
		if to, toErr := cs.attempt.s.TrailersOnly(); toErr != nil {
			// Context error; stop now.
			return toErr
		} else if !to {
			return err
		}

		// TODO(retry): Move down if the spec changes to not check server pushback
		// before considering this a failure for throttling.
		sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"]
		if len(sps) == 1 {
			var e error
			if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
				grpclog.Infof("Server retry pushback specified to abort (%q).", sps[0])
				cs.retryThrottler.throttle() // This counts as a failure for throttling.
				return err
			}
			hasPushback = true
		} else if len(sps) > 1 {
			grpclog.Warningf("Server retry pushback specified multiple values (%q); not retrying.", sps)
			cs.retryThrottler.throttle() // This counts as a failure for throttling.
			return err
		}
	}

	var code codes.Code
	if cs.attempt.s != nil {
		code = cs.attempt.s.Status().Code()
	} else {
		code = status.Convert(err).Code()
	}

	rp := cs.methodConfig.retryPolicy
	if rp == nil || !rp.retryableStatusCodes[code] {
		return err
	}

	// Note: the ordering here is important; we count this as a failure
	// only if the code matched a retryable code.
	if cs.retryThrottler.throttle() {
		return err
	}
	if cs.numRetries+1 >= rp.maxAttempts {
		return err
	}

	var dur time.Duration
	if hasPushback {
		dur = time.Millisecond * time.Duration(pushback)
		cs.numRetriesSincePushback = 0
	} else {
		fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback))
		cur := float64(rp.initialBackoff) * fact
		if max := float64(rp.maxBackoff); cur > max {
			cur = max
		}
		dur = time.Duration(grpcrand.Int63n(int64(cur)))
		cs.numRetriesSincePushback++
	}

	// TODO(dfawley): we could eagerly fail here if dur puts us past the
	// deadline, but unsure if it is worth doing.
	t := time.NewTimer(dur)
	select {
	case <-t.C:
		cs.numRetries++
		return nil
	case <-cs.ctx.Done():
		t.Stop()
		return status.FromContextError(cs.ctx.Err()).Err()
	}
}

// Returns nil if a retry was performed and succeeded; error otherwise.
func (cs *clientStream) retryLocked(lastErr error) error {
	for {
		cs.attempt.finish(lastErr)
		if err := cs.shouldRetry(lastErr); err != nil {
			cs.commitAttemptLocked()
			return err
		}
		if err := cs.newAttemptLocked(nil, traceInfo{}); err != nil {
			return err
		}
		if lastErr = cs.replayBufferLocked(); lastErr == nil {
			return nil
		}
	}
516 517 518
}

func (cs *clientStream) Context() context.Context {
Ben Kochie's avatar
Ben Kochie committed
519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552
	cs.commitAttempt()
	// No need to lock before using attempt, since we know it is committed and
	// cannot change.
	return cs.attempt.s.Context()
}

func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
	cs.mu.Lock()
	for {
		if cs.committed {
			cs.mu.Unlock()
			return op(cs.attempt)
		}
		a := cs.attempt
		cs.mu.Unlock()
		err := op(a)
		cs.mu.Lock()
		if a != cs.attempt {
			// We started another attempt already.
			continue
		}
		if err == io.EOF {
			<-a.s.Done()
		}
		if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
			onSuccess()
			cs.mu.Unlock()
			return err
		}
		if err := cs.retryLocked(err); err != nil {
			cs.mu.Unlock()
			return err
		}
	}
553 554 555
}

func (cs *clientStream) Header() (metadata.MD, error) {
Ben Kochie's avatar
Ben Kochie committed
556 557 558 559 560 561
	var m metadata.MD
	err := cs.withRetry(func(a *csAttempt) error {
		var err error
		m, err = a.s.Header()
		return toRPCErr(err)
	}, cs.commitAttemptLocked)
562
	if err != nil {
Ben Kochie's avatar
Ben Kochie committed
563
		cs.finish(err)
564 565 566 567 568
	}
	return m, err
}

func (cs *clientStream) Trailer() metadata.MD {
Ben Kochie's avatar
Ben Kochie committed
569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603
	// On RPC failure, we never need to retry, because usage requires that
	// RecvMsg() returned a non-nil error before calling this function is valid.
	// We would have retried earlier if necessary.
	//
	// Commit the attempt anyway, just in case users are not following those
	// directions -- it will prevent races and should not meaningfully impact
	// performance.
	cs.commitAttempt()
	if cs.attempt.s == nil {
		return nil
	}
	return cs.attempt.s.Trailer()
}

func (cs *clientStream) replayBufferLocked() error {
	a := cs.attempt
	for _, f := range cs.buffer {
		if err := f(a); err != nil {
			return err
		}
	}
	return nil
}

func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) {
	// Note: we still will buffer if retry is disabled (for transparent retries).
	if cs.committed {
		return
	}
	cs.bufferSize += sz
	if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize {
		cs.commitAttemptLocked()
		return
	}
	cs.buffer = append(cs.buffer, op)
604 605 606
}

func (cs *clientStream) SendMsg(m interface{}) (err error) {
Ben Kochie's avatar
Ben Kochie committed
607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643
	defer func() {
		if err != nil && err != io.EOF {
			// Call finish on the client stream for errors generated by this SendMsg
			// call, as these indicate problems created by this client.  (Transport
			// errors are converted to an io.EOF error in csAttempt.sendMsg; the real
			// error will be returned from RecvMsg eventually in that case, or be
			// retried.)
			cs.finish(err)
		}
	}()
	if cs.sentLast {
		return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
	}
	if !cs.desc.ClientStreams {
		cs.sentLast = true
	}
	data, err := encode(cs.codec, m)
	if err != nil {
		return err
	}
	compData, err := compress(data, cs.cp, cs.comp)
	if err != nil {
		return err
	}
	hdr, payload := msgHeader(data, compData)
	// TODO(dfawley): should we be checking len(data) instead?
	if len(payload) > *cs.callInfo.maxSendMessageSize {
		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
	}
	op := func(a *csAttempt) error {
		err := a.sendMsg(m, hdr, payload, data)
		// nil out the message and uncomp when replaying; they are only needed for
		// stats which is disabled for subsequent attempts.
		m, data = nil, nil
		return err
	}
	return cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
Ben Kochie's avatar
Ben Kochie committed
644 645
}

Ben Kochie's avatar
Ben Kochie committed
646 647 648 649 650 651 652 653 654
func (cs *clientStream) RecvMsg(m interface{}) error {
	err := cs.withRetry(func(a *csAttempt) error {
		return a.recvMsg(m)
	}, cs.commitAttemptLocked)
	if err != nil || !cs.desc.ServerStreams {
		// err != nil or non-server-streaming indicates end of stream.
		cs.finish(err)
	}
	return err
Ben Kochie's avatar
Ben Kochie committed
655 656 657
}

func (cs *clientStream) CloseSend() error {
Ben Kochie's avatar
Ben Kochie committed
658 659 660 661 662 663 664 665 666 667 668 669 670 671 672
	if cs.sentLast {
		// TODO: return an error and finish the stream instead, due to API misuse?
		return nil
	}
	cs.sentLast = true
	op := func(a *csAttempt) error {
		a.t.Write(a.s, nil, nil, &transport.Options{Last: true})
		// Always return nil; io.EOF is the only error that might make sense
		// instead, but there is no need to signal the client to call RecvMsg
		// as the only use left for the stream after CloseSend is to call
		// RecvMsg.  This also matches historical behavior.
		return nil
	}
	cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
	// We never returned an error here for reasons.
Ben Kochie's avatar
Ben Kochie committed
673 674 675 676 677 678 679 680 681 682
	return nil
}

func (cs *clientStream) finish(err error) {
	if err == io.EOF {
		// Ending a stream with EOF indicates a success.
		err = nil
	}
	cs.mu.Lock()
	if cs.finished {
683
		cs.mu.Unlock()
Ben Kochie's avatar
Ben Kochie committed
684 685 686
		return
	}
	cs.finished = true
Ben Kochie's avatar
Ben Kochie committed
687
	cs.commitAttemptLocked()
Ben Kochie's avatar
Ben Kochie committed
688
	cs.mu.Unlock()
Ben Kochie's avatar
Ben Kochie committed
689 690 691
	if err == nil {
		cs.retryThrottler.successfulRPC()
	}
Ben Kochie's avatar
Ben Kochie committed
692 693 694 695 696 697
	if channelz.IsOn() {
		if err != nil {
			cs.cc.incrCallsFailed()
		} else {
			cs.cc.incrCallsSucceeded()
		}
698
	}
Ben Kochie's avatar
Ben Kochie committed
699 700 701 702 703 704 705 706
	if cs.attempt != nil {
		cs.attempt.finish(err)
	}
	// after functions all rely upon having a stream.
	if cs.attempt.s != nil {
		for _, o := range cs.opts {
			o.after(cs.callInfo)
		}
Ben Kochie's avatar
Ben Kochie committed
707 708 709 710
	}
	cs.cancel()
}

Ben Kochie's avatar
Ben Kochie committed
711
func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
Ben Kochie's avatar
Ben Kochie committed
712 713 714 715 716
	cs := a.cs
	if EnableTracing {
		a.mu.Lock()
		if a.trInfo.tr != nil {
			a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
717
		}
Ben Kochie's avatar
Ben Kochie committed
718
		a.mu.Unlock()
719
	}
Ben Kochie's avatar
Ben Kochie committed
720 721 722 723 724 725 726 727
	if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
		if !cs.desc.ClientStreams {
			// For non-client-streaming RPCs, we return nil instead of EOF on error
			// because the generated code requires it.  finish is not called; RecvMsg()
			// will call it with the stream's status independently.
			return nil
		}
		return io.EOF
728
	}
Ben Kochie's avatar
Ben Kochie committed
729 730
	if a.statsHandler != nil {
		a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
731
	}
Ben Kochie's avatar
Ben Kochie committed
732 733
	if channelz.IsOn() {
		a.t.IncrMsgSent()
Ben Kochie's avatar
Ben Kochie committed
734
	}
Ben Kochie's avatar
Ben Kochie committed
735
	return nil
736 737
}

Ben Kochie's avatar
Ben Kochie committed
738 739
func (a *csAttempt) recvMsg(m interface{}) (err error) {
	cs := a.cs
740
	var inPayload *stats.InPayload
Ben Kochie's avatar
Ben Kochie committed
741
	if a.statsHandler != nil {
742 743 744 745
		inPayload = &stats.InPayload{
			Client: true,
		}
	}
Ben Kochie's avatar
Ben Kochie committed
746
	if !a.decompSet {
Jacob Vosmaer's avatar
Jacob Vosmaer committed
747
		// Block until we receive headers containing received message encoding.
Ben Kochie's avatar
Ben Kochie committed
748 749
		if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity {
			if a.dc == nil || a.dc.Type() != ct {
Jacob Vosmaer's avatar
Jacob Vosmaer committed
750 751
				// No configured decompressor, or it does not match the incoming
				// message encoding; attempt to find a registered compressor that does.
Ben Kochie's avatar
Ben Kochie committed
752 753
				a.dc = nil
				a.decomp = encoding.GetCompressor(ct)
Jacob Vosmaer's avatar
Jacob Vosmaer committed
754 755 756
			}
		} else {
			// No compression is used; disable our decompressor.
Ben Kochie's avatar
Ben Kochie committed
757
			a.dc = nil
Jacob Vosmaer's avatar
Jacob Vosmaer committed
758 759
		}
		// Only initialize this state once per stream.
Ben Kochie's avatar
Ben Kochie committed
760
		a.decompSet = true
761
	}
Ben Kochie's avatar
Ben Kochie committed
762
	err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, inPayload, a.decomp)
Ben Kochie's avatar
Ben Kochie committed
763
	if err != nil {
764
		if err == io.EOF {
Ben Kochie's avatar
Ben Kochie committed
765 766
			if statusErr := a.s.Status().Err(); statusErr != nil {
				return statusErr
767
			}
Ben Kochie's avatar
Ben Kochie committed
768
			return io.EOF // indicates successful end of stream.
769 770 771
		}
		return toRPCErr(err)
	}
Ben Kochie's avatar
Ben Kochie committed
772 773 774 775
	if EnableTracing {
		a.mu.Lock()
		if a.trInfo.tr != nil {
			a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
776
		}
Ben Kochie's avatar
Ben Kochie committed
777
		a.mu.Unlock()
778
	}
Ben Kochie's avatar
Ben Kochie committed
779
	if inPayload != nil {
Ben Kochie's avatar
Ben Kochie committed
780
		a.statsHandler.HandleRPC(cs.ctx, inPayload)
Ben Kochie's avatar
Ben Kochie committed
781 782 783 784 785 786
	}
	if channelz.IsOn() {
		a.t.IncrMsgRecv()
	}
	if cs.desc.ServerStreams {
		// Subsequent messages should be received by subsequent RecvMsg calls.
787 788
		return nil
	}
Ben Kochie's avatar
Ben Kochie committed
789 790 791

	// Special handling for non-server-stream rpcs.
	// This recv expects EOF or errors, so we don't collect inPayload.
Ben Kochie's avatar
Ben Kochie committed
792
	err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
Ben Kochie's avatar
Ben Kochie committed
793 794
	if err == nil {
		return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
795
	}
Ben Kochie's avatar
Ben Kochie committed
796 797 798 799
	if err == io.EOF {
		return a.s.Status().Err() // non-server streaming Recv returns nil on success
	}
	return toRPCErr(err)
800 801
}

Ben Kochie's avatar
Ben Kochie committed
802 803
func (a *csAttempt) finish(err error) {
	a.mu.Lock()
Ben Kochie's avatar
Ben Kochie committed
804 805 806 807 808 809 810 811 812 813 814 815
	if a.finished {
		a.mu.Unlock()
		return
	}
	a.finished = true
	if err == io.EOF {
		// Ending a stream with EOF indicates a success.
		err = nil
	}
	if a.s != nil {
		a.t.CloseStream(a.s, err)
	}
Ben Kochie's avatar
Ben Kochie committed
816 817

	if a.done != nil {
Ben Kochie's avatar
Ben Kochie committed
818 819 820 821 822 823
		br := false
		var tr metadata.MD
		if a.s != nil {
			br = a.s.BytesReceived()
			tr = a.s.Trailer()
		}
Ben Kochie's avatar
Ben Kochie committed
824
		a.done(balancer.DoneInfo{
Jacob Vosmaer's avatar
Jacob Vosmaer committed
825
			Err:           err,
Ben Kochie's avatar
Ben Kochie committed
826 827 828
			Trailer:       tr,
			BytesSent:     a.s != nil,
			BytesReceived: br,
829 830
		})
	}
Ben Kochie's avatar
Ben Kochie committed
831
	if a.statsHandler != nil {
832
		end := &stats.End{
Ben Kochie's avatar
Ben Kochie committed
833
			Client:    true,
Ben Kochie's avatar
Ben Kochie committed
834
			BeginTime: a.cs.beginTime,
Ben Kochie's avatar
Ben Kochie committed
835 836
			EndTime:   time.Now(),
			Error:     err,
837
		}
Ben Kochie's avatar
Ben Kochie committed
838
		a.statsHandler.HandleRPC(a.cs.ctx, end)
839
	}
Ben Kochie's avatar
Ben Kochie committed
840 841 842
	if a.trInfo.tr != nil {
		if err == nil {
			a.trInfo.tr.LazyPrintf("RPC: [OK]")
843
		} else {
Ben Kochie's avatar
Ben Kochie committed
844 845
			a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
			a.trInfo.tr.SetError()
846
		}
Ben Kochie's avatar
Ben Kochie committed
847 848
		a.trInfo.tr.Finish()
		a.trInfo.tr = nil
849
	}
Ben Kochie's avatar
Ben Kochie committed
850
	a.mu.Unlock()
851 852
}

Ben Kochie's avatar
Ben Kochie committed
853 854 855 856
// ServerStream defines the server-side behavior of a streaming RPC.
//
// All errors returned from ServerStream methods are compatible with the
// status package.
857 858 859 860 861 862 863 864 865 866 867 868 869 870 871
type ServerStream interface {
	// SetHeader sets the header metadata. It may be called multiple times.
	// When call multiple times, all the provided metadata will be merged.
	// All the metadata will be sent out when one of the following happens:
	//  - ServerStream.SendHeader() is called;
	//  - The first response is sent out;
	//  - An RPC status is sent out (error or success).
	SetHeader(metadata.MD) error
	// SendHeader sends the header metadata.
	// The provided md and headers set by SetHeader() will be sent.
	// It fails if called multiple times.
	SendHeader(metadata.MD) error
	// SetTrailer sets the trailer metadata which will be sent with the RPC status.
	// When called more than once, all the provided metadata will be merged.
	SetTrailer(metadata.MD)
Ben Kochie's avatar
Ben Kochie committed
872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897
	// Context returns the context for this stream.
	Context() context.Context
	// SendMsg sends a message. On error, SendMsg aborts the stream and the
	// error is returned directly.
	//
	// SendMsg blocks until:
	//   - There is sufficient flow control to schedule m with the transport, or
	//   - The stream is done, or
	//   - The stream breaks.
	//
	// SendMsg does not wait until the message is received by the client. An
	// untimely stream closure may result in lost messages.
	//
	// It is safe to have a goroutine calling SendMsg and another goroutine
	// calling RecvMsg on the same stream at the same time, but it is not safe
	// to call SendMsg on the same stream in different goroutines.
	SendMsg(m interface{}) error
	// RecvMsg blocks until it receives a message into m or the stream is
	// done. It returns io.EOF when the client has performed a CloseSend. On
	// any non-EOF error, the stream is aborted and the error contains the
	// RPC status.
	//
	// It is safe to have a goroutine calling SendMsg and another goroutine
	// calling RecvMsg on the same stream at the same time, but it is not
	// safe to call RecvMsg on the same stream in different goroutines.
	RecvMsg(m interface{}) error
898 899 900 901
}

// serverStream implements a server side Stream.
type serverStream struct {
Ben Kochie's avatar
Ben Kochie committed
902
	ctx   context.Context
Jacob Vosmaer's avatar
Jacob Vosmaer committed
903 904 905
	t     transport.ServerTransport
	s     *transport.Stream
	p     *parser
Ben Kochie's avatar
Ben Kochie committed
906
	codec baseCodec
Jacob Vosmaer's avatar
Jacob Vosmaer committed
907 908 909 910 911 912

	cp     Compressor
	dc     Decompressor
	comp   encoding.Compressor
	decomp encoding.Compressor

913 914 915
	maxReceiveMessageSize int
	maxSendMessageSize    int
	trInfo                *traceInfo
916 917 918 919 920 921 922

	statsHandler stats.Handler

	mu sync.Mutex // protects trInfo.tr after the service handler runs.
}

func (ss *serverStream) Context() context.Context {
Ben Kochie's avatar
Ben Kochie committed
923
	return ss.ctx
924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957
}

func (ss *serverStream) SetHeader(md metadata.MD) error {
	if md.Len() == 0 {
		return nil
	}
	return ss.s.SetHeader(md)
}

func (ss *serverStream) SendHeader(md metadata.MD) error {
	return ss.t.WriteHeader(ss.s, md)
}

func (ss *serverStream) SetTrailer(md metadata.MD) {
	if md.Len() == 0 {
		return
	}
	ss.s.SetTrailer(md)
}

func (ss *serverStream) SendMsg(m interface{}) (err error) {
	defer func() {
		if ss.trInfo != nil {
			ss.mu.Lock()
			if ss.trInfo.tr != nil {
				if err == nil {
					ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
				} else {
					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
					ss.trInfo.tr.SetError()
				}
			}
			ss.mu.Unlock()
		}
958 959 960 961
		if err != nil && err != io.EOF {
			st, _ := status.FromError(toRPCErr(err))
			ss.t.WriteStatus(ss.s, st)
		}
Ben Kochie's avatar
Ben Kochie committed
962 963 964
		if channelz.IsOn() && err == nil {
			ss.t.IncrMsgSent()
		}
965
	}()
Ben Kochie's avatar
Ben Kochie committed
966 967 968
	data, err := encode(ss.codec, m)
	if err != nil {
		return err
969
	}
Ben Kochie's avatar
Ben Kochie committed
970
	compData, err := compress(data, ss.cp, ss.comp)
971 972 973
	if err != nil {
		return err
	}
Ben Kochie's avatar
Ben Kochie committed
974 975 976 977
	hdr, payload := msgHeader(data, compData)
	// TODO(dfawley): should we be checking len(data) instead?
	if len(payload) > ss.maxSendMessageSize {
		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
978
	}
Ben Kochie's avatar
Ben Kochie committed
979
	if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
980 981
		return toRPCErr(err)
	}
Ben Kochie's avatar
Ben Kochie committed
982 983
	if ss.statsHandler != nil {
		ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001
	}
	return nil
}

func (ss *serverStream) RecvMsg(m interface{}) (err error) {
	defer func() {
		if ss.trInfo != nil {
			ss.mu.Lock()
			if ss.trInfo.tr != nil {
				if err == nil {
					ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
				} else if err != io.EOF {
					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
					ss.trInfo.tr.SetError()
				}
			}
			ss.mu.Unlock()
		}
1002 1003 1004 1005
		if err != nil && err != io.EOF {
			st, _ := status.FromError(toRPCErr(err))
			ss.t.WriteStatus(ss.s, st)
		}
Ben Kochie's avatar
Ben Kochie committed
1006 1007 1008
		if channelz.IsOn() && err == nil {
			ss.t.IncrMsgRecv()
		}
1009 1010 1011 1012 1013
	}()
	var inPayload *stats.InPayload
	if ss.statsHandler != nil {
		inPayload = &stats.InPayload{}
	}
Jacob Vosmaer's avatar
Jacob Vosmaer committed
1014
	if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, inPayload, ss.decomp); err != nil {
1015 1016 1017 1018
		if err == io.EOF {
			return err
		}
		if err == io.ErrUnexpectedEOF {
Jacob Vosmaer's avatar
Jacob Vosmaer committed
1019
			err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
1020 1021 1022 1023 1024 1025 1026 1027
		}
		return toRPCErr(err)
	}
	if inPayload != nil {
		ss.statsHandler.HandleRPC(ss.s.Context(), inPayload)
	}
	return nil
}
Jacob Vosmaer's avatar
Jacob Vosmaer committed
1028 1029 1030 1031

// MethodFromServerStream returns the method string for the input stream.
// The returned string is in the format of "/service/method".
func MethodFromServerStream(stream ServerStream) (string, bool) {
Ben Kochie's avatar
Ben Kochie committed
1032
	return Method(stream.Context())
Jacob Vosmaer's avatar
Jacob Vosmaer committed
1033
}