Commit 532faec4 authored by Mitchell Hashimoto's avatar Mitchell Hashimoto

packer: New Communicator interface is much simplified

parent ffcb7afb
package packer
import (
"bufio"
"io"
"log"
"sync"
"time"
)
// RemoteCmd represents a remote command being prepared or run.
type RemoteCmd struct {
// Command is the command to run remotely. This is executed as if
// it were a shell command, so you are expected to do any shell escaping
// necessary.
Command string
// Stdin specifies the process's standard input. If Stdin is
// nil, the process reads from an empty bytes.Buffer.
Stdin io.Reader
// Stdout and Stderr represent the process's standard output and
// error.
//
// If either is nil, it will be set to ioutil.Discard.
Stdout io.Writer
Stderr io.Writer
// This will be set to true when the remote command has exited. It
// shouldn't be set manually by the user, but there is no harm in
// doing so.
Exited bool
// Once Exited is true, this will contain the exit code of the process.
ExitStatus int
}
// A Communicator is the interface used to communicate with the machine
// that exists that will eventually be packaged into an image. Communicators
// allow you to execute remote commands, upload files, etc.
......@@ -15,154 +38,7 @@ import (
// Communicators must be safe for concurrency, meaning multiple calls to
// Start or any other method may be called at the same time.
type Communicator interface {
Start(string) (*RemoteCommand, error)
Start(*RemoteCmd) error
Upload(string, io.Reader) error
Download(string, io.Writer) error
}
// This struct contains some information about the remote command being
// executed and can be used to wait for it to complete.
//
// Stdin, Stdout, Stderr are readers and writers to varios IO streams for
// the remote command.
//
// Exited is false until Wait is called. It can be used to check if Wait
// has already been called.
//
// ExitStatus is the exit code of the remote process. It is only available
// once Wait is called.
type RemoteCommand struct {
Stdin io.Writer
Stdout io.Reader
Stderr io.Reader
Exited bool
ExitStatus int
exitChans []chan<- int
exitChanLock sync.Mutex
errChans []chan<- string
errChanLock sync.Mutex
outChans []chan<- string
outChanLock sync.Mutex
}
// StderrStream returns a channel that will be sent all the
func (r *RemoteCommand) StderrChan() <-chan string {
r.errChanLock.Lock()
defer r.errChanLock.Unlock()
// If no channels have been made, make that slice and start
// the goroutine to read and send to them
if r.errChans == nil {
r.errChans = make([]chan<- string, 0, 5)
go r.channelReader(r.Stderr, &r.errChans, &r.errChanLock)
}
// Create the channel, append it to the channels we care about
errChan := make(chan string, 10)
r.errChans = append(r.errChans, errChan)
return errChan
}
// StdoutStream returns a channel that will be sent all the output
// of stdout as it comes. The output isn't guaranteed to be a full line.
// When the channel is closed, the process is exited.
func (r *RemoteCommand) StdoutChan() <-chan string {
r.outChanLock.Lock()
defer r.outChanLock.Unlock()
// If no output channels have been made yet, then make that slice
// and start the goroutine to read and send to them.
if r.outChans == nil {
r.outChans = make([]chan<- string, 0, 5)
go r.channelReader(r.Stdout, &r.outChans, &r.outChanLock)
}
// Create the channel, append it to the channels we care about
outChan := make(chan string, 10)
r.outChans = append(r.outChans, outChan)
return outChan
}
// ExitChan returns a channel that will be sent the exit status once
// the process exits. This can be used in cases such a select statement
// waiting on the process to end.
func (r *RemoteCommand) ExitChan() <-chan int {
r.exitChanLock.Lock()
defer r.exitChanLock.Unlock()
// If we haven't made any channels yet, make that slice
if r.exitChans == nil {
r.exitChans = make([]chan<- int, 0, 5)
go func() {
// Wait for the command to finish
r.Wait()
// Grab the exit chan lock so we can iterate over it and
// message to each channel.
r.exitChanLock.Lock()
defer r.exitChanLock.Unlock()
for _, ch := range r.exitChans {
// Use a select so the send never blocks
select {
case ch <- r.ExitStatus:
default:
log.Println("remote command exit channel wouldn't blocked. Weird.")
}
close(ch)
}
r.exitChans = nil
}()
}
// Append our new channel onto it and return it
exitChan := make(chan int, 1)
r.exitChans = append(r.exitChans, exitChan)
return exitChan
}
// Wait waits for the command to exit.
func (r *RemoteCommand) Wait() {
// Busy wait on being exited. We put a sleep to be kind to the
// Go scheduler, and because we don't really need smaller granularity.
for !r.Exited {
time.Sleep(10 * time.Millisecond)
}
}
// Takes an io.Reader and then writes its data to a slice of channels.
// The channel slice is expected to be protected by the given mutex, and
// after the io.Reader is over, all channels will be closed and the slice
// set to nil.
func (*RemoteCommand) channelReader(r io.Reader, chans *[]chan<- string, chanLock *sync.Mutex) {
buf := bufio.NewReader(r)
var err error
for err != io.EOF {
var data []byte
data, err = buf.ReadSlice('\n')
if len(data) > 0 {
for _, ch := range *chans {
// Note: this blocks if the channel is full (they
// are buffered by default). What to do?
ch <- string(data)
}
}
}
// Clean up the channels by closing them and setting the
// list to nil.
chanLock.Lock()
defer chanLock.Unlock()
for _, ch := range *chans {
close(ch)
}
*chans = nil
}
package packer
import (
"bytes"
"testing"
"time"
)
func TestRemoteCommand_ExitChan(t *testing.T) {
t.Parallel()
rc := &RemoteCommand{}
exitChan := rc.ExitChan()
// Set the exit data so that it is sent
rc.ExitStatus = 42
rc.Exited = true
select {
case exitCode := <-exitChan:
if exitCode != 42 {
t.Fatal("invalid exit code")
}
_, ok := <-exitChan
if ok {
t.Fatal("exit channel should be closed")
}
case <-time.After(500 * time.Millisecond):
t.Fatal("exit channel never sent")
}
}
func TestRemoteCommand_StderrChan(t *testing.T) {
expected := "DATA!!!"
stderrBuf := new(bytes.Buffer)
stderrBuf.WriteString(expected)
rc := &RemoteCommand{}
rc.Stderr = stderrBuf
errChan := rc.StderrChan()
results := new(bytes.Buffer)
for data := range errChan {
results.WriteString(data)
}
if results.String() != expected {
t.Fatalf(
"outputs didn't match:\ngot:\n%s\nexpected:\n%s",
results.String(), stderrBuf.String())
}
}
func TestRemoteCommand_StdoutChan(t *testing.T) {
expected := "DATA!!!"
stdoutBuf := new(bytes.Buffer)
stdoutBuf.WriteString(expected)
rc := &RemoteCommand{}
rc.Stdout = stdoutBuf
outChan := rc.StdoutChan()
results := new(bytes.Buffer)
for data := range outChan {
results.WriteString(data)
}
if results.String() != expected {
t.Fatalf(
"outputs didn't match:\ngot:\n%s\nexpected:\n%s",
results.String(), stdoutBuf.String())
}
}
func TestRemoteCommand_WaitBlocks(t *testing.T) {
t.Parallel()
rc := &RemoteCommand{}
complete := make(chan bool)
// Make a goroutine that never exits. Since this is just in a test,
// this should be okay.
go func() {
rc.Wait()
complete <- true
}()
select {
case <-complete:
t.Fatal("It never should've completed")
case <-time.After(500 * time.Millisecond):
// All is well
}
}
func TestRemoteCommand_WaitCompletes(t *testing.T) {
t.Parallel()
rc := &RemoteCommand{}
complete := make(chan bool)
go func() {
rc.Wait()
complete <- true
}()
// Flag that it completed
rc.Exited = true
select {
case <-complete:
// All is well
case <-time.After(1 * time.Second):
t.Fatal("Timeout waiting for command completion.")
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment