1 // Copyright (c) 2020 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
24 "git.fd.io/govpp.git/api"
34 func (c *Connection) NewStream(ctx context.Context) (api.Stream, error) {
36 return nil, errors.New("nil connection passed in")
38 // TODO: add stream options as variadic parameters for customizing:
39 // - request/reply channel size
45 chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
46 channel := newChannel(chID, c, c.codec, c, 10, 10)
48 // store API channel within the client
50 c.channels[chID] = channel
51 c.channelsLock.Unlock()
53 // Channel.watchRequests are not started here intentionally, because
54 // requests are sent directly by SendMsg.
64 func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error {
65 // TODO: implement invoke
66 panic("not implemented")
69 func (s *Stream) Context() context.Context {
73 func (s *Stream) Close() error {
75 return errors.New("stream closed")
77 s.conn.releaseAPIChannel(s.channel)
82 func (s *Stream) SendMsg(msg api.Message) error {
84 return errors.New("stream closed")
86 req := s.channel.newRequest(msg, false)
87 if err := s.conn.processRequest(s.channel, req); err != nil {
93 func (s *Stream) RecvMsg() (api.Message, error) {
95 return nil, errors.New("stream closed")
98 case reply, ok := <-s.channel.replyChan:
100 return nil, fmt.Errorf("reply channel closed")
102 if reply.err != nil {
103 // this case should actually never happen for stream
104 // since reply.err is only filled in watchRequests
105 // and stream does not use it
106 return nil, reply.err
108 // resolve message type
109 msg, err := s.channel.msgIdentifier.LookupByID(reply.msgID)
113 // allocate message instance
114 msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
115 // decode message data
116 if err := s.channel.msgCodec.DecodeMsg(reply.data, msg); err != nil {
122 return nil, s.ctx.Err()