X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=core%2Fstream.go;h=86bb99e290a3802e8601b99df8e3787085e12bd3;hb=refs%2Fheads%2Fmaster;hp=61a996539eb1b0aa12ce43dbbece43f397ffb887;hpb=e3cea0cd4447be264374b56788da0e93b2e67a7e;p=govpp.git diff --git a/core/stream.go b/core/stream.go index 61a9965..86bb99e 100644 --- a/core/stream.go +++ b/core/stream.go @@ -19,46 +19,54 @@ import ( "errors" "fmt" "reflect" - "sync/atomic" + "sync" + "time" - "git.fd.io/govpp.git/api" + "go.fd.io/govpp/api" ) type Stream struct { - id uint32 conn *Connection ctx context.Context channel *Channel + // available options + requestSize int + replySize int + replyTimeout time.Duration + // per-request context + pkgPath string + sync.Mutex } -func (c *Connection) NewStream(ctx context.Context) (api.Stream, error) { +func (c *Connection) NewStream(ctx context.Context, options ...api.StreamOption) (api.Stream, error) { if c == nil { return nil, errors.New("nil connection passed in") } - // TODO: add stream options as variadic parameters for customizing: - // - request/reply channel size - // - reply timeout - // - retries - // - ??? + s := &Stream{ + conn: c, + ctx: ctx, + // default options + requestSize: RequestChanBufSize, + replySize: ReplyChanBufSize, + replyTimeout: DefaultReplyTimeout, + } - // create new channel - chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) - channel := newChannel(chID, c, c.codec, c, 10, 10) + // parse custom options + for _, option := range options { + option(s) + } - // store API channel within the client - c.channelsLock.Lock() - c.channels[chID] = channel - c.channelsLock.Unlock() + ch, err := c.newChannel(s.requestSize, s.replySize) + if err != nil { + return nil, err + } + s.channel = ch + s.channel.SetReplyTimeout(s.replyTimeout) // Channel.watchRequests are not started here intentionally, because // requests are sent directly by SendMsg. - return &Stream{ - id: uint32(chID), - conn: c, - ctx: ctx, - channel: channel, - }, nil + return s, nil } func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error { @@ -66,6 +74,7 @@ func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Mess if err != nil { return err } + defer func() { _ = stream.Close() }() if err := stream.SendMsg(req); err != nil { return err } @@ -101,6 +110,9 @@ func (s *Stream) SendMsg(msg api.Message) error { if err := s.conn.processRequest(s.channel, req); err != nil { return err } + s.Lock() + s.pkgPath = s.conn.GetMessagePath(msg) + s.Unlock() return nil } @@ -110,7 +122,10 @@ func (s *Stream) RecvMsg() (api.Message, error) { return nil, err } // resolve message type - msg, err := s.channel.msgIdentifier.LookupByID(reply.msgID) + s.Lock() + path := s.pkgPath + s.Unlock() + msg, err := s.channel.msgIdentifier.LookupByID(path, reply.msgID) if err != nil { return nil, err } @@ -123,6 +138,24 @@ func (s *Stream) RecvMsg() (api.Message, error) { return msg, nil } +func WithRequestSize(size int) api.StreamOption { + return func(stream api.Stream) { + stream.(*Stream).requestSize = size + } +} + +func WithReplySize(size int) api.StreamOption { + return func(stream api.Stream) { + stream.(*Stream).replySize = size + } +} + +func WithReplyTimeout(timeout time.Duration) api.StreamOption { + return func(stream api.Stream) { + stream.(*Stream).replyTimeout = timeout + } +} + func (s *Stream) recvReply() (*vppReply, error) { if s.conn == nil { return nil, errors.New("stream closed")