1 // Copyright (c) 2020 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
25 "git.fd.io/govpp.git/api"
35 replyTimeout time.Duration
36 // per-request context
41 func (c *Connection) NewStream(ctx context.Context, options ...api.StreamOption) (api.Stream, error) {
43 return nil, errors.New("nil connection passed in")
49 requestSize: RequestChanBufSize,
50 replySize: ReplyChanBufSize,
51 replyTimeout: DefaultReplyTimeout,
54 // parse custom options
55 for _, option := range options {
59 s.channel = c.newChannel(s.requestSize, s.replySize)
60 s.channel.SetReplyTimeout(s.replyTimeout)
62 // Channel.watchRequests are not started here intentionally, because
63 // requests are sent directly by SendMsg.
68 func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error {
69 stream, err := c.NewStream(ctx)
73 defer func() { _ = stream.Close() }()
74 if err := stream.SendMsg(req); err != nil {
78 rep, err := s.recvReply()
82 if err := s.channel.msgCodec.DecodeMsg(rep.data, reply); err != nil {
88 func (s *Stream) Context() context.Context {
92 func (s *Stream) Close() error {
94 return errors.New("stream closed")
96 s.conn.releaseAPIChannel(s.channel)
101 func (s *Stream) SendMsg(msg api.Message) error {
103 return errors.New("stream closed")
105 req := s.channel.newRequest(msg, false)
106 if err := s.conn.processRequest(s.channel, req); err != nil {
110 s.pkgPath = s.conn.GetMessagePath(msg)
115 func (s *Stream) RecvMsg() (api.Message, error) {
116 reply, err := s.recvReply()
120 // resolve message type
124 msg, err := s.channel.msgIdentifier.LookupByID(path, reply.msgID)
128 // allocate message instance
129 msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
130 // decode message data
131 if err := s.channel.msgCodec.DecodeMsg(reply.data, msg); err != nil {
137 func WithRequestSize(size int) api.StreamOption {
138 return func(stream api.Stream) {
139 stream.(*Stream).requestSize = size
143 func WithReplySize(size int) api.StreamOption {
144 return func(stream api.Stream) {
145 stream.(*Stream).replySize = size
149 func WithReplyTimeout(timeout time.Duration) api.StreamOption {
150 return func(stream api.Stream) {
151 stream.(*Stream).replyTimeout = timeout
155 func (s *Stream) recvReply() (*vppReply, error) {
157 return nil, errors.New("stream closed")
160 case reply, ok := <-s.channel.replyChan:
162 return nil, fmt.Errorf("reply channel closed")
164 if reply.err != nil {
165 // this case should actually never happen for stream
166 // since reply.err is only filled in watchRequests
167 // and stream does not use it
168 return nil, reply.err
173 return nil, s.ctx.Err()