Stats API: added GetMemory()
[govpp.git] / proxy / server.go
1 //  Copyright (c) 2019 Cisco and/or its affiliates.
2 //
3 //  Licensed under the Apache License, Version 2.0 (the "License");
4 //  you may not use this file except in compliance with the License.
5 //  You may obtain a copy of the License at:
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 //  Unless required by applicable law or agreed to in writing, software
10 //  distributed under the License is distributed on an "AS IS" BASIS,
11 //  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 //  See the License for the specific language governing permissions and
13 //  limitations under the License.
14
15 package proxy
16
17 import (
18         "errors"
19         "fmt"
20         "reflect"
21         "sync"
22         "sync/atomic"
23         "time"
24
25         "git.fd.io/govpp.git/adapter"
26         "git.fd.io/govpp.git/api"
27         "git.fd.io/govpp.git/core"
28 )
29
30 const (
31         binapiErrorMsg = `
32 ------------------------------------------------------------
33  received binapi request while VPP connection is down!
34   - is VPP running ?
35   - have you called Connect on the binapi RPC ?
36 ------------------------------------------------------------
37 `
38         statsErrorMsg = `
39 ------------------------------------------------------------
40  received stats request while stats connection is down!
41   - is VPP running ?
42   - is the correct socket name configured ?
43   - have you called Connect on the stats RPC ?
44 ------------------------------------------------------------
45 `
46 )
47
48 type StatsRequest struct {
49         StatsType string
50 }
51
52 type StatsResponse struct {
53         SysStats   *api.SystemStats
54         NodeStats  *api.NodeStats
55         IfaceStats *api.InterfaceStats
56         ErrStats   *api.ErrorStats
57         BufStats   *api.BufferStats
58         MemStats   *api.MemoryStats
59 }
60
61 // StatsRPC is a RPC server for proxying client request to api.StatsProvider.
62 type StatsRPC struct {
63         statsConn *core.StatsConnection
64         stats     adapter.StatsAPI
65
66         done chan struct{}
67         // non-zero if the RPC service is available
68         available uint32
69         // non-zero if connected to stats file.
70         isConnected uint32
71         // synchronizes access to statsConn.
72         mu sync.Mutex
73 }
74
75 // NewStatsRPC returns new StatsRPC to be used as RPC server
76 // proxying request to given api.StatsProvider.
77 func NewStatsRPC(stats adapter.StatsAPI) (*StatsRPC, error) {
78         rpc := new(StatsRPC)
79         if err := rpc.connect(stats); err != nil {
80                 return nil, err
81         }
82         return rpc, nil
83 }
84
85 func (s *StatsRPC) watchConnection() {
86         heartbeatTicker := time.NewTicker(10 * time.Second).C
87         atomic.StoreUint32(&s.available, 1)
88         log.Debugln("enabling statsRPC service")
89
90         count := 0
91         prev := new(api.SystemStats)
92
93         s.mu.Lock()
94         if err := s.statsConn.GetSystemStats(prev); err != nil {
95                 atomic.StoreUint32(&s.available, 0)
96                 log.Warnf("disabling statsRPC service, reason: %v", err)
97         }
98         s.mu.Unlock()
99
100         for {
101                 select {
102                 case <-heartbeatTicker:
103                         // If disconnect was called exit.
104                         if atomic.LoadUint32(&s.isConnected) == 0 {
105                                 atomic.StoreUint32(&s.available, 0)
106                                 return
107                         }
108
109                         curr := new(api.SystemStats)
110
111                         s.mu.Lock()
112                         if err := s.statsConn.GetSystemStats(curr); err != nil {
113                                 atomic.StoreUint32(&s.available, 0)
114                                 log.Warnf("disabling statsRPC service, reason: %v", err)
115                         }
116                         s.mu.Unlock()
117
118                         if curr.Heartbeat <= prev.Heartbeat {
119                                 count++
120                                 // vpp might have crashed/reset... try reconnecting
121                                 if count == 5 {
122                                         count = 0
123                                         atomic.StoreUint32(&s.available, 0)
124                                         log.Warnln("disabling statsRPC service, reason: vpp might have crashed/reset...")
125                                         s.statsConn.Disconnect()
126                                         for {
127                                                 var err error
128                                                 s.statsConn, err = core.ConnectStats(s.stats)
129                                                 if err == nil {
130                                                         atomic.StoreUint32(&s.available, 1)
131                                                         log.Debugln("enabling statsRPC service")
132                                                         break
133                                                 }
134                                                 time.Sleep(5 * time.Second)
135                                         }
136                                 }
137                         } else {
138                                 count = 0
139                         }
140
141                         prev = curr
142                 case <-s.done:
143                         return
144                 }
145         }
146 }
147
148 func (s *StatsRPC) connect(stats adapter.StatsAPI) error {
149         if atomic.LoadUint32(&s.isConnected) == 1 {
150                 return errors.New("connection already exists")
151         }
152         s.stats = stats
153         var err error
154         s.statsConn, err = core.ConnectStats(s.stats)
155         if err != nil {
156                 return err
157         }
158         s.done = make(chan struct{})
159         atomic.StoreUint32(&s.isConnected, 1)
160
161         go s.watchConnection()
162         return nil
163 }
164
165 func (s *StatsRPC) disconnect() {
166         if atomic.LoadUint32(&s.isConnected) == 1 {
167                 atomic.StoreUint32(&s.isConnected, 0)
168                 close(s.done)
169                 s.statsConn.Disconnect()
170                 s.statsConn = nil
171         }
172 }
173
174 func (s *StatsRPC) serviceAvailable() bool {
175         return atomic.LoadUint32(&s.available) == 1
176 }
177
178 func (s *StatsRPC) GetStats(req StatsRequest, resp *StatsResponse) error {
179         if !s.serviceAvailable() {
180                 log.Println(statsErrorMsg)
181                 return errors.New("server does not support 'get stats' at this time, try again later")
182         }
183         log.Debugf("StatsRPC.GetStats - REQ: %+v", req)
184
185         s.mu.Lock()
186         defer s.mu.Unlock()
187
188         switch req.StatsType {
189         case "system":
190                 resp.SysStats = new(api.SystemStats)
191                 return s.statsConn.GetSystemStats(resp.SysStats)
192         case "node":
193                 resp.NodeStats = new(api.NodeStats)
194                 return s.statsConn.GetNodeStats(resp.NodeStats)
195         case "interface":
196                 resp.IfaceStats = new(api.InterfaceStats)
197                 return s.statsConn.GetInterfaceStats(resp.IfaceStats)
198         case "error":
199                 resp.ErrStats = new(api.ErrorStats)
200                 return s.statsConn.GetErrorStats(resp.ErrStats)
201         case "buffer":
202                 resp.BufStats = new(api.BufferStats)
203                 return s.statsConn.GetBufferStats(resp.BufStats)
204         case "memory":
205                 resp.MemStats = new(api.MemoryStats)
206                 return s.statsConn.GetMemoryStats(resp.MemStats)
207         default:
208                 return fmt.Errorf("unknown stats type: %s", req.StatsType)
209         }
210 }
211
212 type BinapiRequest struct {
213         Msg      api.Message
214         IsMulti  bool
215         ReplyMsg api.Message
216         Timeout  time.Duration
217 }
218
219 type BinapiResponse struct {
220         Msg  api.Message
221         Msgs []api.Message
222 }
223
224 type BinapiCompatibilityRequest struct {
225         MsgNameCrcs []string
226 }
227
228 type BinapiCompatibilityResponse struct {
229         CompatibleMsgs   []string
230         IncompatibleMsgs []string
231 }
232
233 // BinapiRPC is a RPC server for proxying client request to api.Channel.
234 type BinapiRPC struct {
235         binapiConn *core.Connection
236         binapi     adapter.VppAPI
237
238         events chan core.ConnectionEvent
239         done   chan struct{}
240         // non-zero if the RPC service is available
241         available uint32
242         // non-zero if connected to vpp.
243         isConnected uint32
244 }
245
246 // NewBinapiRPC returns new BinapiRPC to be used as RPC server
247 // proxying request to given api.Channel.
248 func NewBinapiRPC(binapi adapter.VppAPI) (*BinapiRPC, error) {
249         rpc := new(BinapiRPC)
250         if err := rpc.connect(binapi); err != nil {
251                 return nil, err
252         }
253         return rpc, nil
254 }
255
256 func (s *BinapiRPC) watchConnection() {
257         for {
258                 select {
259                 case e := <-s.events:
260                         // If disconnect was called exit.
261                         if atomic.LoadUint32(&s.isConnected) == 0 {
262                                 atomic.StoreUint32(&s.available, 0)
263                                 return
264                         }
265
266                         switch e.State {
267                         case core.Connected:
268                                 if !s.serviceAvailable() {
269                                         atomic.StoreUint32(&s.available, 1)
270                                         log.Debugln("enabling binapiRPC service")
271                                 }
272                         case core.Disconnected:
273                                 if s.serviceAvailable() {
274                                         atomic.StoreUint32(&s.available, 0)
275                                         log.Warnf("disabling binapiRPC, reason: %v\n", e.Error)
276                                 }
277                         case core.Failed:
278                                 if s.serviceAvailable() {
279                                         atomic.StoreUint32(&s.available, 0)
280                                         log.Warnf("disabling binapiRPC, reason: %v\n", e.Error)
281                                 }
282                                 // vpp might have crashed/reset... reconnect
283                                 s.binapiConn.Disconnect()
284
285                                 var err error
286                                 s.binapiConn, s.events, err = core.AsyncConnect(s.binapi, 3, 5*time.Second)
287                                 if err != nil {
288                                         log.Println(err)
289                                 }
290                         }
291                 case <-s.done:
292                         return
293                 }
294         }
295 }
296
297 func (s *BinapiRPC) connect(binapi adapter.VppAPI) error {
298         if atomic.LoadUint32(&s.isConnected) == 1 {
299                 return errors.New("connection already exists")
300         }
301         s.binapi = binapi
302         var err error
303         s.binapiConn, s.events, err = core.AsyncConnect(binapi, 3, time.Second)
304         if err != nil {
305                 return err
306         }
307         s.done = make(chan struct{})
308         atomic.StoreUint32(&s.isConnected, 1)
309
310         go s.watchConnection()
311         return nil
312 }
313
314 func (s *BinapiRPC) disconnect() {
315         if atomic.LoadUint32(&s.isConnected) == 1 {
316                 atomic.StoreUint32(&s.isConnected, 0)
317                 close(s.done)
318                 s.binapiConn.Disconnect()
319                 s.binapiConn = nil
320         }
321 }
322
323 func (s *BinapiRPC) serviceAvailable() bool {
324         return atomic.LoadUint32(&s.available) == 1
325 }
326
327 func (s *BinapiRPC) Invoke(req BinapiRequest, resp *BinapiResponse) error {
328         if !s.serviceAvailable() {
329                 log.Println(binapiErrorMsg)
330                 return errors.New("server does not support 'invoke' at this time, try again later")
331         }
332         log.Debugf("BinapiRPC.Invoke - REQ: %#v", req)
333
334         ch, err := s.binapiConn.NewAPIChannel()
335         if err != nil {
336                 return err
337         }
338         defer ch.Close()
339         ch.SetReplyTimeout(req.Timeout)
340
341         if req.IsMulti {
342                 multi := ch.SendMultiRequest(req.Msg)
343                 for {
344                         // create new message in response of type ReplyMsg
345                         msg := reflect.New(reflect.TypeOf(req.ReplyMsg).Elem()).Interface().(api.Message)
346
347                         stop, err := multi.ReceiveReply(msg)
348                         if err != nil {
349                                 return err
350                         } else if stop {
351                                 break
352                         }
353
354                         resp.Msgs = append(resp.Msgs, msg)
355                 }
356         } else {
357                 // create new message in response of type ReplyMsg
358                 resp.Msg = reflect.New(reflect.TypeOf(req.ReplyMsg).Elem()).Interface().(api.Message)
359
360                 err := ch.SendRequest(req.Msg).ReceiveReply(resp.Msg)
361                 if err != nil {
362                         return err
363                 }
364         }
365
366         return nil
367 }
368
369 func (s *BinapiRPC) Compatibility(req BinapiCompatibilityRequest, resp *BinapiCompatibilityResponse) error {
370         if !s.serviceAvailable() {
371                 log.Println(binapiErrorMsg)
372                 return errors.New("server does not support 'compatibility check' at this time, try again later")
373         }
374         log.Debugf("BinapiRPC.Compatiblity - REQ: %#v", req)
375
376         ch, err := s.binapiConn.NewAPIChannel()
377         if err != nil {
378                 return err
379         }
380         defer ch.Close()
381
382         resp.CompatibleMsgs = make([]string, 0, len(req.MsgNameCrcs))
383         resp.IncompatibleMsgs = make([]string, 0, len(req.MsgNameCrcs))
384
385         for _, msg := range req.MsgNameCrcs {
386                 val, ok := api.GetRegisteredMessages()[msg]
387                 if !ok {
388                         resp.IncompatibleMsgs = append(resp.IncompatibleMsgs, msg)
389                         continue
390                 }
391
392                 if err = ch.CheckCompatiblity(val); err != nil {
393                         resp.IncompatibleMsgs = append(resp.IncompatibleMsgs, msg)
394                 } else {
395                         resp.CompatibleMsgs = append(resp.CompatibleMsgs, msg)
396                 }
397         }
398
399         if len(resp.IncompatibleMsgs) > 0 {
400                 return fmt.Errorf("compatibility check failed for messages: %v", resp.IncompatibleMsgs)
401         }
402
403         return nil
404 }