3e8b307b47cab3c0402fd597b2c7695690d80527
[govpp.git] / proxy / server.go
1 //  Copyright (c) 2021 Cisco and/or its affiliates.
2 //
3 //  Licensed under the Apache License, Version 2.0 (the "License");
4 //  you may not use this file except in compliance with the License.
5 //  You may obtain a copy of the License at:
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 //  Unless required by applicable law or agreed to in writing, software
10 //  distributed under the License is distributed on an "AS IS" BASIS,
11 //  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 //  See the License for the specific language governing permissions and
13 //  limitations under the License.
14
15 package proxy
16
17 import (
18         "context"
19         "errors"
20         "fmt"
21         "reflect"
22         "sync"
23         "sync/atomic"
24         "time"
25
26         "git.fd.io/govpp.git/adapter"
27         "git.fd.io/govpp.git/api"
28         "git.fd.io/govpp.git/core"
29 )
30
31 const (
32         binapiErrorMsg = `
33 ------------------------------------------------------------
34  received binapi request while VPP connection is down!
35   - is VPP running ?
36   - have you called Connect on the binapi RPC ?
37 ------------------------------------------------------------
38 `
39         statsErrorMsg = `
40 ------------------------------------------------------------
41  received stats request while stats connection is down!
42   - is VPP running ?
43   - is the correct socket name configured ?
44   - have you called Connect on the stats RPC ?
45 ------------------------------------------------------------
46 `
47 )
48
49 type StatsRequest struct {
50         StatsType string
51 }
52
53 type StatsResponse struct {
54         SysStats   *api.SystemStats
55         NodeStats  *api.NodeStats
56         IfaceStats *api.InterfaceStats
57         ErrStats   *api.ErrorStats
58         BufStats   *api.BufferStats
59         MemStats   *api.MemoryStats
60 }
61
62 // StatsRPC is a RPC server for proxying client request to api.StatsProvider.
63 type StatsRPC struct {
64         statsConn *core.StatsConnection
65         stats     adapter.StatsAPI
66
67         done chan struct{}
68         // non-zero if the RPC service is available
69         available uint32
70         // non-zero if connected to stats file.
71         isConnected uint32
72         // synchronizes access to statsConn.
73         mu sync.Mutex
74 }
75
76 // NewStatsRPC returns new StatsRPC to be used as RPC server
77 // proxying request to given api.StatsProvider.
78 func NewStatsRPC(stats adapter.StatsAPI) (*StatsRPC, error) {
79         rpc := new(StatsRPC)
80         if err := rpc.connect(stats); err != nil {
81                 return nil, err
82         }
83         return rpc, nil
84 }
85
86 func (s *StatsRPC) watchConnection() {
87         heartbeatTicker := time.NewTicker(10 * time.Second).C
88         atomic.StoreUint32(&s.available, 1)
89         log.Debugln("enabling statsRPC service")
90
91         count := 0
92         prev := new(api.SystemStats)
93
94         s.mu.Lock()
95         if err := s.statsConn.GetSystemStats(prev); err != nil {
96                 atomic.StoreUint32(&s.available, 0)
97                 log.Warnf("disabling statsRPC service, reason: %v", err)
98         }
99         s.mu.Unlock()
100
101         for {
102                 select {
103                 case <-heartbeatTicker:
104                         // If disconnect was called exit.
105                         if atomic.LoadUint32(&s.isConnected) == 0 {
106                                 atomic.StoreUint32(&s.available, 0)
107                                 return
108                         }
109
110                         curr := new(api.SystemStats)
111
112                         s.mu.Lock()
113                         if err := s.statsConn.GetSystemStats(curr); err != nil {
114                                 atomic.StoreUint32(&s.available, 0)
115                                 log.Warnf("disabling statsRPC service, reason: %v", err)
116                         }
117                         s.mu.Unlock()
118
119                         if curr.Heartbeat <= prev.Heartbeat {
120                                 count++
121                                 // vpp might have crashed/reset... try reconnecting
122                                 if count == 5 {
123                                         count = 0
124                                         atomic.StoreUint32(&s.available, 0)
125                                         log.Warnln("disabling statsRPC service, reason: vpp might have crashed/reset...")
126                                         s.statsConn.Disconnect()
127                                         for {
128                                                 var err error
129                                                 s.statsConn, err = core.ConnectStats(s.stats)
130                                                 if err == nil {
131                                                         atomic.StoreUint32(&s.available, 1)
132                                                         log.Debugln("enabling statsRPC service")
133                                                         break
134                                                 }
135                                                 time.Sleep(5 * time.Second)
136                                         }
137                                 }
138                         } else {
139                                 count = 0
140                         }
141
142                         prev = curr
143                 case <-s.done:
144                         return
145                 }
146         }
147 }
148
149 func (s *StatsRPC) connect(stats adapter.StatsAPI) error {
150         if atomic.LoadUint32(&s.isConnected) == 1 {
151                 return errors.New("connection already exists")
152         }
153         s.stats = stats
154         var err error
155         s.statsConn, err = core.ConnectStats(s.stats)
156         if err != nil {
157                 return err
158         }
159         s.done = make(chan struct{})
160         atomic.StoreUint32(&s.isConnected, 1)
161
162         go s.watchConnection()
163         return nil
164 }
165
166 func (s *StatsRPC) disconnect() {
167         if atomic.LoadUint32(&s.isConnected) == 1 {
168                 atomic.StoreUint32(&s.isConnected, 0)
169                 close(s.done)
170                 s.statsConn.Disconnect()
171                 s.statsConn = nil
172         }
173 }
174
175 func (s *StatsRPC) serviceAvailable() bool {
176         return atomic.LoadUint32(&s.available) == 1
177 }
178
179 func (s *StatsRPC) GetStats(req StatsRequest, resp *StatsResponse) error {
180         if !s.serviceAvailable() {
181                 log.Print(statsErrorMsg)
182                 return errors.New("server does not support 'get stats' at this time, try again later")
183         }
184         log.Debugf("StatsRPC.GetStats - REQ: %+v", req)
185
186         s.mu.Lock()
187         defer s.mu.Unlock()
188
189         switch req.StatsType {
190         case "system":
191                 resp.SysStats = new(api.SystemStats)
192                 return s.statsConn.GetSystemStats(resp.SysStats)
193         case "node":
194                 resp.NodeStats = new(api.NodeStats)
195                 return s.statsConn.GetNodeStats(resp.NodeStats)
196         case "interface":
197                 resp.IfaceStats = new(api.InterfaceStats)
198                 return s.statsConn.GetInterfaceStats(resp.IfaceStats)
199         case "error":
200                 resp.ErrStats = new(api.ErrorStats)
201                 return s.statsConn.GetErrorStats(resp.ErrStats)
202         case "buffer":
203                 resp.BufStats = new(api.BufferStats)
204                 return s.statsConn.GetBufferStats(resp.BufStats)
205         case "memory":
206                 resp.MemStats = new(api.MemoryStats)
207                 return s.statsConn.GetMemoryStats(resp.MemStats)
208         default:
209                 return fmt.Errorf("unknown stats type: %s", req.StatsType)
210         }
211 }
212
213 type BinapiRequest struct {
214         Msg      api.Message
215         IsMulti  bool
216         ReplyMsg api.Message
217         Timeout  time.Duration
218 }
219
220 type BinapiResponse struct {
221         Msg  api.Message
222         Msgs []api.Message
223 }
224
225 type BinapiCompatibilityRequest struct {
226         MsgNameCrcs []string
227 }
228
229 type BinapiCompatibilityResponse struct {
230         CompatibleMsgs   map[string][]string
231         IncompatibleMsgs map[string][]string
232 }
233
234 // BinapiRPC is a RPC server for proxying client request to api.Channel
235 // or api.Stream.
236 type BinapiRPC struct {
237         binapiConn *core.Connection
238         binapi     adapter.VppAPI
239
240         streamsLock sync.Mutex
241         // local ID, different from api.Stream ID
242         maxStreamID uint32
243         streams     map[uint32]api.Stream
244
245         events chan core.ConnectionEvent
246         done   chan struct{}
247         // non-zero if the RPC service is available
248         available uint32
249         // non-zero if connected to vpp.
250         isConnected uint32
251 }
252
253 // NewBinapiRPC returns new BinapiRPC to be used as RPC server
254 // proxying request to given api.Channel.
255 func NewBinapiRPC(binapi adapter.VppAPI) (*BinapiRPC, error) {
256         rpc := new(BinapiRPC)
257         if err := rpc.connect(binapi); err != nil {
258                 return nil, err
259         }
260         return rpc, nil
261 }
262
263 func (s *BinapiRPC) watchConnection() {
264         for {
265                 select {
266                 case e := <-s.events:
267                         // If disconnect was called exit.
268                         if atomic.LoadUint32(&s.isConnected) == 0 {
269                                 atomic.StoreUint32(&s.available, 0)
270                                 return
271                         }
272
273                         switch e.State {
274                         case core.Connected:
275                                 if !s.serviceAvailable() {
276                                         atomic.StoreUint32(&s.available, 1)
277                                         log.Debugln("enabling binapiRPC service")
278                                 }
279                         case core.Disconnected:
280                                 if s.serviceAvailable() {
281                                         atomic.StoreUint32(&s.available, 0)
282                                         log.Warnf("disabling binapiRPC, reason: %v\n", e.Error)
283                                 }
284                         case core.Failed:
285                                 if s.serviceAvailable() {
286                                         atomic.StoreUint32(&s.available, 0)
287                                         log.Warnf("disabling binapiRPC, reason: %v\n", e.Error)
288                                 }
289                                 // vpp might have crashed/reset... reconnect
290                                 s.binapiConn.Disconnect()
291
292                                 var err error
293                                 s.binapiConn, s.events, err = core.AsyncConnect(s.binapi, 3, 5*time.Second)
294                                 if err != nil {
295                                         log.Println(err)
296                                 }
297                         }
298                 case <-s.done:
299                         return
300                 }
301         }
302 }
303
304 func (s *BinapiRPC) connect(binapi adapter.VppAPI) error {
305         if atomic.LoadUint32(&s.isConnected) == 1 {
306                 return errors.New("connection already exists")
307         }
308         s.binapi = binapi
309         var err error
310         s.binapiConn, s.events, err = core.AsyncConnect(binapi, 3, time.Second)
311         if err != nil {
312                 return err
313         }
314         s.done = make(chan struct{})
315         atomic.StoreUint32(&s.isConnected, 1)
316
317         go s.watchConnection()
318         return nil
319 }
320
321 func (s *BinapiRPC) disconnect() {
322         if atomic.LoadUint32(&s.isConnected) == 1 {
323                 atomic.StoreUint32(&s.isConnected, 0)
324                 close(s.done)
325                 s.binapiConn.Disconnect()
326                 s.binapiConn = nil
327         }
328 }
329
330 func (s *BinapiRPC) serviceAvailable() bool {
331         return atomic.LoadUint32(&s.available) == 1
332 }
333
334 type RPCStreamReqResp struct {
335         ID  uint32
336         Msg api.Message
337 }
338
339 func (s *BinapiRPC) NewAPIStream(req RPCStreamReqResp, resp *RPCStreamReqResp) error {
340         if !s.serviceAvailable() {
341                 log.Print(binapiErrorMsg)
342                 return errors.New("server does not support RPC calls at this time, try again later")
343         }
344         log.Debugf("BinapiRPC.NewAPIStream - REQ: %#v", req)
345
346         stream, err := s.binapiConn.NewStream(context.Background())
347         if err != nil {
348                 return err
349         }
350
351         if s.streams == nil {
352                 s.streams = make(map[uint32]api.Stream)
353         }
354
355         s.streamsLock.Lock()
356         s.maxStreamID++
357         s.streams[s.maxStreamID] = stream
358         resp.ID = s.maxStreamID
359         s.streamsLock.Unlock()
360
361         return nil
362 }
363
364 func (s *BinapiRPC) SendMessage(req RPCStreamReqResp, resp *RPCStreamReqResp) error {
365         if !s.serviceAvailable() {
366                 log.Print(binapiErrorMsg)
367                 return errors.New("server does not support RPC calls at this time, try again later")
368         }
369         log.Debugf("BinapiRPC.SendMessage - REQ: %#v", req)
370
371         stream, err := s.getStream(req.ID)
372         if err != nil {
373                 return err
374         }
375
376         return stream.SendMsg(req.Msg)
377 }
378
379 func (s *BinapiRPC) ReceiveMessage(req RPCStreamReqResp, resp *RPCStreamReqResp) error {
380         if !s.serviceAvailable() {
381                 log.Print(binapiErrorMsg)
382                 return errors.New("server does not support RPC calls at this time, try again later")
383         }
384         log.Debugf("BinapiRPC.ReceiveMessage - REQ: %#v", req)
385
386         stream, err := s.getStream(req.ID)
387         if err != nil {
388                 return err
389         }
390
391         resp.Msg, err = stream.RecvMsg()
392         return err
393 }
394
395 func (s *BinapiRPC) CloseStream(req RPCStreamReqResp, resp *RPCStreamReqResp) error {
396         if !s.serviceAvailable() {
397                 log.Print(binapiErrorMsg)
398                 return errors.New("server does not support RPC calls at this time, try again later")
399         }
400         log.Debugf("BinapiRPC.CloseStream - REQ: %#v", req)
401
402         stream, err := s.getStream(req.ID)
403         if err != nil {
404                 return err
405         }
406
407         s.streamsLock.Lock()
408         delete(s.streams, req.ID)
409         s.streamsLock.Unlock()
410
411         return stream.Close()
412 }
413
414 func (s *BinapiRPC) getStream(id uint32) (api.Stream, error) {
415         s.streamsLock.Lock()
416         stream := s.streams[id]
417         s.streamsLock.Unlock()
418
419         if stream == nil || reflect.ValueOf(stream).IsNil() {
420                 s.streamsLock.Lock()
421                 // delete the stream in case it is still in the map
422                 delete(s.streams, id)
423                 s.streamsLock.Unlock()
424                 return nil, errors.New("BinapiRPC stream closed")
425         }
426         return stream, nil
427 }
428
429 func (s *BinapiRPC) Invoke(req BinapiRequest, resp *BinapiResponse) error {
430         if !s.serviceAvailable() {
431                 log.Print(binapiErrorMsg)
432                 return errors.New("server does not support 'invoke' at this time, try again later")
433         }
434         log.Debugf("BinapiRPC.Invoke - REQ: %#v", req)
435
436         ch, err := s.binapiConn.NewAPIChannel()
437         if err != nil {
438                 return err
439         }
440         defer ch.Close()
441         ch.SetReplyTimeout(req.Timeout)
442
443         if req.IsMulti {
444                 multi := ch.SendMultiRequest(req.Msg)
445                 for {
446                         // create new message in response of type ReplyMsg
447                         msg := reflect.New(reflect.TypeOf(req.ReplyMsg).Elem()).Interface().(api.Message)
448
449                         stop, err := multi.ReceiveReply(msg)
450                         if err != nil {
451                                 return err
452                         } else if stop {
453                                 break
454                         }
455
456                         resp.Msgs = append(resp.Msgs, msg)
457                 }
458         } else {
459                 // create new message in response of type ReplyMsg
460                 resp.Msg = reflect.New(reflect.TypeOf(req.ReplyMsg).Elem()).Interface().(api.Message)
461
462                 err := ch.SendRequest(req.Msg).ReceiveReply(resp.Msg)
463                 if err != nil {
464                         return err
465                 }
466         }
467
468         return nil
469 }
470
471 func (s *BinapiRPC) Compatibility(req BinapiCompatibilityRequest, resp *BinapiCompatibilityResponse) error {
472         if !s.serviceAvailable() {
473                 log.Print(binapiErrorMsg)
474                 return errors.New("server does not support 'compatibility check' at this time, try again later")
475         }
476         log.Debugf("BinapiRPC.Compatiblity - REQ: %#v", req)
477
478         ch, err := s.binapiConn.NewAPIChannel()
479         if err != nil {
480                 return err
481         }
482         defer ch.Close()
483
484         resp.CompatibleMsgs = make(map[string][]string)
485         resp.IncompatibleMsgs = make(map[string][]string)
486
487         for path, messages := range api.GetRegisteredMessages() {
488                 resp.IncompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs))
489                 resp.CompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs))
490
491                 for _, msg := range req.MsgNameCrcs {
492                         val, ok := messages[msg]
493                         if !ok {
494                                 resp.IncompatibleMsgs[path] = append(resp.IncompatibleMsgs[path], msg)
495                                 continue
496                         }
497                         if err = ch.CheckCompatiblity(val); err != nil {
498                                 resp.IncompatibleMsgs[path] = append(resp.IncompatibleMsgs[path], msg)
499                         } else {
500                                 resp.CompatibleMsgs[path] = append(resp.CompatibleMsgs[path], msg)
501                         }
502                 }
503         }
504
505         compatible := false
506         for path, incompatibleMsgs := range resp.IncompatibleMsgs {
507                 if len(incompatibleMsgs) == 0 {
508                         compatible = true
509                 } else {
510                         log.Debugf("messages are incompatible for path %s", path)
511                 }
512         }
513         if !compatible {
514                 return errors.New("compatibility check failed")
515         }
516
517         return nil
518 }