1 // Copyright (c) 2020 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
25 "git.fd.io/govpp.git/api"
36 replyTimeout time.Duration
39 func (c *Connection) NewStream(ctx context.Context, options ...api.StreamOption) (api.Stream, error) {
41 return nil, errors.New("nil connection passed in")
47 requestSize: RequestChanBufSize,
48 replySize: ReplyChanBufSize,
49 replyTimeout: DefaultReplyTimeout,
52 // parse custom options
53 for _, option := range options {
56 // create and store a new channel
57 s.id = atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff
58 s.channel = newChannel(uint16(s.id), c, c.codec, c, s.requestSize, s.replySize)
59 s.channel.SetReplyTimeout(s.replyTimeout)
61 // store API channel within the client
63 c.channels[uint16(s.id)] = s.channel
64 c.channelsLock.Unlock()
66 // Channel.watchRequests are not started here intentionally, because
67 // requests are sent directly by SendMsg.
72 func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error {
73 stream, err := c.NewStream(ctx)
77 if err := stream.SendMsg(req); err != nil {
81 rep, err := s.recvReply()
85 if err := s.channel.msgCodec.DecodeMsg(rep.data, reply); err != nil {
91 func (s *Stream) Context() context.Context {
95 func (s *Stream) Close() error {
97 return errors.New("stream closed")
99 s.conn.releaseAPIChannel(s.channel)
104 func (s *Stream) SendMsg(msg api.Message) error {
106 return errors.New("stream closed")
108 req := s.channel.newRequest(msg, false)
109 if err := s.conn.processRequest(s.channel, req); err != nil {
115 func (s *Stream) RecvMsg() (api.Message, error) {
116 reply, err := s.recvReply()
120 // resolve message type
121 msg, err := s.channel.msgIdentifier.LookupByID(reply.msgID)
125 // allocate message instance
126 msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
127 // decode message data
128 if err := s.channel.msgCodec.DecodeMsg(reply.data, msg); err != nil {
134 func WithRequestSize(size int) api.StreamOption {
135 return func(stream api.Stream) {
136 stream.(*Stream).requestSize = size
140 func WithReplySize(size int) api.StreamOption {
141 return func(stream api.Stream) {
142 stream.(*Stream).replySize = size
146 func WithReplyTimeout(timeout time.Duration) api.StreamOption {
147 return func(stream api.Stream) {
148 stream.(*Stream).replyTimeout = timeout
152 func (s *Stream) recvReply() (*vppReply, error) {
154 return nil, errors.New("stream closed")
157 case reply, ok := <-s.channel.replyChan:
159 return nil, fmt.Errorf("reply channel closed")
161 if reply.err != nil {
162 // this case should actually never happen for stream
163 // since reply.err is only filled in watchRequests
164 // and stream does not use it
165 return nil, reply.err
170 return nil, s.ctx.Err()