Commit 9262a85e authored by Mitchell Hashimoto's avatar Mitchell Hashimoto

packer/rpc: Communicator.Upload

parent daa431af
......@@ -34,6 +34,11 @@ type CommunicatorStartResponse struct {
RemoteCommandAddress string
}
type CommunicatorUploadArgs struct {
Path string
ReaderAddress string
}
func Communicator(client *rpc.Client) *communicator {
return &communicator{client}
}
......@@ -87,8 +92,30 @@ func (c *communicator) Start(cmd string) (rc *packer.RemoteCommand, err error) {
return
}
func (c *communicator) Upload(string, io.Reader) error {
return nil
func (c *communicator) Upload(path string, r io.Reader) (err error) {
readerL := netListenerInRange(portRangeMin, portRangeMax)
if readerL == nil {
err = errors.New("couldn't allocate listener for upload reader")
return
}
// Make sure at the end of this call, we close the listener
defer readerL.Close()
// Pipe the reader through to the connection
go serveSingleCopy("uploadReader", readerL, nil, r)
args := CommunicatorUploadArgs{
path,
readerL.Addr().String(),
}
cerr := c.client.Call("Communicator.Upload", &args, &err)
if cerr != nil {
err = cerr
}
return
}
func (c *communicator) Download(string, io.Writer) error {
......@@ -133,6 +160,18 @@ func (c *CommunicatorServer) Start(cmd *string, reply *CommunicatorStartResponse
return
}
func (c *CommunicatorServer) Upload(args *CommunicatorUploadArgs, reply *interface{}) (err error) {
readerC, err := net.Dial("tcp", args.ReaderAddress)
if err != nil {
return
}
defer readerC.Close()
err = c.c.Upload(args.Path, readerC)
return
}
func (rc *RemoteCommandServer) Wait(args *interface{}, reply *int) error {
rc.rc.Wait()
*reply = rc.rc.ExitStatus
......
......@@ -18,6 +18,10 @@ type testCommunicator struct {
startErr *io.PipeWriter
startExited *bool
startExitStatus *int
uploadCalled bool
uploadPath string
uploadReader io.Reader
}
func (t *testCommunicator) Start(cmd string) (*packer.RemoteCommand, error) {
......@@ -45,7 +49,11 @@ func (t *testCommunicator) Start(cmd string) (*packer.RemoteCommand, error) {
return rc, nil
}
func (t *testCommunicator) Upload(string, io.Reader) error {
func (t *testCommunicator) Upload(path string, reader io.Reader) error {
t.uploadCalled = true
t.uploadPath = path
t.uploadReader = reader
return nil
}
......@@ -99,6 +107,21 @@ func TestCommunicatorRPC(t *testing.T) {
*c.startExited = true
rc.Wait()
assert.Equal(rc.ExitStatus, 42, "should have proper exit status")
// Test that we can upload things
uploadR, uploadW := io.Pipe()
err = remote.Upload("foo", uploadR)
assert.Nil(err, "should not error")
assert.True(c.uploadCalled, "should be called")
assert.Equal(c.uploadPath, "foo", "should be correct path")
return
// Test the upload reader
uploadW.Write([]byte("uploadfoo\n"))
bufUpR := bufio.NewReader(c.uploadReader)
data, err = bufUpR.ReadString('\n')
assert.Nil(err, "should not error")
assert.Equal(data, "uploadfoo\n", "should have the proper data")
}
func TestCommunicator_ImplementsCommunicator(t *testing.T) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment