Expose version info control flags
[govpp.git] / core / stream.go
1 //  Copyright (c) 2020 Cisco and/or its affiliates.
2 //
3 //  Licensed under the Apache License, Version 2.0 (the "License");
4 //  you may not use this file except in compliance with the License.
5 //  You may obtain a copy of the License at:
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 //  Unless required by applicable law or agreed to in writing, software
10 //  distributed under the License is distributed on an "AS IS" BASIS,
11 //  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 //  See the License for the specific language governing permissions and
13 //  limitations under the License.
14
15 package core
16
17 import (
18         "context"
19         "errors"
20         "fmt"
21         "reflect"
22         "sync/atomic"
23
24         "git.fd.io/govpp.git/api"
25 )
26
27 type Stream struct {
28         id      uint32
29         conn    *Connection
30         ctx     context.Context
31         channel *Channel
32 }
33
34 func (c *Connection) NewStream(ctx context.Context) (api.Stream, error) {
35         if c == nil {
36                 return nil, errors.New("nil connection passed in")
37         }
38         // TODO: add stream options as variadic parameters for customizing:
39         // - request/reply channel size
40         // - reply timeout
41         // - retries
42         // - ???
43
44         // create new channel
45         chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
46         channel := newChannel(chID, c, c.codec, c, 10, 10)
47
48         // store API channel within the client
49         c.channelsLock.Lock()
50         c.channels[chID] = channel
51         c.channelsLock.Unlock()
52
53         // Channel.watchRequests are not started here intentionally, because
54         // requests are sent directly by SendMsg.
55
56         return &Stream{
57                 id:      uint32(chID),
58                 conn:    c,
59                 ctx:     ctx,
60                 channel: channel,
61         }, nil
62 }
63
64 func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error {
65         stream, err := c.NewStream(ctx)
66         if err != nil {
67                 return err
68         }
69         if err := stream.SendMsg(req); err != nil {
70                 return err
71         }
72         msg, err := stream.RecvMsg()
73         if err != nil {
74                 return err
75         }
76         if msg.GetMessageName() != reply.GetMessageName() ||
77                 msg.GetCrcString() != reply.GetCrcString() {
78                 return fmt.Errorf("unexpected reply: %T %+v", msg, msg)
79         }
80         reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(msg).Elem())
81         return nil
82 }
83
84 func (s *Stream) Context() context.Context {
85         return s.ctx
86 }
87
88 func (s *Stream) Close() error {
89         if s.conn == nil {
90                 return errors.New("stream closed")
91         }
92         s.conn.releaseAPIChannel(s.channel)
93         s.conn = nil
94         return nil
95 }
96
97 func (s *Stream) SendMsg(msg api.Message) error {
98         if s.conn == nil {
99                 return errors.New("stream closed")
100         }
101         req := s.channel.newRequest(msg, false)
102         if err := s.conn.processRequest(s.channel, req); err != nil {
103                 return err
104         }
105         return nil
106 }
107
108 func (s *Stream) RecvMsg() (api.Message, error) {
109         if s.conn == nil {
110                 return nil, errors.New("stream closed")
111         }
112         select {
113         case reply, ok := <-s.channel.replyChan:
114                 if !ok {
115                         return nil, fmt.Errorf("reply channel closed")
116                 }
117                 if reply.err != nil {
118                         // this case should actually never happen for stream
119                         // since reply.err is only filled in watchRequests
120                         // and stream does not use it
121                         return nil, reply.err
122                 }
123                 // resolve message type
124                 msg, err := s.channel.msgIdentifier.LookupByID(reply.msgID)
125                 if err != nil {
126                         return nil, err
127                 }
128                 // allocate message instance
129                 msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
130                 // decode message data
131                 if err := s.channel.msgCodec.DecodeMsg(reply.data, msg); err != nil {
132                         return nil, err
133                 }
134                 return msg, nil
135
136         case <-s.ctx.Done():
137                 return nil, s.ctx.Err()
138         }
139 }