1 // Copyright (c) 2020 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
25 "git.fd.io/govpp.git/api"
35 replyTimeout time.Duration
36 // per-request context
41 func (c *Connection) NewStream(ctx context.Context, options ...api.StreamOption) (api.Stream, error) {
43 return nil, errors.New("nil connection passed in")
49 requestSize: RequestChanBufSize,
50 replySize: ReplyChanBufSize,
51 replyTimeout: DefaultReplyTimeout,
54 // parse custom options
55 for _, option := range options {
59 ch, err := c.newChannel(s.requestSize, s.replySize)
64 s.channel.SetReplyTimeout(s.replyTimeout)
66 // Channel.watchRequests are not started here intentionally, because
67 // requests are sent directly by SendMsg.
72 func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error {
73 stream, err := c.NewStream(ctx)
77 defer func() { _ = stream.Close() }()
78 if err := stream.SendMsg(req); err != nil {
82 rep, err := s.recvReply()
86 if err := s.channel.msgCodec.DecodeMsg(rep.data, reply); err != nil {
92 func (s *Stream) Context() context.Context {
96 func (s *Stream) Close() error {
98 return errors.New("stream closed")
100 s.conn.releaseAPIChannel(s.channel)
105 func (s *Stream) SendMsg(msg api.Message) error {
107 return errors.New("stream closed")
109 req := s.channel.newRequest(msg, false)
110 if err := s.conn.processRequest(s.channel, req); err != nil {
114 s.pkgPath = s.conn.GetMessagePath(msg)
119 func (s *Stream) RecvMsg() (api.Message, error) {
120 reply, err := s.recvReply()
124 // resolve message type
128 msg, err := s.channel.msgIdentifier.LookupByID(path, reply.msgID)
132 // allocate message instance
133 msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
134 // decode message data
135 if err := s.channel.msgCodec.DecodeMsg(reply.data, msg); err != nil {
141 func WithRequestSize(size int) api.StreamOption {
142 return func(stream api.Stream) {
143 stream.(*Stream).requestSize = size
147 func WithReplySize(size int) api.StreamOption {
148 return func(stream api.Stream) {
149 stream.(*Stream).replySize = size
153 func WithReplyTimeout(timeout time.Duration) api.StreamOption {
154 return func(stream api.Stream) {
155 stream.(*Stream).replyTimeout = timeout
159 func (s *Stream) recvReply() (*vppReply, error) {
161 return nil, errors.New("stream closed")
164 case reply, ok := <-s.channel.replyChan:
166 return nil, fmt.Errorf("reply channel closed")
168 if reply.err != nil {
169 // this case should actually never happen for stream
170 // since reply.err is only filled in watchRequests
171 // and stream does not use it
172 return nil, reply.err
177 return nil, s.ctx.Err()