1 // Copyright (c) 2019 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
30 "github.com/fsnotify/fsnotify"
31 "github.com/sirupsen/logrus"
33 "git.fd.io/govpp.git/adapter"
34 "git.fd.io/govpp.git/binapi/memclnt"
35 "git.fd.io/govpp.git/codec"
39 // DefaultSocketName is default VPP API socket file path.
40 DefaultSocketName = "/run/vpp/api.sock"
41 // DefaultClientName is used for identifying client in socket registration
42 DefaultClientName = "govppsock"
47 // DefaultConnectTimeout is default timeout for connecting
48 DefaultConnectTimeout = time.Second * 3
49 // DefaultDisconnectTimeout is default timeout for discconnecting
50 DefaultDisconnectTimeout = time.Millisecond * 100
51 // MaxWaitReady defines maximum duration of waiting for socket file
52 MaxWaitReady = time.Second * 10
56 debug = strings.Contains(os.Getenv("DEBUG_GOVPP"), "socketclient")
57 debugMsgIds = strings.Contains(os.Getenv("DEBUG_GOVPP"), "msgtable")
59 log logrus.FieldLogger
62 // SetLogger sets global logger.
63 func SetLogger(logger logrus.FieldLogger) {
68 logger := logrus.New()
70 logger.Level = logrus.DebugLevel
71 logger.Debug("govpp: debug level enabled for socketclient")
73 log = logger.WithField("logger", "govpp/socketclient")
84 connectTimeout time.Duration
85 disconnectTimeout time.Duration
87 msgCallback adapter.MsgCallback
89 msgTable map[string]uint16
99 // NewVppClient returns a new Client using socket.
100 // If socket is empty string DefaultSocketName is used.
101 func NewVppClient(socket string) *Client {
103 socket = DefaultSocketName
107 clientName: DefaultClientName,
108 connectTimeout: DefaultConnectTimeout,
109 disconnectTimeout: DefaultDisconnectTimeout,
110 headerPool: &sync.Pool{New: func() interface{} {
111 return make([]byte, 16)
113 msgCallback: func(msgID uint16, data []byte) {
114 log.Debugf("no callback set, dropping message: ID=%v len=%d", msgID, len(data))
119 // SetClientName sets a client name used for identification.
120 func (c *Client) SetClientName(name string) {
124 // SetConnectTimeout sets timeout used during connecting.
125 func (c *Client) SetConnectTimeout(t time.Duration) {
129 // SetDisconnectTimeout sets timeout used during disconnecting.
130 func (c *Client) SetDisconnectTimeout(t time.Duration) {
131 c.disconnectTimeout = t
134 func (c *Client) SetMsgCallback(cb adapter.MsgCallback) {
135 log.Debug("SetMsgCallback")
139 // WaitReady checks socket file existence and waits for it if necessary
140 func (c *Client) WaitReady() error {
141 // check if socket already exists
142 if _, err := os.Stat(c.socketPath); err == nil {
143 return nil // socket exists, we are ready
144 } else if !os.IsNotExist(err) {
145 return err // some other error occurred
148 // socket does not exist, watch for it
149 watcher, err := fsnotify.NewWatcher()
154 if err := watcher.Close(); err != nil {
155 log.Debugf("failed to close file watcher: %v", err)
159 // start directory watcher
160 if err := watcher.Add(filepath.Dir(c.socketPath)); err != nil {
164 timeout := time.NewTimer(MaxWaitReady)
168 return fmt.Errorf("timeout waiting (%s) for socket file: %s", MaxWaitReady, c.socketPath)
170 case e := <-watcher.Errors:
173 case ev := <-watcher.Events:
174 log.Debugf("watcher event: %+v", ev)
175 if ev.Name == c.socketPath && (ev.Op&fsnotify.Create) == fsnotify.Create {
176 // socket created, we are ready
183 func (c *Client) Connect() error {
184 // check if socket exists
185 if _, err := os.Stat(c.socketPath); os.IsNotExist(err) {
186 return fmt.Errorf("VPP API socket file %s does not exist", c.socketPath)
187 } else if err != nil {
188 return fmt.Errorf("VPP API socket error: %v", err)
191 if err := c.connect(c.socketPath); err != nil {
195 if err := c.open(); err != nil {
200 c.quit = make(chan struct{})
207 func (c *Client) Disconnect() error {
211 log.Debugf("Disconnecting..")
215 if err := c.conn.CloseRead(); err != nil {
216 log.Debugf("closing readMsg failed: %v", err)
219 // wait for readerLoop to return
222 // Don't bother sending a vl_api_sockclnt_delete_t message,
223 // just close the socket.
224 if err := c.disconnect(); err != nil {
231 const defaultBufferSize = 4096
233 func (c *Client) connect(sockAddr string) error {
234 addr := &net.UnixAddr{Name: sockAddr, Net: "unix"}
236 log.Debugf("Connecting to: %v", c.socketPath)
238 conn, err := net.DialUnix("unix", nil, addr)
240 // we try different type of socket for backwards compatbility with VPP<=19.04
241 if strings.Contains(err.Error(), "wrong type for socket") {
242 addr.Net = "unixpacket"
243 log.Debugf("%s, retrying connect with type unixpacket", err)
244 conn, err = net.DialUnix("unixpacket", nil, addr)
247 log.Debugf("Connecting to socket %s failed: %s", addr, err)
253 log.Debugf("Connected to socket (local addr: %v)", c.conn.LocalAddr().(*net.UnixAddr))
255 c.reader = bufio.NewReaderSize(c.conn, defaultBufferSize)
256 c.writer = bufio.NewWriterSize(c.conn, defaultBufferSize)
261 func (c *Client) disconnect() error {
262 log.Debugf("Closing socket")
263 if err := c.conn.Close(); err != nil {
264 log.Debugln("Closing socket failed:", err)
271 sockCreateMsgId = 15 // hard-coded sockclnt_create message ID
272 createMsgContext = byte(123)
273 deleteMsgContext = byte(124)
276 func (c *Client) open() error {
277 var msgCodec = codec.DefaultCodec
279 // Request socket client create
280 req := &memclnt.SockclntCreate{
283 msg, err := msgCodec.EncodeMsg(req, sockCreateMsgId)
285 log.Debugln("Encode error:", err)
289 msg[5] = createMsgContext
291 if err := c.writeMsg(msg); err != nil {
292 log.Debugln("Write error: ", err)
295 msgReply, err := c.readMsgTimeout(nil, c.connectTimeout)
297 log.Println("Read error:", err)
301 reply := new(memclnt.SockclntCreateReply)
302 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
303 log.Println("Decoding sockclnt_create_reply failed:", err)
305 } else if reply.Response != 0 {
306 return fmt.Errorf("sockclnt_create_reply: response error (%d)", reply.Response)
309 log.Debugf("SockclntCreateReply: Response=%v Index=%v Count=%v",
310 reply.Response, reply.Index, reply.Count)
312 c.clientIndex = reply.Index
313 c.msgTable = make(map[string]uint16, reply.Count)
314 for _, x := range reply.MessageTable {
315 msgName := strings.Split(x.Name, "\x00")[0]
316 name := strings.TrimSuffix(msgName, "\x13")
317 c.msgTable[name] = x.Index
318 if strings.HasPrefix(name, "sockclnt_delete_") {
319 c.sockDelMsgId = x.Index
322 log.Debugf(" - %4d: %q", x.Index, name)
329 func (c *Client) GetMsgID(msgName string, msgCrc string) (uint16, error) {
330 if msgID, ok := c.msgTable[msgName+"_"+msgCrc]; ok {
333 return 0, &adapter.UnknownMsgError{
339 func (c *Client) SendMsg(context uint32, data []byte) error {
341 return fmt.Errorf("invalid message data, length must be at least 10 bytes")
343 setMsgRequestHeader(data, c.clientIndex, context)
346 log.Debugf("sendMsg (%d) context=%v client=%d: % 02X", len(data), context, c.clientIndex, data)
349 if err := c.writeMsg(data); err != nil {
350 log.Debugln("writeMsg error: ", err)
357 // setMsgRequestHeader sets client index and context in the message request header
359 // Message request has following structure:
361 // type msgRequestHeader struct {
363 // ClientIndex uint32
367 func setMsgRequestHeader(data []byte, clientIndex, context uint32) {
368 // message ID is already set
369 binary.BigEndian.PutUint32(data[2:6], clientIndex)
370 binary.BigEndian.PutUint32(data[6:10], context)
373 func (c *Client) writeMsg(msg []byte) error {
374 // we lock to prevent mixing multiple message writes
376 defer c.writeMu.Unlock()
378 header := c.headerPool.Get().([]byte)
379 err := writeMsgHeader(c.writer, header, len(msg))
383 c.headerPool.Put(header)
385 if err := writeMsgData(c.writer, msg, c.writer.Size()); err != nil {
389 if err := c.writer.Flush(); err != nil {
393 log.Debugf(" -- writeMsg done")
398 func writeMsgHeader(w io.Writer, header []byte, dataLen int) error {
399 binary.BigEndian.PutUint32(header[8:12], uint32(dataLen))
401 n, err := w.Write(header)
406 log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header)
412 func writeMsgData(w io.Writer, msg []byte, writerSize int) error {
413 for i := 0; i <= len(msg)/writerSize; i++ {
414 x := i*writerSize + writerSize
419 log.Debugf(" - x=%v i=%v len=%v mod=%v", x, i, len(msg), len(msg)/writerSize)
421 n, err := w.Write(msg[i*writerSize : x])
426 log.Debugf(" - data sent x=%d (%d/%d): % 0X", x, n, len(msg), msg)
432 func (c *Client) readerLoop() {
434 defer log.Debugf("reader loop done")
445 msg, err := c.readMsg(buf[:])
447 if isClosedError(err) {
450 log.Debugf("readMsg error: %v", err)
454 msgID, context := getMsgReplyHeader(msg)
456 log.Debugf("recvMsg (%d) msgID=%d context=%v", len(msg), msgID, context)
459 c.msgCallback(msgID, msg)
463 // getMsgReplyHeader gets message ID and context from the message reply header
465 // Message reply has following structure:
467 // type msgReplyHeader struct {
472 func getMsgReplyHeader(msg []byte) (msgID uint16, context uint32) {
473 msgID = binary.BigEndian.Uint16(msg[0:2])
474 context = binary.BigEndian.Uint32(msg[2:6])
478 func (c *Client) readMsgTimeout(buf []byte, timeout time.Duration) ([]byte, error) {
480 readDeadline := time.Now().Add(timeout)
481 if err := c.conn.SetReadDeadline(readDeadline); err != nil {
486 msgReply, err := c.readMsg(buf)
491 // reset read deadline
492 if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
499 func (c *Client) readMsg(buf []byte) ([]byte, error) {
500 log.Debug("reading msg..")
502 header := c.headerPool.Get().([]byte)
503 msgLen, err := readMsgHeader(c.reader, header)
507 c.headerPool.Put(header)
509 msg, err := readMsgData(c.reader, buf, msgLen)
511 log.Debugf(" -- readMsg done (buffered: %d)", c.reader.Buffered())
516 func readMsgHeader(r io.Reader, header []byte) (int, error) {
517 n, err := io.ReadAtLeast(r, header, 16)
522 log.Debugln("zero bytes header")
525 log.Debugf("invalid header (%d bytes): % 0X", n, header[:n])
526 return 0, fmt.Errorf("invalid header (expected 16 bytes, got %d)", n)
529 dataLen := binary.BigEndian.Uint32(header[8:12])
531 return int(dataLen), nil
534 func readMsgData(r io.Reader, buf []byte, dataLen int) ([]byte, error) {
536 if buf == nil || len(buf) < dataLen {
537 msg = make([]byte, dataLen)
542 n, err := r.Read(msg)
547 log.Debugf(" - read data (%d bytes): % 0X", n, msg[:n])
551 remain := dataLen - n
552 log.Debugf("continue reading remaining %d bytes", remain)
556 nbytes, err := r.Read(view)
559 } else if nbytes == 0 {
560 return nil, fmt.Errorf("zero nbytes")
564 log.Debugf("another data received: %d bytes (remain: %d)", nbytes, remain)
573 func isClosedError(err error) bool {
574 if errors.Is(err, io.EOF) {
577 return strings.HasSuffix(err.Error(), "use of closed network connection")