15 "github.com/fsnotify/fsnotify"
16 "github.com/lunixbochs/struc"
17 logger "github.com/sirupsen/logrus"
19 "git.fd.io/govpp.git/adapter"
20 "git.fd.io/govpp.git/codec"
21 "git.fd.io/govpp.git/examples/bin_api/memclnt"
25 // DefaultSocketName is default VPP API socket file name
26 DefaultSocketName = "/run/vpp-api.sock"
30 // DefaultConnectTimeout is default timeout for connecting
31 DefaultConnectTimeout = time.Second * 3
32 // DefaultDisconnectTimeout is default timeout for discconnecting
33 DefaultDisconnectTimeout = time.Second
34 // MaxWaitReady defines maximum duration before waiting for socket file
36 MaxWaitReady = time.Second * 15
37 // ClientName is used for identifying client in socket registration
38 ClientName = "govppsock"
42 // Debug is global variable that determines debug mode
43 Debug = os.Getenv("DEBUG_GOVPP_SOCK") != ""
44 // DebugMsgIds is global variable that determines debug mode for msg ids
45 DebugMsgIds = os.Getenv("DEBUG_GOVPP_SOCKMSG") != ""
47 Log = logger.New() // global logger
50 // init initializes global logger, which logs debug level messages to stdout.
54 Log.Level = logger.DebugLevel
58 type vppClient struct {
64 connectTimeout time.Duration
65 disconnectTimeout time.Duration
67 cb adapter.MsgCallback
69 msgTable map[string]uint16
77 func NewVppClient(sockAddr string) *vppClient {
79 sockAddr = DefaultSocketName
83 connectTimeout: DefaultConnectTimeout,
84 disconnectTimeout: DefaultDisconnectTimeout,
85 cb: func(msgID uint16, data []byte) {
86 Log.Warnf("no callback set, dropping message: ID=%v len=%d", msgID, len(data))
91 // SetConnectTimeout sets timeout used during connecting.
92 func (c *vppClient) SetConnectTimeout(t time.Duration) {
96 // SetDisconnectTimeout sets timeout used during disconnecting.
97 func (c *vppClient) SetDisconnectTimeout(t time.Duration) {
98 c.disconnectTimeout = t
101 // WaitReady checks socket file existence and waits for it if necessary
102 func (c *vppClient) WaitReady() error {
103 // check if file at the path already exists
104 if _, err := os.Stat(c.sockAddr); err == nil {
106 } else if os.IsExist(err) {
110 // if not, watch for it
111 watcher, err := fsnotify.NewWatcher()
116 if err := watcher.Close(); err != nil {
117 Log.Errorf("failed to close file watcher: %v", err)
121 // start watching directory
122 if err := watcher.Add(filepath.Dir(c.sockAddr)); err != nil {
128 case <-time.After(MaxWaitReady):
129 return fmt.Errorf("waiting for socket file timed out (%s)", MaxWaitReady)
130 case e := <-watcher.Errors:
132 case ev := <-watcher.Events:
133 Log.Debugf("watcher event: %+v", ev)
134 if ev.Name == c.sockAddr {
135 if (ev.Op & fsnotify.Create) == fsnotify.Create {
136 // socket was created, we are ready
144 func (c *vppClient) SetMsgCallback(cb adapter.MsgCallback) {
145 Log.Debug("SetMsgCallback")
149 func (c *vppClient) Connect() error {
150 Log.Debugf("Connecting to: %v", c.sockAddr)
152 if err := c.connect(c.sockAddr); err != nil {
156 if err := c.open(); err != nil {
160 c.quit = make(chan struct{})
167 func (c *vppClient) connect(sockAddr string) error {
168 addr := &net.UnixAddr{Name: sockAddr, Net: "unix"}
170 conn, err := net.DialUnix("unix", nil, addr)
172 // we try different type of socket for backwards compatbility with VPP<=19.04
173 if strings.Contains(err.Error(), "wrong type for socket") {
174 addr.Net = "unixpacket"
175 Log.Debugf("%s, retrying connect with type unixpacket", err)
176 conn, err = net.DialUnix("unixpacket", nil, addr)
179 Log.Debugf("Connecting to socket %s failed: %s", addr, err)
185 c.reader = bufio.NewReader(c.conn)
186 c.writer = bufio.NewWriter(c.conn)
188 Log.Debugf("Connected to socket: %v", addr)
194 sockCreateMsgId = 15 // hard-coded sockclnt_create message ID
195 createMsgContext = byte(123)
196 deleteMsgContext = byte(124)
199 func (c *vppClient) open() error {
200 msgCodec := new(codec.MsgCodec)
202 req := &memclnt.SockclntCreate{
203 Name: []byte(ClientName),
205 msg, err := msgCodec.EncodeMsg(req, sockCreateMsgId)
207 Log.Debugln("Encode error:", err)
211 msg[5] = createMsgContext
213 if err := c.write(msg); err != nil {
214 Log.Debugln("Write error: ", err)
218 readDeadline := time.Now().Add(c.connectTimeout)
219 if err := c.conn.SetReadDeadline(readDeadline); err != nil {
222 msgReply, err := c.read()
224 Log.Println("Read error:", err)
227 // reset read deadline
228 if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
232 reply := new(memclnt.SockclntCreateReply)
233 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
234 Log.Println("Decode error:", err)
238 Log.Debugf("SockclntCreateReply: Response=%v Index=%v Count=%v",
239 reply.Response, reply.Index, reply.Count)
241 c.clientIndex = reply.Index
242 c.msgTable = make(map[string]uint16, reply.Count)
243 for _, x := range reply.MessageTable {
244 name := string(bytes.TrimSuffix(bytes.Split(x.Name, []byte{0x00})[0], []byte{0x13}))
245 c.msgTable[name] = x.Index
246 if strings.HasPrefix(name, "sockclnt_delete_") {
247 c.sockDelMsgId = x.Index
250 Log.Debugf(" - %4d: %q", x.Index, name)
257 func (c *vppClient) Disconnect() error {
261 Log.Debugf("Disconnecting..")
265 // force readerLoop to timeout
266 if err := c.conn.SetReadDeadline(time.Now()); err != nil {
270 // wait for readerLoop to return
273 if err := c.close(); err != nil {
277 if err := c.conn.Close(); err != nil {
278 Log.Debugln("Closing socket failed:", err)
285 func (c *vppClient) close() error {
286 msgCodec := new(codec.MsgCodec)
288 req := &memclnt.SockclntDelete{
289 Index: c.clientIndex,
291 msg, err := msgCodec.EncodeMsg(req, c.sockDelMsgId)
293 Log.Debugln("Encode error:", err)
297 msg[5] = deleteMsgContext
299 Log.Debugf("sending socklntDel (%d byes): % 0X", len(msg), msg)
300 if err := c.write(msg); err != nil {
301 Log.Debugln("Write error: ", err)
305 readDeadline := time.Now().Add(c.disconnectTimeout)
306 if err := c.conn.SetReadDeadline(readDeadline); err != nil {
309 msgReply, err := c.read()
311 Log.Debugln("Read error:", err)
312 if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
313 // we accept timeout for reply
318 // reset read deadline
319 if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
323 reply := new(memclnt.SockclntDeleteReply)
324 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
325 Log.Debugln("Decode error:", err)
329 Log.Debugf("SockclntDeleteReply: Response=%v", reply.Response)
334 func (c *vppClient) GetMsgID(msgName string, msgCrc string) (uint16, error) {
335 msg := msgName + "_" + msgCrc
336 msgID, ok := c.msgTable[msg]
338 return 0, fmt.Errorf("unknown message: %q", msg)
343 type reqHeader struct {
349 func (c *vppClient) SendMsg(context uint32, data []byte) error {
351 ClientIndex: c.clientIndex,
354 buf := new(bytes.Buffer)
355 if err := struc.Pack(buf, h); err != nil {
358 copy(data[2:], buf.Bytes())
360 Log.Debugf("SendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data)
362 if err := c.write(data); err != nil {
363 Log.Debugln("write error: ", err)
370 func (c *vppClient) write(msg []byte) error {
372 DataLen: uint32(len(msg)),
374 buf := new(bytes.Buffer)
375 if err := struc.Pack(buf, h); err != nil {
378 header := buf.Bytes()
380 // we lock to prevent mixing multiple message sends
382 defer c.writeMu.Unlock()
384 if n, err := c.writer.Write(header); err != nil {
387 Log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header)
390 if err := c.writer.Flush(); err != nil {
394 for i := 0; i <= len(msg)/c.writer.Size(); i++ {
395 x := i*c.writer.Size() + c.writer.Size()
399 Log.Debugf("x=%v i=%v len=%v mod=%v", x, i, len(msg), len(msg)/c.writer.Size())
400 if n, err := c.writer.Write(msg[i*c.writer.Size() : x]); err != nil {
403 Log.Debugf(" - msg sent x=%d (%d/%d): % 0X", x, n, len(msg), msg)
405 if err := c.writer.Flush(); err != nil {
414 type msgHeader struct {
419 func (c *vppClient) readerLoop() {
421 defer Log.Debugf("reader quit")
431 if isClosedError(err) {
434 Log.Debugf("read failed: %v", err)
438 if err := struc.Unpack(bytes.NewReader(msg), h); err != nil {
439 Log.Debugf("unpacking header failed: %v", err)
443 Log.Debugf("recvMsg (%d) msgID=%d context=%v", len(msg), h.MsgID, h.Context)
448 type msgheader struct {
449 Q int `struc:"uint64"`
450 DataLen uint32 `struc:"uint32"`
451 GcMarkTimestamp uint32 `struc:"uint32"`
454 func (c *vppClient) read() ([]byte, error) {
455 Log.Debug("reading next msg..")
457 header := make([]byte, 16)
459 n, err := io.ReadAtLeast(c.reader, header, 16)
463 Log.Debugln("zero bytes header")
467 Log.Debugf("invalid header data (%d): % 0X", n, header[:n])
468 return nil, fmt.Errorf("invalid header (expected 16 bytes, got %d)", n)
470 Log.Debugf(" - read header %d bytes: % 0X", n, header)
473 if err := struc.Unpack(bytes.NewReader(header[:]), h); err != nil {
476 Log.Debugf(" - decoded header: %+v", h)
478 msgLen := int(h.DataLen)
479 msg := make([]byte, msgLen)
481 n, err = c.reader.Read(msg)
485 Log.Debugf(" - read msg %d bytes (%d buffered)", n, c.reader.Buffered())
489 Log.Debugf("continue read for another %d bytes", remain)
493 nbytes, err := c.reader.Read(view)
496 } else if nbytes == 0 {
497 return nil, fmt.Errorf("zero nbytes")
501 Log.Debugf("another data received: %d bytes (remain: %d)", nbytes, remain)
510 func isClosedError(err error) bool {
514 return strings.HasSuffix(err.Error(), "use of closed network connection")