1 // Copyright (c) 2020 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
26 "git.fd.io/govpp.git/api"
37 replyTimeout time.Duration
38 // per-request context
43 func (c *Connection) NewStream(ctx context.Context, options ...api.StreamOption) (api.Stream, error) {
45 return nil, errors.New("nil connection passed in")
51 requestSize: RequestChanBufSize,
52 replySize: ReplyChanBufSize,
53 replyTimeout: DefaultReplyTimeout,
56 // parse custom options
57 for _, option := range options {
60 // create and store a new channel
61 s.id = atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff
62 s.channel = newChannel(uint16(s.id), c, c.codec, c, s.requestSize, s.replySize)
63 s.channel.SetReplyTimeout(s.replyTimeout)
65 // store API channel within the client
67 c.channels[uint16(s.id)] = s.channel
68 c.channelsLock.Unlock()
70 // Channel.watchRequests are not started here intentionally, because
71 // requests are sent directly by SendMsg.
76 func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error {
77 stream, err := c.NewStream(ctx)
81 if err := stream.SendMsg(req); err != nil {
85 rep, err := s.recvReply()
89 if err := s.channel.msgCodec.DecodeMsg(rep.data, reply); err != nil {
95 func (s *Stream) Context() context.Context {
99 func (s *Stream) Close() error {
101 return errors.New("stream closed")
103 s.conn.releaseAPIChannel(s.channel)
108 func (s *Stream) SendMsg(msg api.Message) error {
110 return errors.New("stream closed")
112 req := s.channel.newRequest(msg, false)
113 if err := s.conn.processRequest(s.channel, req); err != nil {
117 s.pkgPath = s.conn.GetMessagePath(msg)
122 func (s *Stream) RecvMsg() (api.Message, error) {
123 reply, err := s.recvReply()
127 // resolve message type
131 msg, err := s.channel.msgIdentifier.LookupByID(path, reply.msgID)
135 // allocate message instance
136 msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
137 // decode message data
138 if err := s.channel.msgCodec.DecodeMsg(reply.data, msg); err != nil {
144 func WithRequestSize(size int) api.StreamOption {
145 return func(stream api.Stream) {
146 stream.(*Stream).requestSize = size
150 func WithReplySize(size int) api.StreamOption {
151 return func(stream api.Stream) {
152 stream.(*Stream).replySize = size
156 func WithReplyTimeout(timeout time.Duration) api.StreamOption {
157 return func(stream api.Stream) {
158 stream.(*Stream).replyTimeout = timeout
162 func (s *Stream) recvReply() (*vppReply, error) {
164 return nil, errors.New("stream closed")
167 case reply, ok := <-s.channel.replyChan:
169 return nil, fmt.Errorf("reply channel closed")
171 if reply.err != nil {
172 // this case should actually never happen for stream
173 // since reply.err is only filled in watchRequests
174 // and stream does not use it
175 return nil, reply.err
180 return nil, s.ctx.Err()