Commit 2ac629c9 authored by Mitchell Hashimoto's avatar Mitchell Hashimoto

packer/rpc: get PostProcessor working

parent ec68a3fd
......@@ -15,14 +15,12 @@ type Client struct {
}
func NewClient(rwc io.ReadWriteCloser) (*Client, error) {
// Create the MuxConn around the RWC and get the client to server stream.
// This is the primary stream that we use to communicate with the
// remote RPC server. On the remote side Server.ServeConn also listens
// on this stream ID.
mux := NewMuxConn(rwc)
clientConn, err := mux.Dial(0)
return NewClientWithMux(NewMuxConn(rwc), 0)
}
func NewClientWithMux(mux *MuxConn, streamId uint32) (*Client, error) {
clientConn, err := mux.Dial(streamId)
if err != nil {
mux.Close()
return nil, err
}
......@@ -37,7 +35,7 @@ func (c *Client) Close() error {
return err
}
return c.mux.Close()
return nil
}
func (c *Client) Artifact() packer.Artifact {
......@@ -56,12 +54,13 @@ func (c *Client) Cache() packer.Cache {
func (c *Client) PostProcessor() packer.PostProcessor {
return &postProcessor{
client: c.client,
mux: c.mux,
}
}
func (c *Client) Ui() packer.Ui {
return &Ui{
client: c.client,
client: c.client,
endpoint: DefaultUiEndpoint,
}
}
......@@ -21,7 +21,7 @@ type MuxConn struct {
curId uint32
rwc io.ReadWriteCloser
streams map[uint32]*Stream
mu sync.RWMutex
mu sync.Mutex
wlock sync.Mutex
}
......
......@@ -9,14 +9,14 @@ import (
// executed over an RPC connection.
type postProcessor struct {
client *rpc.Client
server *rpc.Server
mux *MuxConn
}
// PostProcessorServer wraps a packer.PostProcessor implementation and makes it
// exportable as part of a Golang RPC server.
type PostProcessorServer struct {
client *rpc.Client
server *rpc.Server
mux *MuxConn
p packer.PostProcessor
}
......@@ -24,15 +24,10 @@ type PostProcessorConfigureArgs struct {
Configs []interface{}
}
type PostProcessorPostProcessArgs struct {
ArtifactEndpoint string
UiEndpoint string
}
type PostProcessorProcessResponse struct {
Err error
Keep bool
ArtifactEndpoint string
Err error
Keep bool
StreamId uint32
}
func PostProcessor(client *rpc.Client) *postProcessor {
......@@ -49,21 +44,14 @@ func (p *postProcessor) Configure(raw ...interface{}) (err error) {
}
func (p *postProcessor) PostProcess(ui packer.Ui, a packer.Artifact) (packer.Artifact, bool, error) {
artifactEndpoint := registerComponent(p.server, "Artifact", &ArtifactServer{
artifact: a,
}, true)
uiEndpoint := registerComponent(p.server, "Ui", &UiServer{
ui: ui,
}, true)
args := PostProcessorPostProcessArgs{
ArtifactEndpoint: artifactEndpoint,
UiEndpoint: uiEndpoint,
}
nextId := p.mux.NextId()
server := NewServerWithMux(p.mux, nextId)
server.RegisterArtifact(a)
server.RegisterUi(ui)
go server.Serve()
var response PostProcessorProcessResponse
if err := p.client.Call("PostProcessor.PostProcess", &args, &response); err != nil {
if err := p.client.Call("PostProcessor.PostProcess", nextId, &response); err != nil {
return nil, false, err
}
......@@ -71,14 +59,16 @@ func (p *postProcessor) PostProcess(ui packer.Ui, a packer.Artifact) (packer.Art
return nil, false, response.Err
}
if response.ArtifactEndpoint == "" {
if response.StreamId == 0 {
return nil, false, nil
}
return &artifact{
client: p.client,
endpoint: response.ArtifactEndpoint,
}, response.Keep, nil
client, err := NewClientWithMux(p.mux, response.StreamId)
if err != nil {
return nil, false, err
}
return client.Artifact(), response.Keep, nil
}
func (p *PostProcessorServer) Configure(args *PostProcessorConfigureArgs, reply *error) error {
......@@ -90,23 +80,20 @@ func (p *PostProcessorServer) Configure(args *PostProcessorConfigureArgs, reply
return nil
}
func (p *PostProcessorServer) PostProcess(args *PostProcessorPostProcessArgs, reply *PostProcessorProcessResponse) error {
artifact := &artifact{
client: p.client,
endpoint: args.ArtifactEndpoint,
}
ui := &Ui{
client: p.client,
endpoint: args.UiEndpoint,
func (p *PostProcessorServer) PostProcess(streamId uint32, reply *PostProcessorProcessResponse) error {
client, err := NewClientWithMux(p.mux, streamId)
if err != nil {
return NewBasicError(err)
}
defer client.Close()
var artifactEndpoint string
artifactResult, keep, err := p.p.PostProcess(ui, artifact)
streamId = 0
artifactResult, keep, err := p.p.PostProcess(client.Ui(), client.Artifact())
if err == nil && artifactResult != nil {
artifactEndpoint = registerComponent(p.server, "Artifact", &ArtifactServer{
artifact: artifactResult,
}, true)
streamId = p.mux.NextId()
server := NewServerWithMux(p.mux, streamId)
server.RegisterArtifact(artifactResult)
go server.Serve()
}
if err != nil {
......@@ -114,9 +101,9 @@ func (p *PostProcessorServer) PostProcess(args *PostProcessorPostProcessArgs, re
}
*reply = PostProcessorProcessResponse{
Err: err,
Keep: keep,
ArtifactEndpoint: artifactEndpoint,
Err: err,
Keep: keep,
StreamId: streamId,
}
return nil
......
......@@ -13,6 +13,7 @@ type TestPostProcessor struct {
configVal []interface{}
ppCalled bool
ppArtifact packer.Artifact
ppArtifactId string
ppUi packer.Ui
}
......@@ -25,6 +26,7 @@ func (pp *TestPostProcessor) Configure(v ...interface{}) error {
func (pp *TestPostProcessor) PostProcess(ui packer.Ui, a packer.Artifact) (packer.Artifact, bool, error) {
pp.ppCalled = true
pp.ppArtifact = a
pp.ppArtifactId = a.Id()
pp.ppUi = ui
return testPostProcessorArtifact, false, nil
}
......@@ -70,8 +72,8 @@ func TestPostProcessorRPC(t *testing.T) {
t.Fatal("postprocess should be called")
}
if p.ppArtifact.Id() != "ppTestId" {
t.Fatal("unknown artifact")
if p.ppArtifactId != "ppTestId" {
t.Fatalf("unknown artifact: %s", p.ppArtifact.Id())
}
if artifact.Id() != "id" {
......
......@@ -21,15 +21,21 @@ const (
// Server represents an RPC server for Packer. This must be paired on
// the other side with a Client.
type Server struct {
mux *MuxConn
server *rpc.Server
mux *MuxConn
streamId uint32
server *rpc.Server
}
// NewServer returns a new Packer RPC server.
func NewServer(conn io.ReadWriteCloser) *Server {
return NewServerWithMux(NewMuxConn(conn), 0)
}
func NewServerWithMux(mux *MuxConn, streamId uint32) *Server {
return &Server{
mux: NewMuxConn(conn),
server: rpc.NewServer(),
mux: mux,
streamId: streamId,
server: rpc.NewServer(),
}
}
......@@ -51,7 +57,8 @@ func (s *Server) RegisterCache(c packer.Cache) {
func (s *Server) RegisterPostProcessor(p packer.PostProcessor) {
s.server.RegisterName(DefaultPostProcessorEndpoint, &PostProcessorServer{
p: p,
mux: s.mux,
p: p,
})
}
......@@ -66,7 +73,7 @@ func (s *Server) RegisterUi(ui packer.Ui) {
func (s *Server) Serve() {
// Accept a connection on stream ID 0, which is always used for
// normal client to server connections.
stream, err := s.mux.Accept(0)
stream, err := s.mux.Accept(s.streamId)
defer stream.Close()
if err != nil {
log.Printf("[ERR] Error retrieving stream for serving: %s", err)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment