Generate VPE HTTP handler
[govpp.git] / core / stream.go
1 //  Copyright (c) 2020 Cisco and/or its affiliates.
2 //
3 //  Licensed under the Apache License, Version 2.0 (the "License");
4 //  you may not use this file except in compliance with the License.
5 //  You may obtain a copy of the License at:
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 //  Unless required by applicable law or agreed to in writing, software
10 //  distributed under the License is distributed on an "AS IS" BASIS,
11 //  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 //  See the License for the specific language governing permissions and
13 //  limitations under the License.
14
15 package core
16
17 import (
18         "context"
19         "errors"
20         "fmt"
21         "reflect"
22         "sync/atomic"
23
24         "git.fd.io/govpp.git/api"
25 )
26
27 type Stream struct {
28         id      uint32
29         conn    *Connection
30         ctx     context.Context
31         channel *Channel
32 }
33
34 func (c *Connection) NewStream(ctx context.Context) (api.Stream, error) {
35         if c == nil {
36                 return nil, errors.New("nil connection passed in")
37         }
38         // TODO: add stream options as variadic parameters for customizing:
39         // - request/reply channel size
40         // - reply timeout
41         // - retries
42         // - ???
43
44         // create new channel
45         chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
46         channel := newChannel(chID, c, c.codec, c, 10, 10)
47
48         // store API channel within the client
49         c.channelsLock.Lock()
50         c.channels[chID] = channel
51         c.channelsLock.Unlock()
52
53         // Channel.watchRequests are not started here intentionally, because
54         // requests are sent directly by SendMsg.
55
56         return &Stream{
57                 id:      uint32(chID),
58                 conn:    c,
59                 ctx:     ctx,
60                 channel: channel,
61         }, nil
62 }
63
64 func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error {
65         stream, err := c.NewStream(ctx)
66         if err != nil {
67                 return err
68         }
69         if err := stream.SendMsg(req); err != nil {
70                 return err
71         }
72         s := stream.(*Stream)
73         rep, err := s.recvReply()
74         if err != nil {
75                 return err
76         }
77         if err := s.channel.msgCodec.DecodeMsg(rep.data, reply); err != nil {
78                 return err
79         }
80         return nil
81 }
82
83 func (s *Stream) Context() context.Context {
84         return s.ctx
85 }
86
87 func (s *Stream) Close() error {
88         if s.conn == nil {
89                 return errors.New("stream closed")
90         }
91         s.conn.releaseAPIChannel(s.channel)
92         s.conn = nil
93         return nil
94 }
95
96 func (s *Stream) SendMsg(msg api.Message) error {
97         if s.conn == nil {
98                 return errors.New("stream closed")
99         }
100         req := s.channel.newRequest(msg, false)
101         if err := s.conn.processRequest(s.channel, req); err != nil {
102                 return err
103         }
104         return nil
105 }
106
107 func (s *Stream) RecvMsg() (api.Message, error) {
108         reply, err := s.recvReply()
109         if err != nil {
110                 return nil, err
111         }
112         // resolve message type
113         msg, err := s.channel.msgIdentifier.LookupByID(reply.msgID)
114         if err != nil {
115                 return nil, err
116         }
117         // allocate message instance
118         msg = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
119         // decode message data
120         if err := s.channel.msgCodec.DecodeMsg(reply.data, msg); err != nil {
121                 return nil, err
122         }
123         return msg, nil
124 }
125
126 func (s *Stream) recvReply() (*vppReply, error) {
127         if s.conn == nil {
128                 return nil, errors.New("stream closed")
129         }
130         select {
131         case reply, ok := <-s.channel.replyChan:
132                 if !ok {
133                         return nil, fmt.Errorf("reply channel closed")
134                 }
135                 if reply.err != nil {
136                         // this case should actually never happen for stream
137                         // since reply.err is only filled in watchRequests
138                         // and stream does not use it
139                         return nil, reply.err
140                 }
141                 return reply, nil
142
143         case <-s.ctx.Done():
144                 return nil, s.ctx.Err()
145         }
146 }