1 // Copyright (c) 2020 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
25 "git.fd.io/govpp.git/api"
35 replyTimeout time.Duration
36 // per-request context
41 func (c *Connection) NewStream(ctx context.Context, options ...api.StreamOption) (api.Stream, error) {
43 return nil, errors.New("nil connection passed in")
49 requestSize: RequestChanBufSize,
50 replySize: ReplyChanBufSize,
51 replyTimeout: DefaultReplyTimeout,
54 // parse custom options
55 for _, option := range options {
59 s.channel = c.newChannel(s.requestSize, s.replySize)
60 s.channel.SetReplyTimeout(s.replyTimeout)
62 // Channel.watchRequests are not started here intentionally, because
63 // requests are sent directly by SendMsg.
68 func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error {
69 stream, err := c.NewStream(ctx)
73 if err := stream.SendMsg(req); err != nil {
77 rep, err := s.recvReply()
81 if err := s.channel.msgCodec.DecodeMsg(rep.data, reply); err != nil {
87 func (s *Stream) Context() context.Context {
91 func (s *Stream) Close() error {
93 return errors.New("stream closed")
95 s.conn.releaseAPIChannel(s.channel)
100 func (s *Stream) SendMsg(msg api.Message) error {
102 return errors.New("stream closed")
104 req := s.channel.newRequest(msg, false)
105 if err := s.conn.processRequest(s.channel, req); err != nil {
109 s.pkgPath = s.conn.GetMessagePath(msg)
114 func (s *Stream) RecvMsg() (api.Message, error) {
115 reply, err := s.recvReply()
119 // resolve message type
123 msg, err := s.channel.msgIdentifier.LookupByID(path, reply.msgID)
127 // allocate message instance
128 msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
129 // decode message data
130 if err := s.channel.msgCodec.DecodeMsg(reply.data, msg); err != nil {
136 func WithRequestSize(size int) api.StreamOption {
137 return func(stream api.Stream) {
138 stream.(*Stream).requestSize = size
142 func WithReplySize(size int) api.StreamOption {
143 return func(stream api.Stream) {
144 stream.(*Stream).replySize = size
148 func WithReplyTimeout(timeout time.Duration) api.StreamOption {
149 return func(stream api.Stream) {
150 stream.(*Stream).replyTimeout = timeout
154 func (s *Stream) recvReply() (*vppReply, error) {
156 return nil, errors.New("stream closed")
159 case reply, ok := <-s.channel.replyChan:
161 return nil, fmt.Errorf("reply channel closed")
163 if reply.err != nil {
164 // this case should actually never happen for stream
165 // since reply.err is only filled in watchRequests
166 // and stream does not use it
167 return nil, reply.err
172 return nil, s.ctx.Err()