X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=proxy%2Fserver.go;h=e395468ac2d8ec71e8a3d3b0bc16759ca0db567f;hb=d0b973030fe07dc7875da72f5ebe42d8bd9544b1;hp=df623562b923965542c562611e91d00ea730f500;hpb=454aff1907d87c79ffb11e151e25953241c7f1be;p=govpp.git diff --git a/proxy/server.go b/proxy/server.go index df62356..e395468 100644 --- a/proxy/server.go +++ b/proxy/server.go @@ -1,11 +1,48 @@ +// Copyright (c) 2019 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package proxy import ( + "errors" "fmt" - "log" "reflect" + "sync" + "sync/atomic" + "time" + "git.fd.io/govpp.git/adapter" "git.fd.io/govpp.git/api" + "git.fd.io/govpp.git/core" +) + +const ( + binapiErrorMsg = ` +------------------------------------------------------------ + received binapi request while VPP connection is down! + - is VPP running ? + - have you called Connect on the binapi RPC ? +------------------------------------------------------------ +` + statsErrorMsg = ` +------------------------------------------------------------ + received stats request while stats connection is down! + - is VPP running ? + - is the correct socket name configured ? + - have you called Connect on the stats RPC ? +------------------------------------------------------------ +` ) type StatsRequest struct { @@ -18,38 +55,155 @@ type StatsResponse struct { IfaceStats *api.InterfaceStats ErrStats *api.ErrorStats BufStats *api.BufferStats + MemStats *api.MemoryStats } // StatsRPC is a RPC server for proxying client request to api.StatsProvider. type StatsRPC struct { - stats api.StatsProvider + statsConn *core.StatsConnection + stats adapter.StatsAPI + + done chan struct{} + // non-zero if the RPC service is available + available uint32 + // non-zero if connected to stats file. + isConnected uint32 + // synchronizes access to statsConn. + mu sync.Mutex } // NewStatsRPC returns new StatsRPC to be used as RPC server // proxying request to given api.StatsProvider. -func NewStatsRPC(stats api.StatsProvider) *StatsRPC { - return &StatsRPC{stats: stats} +func NewStatsRPC(stats adapter.StatsAPI) (*StatsRPC, error) { + rpc := new(StatsRPC) + if err := rpc.connect(stats); err != nil { + return nil, err + } + return rpc, nil +} + +func (s *StatsRPC) watchConnection() { + heartbeatTicker := time.NewTicker(10 * time.Second).C + atomic.StoreUint32(&s.available, 1) + log.Debugln("enabling statsRPC service") + + count := 0 + prev := new(api.SystemStats) + + s.mu.Lock() + if err := s.statsConn.GetSystemStats(prev); err != nil { + atomic.StoreUint32(&s.available, 0) + log.Warnf("disabling statsRPC service, reason: %v", err) + } + s.mu.Unlock() + + for { + select { + case <-heartbeatTicker: + // If disconnect was called exit. + if atomic.LoadUint32(&s.isConnected) == 0 { + atomic.StoreUint32(&s.available, 0) + return + } + + curr := new(api.SystemStats) + + s.mu.Lock() + if err := s.statsConn.GetSystemStats(curr); err != nil { + atomic.StoreUint32(&s.available, 0) + log.Warnf("disabling statsRPC service, reason: %v", err) + } + s.mu.Unlock() + + if curr.Heartbeat <= prev.Heartbeat { + count++ + // vpp might have crashed/reset... try reconnecting + if count == 5 { + count = 0 + atomic.StoreUint32(&s.available, 0) + log.Warnln("disabling statsRPC service, reason: vpp might have crashed/reset...") + s.statsConn.Disconnect() + for { + var err error + s.statsConn, err = core.ConnectStats(s.stats) + if err == nil { + atomic.StoreUint32(&s.available, 1) + log.Debugln("enabling statsRPC service") + break + } + time.Sleep(5 * time.Second) + } + } + } else { + count = 0 + } + + prev = curr + case <-s.done: + return + } + } +} + +func (s *StatsRPC) connect(stats adapter.StatsAPI) error { + if atomic.LoadUint32(&s.isConnected) == 1 { + return errors.New("connection already exists") + } + s.stats = stats + var err error + s.statsConn, err = core.ConnectStats(s.stats) + if err != nil { + return err + } + s.done = make(chan struct{}) + atomic.StoreUint32(&s.isConnected, 1) + + go s.watchConnection() + return nil +} + +func (s *StatsRPC) disconnect() { + if atomic.LoadUint32(&s.isConnected) == 1 { + atomic.StoreUint32(&s.isConnected, 0) + close(s.done) + s.statsConn.Disconnect() + s.statsConn = nil + } +} + +func (s *StatsRPC) serviceAvailable() bool { + return atomic.LoadUint32(&s.available) == 1 } func (s *StatsRPC) GetStats(req StatsRequest, resp *StatsResponse) error { - log.Printf("StatsRPC.GetStats - REQ: %+v", req) + if !s.serviceAvailable() { + log.Println(statsErrorMsg) + return errors.New("server does not support 'get stats' at this time, try again later") + } + log.Debugf("StatsRPC.GetStats - REQ: %+v", req) + + s.mu.Lock() + defer s.mu.Unlock() switch req.StatsType { case "system": resp.SysStats = new(api.SystemStats) - return s.stats.GetSystemStats(resp.SysStats) + return s.statsConn.GetSystemStats(resp.SysStats) case "node": resp.NodeStats = new(api.NodeStats) - return s.stats.GetNodeStats(resp.NodeStats) + return s.statsConn.GetNodeStats(resp.NodeStats) case "interface": resp.IfaceStats = new(api.InterfaceStats) - return s.stats.GetInterfaceStats(resp.IfaceStats) + return s.statsConn.GetInterfaceStats(resp.IfaceStats) case "error": resp.ErrStats = new(api.ErrorStats) - return s.stats.GetErrorStats(resp.ErrStats) + return s.statsConn.GetErrorStats(resp.ErrStats) case "buffer": resp.BufStats = new(api.BufferStats) - return s.stats.GetBufferStats(resp.BufStats) + return s.statsConn.GetBufferStats(resp.BufStats) + case "memory": + resp.MemStats = new(api.MemoryStats) + return s.statsConn.GetMemoryStats(resp.MemStats) default: return fmt.Errorf("unknown stats type: %s", req.StatsType) } @@ -59,6 +213,7 @@ type BinapiRequest struct { Msg api.Message IsMulti bool ReplyMsg api.Message + Timeout time.Duration } type BinapiResponse struct { @@ -66,22 +221,125 @@ type BinapiResponse struct { Msgs []api.Message } +type BinapiCompatibilityRequest struct { + MsgNameCrcs []string +} + +type BinapiCompatibilityResponse struct { + CompatibleMsgs map[string][]string + IncompatibleMsgs map[string][]string +} + // BinapiRPC is a RPC server for proxying client request to api.Channel. type BinapiRPC struct { - binapi api.Channel + binapiConn *core.Connection + binapi adapter.VppAPI + + events chan core.ConnectionEvent + done chan struct{} + // non-zero if the RPC service is available + available uint32 + // non-zero if connected to vpp. + isConnected uint32 } // NewBinapiRPC returns new BinapiRPC to be used as RPC server // proxying request to given api.Channel. -func NewBinapiRPC(binapi api.Channel) *BinapiRPC { - return &BinapiRPC{binapi: binapi} +func NewBinapiRPC(binapi adapter.VppAPI) (*BinapiRPC, error) { + rpc := new(BinapiRPC) + if err := rpc.connect(binapi); err != nil { + return nil, err + } + return rpc, nil +} + +func (s *BinapiRPC) watchConnection() { + for { + select { + case e := <-s.events: + // If disconnect was called exit. + if atomic.LoadUint32(&s.isConnected) == 0 { + atomic.StoreUint32(&s.available, 0) + return + } + + switch e.State { + case core.Connected: + if !s.serviceAvailable() { + atomic.StoreUint32(&s.available, 1) + log.Debugln("enabling binapiRPC service") + } + case core.Disconnected: + if s.serviceAvailable() { + atomic.StoreUint32(&s.available, 0) + log.Warnf("disabling binapiRPC, reason: %v\n", e.Error) + } + case core.Failed: + if s.serviceAvailable() { + atomic.StoreUint32(&s.available, 0) + log.Warnf("disabling binapiRPC, reason: %v\n", e.Error) + } + // vpp might have crashed/reset... reconnect + s.binapiConn.Disconnect() + + var err error + s.binapiConn, s.events, err = core.AsyncConnect(s.binapi, 3, 5*time.Second) + if err != nil { + log.Println(err) + } + } + case <-s.done: + return + } + } +} + +func (s *BinapiRPC) connect(binapi adapter.VppAPI) error { + if atomic.LoadUint32(&s.isConnected) == 1 { + return errors.New("connection already exists") + } + s.binapi = binapi + var err error + s.binapiConn, s.events, err = core.AsyncConnect(binapi, 3, time.Second) + if err != nil { + return err + } + s.done = make(chan struct{}) + atomic.StoreUint32(&s.isConnected, 1) + + go s.watchConnection() + return nil +} + +func (s *BinapiRPC) disconnect() { + if atomic.LoadUint32(&s.isConnected) == 1 { + atomic.StoreUint32(&s.isConnected, 0) + close(s.done) + s.binapiConn.Disconnect() + s.binapiConn = nil + } +} + +func (s *BinapiRPC) serviceAvailable() bool { + return atomic.LoadUint32(&s.available) == 1 } func (s *BinapiRPC) Invoke(req BinapiRequest, resp *BinapiResponse) error { - log.Printf("BinapiRPC.Invoke - REQ: %#v", req) + if !s.serviceAvailable() { + log.Println(binapiErrorMsg) + return errors.New("server does not support 'invoke' at this time, try again later") + } + log.Debugf("BinapiRPC.Invoke - REQ: %#v", req) + + ch, err := s.binapiConn.NewAPIChannel() + if err != nil { + return err + } + defer ch.Close() + ch.SetReplyTimeout(req.Timeout) if req.IsMulti { - multi := s.binapi.SendMultiRequest(req.Msg) + multi := ch.SendMultiRequest(req.Msg) for { // create new message in response of type ReplyMsg msg := reflect.New(reflect.TypeOf(req.ReplyMsg).Elem()).Interface().(api.Message) @@ -99,7 +357,7 @@ func (s *BinapiRPC) Invoke(req BinapiRequest, resp *BinapiResponse) error { // create new message in response of type ReplyMsg resp.Msg = reflect.New(reflect.TypeOf(req.ReplyMsg).Elem()).Interface().(api.Message) - err := s.binapi.SendRequest(req.Msg).ReceiveReply(resp.Msg) + err := ch.SendRequest(req.Msg).ReceiveReply(resp.Msg) if err != nil { return err } @@ -107,3 +365,48 @@ func (s *BinapiRPC) Invoke(req BinapiRequest, resp *BinapiResponse) error { return nil } + +func (s *BinapiRPC) Compatibility(req BinapiCompatibilityRequest, resp *BinapiCompatibilityResponse) error { + if !s.serviceAvailable() { + log.Println(binapiErrorMsg) + return errors.New("server does not support 'compatibility check' at this time, try again later") + } + log.Debugf("BinapiRPC.Compatiblity - REQ: %#v", req) + + ch, err := s.binapiConn.NewAPIChannel() + if err != nil { + return err + } + defer ch.Close() + + resp.CompatibleMsgs = make(map[string][]string) + resp.IncompatibleMsgs = make(map[string][]string) + + for path, messages := range api.GetRegisteredMessages() { + if resp.IncompatibleMsgs[path] == nil { + resp.IncompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs)) + } + if resp.CompatibleMsgs[path] == nil { + resp.CompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs)) + } + for _, msg := range req.MsgNameCrcs { + val, ok := messages[msg] + if !ok { + resp.IncompatibleMsgs[path] = append(resp.IncompatibleMsgs[path], msg) + continue + } + if err = ch.CheckCompatiblity(val); err != nil { + resp.IncompatibleMsgs[path] = append(resp.IncompatibleMsgs[path], msg) + } else { + resp.CompatibleMsgs[path] = append(resp.CompatibleMsgs[path], msg) + } + } + } + for _, messages := range resp.IncompatibleMsgs { + if len(messages) > 0 { + return fmt.Errorf("compatibility check failed for messages: %v", resp.IncompatibleMsgs) + } + } + + return nil +}