d1c5d73d6896e88b0a46c654ee71aca45ac8af98
[govpp.git] / proxy / client.go
1 //  Copyright (c) 2021 Cisco and/or its affiliates.
2 //
3 //  Licensed under the Apache License, Version 2.0 (the "License");
4 //  you may not use this file except in compliance with the License.
5 //  You may obtain a copy of the License at:
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 //  Unless required by applicable law or agreed to in writing, software
10 //  distributed under the License is distributed on an "AS IS" BASIS,
11 //  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 //  See the License for the specific language governing permissions and
13 //  limitations under the License.
14
15 package proxy
16
17 import (
18         "context"
19         "fmt"
20         "net/rpc"
21         "reflect"
22         "time"
23
24         "git.fd.io/govpp.git/api"
25         "git.fd.io/govpp.git/core"
26 )
27
28 type Client struct {
29         serverAddr string
30         rpc        *rpc.Client
31 }
32
33 // Connect dials remote proxy server on given address and
34 // returns new client if successful.
35 func Connect(addr string) (*Client, error) {
36         client, err := rpc.DialHTTP("tcp", addr)
37         if err != nil {
38                 return nil, fmt.Errorf("connection error:%v", err)
39         }
40         c := &Client{
41                 serverAddr: addr,
42                 rpc:        client,
43         }
44         return c, nil
45 }
46
47 // NewStatsClient returns new StatsClient which implements api.StatsProvider.
48 func (c *Client) NewStatsClient() (*StatsClient, error) {
49         stats := &StatsClient{
50                 rpc: c.rpc,
51         }
52         return stats, nil
53 }
54
55 // NewBinapiClient returns new BinapiClient which implements api.Channel.
56 func (c *Client) NewBinapiClient() (*BinapiClient, error) {
57         binapi := &BinapiClient{
58                 rpc:     c.rpc,
59                 timeout: core.DefaultReplyTimeout,
60         }
61         return binapi, nil
62 }
63
64 type StatsClient struct {
65         rpc *rpc.Client
66 }
67
68 func (s *StatsClient) GetSystemStats(sysStats *api.SystemStats) error {
69         // we need to start with a clean, zeroed item before decoding
70         // 'cause if the new values are 'zero' for the type, they will be ignored
71         // by the decoder. (i.e the old values will be left unchanged).
72         req := StatsRequest{StatsType: "system"}
73         resp := StatsResponse{SysStats: new(api.SystemStats)}
74         if err := s.rpc.Call("StatsRPC.GetStats", req, &resp); err != nil {
75                 return err
76         }
77         *sysStats = *resp.SysStats
78         return nil
79 }
80
81 func (s *StatsClient) GetNodeStats(nodeStats *api.NodeStats) error {
82         req := StatsRequest{StatsType: "node"}
83         resp := StatsResponse{NodeStats: new(api.NodeStats)}
84         if err := s.rpc.Call("StatsRPC.GetStats", req, &resp); err != nil {
85                 return err
86         }
87         *nodeStats = *resp.NodeStats
88         return nil
89 }
90
91 func (s *StatsClient) GetInterfaceStats(ifaceStats *api.InterfaceStats) error {
92         req := StatsRequest{StatsType: "interface"}
93         resp := StatsResponse{IfaceStats: new(api.InterfaceStats)}
94         if err := s.rpc.Call("StatsRPC.GetStats", req, &resp); err != nil {
95                 return err
96         }
97         *ifaceStats = *resp.IfaceStats
98         return nil
99 }
100
101 func (s *StatsClient) GetErrorStats(errStats *api.ErrorStats) error {
102         req := StatsRequest{StatsType: "error"}
103         resp := StatsResponse{ErrStats: new(api.ErrorStats)}
104         if err := s.rpc.Call("StatsRPC.GetStats", req, &resp); err != nil {
105                 return err
106         }
107         *errStats = *resp.ErrStats
108         return nil
109 }
110
111 func (s *StatsClient) GetBufferStats(bufStats *api.BufferStats) error {
112         req := StatsRequest{StatsType: "buffer"}
113         resp := StatsResponse{BufStats: new(api.BufferStats)}
114         if err := s.rpc.Call("StatsRPC.GetStats", req, &resp); err != nil {
115                 return err
116         }
117         *bufStats = *resp.BufStats
118         return nil
119 }
120
121 func (s *StatsClient) GetMemoryStats(memStats *api.MemoryStats) error {
122         req := StatsRequest{StatsType: "memory"}
123         resp := StatsResponse{MemStats: new(api.MemoryStats)}
124         if err := s.rpc.Call("StatsRPC.GetStats", req, &resp); err != nil {
125                 return err
126         }
127         *memStats = *resp.MemStats
128         return nil
129 }
130
131 type BinapiClient struct {
132         rpc     *rpc.Client
133         timeout time.Duration
134 }
135
136 // RPCStream is a stream for forwarding requests to BinapiRPC's stream.
137 type RPCStream struct {
138         rpc *rpc.Client
139         id  uint32
140 }
141
142 func (s *RPCStream) SendMsg(msg api.Message) error {
143         req := RPCStreamReqResp{
144                 ID:  s.id,
145                 Msg: msg,
146         }
147         resp := RPCStreamReqResp{}
148         err := s.rpc.Call("BinapiRPC.SendMessage", req, &resp)
149         if err != nil {
150                 return fmt.Errorf("RPC SendMessage call failed: %v", err)
151         }
152         return nil
153 }
154
155 func (s *RPCStream) RecvMsg() (api.Message, error) {
156         req := RPCStreamReqResp{
157                 ID: s.id,
158         }
159         resp := RPCStreamReqResp{}
160         err := s.rpc.Call("BinapiRPC.ReceiveMessage", req, &resp)
161         if err != nil {
162                 return nil, fmt.Errorf("RPC ReceiveMessage call failed: %v", err)
163         }
164         return resp.Msg, nil
165 }
166
167 func (s *RPCStream) Close() error {
168         req := RPCStreamReqResp{
169                 ID: s.id,
170         }
171         resp := RPCStreamReqResp{}
172         err := s.rpc.Call("BinapiRPC.CloseStream", req, &resp)
173         if err != nil {
174                 return fmt.Errorf("RPC CloseStream call failed: %v", err)
175         }
176         return nil
177 }
178
179 func (b *BinapiClient) NewStream(_ context.Context, _ ...api.StreamOption) (api.Stream, error) {
180         stream := &RPCStream{
181                 rpc: b.rpc,
182         }
183         req := RPCStreamReqResp{}
184         resp := RPCStreamReqResp{}
185         err := stream.rpc.Call("BinapiRPC.NewAPIStream", req, &resp)
186         if err != nil {
187                 return nil, fmt.Errorf("RPC NewAPIStream call failed: %v", err)
188         }
189         stream.id = resp.ID
190         return stream, err
191 }
192
193 func (b *BinapiClient) Invoke(_ context.Context, request api.Message, reply api.Message) error {
194         return invokeInternal(b.rpc, request, reply, b.timeout)
195 }
196
197 func (b *BinapiClient) SendRequest(msg api.Message) api.RequestCtx {
198         req := &requestCtx{
199                 rpc:     b.rpc,
200                 timeout: b.timeout,
201                 req:     msg,
202         }
203         log.Debugf("SendRequest: %T %+v", msg, msg)
204         return req
205 }
206
207 type requestCtx struct {
208         rpc     *rpc.Client
209         req     api.Message
210         timeout time.Duration
211 }
212
213 func (r *requestCtx) ReceiveReply(msg api.Message) error {
214         return invokeInternal(r.rpc, r.req, msg, r.timeout)
215 }
216
217 func invokeInternal(rpc *rpc.Client, msgIn, msgOut api.Message, timeout time.Duration) error {
218         req := BinapiRequest{
219                 Msg:      msgIn,
220                 ReplyMsg: msgOut,
221                 Timeout:  timeout,
222         }
223         resp := BinapiResponse{}
224
225         err := rpc.Call("BinapiRPC.Invoke", req, &resp)
226         if err != nil {
227                 return fmt.Errorf("RPC call failed: %v", err)
228         }
229
230         // we set the value of msg to the value from response
231         reflect.ValueOf(msgOut).Elem().Set(reflect.ValueOf(resp.Msg).Elem())
232
233         return nil
234 }
235
236 func (b *BinapiClient) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
237         req := &multiRequestCtx{
238                 rpc:     b.rpc,
239                 timeout: b.timeout,
240                 req:     msg,
241         }
242         log.Debugf("SendMultiRequest: %T %+v", msg, msg)
243         return req
244 }
245
246 type multiRequestCtx struct {
247         rpc     *rpc.Client
248         req     api.Message
249         timeout time.Duration
250
251         index   int
252         replies []api.Message
253 }
254
255 func (r *multiRequestCtx) ReceiveReply(msg api.Message) (stop bool, err error) {
256         // we call Invoke only on first ReceiveReply
257         if r.index == 0 {
258                 req := BinapiRequest{
259                         Msg:      r.req,
260                         ReplyMsg: msg,
261                         IsMulti:  true,
262                         Timeout:  r.timeout,
263                 }
264                 resp := BinapiResponse{}
265
266                 err := r.rpc.Call("BinapiRPC.Invoke", req, &resp)
267                 if err != nil {
268                         return false, fmt.Errorf("RPC call failed: %v", err)
269                 }
270
271                 r.replies = resp.Msgs
272         }
273
274         if r.index >= len(r.replies) {
275                 return true, nil
276         }
277
278         // we set the value of msg to the value from response
279         reflect.ValueOf(msg).Elem().Set(reflect.ValueOf(r.replies[r.index]).Elem())
280         r.index++
281
282         return false, nil
283 }
284
285 func (b *BinapiClient) SubscribeNotification(notifChan chan api.Message, event api.Message) (api.SubscriptionCtx, error) {
286         panic("implement me")
287 }
288
289 func (b *BinapiClient) SetReplyTimeout(timeout time.Duration) {
290         b.timeout = timeout
291 }
292
293 func (b *BinapiClient) CheckCompatiblity(msgs ...api.Message) error {
294         msgNamesCrscs := make([]string, 0, len(msgs))
295
296         for _, msg := range msgs {
297                 msgNamesCrscs = append(msgNamesCrscs, msg.GetMessageName()+"_"+msg.GetCrcString())
298         }
299
300         req := BinapiCompatibilityRequest{MsgNameCrcs: msgNamesCrscs}
301         resp := BinapiCompatibilityResponse{}
302
303         if err := b.rpc.Call("BinapiRPC.Compatibility", req, &resp); err != nil {
304                 return err
305         }
306
307         return nil
308 }
309
310 func (b *BinapiClient) Close() {
311         b.rpc.Close()
312 }