Skip to content

Commit

Permalink
Some changes
Browse files Browse the repository at this point in the history
1. update plugin
2. if unsupported, disconnected
3. update ‘PullCmd’
  • Loading branch information
andeya committed Nov 15, 2017
1 parent 1f6ed08 commit b44aecc
Show file tree
Hide file tree
Showing 10 changed files with 223 additions and 109 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ func (p *AliasPlugin) Name() string {
return "AliasPlugin"
}

// PostReadHeader converts the alias of this service.
func (p *AliasPlugin) PostReadHeader(ctx tp.ReadCtx) tp.Xerror {
// PostReadPullHeader converts the alias of this service.
func (p *AliasPlugin) PostReadPullHeader(ctx tp.ReadCtx) tp.Xerror {
var u = ctx.Input().Header.Uri
if p.Aliases != nil {
if a = p.Aliases[u]; a != "" {
Expand Down Expand Up @@ -422,8 +422,8 @@ func main() {
&reply,
)

if pullcmd.Xerror != nil {
tp.Fatalf("pull error: %v", pullcmd.Xerror.Error())
if pullcmd.Xerror() != nil {
tp.Fatalf("pull error: %v", pullcmd.Xerror().Error())
}
tp.Infof("9090reply: %#v", reply)
}
Expand All @@ -441,8 +441,8 @@ func main() {
&reply,
)

if pullcmd.Xerror != nil {
tp.Fatalf("pull error: %v", pullcmd.Xerror.Error())
if pullcmd.Xerror() != nil {
tp.Fatalf("pull error: %v", pullcmd.Xerror().Error())
}
tp.Infof("9091reply test_unknown: %#v", reply)
}
Expand Down
12 changes: 6 additions & 6 deletions README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ func (p *AliasPlugin) Name() string {
return "AliasPlugin"
}

// PostReadHeader converts the alias of this service.
func (p *AliasPlugin) PostReadHeader(ctx tp.ReadCtx) tp.Xerror {
// PostReadPullHeader converts the alias of this service.
func (p *AliasPlugin) PostReadPullHeader(ctx tp.ReadCtx) tp.Xerror {
var u = ctx.Input().Header.Uri
if p.Aliases != nil {
if a = p.Aliases[u]; a != "" {
Expand Down Expand Up @@ -425,8 +425,8 @@ func main() {
&reply,
)

if pullcmd.Xerror != nil {
tp.Fatalf("pull error: %v", pullcmd.Xerror.Error())
if pullcmd.Xerror() != nil {
tp.Fatalf("pull error: %v", pullcmd.Xerror().Error())
}
tp.Infof("9090reply: %#v", reply)
}
Expand All @@ -444,8 +444,8 @@ func main() {
&reply,
)

if pullcmd.Xerror != nil {
tp.Fatalf("pull error: %v", pullcmd.Xerror.Error())
if pullcmd.Xerror() != nil {
tp.Fatalf("pull error: %v", pullcmd.Xerror().Error())
}
tp.Infof("9091reply test_unknown: %#v", reply)
}
Expand Down
82 changes: 37 additions & 45 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,23 @@ func (c *readHandleCtx) handle() {
c.handlePull()

default:
c.handleUnsupported()
// if unsupported, disconnected.
c.output.Header.StatusCode = StatusUnsupportedTx
c.output.Header.Status = StatusText(StatusUnsupportedTx)
if c.sess.peer.printBody {
logformat := "disconnect(%s) due to unsupported type: %d |\nuri: %-30s |\nRECV:\n size: %d\n body[-json]: %s\n"
Errorf(logformat, c.Ip(), c.input.Header.Type, c.input.Header.Uri, c.input.Size, bodyLogBytes(c.input))
} else {
logformat := "disconnect(%s) due to unsupported type: %d |\nuri: %-30s |\nRECV:\n size: %d\n"
Errorf(logformat, c.Ip(), c.input.Header.Type, c.input.Header.Uri, c.input.Size)
}
go c.sess.Close()
}
}

func (c *readHandleCtx) bindPush(header *socket.Header) interface{} {
var err error
err = c.pluginContainer.PostReadHeader(c)
err = c.pluginContainer.PostReadPushHeader(c)
if err != nil {
return nil
}
Expand All @@ -275,7 +285,7 @@ func (c *readHandleCtx) bindPush(header *socket.Header) interface{} {
c.arg = reflect.New(c.apiType.argElem)
c.input.Body = c.arg.Interface()

err = c.pluginContainer.PreReadBody(c)
err = c.pluginContainer.PreReadPushBody(c)
if err != nil {
return nil
}
Expand All @@ -297,7 +307,7 @@ func (c *readHandleCtx) handlePush() {
return
}

err := c.pluginContainer.PostReadBody(c)
err := c.pluginContainer.PostReadPushBody(c)
if err != nil {
return
}
Expand All @@ -316,7 +326,7 @@ func (c *readHandleCtx) bindPull(header *socket.Header) interface{} {
c.output.HeaderCodec = c.input.HeaderCodec
c.output.Header.Gzip = header.Gzip

xerr := c.pluginContainer.PostReadHeader(c)
xerr := c.pluginContainer.PostReadPullHeader(c)
if xerr != nil {
c.output.Header.StatusCode = xerr.Code()
c.output.Header.Status = xerr.Text()
Expand Down Expand Up @@ -346,7 +356,7 @@ func (c *readHandleCtx) bindPull(header *socket.Header) interface{} {
c.input.Body = c.arg.Interface()
}

xerr = c.pluginContainer.PreReadBody(c)
xerr = c.pluginContainer.PreReadPullBody(c)
if xerr != nil {
c.output.Header.StatusCode = xerr.Code()
c.output.Header.Status = xerr.Text()
Expand All @@ -370,7 +380,7 @@ func (c *readHandleCtx) handlePull() {

// handle pull
if c.output.Header.StatusCode == StatusOK {
xerr := c.pluginContainer.PostReadBody(c)
xerr := c.pluginContainer.PostReadPullBody(c)
if xerr != nil {
c.output.Header.StatusCode = xerr.Code()
c.output.Header.Status = xerr.Text()
Expand Down Expand Up @@ -403,14 +413,14 @@ func (c *readHandleCtx) bindReply(header *socket.Header) interface{} {
c.pullCmd = pullCmd.(*PullCmd)
c.public = c.pullCmd.public

xerr := c.pluginContainer.PostReadHeader(c)
xerr := c.pluginContainer.PostReadReplyHeader(c)
if xerr != nil {
c.pullCmd.Xerror = xerr
c.pullCmd.xerror = xerr
return nil
}
xerr = c.pluginContainer.PreReadBody(c)
xerr = c.pluginContainer.PreReadReplyBody(c)
if xerr != nil {
c.pullCmd.Xerror = xerr
c.pullCmd.xerror = xerr
return nil
}
return c.pullCmd.reply
Expand All @@ -430,42 +440,14 @@ func (c *readHandleCtx) handleReply() {
c.pullCmd.cost = time.Since(c.pullCmd.start)
c.sess.runlog(c.pullCmd.cost, c.input, c.pullCmd.output)
}()
if c.pullCmd.Xerror != nil {
if c.pullCmd.xerror != nil {
return
}
if c.input.Header.StatusCode != StatusOK {
c.pullCmd.Xerror = NewXerror(c.input.Header.StatusCode, c.input.Header.Status)
} else if xerr := c.pluginContainer.PostReadBody(c); xerr != nil {
c.pullCmd.Xerror = xerr
}
}

func (c *readHandleCtx) handleUnsupported() {
defer func() {
if p := recover(); p != nil {
Errorf("panic:\n%v\n%s", p, goutil.PanicTrace(1))
}
c.cost = time.Since(c.start)
c.sess.runlog(c.cost, c.input, c.output)
}()
c.pluginContainer.PostReadBody(c)
c.output.Header.StatusCode = StatusUnsupportedTx
c.output.Header.Status = StatusText(StatusUnsupportedTx)
c.output.Body = nil

if len(c.output.BodyCodec) == 0 {
c.output.BodyCodec = c.input.BodyCodec
c.pullCmd.xerror = NewXerror(c.input.Header.StatusCode, c.input.Header.Status)
} else if xerr := c.pluginContainer.PostReadReplyBody(c); xerr != nil {
c.pullCmd.xerror = xerr
}

c.pluginContainer.PreWriteReply(c)

err := c.sess.write(c.output)
if err != nil {
c.output.Header.StatusCode = StatusWriteFailed
c.output.Header.Status = StatusText(StatusWriteFailed) + ": " + err.Error()
}

c.pluginContainer.PostWriteReply(c)
}

// InputBodyBytes if the input body binder is []byte type, returns it, else returns nil.
Expand Down Expand Up @@ -495,11 +477,11 @@ type PullCmd struct {
sess *session
output *socket.Packet
reply interface{}
xerror Xerror
doneChan chan *PullCmd // Strobes when pull is complete.
start time.Time
cost time.Duration
public goutil.Map
Xerror Xerror
}

var _ WriteCtx = new(PullCmd)
Expand Down Expand Up @@ -534,11 +516,21 @@ func (c *PullCmd) Output() *socket.Packet {
return c.output
}

// Result returns the pull result.
func (c *PullCmd) Result() (interface{}, Xerror) {
return c.reply, c.xerror
}

// Xerror returns the pull error.
func (c *PullCmd) Xerror() Xerror {
return c.xerror
}

func (p *PullCmd) cancel() {
defer func() {
recover()
}()
p.Xerror = NewXerror(StatusConnClosed, StatusText(StatusConnClosed))
p.xerror = NewXerror(StatusConnClosed, StatusText(StatusConnClosed))
p.doneChan <- p
p.sess.pullCmdMap.Delete(p.output.Header.Seq)
{
Expand Down
2 changes: 1 addition & 1 deletion parameter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package tp

// Packet Header types
// Packet types
const (
TypeUndefined int32 = 0
TypePull int32 = 1
Expand Down
Loading

0 comments on commit b44aecc

Please sign in to comment.