Stream API options
[govpp.git] / core / stream.go
index 61a9965..abe9d55 100644 (file)
@@ -20,6 +20,7 @@ import (
        "fmt"
        "reflect"
        "sync/atomic"
+       "time"
 
        "git.fd.io/govpp.git/api"
 )
@@ -29,36 +30,43 @@ type Stream struct {
        conn    *Connection
        ctx     context.Context
        channel *Channel
+       // available options
+       requestSize  int
+       replySize    int
+       replyTimeout time.Duration
 }
 
-func (c *Connection) NewStream(ctx context.Context) (api.Stream, error) {
+func (c *Connection) NewStream(ctx context.Context, options ...api.StreamOption) (api.Stream, error) {
        if c == nil {
                return nil, errors.New("nil connection passed in")
        }
-       // TODO: add stream options as variadic parameters for customizing:
-       // - request/reply channel size
-       // - reply timeout
-       // - retries
-       // - ???
+       s := &Stream{
+               conn: c,
+               ctx:  ctx,
+               // default options
+               requestSize:  RequestChanBufSize,
+               replySize:    ReplyChanBufSize,
+               replyTimeout: DefaultReplyTimeout,
+       }
 
-       // create new channel
-       chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
-       channel := newChannel(chID, c, c.codec, c, 10, 10)
+       // parse custom options
+       for _, option := range options {
+               option(s)
+       }
+       // create and store a new channel
+       s.id = atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff
+       s.channel = newChannel(uint16(s.id), c, c.codec, c, s.requestSize, s.replySize)
+       s.channel.SetReplyTimeout(s.replyTimeout)
 
        // store API channel within the client
        c.channelsLock.Lock()
-       c.channels[chID] = channel
+       c.channels[uint16(s.id)] = s.channel
        c.channelsLock.Unlock()
 
        // Channel.watchRequests are not started here intentionally, because
        // requests are sent directly by SendMsg.
 
-       return &Stream{
-               id:      uint32(chID),
-               conn:    c,
-               ctx:     ctx,
-               channel: channel,
-       }, nil
+       return s, nil
 }
 
 func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error {
@@ -123,6 +131,24 @@ func (s *Stream) RecvMsg() (api.Message, error) {
        return msg, nil
 }
 
+func WithRequestSize(size int) api.StreamOption {
+       return func(stream api.Stream) {
+               stream.(*Stream).requestSize = size
+       }
+}
+
+func WithReplySize(size int) api.StreamOption {
+       return func(stream api.Stream) {
+               stream.(*Stream).replySize = size
+       }
+}
+
+func WithReplyTimeout(timeout time.Duration) api.StreamOption {
+       return func(stream api.Stream) {
+               stream.(*Stream).replyTimeout = timeout
+       }
+}
+
 func (s *Stream) recvReply() (*vppReply, error) {
        if s.conn == nil {
                return nil, errors.New("stream closed")