1 // Copyright (c) 2020 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
24 "git.fd.io/govpp.git/api"
34 func (c *Connection) NewStream(ctx context.Context) (api.Stream, error) {
36 return nil, errors.New("nil connection passed in")
38 // TODO: add stream options as variadic parameters for customizing:
39 // - request/reply channel size
45 chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
46 channel := newChannel(chID, c, c.codec, c, 10, 10)
48 // store API channel within the client
50 c.channels[chID] = channel
51 c.channelsLock.Unlock()
53 // Channel.watchRequests are not started here intentionally, because
54 // requests are sent directly by SendMsg.
64 func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error {
65 stream, err := c.NewStream(ctx)
69 if err := stream.SendMsg(req); err != nil {
72 msg, err := stream.RecvMsg()
76 if msg.GetMessageName() != reply.GetMessageName() ||
77 msg.GetCrcString() != reply.GetCrcString() {
78 return fmt.Errorf("unexpected reply: %T %+v", msg, msg)
80 reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(msg).Elem())
84 func (s *Stream) Context() context.Context {
88 func (s *Stream) Close() error {
90 return errors.New("stream closed")
92 s.conn.releaseAPIChannel(s.channel)
97 func (s *Stream) SendMsg(msg api.Message) error {
99 return errors.New("stream closed")
101 req := s.channel.newRequest(msg, false)
102 if err := s.conn.processRequest(s.channel, req); err != nil {
108 func (s *Stream) RecvMsg() (api.Message, error) {
110 return nil, errors.New("stream closed")
113 case reply, ok := <-s.channel.replyChan:
115 return nil, fmt.Errorf("reply channel closed")
117 if reply.err != nil {
118 // this case should actually never happen for stream
119 // since reply.err is only filled in watchRequests
120 // and stream does not use it
121 return nil, reply.err
123 // resolve message type
124 msg, err := s.channel.msgIdentifier.LookupByID(reply.msgID)
128 // allocate message instance
129 msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
130 // decode message data
131 if err := s.channel.msgCodec.DecodeMsg(reply.data, msg); err != nil {
137 return nil, s.ctx.Err()