1 // Copyright (c) 2019 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
29 "github.com/fsnotify/fsnotify"
30 "github.com/lunixbochs/struc"
31 logger "github.com/sirupsen/logrus"
33 "git.fd.io/govpp.git/adapter"
34 "git.fd.io/govpp.git/codec"
35 "git.fd.io/govpp.git/examples/binapi/memclnt"
39 // DefaultSocketName is default VPP API socket file path.
40 DefaultSocketName = adapter.DefaultBinapiSocket
41 legacySocketName = "/run/vpp-api.sock"
45 // DefaultConnectTimeout is default timeout for connecting
46 DefaultConnectTimeout = time.Second * 3
47 // DefaultDisconnectTimeout is default timeout for discconnecting
48 DefaultDisconnectTimeout = time.Millisecond * 100
49 // MaxWaitReady defines maximum duration before waiting for socket file
51 MaxWaitReady = time.Second * 10
52 // ClientName is used for identifying client in socket registration
53 ClientName = "govppsock"
57 // Debug is global variable that determines debug mode
58 Debug = os.Getenv("DEBUG_GOVPP_SOCK") != ""
59 // DebugMsgIds is global variable that determines debug mode for msg ids
60 DebugMsgIds = os.Getenv("DEBUG_GOVPP_SOCKMSG") != ""
62 // Log is global logger
66 // init initializes global logger, which logs debug level messages to stdout.
70 Log.Level = logger.DebugLevel
71 Log.Debug("govpp/socketclient: enabled debug mode")
75 const socketMissing = `
76 ------------------------------------------------------------
77 No socket file found at: %s
78 VPP binary API socket file is missing!
80 - is VPP running with socket for binapi enabled?
81 - is the correct socket name configured?
83 To enable it add following section to your VPP config:
87 ------------------------------------------------------------
90 var warnOnce sync.Once
92 func (c *vppClient) printMissingSocketMsg() {
93 fmt.Fprintf(os.Stderr, socketMissing, c.sockAddr)
96 type vppClient struct {
103 connectTimeout time.Duration
104 disconnectTimeout time.Duration
106 cb adapter.MsgCallback
108 msgTable map[string]uint16
116 func NewVppClient(sockAddr string) *vppClient {
118 sockAddr = DefaultSocketName
122 connectTimeout: DefaultConnectTimeout,
123 disconnectTimeout: DefaultDisconnectTimeout,
124 cb: func(msgID uint16, data []byte) {
125 Log.Warnf("no callback set, dropping message: ID=%v len=%d", msgID, len(data))
130 // SetConnectTimeout sets timeout used during connecting.
131 func (c *vppClient) SetConnectTimeout(t time.Duration) {
135 // SetDisconnectTimeout sets timeout used during disconnecting.
136 func (c *vppClient) SetDisconnectTimeout(t time.Duration) {
137 c.disconnectTimeout = t
140 func (c *vppClient) SetMsgCallback(cb adapter.MsgCallback) {
141 Log.Debug("SetMsgCallback")
145 func (c *vppClient) checkLegacySocket() bool {
146 if c.sockAddr == legacySocketName {
149 Log.Debugf("checking legacy socket: %s", legacySocketName)
150 // check if socket exists
151 if _, err := os.Stat(c.sockAddr); err == nil {
152 return false // socket exists
153 } else if !os.IsNotExist(err) {
154 return false // some other error occurred
156 // check if legacy socket exists
157 if _, err := os.Stat(legacySocketName); err == nil {
158 // legacy socket exists, update sockAddr
159 c.sockAddr = legacySocketName
162 // no socket socket found
166 // WaitReady checks socket file existence and waits for it if necessary
167 func (c *vppClient) WaitReady() error {
168 // check if socket already exists
169 if _, err := os.Stat(c.sockAddr); err == nil {
170 return nil // socket exists, we are ready
171 } else if !os.IsNotExist(err) {
172 return err // some other error occurred
175 if c.checkLegacySocket() {
179 // socket does not exist, watch for it
180 watcher, err := fsnotify.NewWatcher()
185 if err := watcher.Close(); err != nil {
186 Log.Warnf("failed to close file watcher: %v", err)
190 // start directory watcher
191 if err := watcher.Add(filepath.Dir(c.sockAddr)); err != nil {
195 timeout := time.NewTimer(MaxWaitReady)
199 if c.checkLegacySocket() {
202 return fmt.Errorf("timeout waiting (%s) for socket file: %s", MaxWaitReady, c.sockAddr)
204 case e := <-watcher.Errors:
207 case ev := <-watcher.Events:
208 Log.Debugf("watcher event: %+v", ev)
209 if ev.Name == c.sockAddr && (ev.Op&fsnotify.Create) == fsnotify.Create {
210 // socket created, we are ready
217 func (c *vppClient) Connect() error {
218 c.checkLegacySocket()
220 // check if socket exists
221 if _, err := os.Stat(c.sockAddr); os.IsNotExist(err) {
222 warnOnce.Do(c.printMissingSocketMsg)
223 return fmt.Errorf("VPP API socket file %s does not exist", c.sockAddr)
224 } else if err != nil {
225 return fmt.Errorf("VPP API socket error: %v", err)
228 if err := c.connect(c.sockAddr); err != nil {
232 if err := c.open(); err != nil {
237 c.quit = make(chan struct{})
244 func (c *vppClient) Disconnect() error {
248 Log.Debugf("Disconnecting..")
252 if err := c.conn.CloseRead(); err != nil {
253 Log.Debugf("closing read failed: %v", err)
256 // wait for readerLoop to return
259 if err := c.close(); err != nil {
260 Log.Debugf("closing failed: %v", err)
263 if err := c.disconnect(); err != nil {
270 func (c *vppClient) connect(sockAddr string) error {
271 addr := &net.UnixAddr{Name: sockAddr, Net: "unix"}
273 Log.Debugf("Connecting to: %v", c.sockAddr)
275 conn, err := net.DialUnix("unix", nil, addr)
277 // we try different type of socket for backwards compatbility with VPP<=19.04
278 if strings.Contains(err.Error(), "wrong type for socket") {
279 addr.Net = "unixpacket"
280 Log.Debugf("%s, retrying connect with type unixpacket", err)
281 conn, err = net.DialUnix("unixpacket", nil, addr)
284 Log.Debugf("Connecting to socket %s failed: %s", addr, err)
290 Log.Debugf("Connected to socket (local addr: %v)", c.conn.LocalAddr().(*net.UnixAddr))
292 c.reader = bufio.NewReader(c.conn)
293 c.writer = bufio.NewWriter(c.conn)
298 func (c *vppClient) disconnect() error {
299 Log.Debugf("Closing socket")
300 if err := c.conn.Close(); err != nil {
301 Log.Debugln("Closing socket failed:", err)
308 sockCreateMsgId = 15 // hard-coded sockclnt_create message ID
309 createMsgContext = byte(123)
310 deleteMsgContext = byte(124)
313 func (c *vppClient) open() error {
314 msgCodec := new(codec.MsgCodec)
316 req := &memclnt.SockclntCreate{
317 Name: []byte(ClientName),
319 msg, err := msgCodec.EncodeMsg(req, sockCreateMsgId)
321 Log.Debugln("Encode error:", err)
325 msg[5] = createMsgContext
327 if err := c.write(msg); err != nil {
328 Log.Debugln("Write error: ", err)
332 readDeadline := time.Now().Add(c.connectTimeout)
333 if err := c.conn.SetReadDeadline(readDeadline); err != nil {
336 msgReply, err := c.read()
338 Log.Println("Read error:", err)
341 // reset read deadline
342 if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
346 reply := new(memclnt.SockclntCreateReply)
347 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
348 Log.Println("Decode error:", err)
352 Log.Debugf("SockclntCreateReply: Response=%v Index=%v Count=%v",
353 reply.Response, reply.Index, reply.Count)
355 c.clientIndex = reply.Index
356 c.msgTable = make(map[string]uint16, reply.Count)
357 for _, x := range reply.MessageTable {
358 name := string(bytes.TrimSuffix(bytes.Split(x.Name, []byte{0x00})[0], []byte{0x13}))
359 c.msgTable[name] = x.Index
360 if strings.HasPrefix(name, "sockclnt_delete_") {
361 c.sockDelMsgId = x.Index
364 Log.Debugf(" - %4d: %q", x.Index, name)
371 func (c *vppClient) close() error {
372 msgCodec := new(codec.MsgCodec)
374 req := &memclnt.SockclntDelete{
375 Index: c.clientIndex,
377 msg, err := msgCodec.EncodeMsg(req, c.sockDelMsgId)
379 Log.Debugln("Encode error:", err)
383 msg[5] = deleteMsgContext
385 Log.Debugf("sending socklntDel (%d byes): % 0X", len(msg), msg)
386 if err := c.write(msg); err != nil {
387 Log.Debugln("Write error: ", err)
391 readDeadline := time.Now().Add(c.disconnectTimeout)
392 if err := c.conn.SetReadDeadline(readDeadline); err != nil {
395 msgReply, err := c.read()
397 Log.Debugln("Read error:", err)
398 if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
399 // we accept timeout for reply
404 // reset read deadline
405 if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
409 reply := new(memclnt.SockclntDeleteReply)
410 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
411 Log.Debugln("Decode error:", err)
415 Log.Debugf("SockclntDeleteReply: Response=%v", reply.Response)
420 func (c *vppClient) GetMsgID(msgName string, msgCrc string) (uint16, error) {
421 msg := msgName + "_" + msgCrc
422 msgID, ok := c.msgTable[msg]
424 return 0, fmt.Errorf("unknown message: %q", msg)
429 type reqHeader struct {
435 func (c *vppClient) SendMsg(context uint32, data []byte) error {
437 ClientIndex: c.clientIndex,
440 buf := new(bytes.Buffer)
441 if err := struc.Pack(buf, h); err != nil {
444 copy(data[2:], buf.Bytes())
446 Log.Debugf("sendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data)
448 if err := c.write(data); err != nil {
449 Log.Debugln("write error: ", err)
456 func (c *vppClient) write(msg []byte) error {
458 DataLen: uint32(len(msg)),
460 buf := new(bytes.Buffer)
461 if err := struc.Pack(buf, h); err != nil {
464 header := buf.Bytes()
466 // we lock to prevent mixing multiple message sends
468 defer c.writeMu.Unlock()
470 if n, err := c.writer.Write(header); err != nil {
473 Log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header)
476 writerSize := c.writer.Size()
477 for i := 0; i <= len(msg)/writerSize; i++ {
478 x := i*writerSize + writerSize
482 Log.Debugf(" - x=%v i=%v len=%v mod=%v", x, i, len(msg), len(msg)/writerSize)
483 if n, err := c.writer.Write(msg[i*writerSize : x]); err != nil {
486 Log.Debugf(" - msg sent x=%d (%d/%d): % 0X", x, n, len(msg), msg)
489 if err := c.writer.Flush(); err != nil {
493 Log.Debugf(" -- write done")
498 type msgHeader struct {
503 func (c *vppClient) readerLoop() {
505 defer Log.Debugf("reader quit")
516 if isClosedError(err) {
519 Log.Debugf("read failed: %v", err)
524 if err := struc.Unpack(bytes.NewReader(msg), h); err != nil {
525 Log.Debugf("unpacking header failed: %v", err)
529 Log.Debugf("recvMsg (%d) msgID=%d context=%v", len(msg), h.MsgID, h.Context)
534 type msgheader struct {
535 Q int `struc:"uint64"`
536 DataLen uint32 `struc:"uint32"`
537 GcMarkTimestamp uint32 `struc:"uint32"`
540 func (c *vppClient) read() ([]byte, error) {
541 Log.Debug(" reading next msg..")
543 header := make([]byte, 16)
545 n, err := io.ReadAtLeast(c.reader, header, 16)
550 Log.Debugln("zero bytes header")
553 Log.Debugf("invalid header data (%d): % 0X", n, header[:n])
554 return nil, fmt.Errorf("invalid header (expected 16 bytes, got %d)", n)
556 Log.Debugf(" read header %d bytes: % 0X", n, header)
559 if err := struc.Unpack(bytes.NewReader(header[:]), h); err != nil {
562 Log.Debugf(" - decoded header: %+v", h)
564 msgLen := int(h.DataLen)
565 msg := make([]byte, msgLen)
567 n, err = c.reader.Read(msg)
571 Log.Debugf(" - read msg %d bytes (%d buffered) % 0X", n, c.reader.Buffered(), msg[:n])
575 Log.Debugf("continue read for another %d bytes", remain)
579 nbytes, err := c.reader.Read(view)
582 } else if nbytes == 0 {
583 return nil, fmt.Errorf("zero nbytes")
587 Log.Debugf("another data received: %d bytes (remain: %d)", nbytes, remain)
593 Log.Debugf(" -- read done (buffered: %d)", c.reader.Buffered())
598 func isClosedError(err error) bool {
602 return strings.HasSuffix(err.Error(), "use of closed network connection")