1 // Copyright (c) 2019 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
25 "git.fd.io/govpp.git/adapter"
26 "git.fd.io/govpp.git/api"
27 "git.fd.io/govpp.git/core"
32 ------------------------------------------------------------
33 received binapi request while VPP connection is down!
35 - have you called Connect on the binapi RPC ?
36 ------------------------------------------------------------
39 ------------------------------------------------------------
40 received stats request while stats connection is down!
42 - is the correct socket name configured ?
43 - have you called Connect on the stats RPC ?
44 ------------------------------------------------------------
48 type StatsRequest struct {
52 type StatsResponse struct {
53 SysStats *api.SystemStats
54 NodeStats *api.NodeStats
55 IfaceStats *api.InterfaceStats
56 ErrStats *api.ErrorStats
57 BufStats *api.BufferStats
60 // StatsRPC is a RPC server for proxying client request to api.StatsProvider.
61 type StatsRPC struct {
62 statsConn *core.StatsConnection
63 stats adapter.StatsAPI
66 // non-zero if the RPC service is available
68 // non-zero if connected to stats file.
70 // synchronizes access to statsConn.
74 // NewStatsRPC returns new StatsRPC to be used as RPC server
75 // proxying request to given api.StatsProvider.
76 func NewStatsRPC(stats adapter.StatsAPI) (*StatsRPC, error) {
78 if err := rpc.connect(stats); err != nil {
84 func (s *StatsRPC) watchConnection() {
85 heartbeatTicker := time.NewTicker(10 * time.Second).C
86 atomic.StoreUint32(&s.available, 1)
87 log.Debugln("enabling statsRPC service")
90 prev := new(api.SystemStats)
93 if err := s.statsConn.GetSystemStats(prev); err != nil {
94 atomic.StoreUint32(&s.available, 0)
95 log.Warnf("disabling statsRPC service, reason: %v", err)
101 case <-heartbeatTicker:
102 // If disconnect was called exit.
103 if atomic.LoadUint32(&s.isConnected) == 0 {
104 atomic.StoreUint32(&s.available, 0)
108 curr := new(api.SystemStats)
111 if err := s.statsConn.GetSystemStats(curr); err != nil {
112 atomic.StoreUint32(&s.available, 0)
113 log.Warnf("disabling statsRPC service, reason: %v", err)
117 if curr.Heartbeat <= prev.Heartbeat {
119 // vpp might have crashed/reset... try reconnecting
122 atomic.StoreUint32(&s.available, 0)
123 log.Warnln("disabling statsRPC service, reason: vpp might have crashed/reset...")
124 s.statsConn.Disconnect()
127 s.statsConn, err = core.ConnectStats(s.stats)
129 atomic.StoreUint32(&s.available, 1)
130 log.Debugln("enabling statsRPC service")
133 time.Sleep(5 * time.Second)
147 func (s *StatsRPC) connect(stats adapter.StatsAPI) error {
148 if atomic.LoadUint32(&s.isConnected) == 1 {
149 return errors.New("connection already exists")
153 s.statsConn, err = core.ConnectStats(s.stats)
157 s.done = make(chan struct{})
158 atomic.StoreUint32(&s.isConnected, 1)
160 go s.watchConnection()
164 func (s *StatsRPC) disconnect() {
165 if atomic.LoadUint32(&s.isConnected) == 1 {
166 atomic.StoreUint32(&s.isConnected, 0)
168 s.statsConn.Disconnect()
173 func (s *StatsRPC) serviceAvailable() bool {
174 return atomic.LoadUint32(&s.available) == 1
177 func (s *StatsRPC) GetStats(req StatsRequest, resp *StatsResponse) error {
178 if !s.serviceAvailable() {
179 log.Println(statsErrorMsg)
180 return errors.New("server does not support 'get stats' at this time, try again later")
182 log.Debugf("StatsRPC.GetStats - REQ: %+v", req)
187 switch req.StatsType {
189 resp.SysStats = new(api.SystemStats)
190 return s.statsConn.GetSystemStats(resp.SysStats)
192 resp.NodeStats = new(api.NodeStats)
193 return s.statsConn.GetNodeStats(resp.NodeStats)
195 resp.IfaceStats = new(api.InterfaceStats)
196 return s.statsConn.GetInterfaceStats(resp.IfaceStats)
198 resp.ErrStats = new(api.ErrorStats)
199 return s.statsConn.GetErrorStats(resp.ErrStats)
201 resp.BufStats = new(api.BufferStats)
202 return s.statsConn.GetBufferStats(resp.BufStats)
204 return fmt.Errorf("unknown stats type: %s", req.StatsType)
208 type BinapiRequest struct {
212 Timeout time.Duration
215 type BinapiResponse struct {
220 type BinapiCompatibilityRequest struct {
224 type BinapiCompatibilityResponse struct {
225 CompatibleMsgs []string
226 IncompatibleMsgs []string
229 // BinapiRPC is a RPC server for proxying client request to api.Channel.
230 type BinapiRPC struct {
231 binapiConn *core.Connection
232 binapi adapter.VppAPI
234 events chan core.ConnectionEvent
236 // non-zero if the RPC service is available
238 // non-zero if connected to vpp.
242 // NewBinapiRPC returns new BinapiRPC to be used as RPC server
243 // proxying request to given api.Channel.
244 func NewBinapiRPC(binapi adapter.VppAPI) (*BinapiRPC, error) {
245 rpc := new(BinapiRPC)
246 if err := rpc.connect(binapi); err != nil {
252 func (s *BinapiRPC) watchConnection() {
255 case e := <-s.events:
256 // If disconnect was called exit.
257 if atomic.LoadUint32(&s.isConnected) == 0 {
258 atomic.StoreUint32(&s.available, 0)
264 if !s.serviceAvailable() {
265 atomic.StoreUint32(&s.available, 1)
266 log.Debugln("enabling binapiRPC service")
268 case core.Disconnected:
269 if s.serviceAvailable() {
270 atomic.StoreUint32(&s.available, 0)
271 log.Warnf("disabling binapiRPC, reason: %v\n", e.Error)
274 if s.serviceAvailable() {
275 atomic.StoreUint32(&s.available, 0)
276 log.Warnf("disabling binapiRPC, reason: %v\n", e.Error)
278 // vpp might have crashed/reset... reconnect
279 s.binapiConn.Disconnect()
282 s.binapiConn, s.events, err = core.AsyncConnect(s.binapi, 3, 5*time.Second)
293 func (s *BinapiRPC) connect(binapi adapter.VppAPI) error {
294 if atomic.LoadUint32(&s.isConnected) == 1 {
295 return errors.New("connection already exists")
299 s.binapiConn, s.events, err = core.AsyncConnect(binapi, 3, time.Second)
303 s.done = make(chan struct{})
304 atomic.StoreUint32(&s.isConnected, 1)
306 go s.watchConnection()
310 func (s *BinapiRPC) disconnect() {
311 if atomic.LoadUint32(&s.isConnected) == 1 {
312 atomic.StoreUint32(&s.isConnected, 0)
314 s.binapiConn.Disconnect()
319 func (s *BinapiRPC) serviceAvailable() bool {
320 return atomic.LoadUint32(&s.available) == 1
323 func (s *BinapiRPC) Invoke(req BinapiRequest, resp *BinapiResponse) error {
324 if !s.serviceAvailable() {
325 log.Println(binapiErrorMsg)
326 return errors.New("server does not support 'invoke' at this time, try again later")
328 log.Debugf("BinapiRPC.Invoke - REQ: %#v", req)
330 ch, err := s.binapiConn.NewAPIChannel()
335 ch.SetReplyTimeout(req.Timeout)
338 multi := ch.SendMultiRequest(req.Msg)
340 // create new message in response of type ReplyMsg
341 msg := reflect.New(reflect.TypeOf(req.ReplyMsg).Elem()).Interface().(api.Message)
343 stop, err := multi.ReceiveReply(msg)
350 resp.Msgs = append(resp.Msgs, msg)
353 // create new message in response of type ReplyMsg
354 resp.Msg = reflect.New(reflect.TypeOf(req.ReplyMsg).Elem()).Interface().(api.Message)
356 err := ch.SendRequest(req.Msg).ReceiveReply(resp.Msg)
365 func (s *BinapiRPC) Compatibility(req BinapiCompatibilityRequest, resp *BinapiCompatibilityResponse) error {
366 if !s.serviceAvailable() {
367 log.Println(binapiErrorMsg)
368 return errors.New("server does not support 'compatibility check' at this time, try again later")
370 log.Debugf("BinapiRPC.Compatiblity - REQ: %#v", req)
372 ch, err := s.binapiConn.NewAPIChannel()
378 resp.CompatibleMsgs = make([]string, 0, len(req.MsgNameCrcs))
379 resp.IncompatibleMsgs = make([]string, 0, len(req.MsgNameCrcs))
381 for _, msg := range req.MsgNameCrcs {
382 val, ok := api.GetRegisteredMessages()[msg]
384 resp.IncompatibleMsgs = append(resp.IncompatibleMsgs, msg)
388 if err = ch.CheckCompatiblity(val); err != nil {
389 resp.IncompatibleMsgs = append(resp.IncompatibleMsgs, msg)
391 resp.CompatibleMsgs = append(resp.CompatibleMsgs, msg)
395 if len(resp.IncompatibleMsgs) > 0 {
396 return fmt.Errorf("compatibility check failed for messages: %v", resp.IncompatibleMsgs)