50a00774d63ea24ea8362bef0553ab82006d30a7
[govpp.git] / proxy / server.go
1 //  Copyright (c) 2019 Cisco and/or its affiliates.
2 //
3 //  Licensed under the Apache License, Version 2.0 (the "License");
4 //  you may not use this file except in compliance with the License.
5 //  You may obtain a copy of the License at:
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 //  Unless required by applicable law or agreed to in writing, software
10 //  distributed under the License is distributed on an "AS IS" BASIS,
11 //  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 //  See the License for the specific language governing permissions and
13 //  limitations under the License.
14
15 package proxy
16
17 import (
18         "errors"
19         "fmt"
20         "reflect"
21         "sync"
22         "sync/atomic"
23         "time"
24
25         "git.fd.io/govpp.git/adapter"
26         "git.fd.io/govpp.git/api"
27         "git.fd.io/govpp.git/core"
28 )
29
30 const (
31         binapiErrorMsg = `
32 ------------------------------------------------------------
33  received binapi request while VPP connection is down!
34   - is VPP running ?
35   - have you called Connect on the binapi RPC ?
36 ------------------------------------------------------------
37 `
38         statsErrorMsg = `
39 ------------------------------------------------------------
40  received stats request while stats connection is down!
41   - is VPP running ?
42   - is the correct socket name configured ?
43   - have you called Connect on the stats RPC ?
44 ------------------------------------------------------------
45 `
46 )
47
48 type StatsRequest struct {
49         StatsType string
50 }
51
52 type StatsResponse struct {
53         SysStats   *api.SystemStats
54         NodeStats  *api.NodeStats
55         IfaceStats *api.InterfaceStats
56         ErrStats   *api.ErrorStats
57         BufStats   *api.BufferStats
58 }
59
60 // StatsRPC is a RPC server for proxying client request to api.StatsProvider.
61 type StatsRPC struct {
62         statsConn *core.StatsConnection
63         stats     adapter.StatsAPI
64
65         done chan struct{}
66         // non-zero if the RPC service is available
67         available uint32
68         // non-zero if connected to stats file.
69         isConnected uint32
70         // synchronizes access to statsConn.
71         mu sync.Mutex
72 }
73
74 // NewStatsRPC returns new StatsRPC to be used as RPC server
75 // proxying request to given api.StatsProvider.
76 func NewStatsRPC(stats adapter.StatsAPI) (*StatsRPC, error) {
77         rpc := new(StatsRPC)
78         if err := rpc.connect(stats); err != nil {
79                 return nil, err
80         }
81         return rpc, nil
82 }
83
84 func (s *StatsRPC) watchConnection() {
85         heartbeatTicker := time.NewTicker(10 * time.Second).C
86         atomic.StoreUint32(&s.available, 1)
87         log.Debugln("enabling statsRPC service")
88
89         count := 0
90         prev := new(api.SystemStats)
91
92         s.mu.Lock()
93         if err := s.statsConn.GetSystemStats(prev); err != nil {
94                 atomic.StoreUint32(&s.available, 0)
95                 log.Warnf("disabling statsRPC service, reason: %v", err)
96         }
97         s.mu.Unlock()
98
99         for {
100                 select {
101                 case <-heartbeatTicker:
102                         // If disconnect was called exit.
103                         if atomic.LoadUint32(&s.isConnected) == 0 {
104                                 atomic.StoreUint32(&s.available, 0)
105                                 return
106                         }
107
108                         curr := new(api.SystemStats)
109
110                         s.mu.Lock()
111                         if err := s.statsConn.GetSystemStats(curr); err != nil {
112                                 atomic.StoreUint32(&s.available, 0)
113                                 log.Warnf("disabling statsRPC service, reason: %v", err)
114                         }
115                         s.mu.Unlock()
116
117                         if curr.Heartbeat <= prev.Heartbeat {
118                                 count++
119                                 // vpp might have crashed/reset... try reconnecting
120                                 if count == 5 {
121                                         count = 0
122                                         atomic.StoreUint32(&s.available, 0)
123                                         log.Warnln("disabling statsRPC service, reason: vpp might have crashed/reset...")
124                                         s.statsConn.Disconnect()
125                                         for {
126                                                 var err error
127                                                 s.statsConn, err = core.ConnectStats(s.stats)
128                                                 if err == nil {
129                                                         atomic.StoreUint32(&s.available, 1)
130                                                         log.Debugln("enabling statsRPC service")
131                                                         break
132                                                 }
133                                                 time.Sleep(5 * time.Second)
134                                         }
135                                 }
136                         } else {
137                                 count = 0
138                         }
139
140                         prev = curr
141                 case <-s.done:
142                         return
143                 }
144         }
145 }
146
147 func (s *StatsRPC) connect(stats adapter.StatsAPI) error {
148         if atomic.LoadUint32(&s.isConnected) == 1 {
149                 return errors.New("connection already exists")
150         }
151         s.stats = stats
152         var err error
153         s.statsConn, err = core.ConnectStats(s.stats)
154         if err != nil {
155                 return err
156         }
157         s.done = make(chan struct{})
158         atomic.StoreUint32(&s.isConnected, 1)
159
160         go s.watchConnection()
161         return nil
162 }
163
164 func (s *StatsRPC) disconnect() {
165         if atomic.LoadUint32(&s.isConnected) == 1 {
166                 atomic.StoreUint32(&s.isConnected, 0)
167                 close(s.done)
168                 s.statsConn.Disconnect()
169                 s.statsConn = nil
170         }
171 }
172
173 func (s *StatsRPC) serviceAvailable() bool {
174         return atomic.LoadUint32(&s.available) == 1
175 }
176
177 func (s *StatsRPC) GetStats(req StatsRequest, resp *StatsResponse) error {
178         if !s.serviceAvailable() {
179                 log.Println(statsErrorMsg)
180                 return errors.New("server does not support 'get stats' at this time, try again later")
181         }
182         log.Debugf("StatsRPC.GetStats - REQ: %+v", req)
183
184         s.mu.Lock()
185         defer s.mu.Unlock()
186
187         switch req.StatsType {
188         case "system":
189                 resp.SysStats = new(api.SystemStats)
190                 return s.statsConn.GetSystemStats(resp.SysStats)
191         case "node":
192                 resp.NodeStats = new(api.NodeStats)
193                 return s.statsConn.GetNodeStats(resp.NodeStats)
194         case "interface":
195                 resp.IfaceStats = new(api.InterfaceStats)
196                 return s.statsConn.GetInterfaceStats(resp.IfaceStats)
197         case "error":
198                 resp.ErrStats = new(api.ErrorStats)
199                 return s.statsConn.GetErrorStats(resp.ErrStats)
200         case "buffer":
201                 resp.BufStats = new(api.BufferStats)
202                 return s.statsConn.GetBufferStats(resp.BufStats)
203         default:
204                 return fmt.Errorf("unknown stats type: %s", req.StatsType)
205         }
206 }
207
208 type BinapiRequest struct {
209         Msg      api.Message
210         IsMulti  bool
211         ReplyMsg api.Message
212         Timeout  time.Duration
213 }
214
215 type BinapiResponse struct {
216         Msg  api.Message
217         Msgs []api.Message
218 }
219
220 type BinapiCompatibilityRequest struct {
221         MsgNameCrcs []string
222 }
223
224 type BinapiCompatibilityResponse struct {
225         CompatibleMsgs   []string
226         IncompatibleMsgs []string
227 }
228
229 // BinapiRPC is a RPC server for proxying client request to api.Channel.
230 type BinapiRPC struct {
231         binapiConn *core.Connection
232         binapi     adapter.VppAPI
233
234         events chan core.ConnectionEvent
235         done   chan struct{}
236         // non-zero if the RPC service is available
237         available uint32
238         // non-zero if connected to vpp.
239         isConnected uint32
240 }
241
242 // NewBinapiRPC returns new BinapiRPC to be used as RPC server
243 // proxying request to given api.Channel.
244 func NewBinapiRPC(binapi adapter.VppAPI) (*BinapiRPC, error) {
245         rpc := new(BinapiRPC)
246         if err := rpc.connect(binapi); err != nil {
247                 return nil, err
248         }
249         return rpc, nil
250 }
251
252 func (s *BinapiRPC) watchConnection() {
253         for {
254                 select {
255                 case e := <-s.events:
256                         // If disconnect was called exit.
257                         if atomic.LoadUint32(&s.isConnected) == 0 {
258                                 atomic.StoreUint32(&s.available, 0)
259                                 return
260                         }
261
262                         switch e.State {
263                         case core.Connected:
264                                 if !s.serviceAvailable() {
265                                         atomic.StoreUint32(&s.available, 1)
266                                         log.Debugln("enabling binapiRPC service")
267                                 }
268                         case core.Disconnected:
269                                 if s.serviceAvailable() {
270                                         atomic.StoreUint32(&s.available, 0)
271                                         log.Warnf("disabling binapiRPC, reason: %v\n", e.Error)
272                                 }
273                         case core.Failed:
274                                 if s.serviceAvailable() {
275                                         atomic.StoreUint32(&s.available, 0)
276                                         log.Warnf("disabling binapiRPC, reason: %v\n", e.Error)
277                                 }
278                                 // vpp might have crashed/reset... reconnect
279                                 s.binapiConn.Disconnect()
280
281                                 var err error
282                                 s.binapiConn, s.events, err = core.AsyncConnect(s.binapi, 3, 5*time.Second)
283                                 if err != nil {
284                                         log.Println(err)
285                                 }
286                         }
287                 case <-s.done:
288                         return
289                 }
290         }
291 }
292
293 func (s *BinapiRPC) connect(binapi adapter.VppAPI) error {
294         if atomic.LoadUint32(&s.isConnected) == 1 {
295                 return errors.New("connection already exists")
296         }
297         s.binapi = binapi
298         var err error
299         s.binapiConn, s.events, err = core.AsyncConnect(binapi, 3, time.Second)
300         if err != nil {
301                 return err
302         }
303         s.done = make(chan struct{})
304         atomic.StoreUint32(&s.isConnected, 1)
305
306         go s.watchConnection()
307         return nil
308 }
309
310 func (s *BinapiRPC) disconnect() {
311         if atomic.LoadUint32(&s.isConnected) == 1 {
312                 atomic.StoreUint32(&s.isConnected, 0)
313                 close(s.done)
314                 s.binapiConn.Disconnect()
315                 s.binapiConn = nil
316         }
317 }
318
319 func (s *BinapiRPC) serviceAvailable() bool {
320         return atomic.LoadUint32(&s.available) == 1
321 }
322
323 func (s *BinapiRPC) Invoke(req BinapiRequest, resp *BinapiResponse) error {
324         if !s.serviceAvailable() {
325                 log.Println(binapiErrorMsg)
326                 return errors.New("server does not support 'invoke' at this time, try again later")
327         }
328         log.Debugf("BinapiRPC.Invoke - REQ: %#v", req)
329
330         ch, err := s.binapiConn.NewAPIChannel()
331         if err != nil {
332                 return err
333         }
334         defer ch.Close()
335         ch.SetReplyTimeout(req.Timeout)
336
337         if req.IsMulti {
338                 multi := ch.SendMultiRequest(req.Msg)
339                 for {
340                         // create new message in response of type ReplyMsg
341                         msg := reflect.New(reflect.TypeOf(req.ReplyMsg).Elem()).Interface().(api.Message)
342
343                         stop, err := multi.ReceiveReply(msg)
344                         if err != nil {
345                                 return err
346                         } else if stop {
347                                 break
348                         }
349
350                         resp.Msgs = append(resp.Msgs, msg)
351                 }
352         } else {
353                 // create new message in response of type ReplyMsg
354                 resp.Msg = reflect.New(reflect.TypeOf(req.ReplyMsg).Elem()).Interface().(api.Message)
355
356                 err := ch.SendRequest(req.Msg).ReceiveReply(resp.Msg)
357                 if err != nil {
358                         return err
359                 }
360         }
361
362         return nil
363 }
364
365 func (s *BinapiRPC) Compatibility(req BinapiCompatibilityRequest, resp *BinapiCompatibilityResponse) error {
366         if !s.serviceAvailable() {
367                 log.Println(binapiErrorMsg)
368                 return errors.New("server does not support 'compatibility check' at this time, try again later")
369         }
370         log.Debugf("BinapiRPC.Compatiblity - REQ: %#v", req)
371
372         ch, err := s.binapiConn.NewAPIChannel()
373         if err != nil {
374                 return err
375         }
376         defer ch.Close()
377
378         resp.CompatibleMsgs = make([]string, 0, len(req.MsgNameCrcs))
379         resp.IncompatibleMsgs = make([]string, 0, len(req.MsgNameCrcs))
380
381         for _, msg := range req.MsgNameCrcs {
382                 val, ok := api.GetRegisteredMessages()[msg]
383                 if !ok {
384                         resp.IncompatibleMsgs = append(resp.IncompatibleMsgs, msg)
385                         continue
386                 }
387
388                 if err = ch.CheckCompatiblity(val); err != nil {
389                         resp.IncompatibleMsgs = append(resp.IncompatibleMsgs, msg)
390                 } else {
391                         resp.CompatibleMsgs = append(resp.CompatibleMsgs, msg)
392                 }
393         }
394
395         if len(resp.IncompatibleMsgs) > 0 {
396                 return fmt.Errorf("compatibility check failed for messages: %v", resp.IncompatibleMsgs)
397         }
398
399         return nil
400 }