X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=core%2Fstream.go;h=3d417f16d65dafef3e9b14fbc6bfd3bf467856fa;hb=5de7f6b85458615fa592a335d45c546397f32c9a;hp=edc3f2b9145dbb2cbf612900fa18e24014119d21;hpb=df67791c6ffc96331f75aec7d3addfe2efca7739;p=govpp.git diff --git a/core/stream.go b/core/stream.go index edc3f2b..3d417f1 100644 --- a/core/stream.go +++ b/core/stream.go @@ -19,7 +19,9 @@ import ( "errors" "fmt" "reflect" + "sync" "sync/atomic" + "time" "git.fd.io/govpp.git/api" ) @@ -29,41 +31,65 @@ type Stream struct { conn *Connection ctx context.Context channel *Channel + // available options + requestSize int + replySize int + replyTimeout time.Duration + // per-request context + pkgPath string + sync.Mutex } -func (c *Connection) NewStream(ctx context.Context) (api.Stream, error) { +func (c *Connection) NewStream(ctx context.Context, options ...api.StreamOption) (api.Stream, error) { if c == nil { return nil, errors.New("nil connection passed in") } - // TODO: add stream options as variadic parameters for customizing: - // - request/reply channel size - // - reply timeout - // - retries - // - ??? + s := &Stream{ + conn: c, + ctx: ctx, + // default options + requestSize: RequestChanBufSize, + replySize: ReplyChanBufSize, + replyTimeout: DefaultReplyTimeout, + } - // create new channel - chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) - channel := newChannel(chID, c, c.codec, c, 10, 10) + // parse custom options + for _, option := range options { + option(s) + } + // create and store a new channel + s.id = atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff + s.channel = newChannel(uint16(s.id), c, c.codec, c, s.requestSize, s.replySize) + s.channel.SetReplyTimeout(s.replyTimeout) // store API channel within the client c.channelsLock.Lock() - c.channels[chID] = channel + c.channels[uint16(s.id)] = s.channel c.channelsLock.Unlock() // Channel.watchRequests are not started here intentionally, because // requests are sent directly by SendMsg. - return &Stream{ - id: uint32(chID), - conn: c, - ctx: ctx, - channel: channel, - }, nil + return s, nil } func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error { - // TODO: implement invoke - panic("not implemented") + stream, err := c.NewStream(ctx) + if err != nil { + return err + } + if err := stream.SendMsg(req); err != nil { + return err + } + s := stream.(*Stream) + rep, err := s.recvReply() + if err != nil { + return err + } + if err := s.channel.msgCodec.DecodeMsg(rep.data, reply); err != nil { + return err + } + return nil } func (s *Stream) Context() context.Context { @@ -87,10 +113,53 @@ func (s *Stream) SendMsg(msg api.Message) error { if err := s.conn.processRequest(s.channel, req); err != nil { return err } + s.Lock() + s.pkgPath = s.conn.GetMessagePath(msg) + s.Unlock() return nil } func (s *Stream) RecvMsg() (api.Message, error) { + reply, err := s.recvReply() + if err != nil { + return nil, err + } + // resolve message type + s.Lock() + path := s.pkgPath + s.Unlock() + msg, err := s.channel.msgIdentifier.LookupByID(path, reply.msgID) + if err != nil { + return nil, err + } + // allocate message instance + msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + // decode message data + if err := s.channel.msgCodec.DecodeMsg(reply.data, msg); err != nil { + return nil, err + } + return msg, nil +} + +func WithRequestSize(size int) api.StreamOption { + return func(stream api.Stream) { + stream.(*Stream).requestSize = size + } +} + +func WithReplySize(size int) api.StreamOption { + return func(stream api.Stream) { + stream.(*Stream).replySize = size + } +} + +func WithReplyTimeout(timeout time.Duration) api.StreamOption { + return func(stream api.Stream) { + stream.(*Stream).replyTimeout = timeout + } +} + +func (s *Stream) recvReply() (*vppReply, error) { if s.conn == nil { return nil, errors.New("stream closed") } @@ -105,18 +174,7 @@ func (s *Stream) RecvMsg() (api.Message, error) { // and stream does not use it return nil, reply.err } - // resolve message type - msg, err := s.channel.msgIdentifier.LookupByID(reply.msgID) - if err != nil { - return nil, err - } - // allocate message instance - msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) - // decode message data - if err := s.channel.msgCodec.DecodeMsg(reply.data, msg); err != nil { - return nil, err - } - return msg, nil + return reply, nil case <-s.ctx.Done(): return nil, s.ctx.Err()