Introduce Stream - experimental API for low-level access to VPP API
[govpp.git] / core / stream.go
1 //  Copyright (c) 2020 Cisco and/or its affiliates.
2 //
3 //  Licensed under the Apache License, Version 2.0 (the "License");
4 //  you may not use this file except in compliance with the License.
5 //  You may obtain a copy of the License at:
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 //  Unless required by applicable law or agreed to in writing, software
10 //  distributed under the License is distributed on an "AS IS" BASIS,
11 //  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 //  See the License for the specific language governing permissions and
13 //  limitations under the License.
14
15 package core
16
17 import (
18         "context"
19         "errors"
20         "fmt"
21         "reflect"
22         "sync/atomic"
23
24         "git.fd.io/govpp.git/api"
25 )
26
27 type Stream struct {
28         id      uint32
29         conn    *Connection
30         ctx     context.Context
31         channel *Channel
32 }
33
34 func (c *Connection) NewStream(ctx context.Context) (api.Stream, error) {
35         if c == nil {
36                 return nil, errors.New("nil connection passed in")
37         }
38         // TODO: add stream options as variadic parameters for customizing:
39         // - request/reply channel size
40         // - reply timeout
41         // - retries
42         // - ???
43
44         // create new channel
45         chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
46         channel := newChannel(chID, c, c.codec, c, 10, 10)
47
48         // store API channel within the client
49         c.channelsLock.Lock()
50         c.channels[chID] = channel
51         c.channelsLock.Unlock()
52
53         // Channel.watchRequests are not started here intentionally, because
54         // requests are sent directly by SendMsg.
55
56         return &Stream{
57                 id:      uint32(chID),
58                 conn:    c,
59                 ctx:     ctx,
60                 channel: channel,
61         }, nil
62 }
63
64 func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error {
65         // TODO: implement invoke
66         panic("not implemented")
67 }
68
69 func (s *Stream) Context() context.Context {
70         return s.ctx
71 }
72
73 func (s *Stream) Close() error {
74         if s.conn == nil {
75                 return errors.New("stream closed")
76         }
77         s.conn.releaseAPIChannel(s.channel)
78         s.conn = nil
79         return nil
80 }
81
82 func (s *Stream) SendMsg(msg api.Message) error {
83         if s.conn == nil {
84                 return errors.New("stream closed")
85         }
86         req := s.channel.newRequest(msg, false)
87         if err := s.conn.processRequest(s.channel, req); err != nil {
88                 return err
89         }
90         return nil
91 }
92
93 func (s *Stream) RecvMsg() (api.Message, error) {
94         if s.conn == nil {
95                 return nil, errors.New("stream closed")
96         }
97         select {
98         case reply, ok := <-s.channel.replyChan:
99                 if !ok {
100                         return nil, fmt.Errorf("reply channel closed")
101                 }
102                 if reply.err != nil {
103                         // this case should actually never happen for stream
104                         // since reply.err is only filled in watchRequests
105                         // and stream does not use it
106                         return nil, reply.err
107                 }
108                 // resolve message type
109                 msg, err := s.channel.msgIdentifier.LookupByID(reply.msgID)
110                 if err != nil {
111                         return nil, err
112                 }
113                 // allocate message instance
114                 msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
115                 // decode message data
116                 if err := s.channel.msgCodec.DecodeMsg(reply.data, msg); err != nil {
117                         return nil, err
118                 }
119                 return msg, nil
120
121         case <-s.ctx.Done():
122                 return nil, s.ctx.Err()
123         }
124 }