1 // Copyright (c) 2019 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
30 "github.com/fsnotify/fsnotify"
31 "github.com/sirupsen/logrus"
33 "git.fd.io/govpp.git/adapter"
34 "git.fd.io/govpp.git/codec"
38 // DefaultSocketName is default VPP API socket file path.
39 DefaultSocketName = "/run/vpp/api.sock"
40 // DefaultClientName is used for identifying client in socket registration
41 DefaultClientName = "govppsock"
46 // DefaultConnectTimeout is default timeout for connecting
47 DefaultConnectTimeout = time.Second * 3
48 // DefaultDisconnectTimeout is default timeout for discconnecting
49 DefaultDisconnectTimeout = time.Millisecond * 100
50 // MaxWaitReady defines maximum duration of waiting for socket file
51 MaxWaitReady = time.Second * 10
55 debug = strings.Contains(os.Getenv("DEBUG_GOVPP"), "socketclient")
56 debugMsgIds = strings.Contains(os.Getenv("DEBUG_GOVPP"), "msgtable")
59 log = logger.WithField("logger", "govpp/socketclient")
62 // init initializes global logger
65 logger.Level = logrus.DebugLevel
66 log.Debug("govpp: debug level enabled for socketclient")
70 const socketMissing = `
71 ------------------------------------------------------------
72 No socket file found at: %s
73 VPP binary API socket file is missing!
75 - is VPP running with socket for binapi enabled?
76 - is the correct socket name configured?
78 To enable it add following section to your VPP config:
82 ------------------------------------------------------------
85 var warnOnce sync.Once
87 func (c *socketClient) printMissingSocketMsg() {
88 fmt.Fprintf(os.Stderr, socketMissing, c.sockAddr)
91 type socketClient struct {
99 connectTimeout time.Duration
100 disconnectTimeout time.Duration
102 msgCallback adapter.MsgCallback
104 msgTable map[string]uint16
108 headerPool *sync.Pool
114 func NewVppClient(sockAddr string) *socketClient {
116 sockAddr = DefaultSocketName
118 return &socketClient{
120 clientName: DefaultClientName,
121 connectTimeout: DefaultConnectTimeout,
122 disconnectTimeout: DefaultDisconnectTimeout,
123 headerPool: &sync.Pool{New: func() interface{} {
124 return make([]byte, 16)
126 msgCallback: func(msgID uint16, data []byte) {
127 log.Debugf("no callback set, dropping message: ID=%v len=%d", msgID, len(data))
132 // SetClientName sets a client name used for identification.
133 func (c *socketClient) SetClientName(name string) {
137 // SetConnectTimeout sets timeout used during connecting.
138 func (c *socketClient) SetConnectTimeout(t time.Duration) {
142 // SetDisconnectTimeout sets timeout used during disconnecting.
143 func (c *socketClient) SetDisconnectTimeout(t time.Duration) {
144 c.disconnectTimeout = t
147 func (c *socketClient) SetMsgCallback(cb adapter.MsgCallback) {
148 log.Debug("SetMsgCallback")
152 const legacySocketName = "/run/vpp-api.sock"
154 func (c *socketClient) checkLegacySocket() bool {
155 if c.sockAddr == legacySocketName {
158 log.Debugf("checking legacy socket: %s", legacySocketName)
159 // check if socket exists
160 if _, err := os.Stat(c.sockAddr); err == nil {
161 return false // socket exists
162 } else if !os.IsNotExist(err) {
163 return false // some other error occurred
165 // check if legacy socket exists
166 if _, err := os.Stat(legacySocketName); err == nil {
167 // legacy socket exists, update sockAddr
168 c.sockAddr = legacySocketName
171 // no socket socket found
175 // WaitReady checks socket file existence and waits for it if necessary
176 func (c *socketClient) WaitReady() error {
177 // check if socket already exists
178 if _, err := os.Stat(c.sockAddr); err == nil {
179 return nil // socket exists, we are ready
180 } else if !os.IsNotExist(err) {
181 return err // some other error occurred
184 if c.checkLegacySocket() {
188 // socket does not exist, watch for it
189 watcher, err := fsnotify.NewWatcher()
194 if err := watcher.Close(); err != nil {
195 log.Debugf("failed to close file watcher: %v", err)
199 // start directory watcher
200 if err := watcher.Add(filepath.Dir(c.sockAddr)); err != nil {
204 timeout := time.NewTimer(MaxWaitReady)
208 if c.checkLegacySocket() {
211 return fmt.Errorf("timeout waiting (%s) for socket file: %s", MaxWaitReady, c.sockAddr)
213 case e := <-watcher.Errors:
216 case ev := <-watcher.Events:
217 log.Debugf("watcher event: %+v", ev)
218 if ev.Name == c.sockAddr && (ev.Op&fsnotify.Create) == fsnotify.Create {
219 // socket created, we are ready
226 func (c *socketClient) Connect() error {
227 c.checkLegacySocket()
229 // check if socket exists
230 if _, err := os.Stat(c.sockAddr); os.IsNotExist(err) {
231 warnOnce.Do(c.printMissingSocketMsg)
232 return fmt.Errorf("VPP API socket file %s does not exist", c.sockAddr)
233 } else if err != nil {
234 return fmt.Errorf("VPP API socket error: %v", err)
237 if err := c.connect(c.sockAddr); err != nil {
241 if err := c.open(); err != nil {
246 c.quit = make(chan struct{})
253 func (c *socketClient) Disconnect() error {
257 log.Debugf("Disconnecting..")
261 if err := c.conn.CloseRead(); err != nil {
262 log.Debugf("closing readMsg failed: %v", err)
265 // wait for readerLoop to return
268 if err := c.close(); err != nil {
269 log.Debugf("closing failed: %v", err)
272 if err := c.disconnect(); err != nil {
279 const defaultBufferSize = 4096
281 func (c *socketClient) connect(sockAddr string) error {
282 addr := &net.UnixAddr{Name: sockAddr, Net: "unix"}
284 log.Debugf("Connecting to: %v", c.sockAddr)
286 conn, err := net.DialUnix("unix", nil, addr)
288 // we try different type of socket for backwards compatbility with VPP<=19.04
289 if strings.Contains(err.Error(), "wrong type for socket") {
290 addr.Net = "unixpacket"
291 log.Debugf("%s, retrying connect with type unixpacket", err)
292 conn, err = net.DialUnix("unixpacket", nil, addr)
295 log.Debugf("Connecting to socket %s failed: %s", addr, err)
301 log.Debugf("Connected to socket (local addr: %v)", c.conn.LocalAddr().(*net.UnixAddr))
303 c.reader = bufio.NewReaderSize(c.conn, defaultBufferSize)
304 c.writer = bufio.NewWriterSize(c.conn, defaultBufferSize)
309 func (c *socketClient) disconnect() error {
310 log.Debugf("Closing socket")
311 if err := c.conn.Close(); err != nil {
312 log.Debugln("Closing socket failed:", err)
319 sockCreateMsgId = 15 // hard-coded sockclnt_create message ID
320 createMsgContext = byte(123)
321 deleteMsgContext = byte(124)
324 func (c *socketClient) open() error {
325 var msgCodec = codec.DefaultCodec
327 // Request socket client create
328 req := &SockclntCreate{
331 msg, err := msgCodec.EncodeMsg(req, sockCreateMsgId)
333 log.Debugln("Encode error:", err)
337 msg[5] = createMsgContext
339 if err := c.writeMsg(msg); err != nil {
340 log.Debugln("Write error: ", err)
343 msgReply, err := c.readMsgTimeout(nil, c.connectTimeout)
345 log.Println("Read error:", err)
349 reply := new(SockclntCreateReply)
350 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
351 log.Println("Decoding sockclnt_create_reply failed:", err)
353 } else if reply.Response != 0 {
354 return fmt.Errorf("sockclnt_create_reply: response error (%d)", reply.Response)
357 log.Debugf("SockclntCreateReply: Response=%v Index=%v Count=%v",
358 reply.Response, reply.Index, reply.Count)
360 c.clientIndex = reply.Index
361 c.msgTable = make(map[string]uint16, reply.Count)
362 for _, x := range reply.MessageTable {
363 msgName := strings.Split(x.Name, "\x00")[0]
364 name := strings.TrimSuffix(msgName, "\x13")
365 c.msgTable[name] = x.Index
366 if strings.HasPrefix(name, "sockclnt_delete_") {
367 c.sockDelMsgId = x.Index
370 log.Debugf(" - %4d: %q", x.Index, name)
377 func (c *socketClient) close() error {
378 var msgCodec = codec.DefaultCodec
380 req := &SockclntDelete{
381 Index: c.clientIndex,
383 msg, err := msgCodec.EncodeMsg(req, c.sockDelMsgId)
385 log.Debugln("Encode error:", err)
389 msg[5] = deleteMsgContext
391 log.Debugf("sending socklntDel (%d bytes): % 0X", len(msg), msg)
393 if err := c.writeMsg(msg); err != nil {
394 log.Debugln("Write error: ", err)
398 msgReply, err := c.readMsgTimeout(nil, c.disconnectTimeout)
400 if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
401 // we accept timeout for reply
404 log.Debugln("Read error:", err)
408 reply := new(SockclntDeleteReply)
409 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
410 log.Debugln("Decoding sockclnt_delete_reply failed:", err)
412 } else if reply.Response != 0 {
413 return fmt.Errorf("sockclnt_delete_reply: response error (%d)", reply.Response)
419 func (c *socketClient) GetMsgID(msgName string, msgCrc string) (uint16, error) {
420 if msgID, ok := c.msgTable[msgName+"_"+msgCrc]; ok {
423 return 0, &adapter.UnknownMsgError{
429 func (c *socketClient) SendMsg(context uint32, data []byte) error {
431 return fmt.Errorf("invalid message data, length must be at least 10 bytes")
433 setMsgRequestHeader(data, c.clientIndex, context)
436 log.Debugf("sendMsg (%d) context=%v client=%d: % 02X", len(data), context, c.clientIndex, data)
439 if err := c.writeMsg(data); err != nil {
440 log.Debugln("writeMsg error: ", err)
447 // setMsgRequestHeader sets client index and context in the message request header
449 // Message request has following structure:
451 // type msgRequestHeader struct {
453 // ClientIndex uint32
457 func setMsgRequestHeader(data []byte, clientIndex, context uint32) {
458 // message ID is already set
459 binary.BigEndian.PutUint32(data[2:6], clientIndex)
460 binary.BigEndian.PutUint32(data[6:10], context)
463 func (c *socketClient) writeMsg(msg []byte) error {
464 // we lock to prevent mixing multiple message writes
466 defer c.writeMu.Unlock()
468 header := c.headerPool.Get().([]byte)
469 err := writeMsgHeader(c.writer, header, len(msg))
473 c.headerPool.Put(header)
475 if err := writeMsgData(c.writer, msg, c.writer.Size()); err != nil {
479 if err := c.writer.Flush(); err != nil {
483 log.Debugf(" -- writeMsg done")
488 func writeMsgHeader(w io.Writer, header []byte, dataLen int) error {
489 binary.BigEndian.PutUint32(header[8:12], uint32(dataLen))
491 n, err := w.Write(header)
496 log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header)
502 func writeMsgData(w io.Writer, msg []byte, writerSize int) error {
503 for i := 0; i <= len(msg)/writerSize; i++ {
504 x := i*writerSize + writerSize
509 log.Debugf(" - x=%v i=%v len=%v mod=%v", x, i, len(msg), len(msg)/writerSize)
511 n, err := w.Write(msg[i*writerSize : x])
516 log.Debugf(" - data sent x=%d (%d/%d): % 0X", x, n, len(msg), msg)
522 func (c *socketClient) readerLoop() {
524 defer log.Debugf("reader loop done")
535 msg, err := c.readMsg(buf[:])
537 if isClosedError(err) {
540 log.Debugf("readMsg error: %v", err)
544 msgID, context := getMsgReplyHeader(msg)
546 log.Debugf("recvMsg (%d) msgID=%d context=%v", len(msg), msgID, context)
549 c.msgCallback(msgID, msg)
553 // getMsgReplyHeader gets message ID and context from the message reply header
555 // Message reply has following structure:
557 // type msgReplyHeader struct {
562 func getMsgReplyHeader(msg []byte) (msgID uint16, context uint32) {
563 msgID = binary.BigEndian.Uint16(msg[0:2])
564 context = binary.BigEndian.Uint32(msg[2:6])
568 func (c *socketClient) readMsgTimeout(buf []byte, timeout time.Duration) ([]byte, error) {
570 readDeadline := time.Now().Add(timeout)
571 if err := c.conn.SetReadDeadline(readDeadline); err != nil {
576 msgReply, err := c.readMsg(buf)
581 // reset read deadline
582 if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
589 func (c *socketClient) readMsg(buf []byte) ([]byte, error) {
590 log.Debug("reading msg..")
592 header := c.headerPool.Get().([]byte)
593 msgLen, err := readMsgHeader(c.reader, header)
597 c.headerPool.Put(header)
599 msg, err := readMsgData(c.reader, buf, msgLen)
601 log.Debugf(" -- readMsg done (buffered: %d)", c.reader.Buffered())
606 func readMsgHeader(r io.Reader, header []byte) (int, error) {
607 n, err := io.ReadAtLeast(r, header, 16)
612 log.Debugln("zero bytes header")
615 log.Debugf("invalid header (%d bytes): % 0X", n, header[:n])
616 return 0, fmt.Errorf("invalid header (expected 16 bytes, got %d)", n)
619 dataLen := binary.BigEndian.Uint32(header[8:12])
621 return int(dataLen), nil
624 func readMsgData(r io.Reader, buf []byte, dataLen int) ([]byte, error) {
626 if buf == nil || len(buf) < dataLen {
627 msg = make([]byte, dataLen)
632 n, err := r.Read(msg)
637 log.Debugf(" - read data (%d bytes): % 0X", n, msg[:n])
641 remain := dataLen - n
642 log.Debugf("continue reading remaining %d bytes", remain)
646 nbytes, err := r.Read(view)
649 } else if nbytes == 0 {
650 return nil, fmt.Errorf("zero nbytes")
654 log.Debugf("another data received: %d bytes (remain: %d)", nbytes, remain)
663 func isClosedError(err error) bool {
664 if errors.Is(err, io.EOF) {
667 return strings.HasSuffix(err.Error(), "use of closed network connection")