pipe_test.go 9.06 KB
Newer Older
Russ Cox's avatar
io.Pipe  
Russ Cox committed
1 2 3 4
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

5
package io_test
Russ Cox's avatar
io.Pipe  
Russ Cox committed
6 7

import (
Joe Tsai's avatar
Joe Tsai committed
8
	"bytes"
9 10
	"fmt"
	. "io"
Joe Tsai's avatar
Joe Tsai committed
11 12
	"sort"
	"strings"
13 14
	"testing"
	"time"
Russ Cox's avatar
io.Pipe  
Russ Cox committed
15 16
)

17
func checkWrite(t *testing.T, w Writer, data []byte, c chan int) {
18
	n, err := w.Write(data)
Russ Cox's avatar
io.Pipe  
Russ Cox committed
19
	if err != nil {
20
		t.Errorf("write: %v", err)
Russ Cox's avatar
io.Pipe  
Russ Cox committed
21 22
	}
	if n != len(data) {
23
		t.Errorf("short write: %d != %d", n, len(data))
Russ Cox's avatar
io.Pipe  
Russ Cox committed
24
	}
25
	c <- 0
Russ Cox's avatar
io.Pipe  
Russ Cox committed
26 27 28 29
}

// Test a single read/write pair.
func TestPipe1(t *testing.T) {
30 31 32
	c := make(chan int)
	r, w := Pipe()
	var buf = make([]byte, 64)
Russ Cox's avatar
Russ Cox committed
33
	go checkWrite(t, w, []byte("hello, world"), c)
34
	n, err := r.Read(buf)
Russ Cox's avatar
io.Pipe  
Russ Cox committed
35
	if err != nil {
36
		t.Errorf("read: %v", err)
37
	} else if n != 12 || string(buf[0:12]) != "hello, world" {
38
		t.Errorf("bad read: got %q", buf[0:n])
Russ Cox's avatar
io.Pipe  
Russ Cox committed
39
	}
40 41 42
	<-c
	r.Close()
	w.Close()
Russ Cox's avatar
io.Pipe  
Russ Cox committed
43 44
}

45
func reader(t *testing.T, r Reader, c chan int) {
46
	var buf = make([]byte, 64)
Russ Cox's avatar
io.Pipe  
Russ Cox committed
47
	for {
48
		n, err := r.Read(buf)
Russ Cox's avatar
Russ Cox committed
49
		if err == EOF {
50 51
			c <- 0
			break
52
		}
Russ Cox's avatar
io.Pipe  
Russ Cox committed
53
		if err != nil {
54
			t.Errorf("read: %v", err)
Russ Cox's avatar
io.Pipe  
Russ Cox committed
55
		}
56
		c <- n
Russ Cox's avatar
io.Pipe  
Russ Cox committed
57 58 59 60 61
	}
}

// Test a sequence of read/write pairs.
func TestPipe2(t *testing.T) {
62 63 64 65
	c := make(chan int)
	r, w := Pipe()
	go reader(t, r, c)
	var buf = make([]byte, 64)
Russ Cox's avatar
io.Pipe  
Russ Cox committed
66
	for i := 0; i < 5; i++ {
67 68
		p := buf[0 : 5+i*10]
		n, err := w.Write(p)
Russ Cox's avatar
io.Pipe  
Russ Cox committed
69
		if n != len(p) {
70
			t.Errorf("wrote %d, got %d", len(p), n)
Russ Cox's avatar
io.Pipe  
Russ Cox committed
71 72
		}
		if err != nil {
73
			t.Errorf("write: %v", err)
Russ Cox's avatar
io.Pipe  
Russ Cox committed
74
		}
75
		nn := <-c
Russ Cox's avatar
io.Pipe  
Russ Cox committed
76
		if nn != n {
77
			t.Errorf("wrote %d, read got %d", n, nn)
Russ Cox's avatar
io.Pipe  
Russ Cox committed
78 79
		}
	}
80 81
	w.Close()
	nn := <-c
Russ Cox's avatar
io.Pipe  
Russ Cox committed
82
	if nn != 0 {
83
		t.Errorf("final read got %d", nn)
Russ Cox's avatar
io.Pipe  
Russ Cox committed
84 85 86
	}
}

87
type pipeReturn struct {
88
	n   int
Russ Cox's avatar
Russ Cox committed
89
	err error
90 91
}

Russ Cox's avatar
io.Pipe  
Russ Cox committed
92
// Test a large write that requires multiple reads to satisfy.
93
func writer(w WriteCloser, buf []byte, c chan pipeReturn) {
94 95 96
	n, err := w.Write(buf)
	w.Close()
	c <- pipeReturn{n, err}
Russ Cox's avatar
io.Pipe  
Russ Cox committed
97 98 99
}

func TestPipe3(t *testing.T) {
100 101 102
	c := make(chan pipeReturn)
	r, w := Pipe()
	var wdat = make([]byte, 128)
Russ Cox's avatar
io.Pipe  
Russ Cox committed
103
	for i := 0; i < len(wdat); i++ {
104
		wdat[i] = byte(i)
Russ Cox's avatar
io.Pipe  
Russ Cox committed
105
	}
106 107 108
	go writer(w, wdat, c)
	var rdat = make([]byte, 1024)
	tot := 0
Russ Cox's avatar
io.Pipe  
Russ Cox committed
109
	for n := 1; n <= 256; n *= 2 {
110
		nn, err := r.Read(rdat[tot : tot+n])
Russ Cox's avatar
Russ Cox committed
111
		if err != nil && err != EOF {
112
			t.Fatalf("read: %v", err)
Russ Cox's avatar
io.Pipe  
Russ Cox committed
113 114 115
		}

		// only final two reads should be short - 1 byte, then 0
116
		expect := n
Russ Cox's avatar
io.Pipe  
Russ Cox committed
117
		if n == 128 {
118
			expect = 1
Russ Cox's avatar
io.Pipe  
Russ Cox committed
119
		} else if n == 256 {
120
			expect = 0
Russ Cox's avatar
Russ Cox committed
121
			if err != EOF {
122
				t.Fatalf("read at end: %v", err)
123
			}
Russ Cox's avatar
io.Pipe  
Russ Cox committed
124 125
		}
		if nn != expect {
126
			t.Fatalf("read %d, expected %d, got %d", n, expect, nn)
Russ Cox's avatar
io.Pipe  
Russ Cox committed
127
		}
128
		tot += nn
Russ Cox's avatar
io.Pipe  
Russ Cox committed
129
	}
130
	pr := <-c
Russ Cox's avatar
io.Pipe  
Russ Cox committed
131
	if pr.n != 128 || pr.err != nil {
132
		t.Fatalf("write 128: %d, %v", pr.n, pr.err)
Russ Cox's avatar
io.Pipe  
Russ Cox committed
133 134
	}
	if tot != 128 {
135
		t.Fatalf("total read %d != 128", tot)
Russ Cox's avatar
io.Pipe  
Russ Cox committed
136 137 138
	}
	for i := 0; i < 128; i++ {
		if rdat[i] != byte(i) {
139
			t.Fatalf("rdat[%d] = %d", i, rdat[i])
Russ Cox's avatar
io.Pipe  
Russ Cox committed
140 141 142 143 144 145
		}
	}
}

// Test read after/before writer close.

146
type closer interface {
Russ Cox's avatar
Russ Cox committed
147 148
	CloseWithError(error) error
	Close() error
Russ Cox's avatar
io.Pipe  
Russ Cox committed
149 150
}

151
type pipeTest struct {
152
	async          bool
Russ Cox's avatar
Russ Cox committed
153
	err            error
154
	closeWithError bool
Russ Cox's avatar
io.Pipe  
Russ Cox committed
155 156
}

157
func (p pipeTest) String() string {
158
	return fmt.Sprintf("async=%v err=%v closeWithError=%v", p.async, p.err, p.closeWithError)
159
}
Russ Cox's avatar
io.Pipe  
Russ Cox committed
160

161
var pipeTests = []pipeTest{
Robert Griesemer's avatar
Robert Griesemer committed
162 163 164 165 166 167
	{true, nil, false},
	{true, nil, true},
	{true, ErrShortWrite, true},
	{false, nil, false},
	{false, nil, true},
	{false, ErrShortWrite, true},
168 169 170
}

func delayClose(t *testing.T, cl closer, ch chan int, tt pipeTest) {
171
	time.Sleep(1 * time.Millisecond)
Russ Cox's avatar
Russ Cox committed
172
	var err error
173
	if tt.closeWithError {
174
		err = cl.CloseWithError(tt.err)
Russ Cox's avatar
io.Pipe  
Russ Cox committed
175
	} else {
176
		err = cl.Close()
Russ Cox's avatar
io.Pipe  
Russ Cox committed
177
	}
178
	if err != nil {
179
		t.Errorf("delayClose: %v", err)
Russ Cox's avatar
io.Pipe  
Russ Cox committed
180
	}
181
	ch <- 0
Russ Cox's avatar
io.Pipe  
Russ Cox committed
182 183
}

184 185
func TestPipeReadClose(t *testing.T) {
	for _, tt := range pipeTests {
186 187
		c := make(chan int, 1)
		r, w := Pipe()
188
		if tt.async {
189
			go delayClose(t, w, c, tt)
190
		} else {
191
			delayClose(t, w, c, tt)
192
		}
193 194 195 196
		var buf = make([]byte, 64)
		n, err := r.Read(buf)
		<-c
		want := tt.err
197
		if want == nil {
Russ Cox's avatar
Russ Cox committed
198
			want = EOF
199 200
		}
		if err != want {
201
			t.Errorf("read from closed pipe: %v want %v", err, want)
202 203
		}
		if n != 0 {
204
			t.Errorf("read on closed pipe returned %d", n)
205 206
		}
		if err = r.Close(); err != nil {
207
			t.Errorf("r.Close: %v", err)
208 209
		}
	}
Russ Cox's avatar
io.Pipe  
Russ Cox committed
210 211
}

212 213 214 215 216 217 218
// Test close on Read side during Read.
func TestPipeReadClose2(t *testing.T) {
	c := make(chan int, 1)
	r, _ := Pipe()
	go delayClose(t, r, c, pipeTest{})
	n, err := r.Read(make([]byte, 64))
	<-c
Russ Cox's avatar
Russ Cox committed
219 220
	if n != 0 || err != ErrClosedPipe {
		t.Errorf("read from closed pipe: %v, %v want %v, %v", n, err, 0, ErrClosedPipe)
221 222 223
	}
}

224
// Test write after/before reader close.
Russ Cox's avatar
io.Pipe  
Russ Cox committed
225

226 227
func TestPipeWriteClose(t *testing.T) {
	for _, tt := range pipeTests {
228 229
		c := make(chan int, 1)
		r, w := Pipe()
230
		if tt.async {
231
			go delayClose(t, r, c, tt)
232
		} else {
233
			delayClose(t, r, c, tt)
234
		}
235 236 237
		n, err := WriteString(w, "hello, world")
		<-c
		expect := tt.err
238
		if expect == nil {
Russ Cox's avatar
Russ Cox committed
239
			expect = ErrClosedPipe
240 241
		}
		if err != expect {
242
			t.Errorf("write on closed pipe: %v want %v", err, expect)
243 244
		}
		if n != 0 {
245
			t.Errorf("write on closed pipe returned %d", n)
246 247
		}
		if err = w.Close(); err != nil {
248
			t.Errorf("w.Close: %v", err)
249 250
		}
	}
Russ Cox's avatar
io.Pipe  
Russ Cox committed
251
}
Russ Cox's avatar
Russ Cox committed
252

253 254 255 256 257 258 259 260 261 262 263 264
// Test close on Write side during Write.
func TestPipeWriteClose2(t *testing.T) {
	c := make(chan int, 1)
	_, w := Pipe()
	go delayClose(t, w, c, pipeTest{})
	n, err := w.Write(make([]byte, 64))
	<-c
	if n != 0 || err != ErrClosedPipe {
		t.Errorf("write to closed pipe: %v, %v want %v, %v", n, err, 0, ErrClosedPipe)
	}
}

Russ Cox's avatar
Russ Cox committed
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
func TestWriteEmpty(t *testing.T) {
	r, w := Pipe()
	go func() {
		w.Write([]byte{})
		w.Close()
	}()
	var b [2]byte
	ReadFull(r, b[0:2])
	r.Close()
}

func TestWriteNil(t *testing.T) {
	r, w := Pipe()
	go func() {
		w.Write(nil)
		w.Close()
	}()
	var b [2]byte
	ReadFull(r, b[0:2])
	r.Close()
}
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317

func TestWriteAfterWriterClose(t *testing.T) {
	r, w := Pipe()

	done := make(chan bool)
	var writeErr error
	go func() {
		_, err := w.Write([]byte("hello"))
		if err != nil {
			t.Errorf("got error: %q; expected none", err)
		}
		w.Close()
		_, writeErr = w.Write([]byte("world"))
		done <- true
	}()

	buf := make([]byte, 100)
	var result string
	n, err := ReadFull(r, buf)
	if err != nil && err != ErrUnexpectedEOF {
		t.Fatalf("got: %q; want: %q", err, ErrUnexpectedEOF)
	}
	result = string(buf[0:n])
	<-done

	if result != "hello" {
		t.Errorf("got: %q; want: %q", result, "hello")
	}
	if writeErr != ErrClosedPipe {
		t.Errorf("got: %q; want: %q", writeErr, ErrClosedPipe)
	}
}
Joe Tsai's avatar
Joe Tsai committed
318

319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
func TestPipeCloseError(t *testing.T) {
	type testError1 struct{ error }
	type testError2 struct{ error }

	r, w := Pipe()
	r.CloseWithError(testError1{})
	if _, err := w.Write(nil); err != (testError1{}) {
		t.Errorf("Write error: got %T, want testError1", err)
	}
	r.CloseWithError(testError2{})
	if _, err := w.Write(nil); err != (testError2{}) {
		t.Errorf("Write error: got %T, want testError2", err)
	}

	r, w = Pipe()
	w.CloseWithError(testError1{})
	if _, err := r.Read(nil); err != (testError1{}) {
		t.Errorf("Read error: got %T, want testError1", err)
	}
	w.CloseWithError(testError2{})
	if _, err := r.Read(nil); err != (testError2{}) {
		t.Errorf("Read error: got %T, want testError2", err)
	}
}

Joe Tsai's avatar
Joe Tsai committed
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423
func TestPipeConcurrent(t *testing.T) {
	const (
		input    = "0123456789abcdef"
		count    = 8
		readSize = 2
	)

	t.Run("Write", func(t *testing.T) {
		r, w := Pipe()

		for i := 0; i < count; i++ {
			go func() {
				time.Sleep(time.Millisecond) // Increase probability of race
				if n, err := w.Write([]byte(input)); n != len(input) || err != nil {
					t.Errorf("Write() = (%d, %v); want (%d, nil)", n, err, len(input))
				}
			}()
		}

		buf := make([]byte, count*len(input))
		for i := 0; i < len(buf); i += readSize {
			if n, err := r.Read(buf[i : i+readSize]); n != readSize || err != nil {
				t.Errorf("Read() = (%d, %v); want (%d, nil)", n, err, readSize)
			}
		}

		// Since each Write is fully gated, if multiple Read calls were needed,
		// the contents of Write should still appear together in the output.
		got := string(buf)
		want := strings.Repeat(input, count)
		if got != want {
			t.Errorf("got: %q; want: %q", got, want)
		}
	})

	t.Run("Read", func(t *testing.T) {
		r, w := Pipe()

		c := make(chan []byte, count*len(input)/readSize)
		for i := 0; i < cap(c); i++ {
			go func() {
				time.Sleep(time.Millisecond) // Increase probability of race
				buf := make([]byte, readSize)
				if n, err := r.Read(buf); n != readSize || err != nil {
					t.Errorf("Read() = (%d, %v); want (%d, nil)", n, err, readSize)
				}
				c <- buf
			}()
		}

		for i := 0; i < count; i++ {
			if n, err := w.Write([]byte(input)); n != len(input) || err != nil {
				t.Errorf("Write() = (%d, %v); want (%d, nil)", n, err, len(input))
			}
		}

		// Since each read is independent, the only guarantee about the output
		// is that it is a permutation of the input in readSized groups.
		got := make([]byte, 0, count*len(input))
		for i := 0; i < cap(c); i++ {
			got = append(got, (<-c)...)
		}
		got = sortBytesInGroups(got, readSize)
		want := bytes.Repeat([]byte(input), count)
		want = sortBytesInGroups(want, readSize)
		if string(got) != string(want) {
			t.Errorf("got: %q; want: %q", got, want)
		}
	})
}

func sortBytesInGroups(b []byte, n int) []byte {
	var groups [][]byte
	for len(b) > 0 {
		groups = append(groups, b[:n])
		b = b[n:]
	}
	sort.Slice(groups, func(i, j int) bool { return bytes.Compare(groups[i], groups[j]) < 0 })
	return bytes.Join(groups, nil)
}