15 "github.com/fsnotify/fsnotify"
17 "github.com/lunixbochs/struc"
18 logger "github.com/sirupsen/logrus"
20 "git.fd.io/govpp.git/adapter"
21 "git.fd.io/govpp.git/codec"
22 "git.fd.io/govpp.git/examples/bin_api/memclnt"
26 // DefaultSocketName is default VPP API socket file name
27 DefaultSocketName = "/run/vpp-api.sock"
29 sockCreateMsgId = 15 // hard-coded id for sockclnt_create message
30 govppClientName = "govppsock" // client name used for socket registration
34 ConnectTimeout = time.Second * 3
35 DisconnectTimeout = time.Second
37 // MaxWaitReady defines maximum duration before waiting for socket file
39 MaxWaitReady = time.Second * 15
41 Debug = os.Getenv("DEBUG_GOVPP_SOCK") != ""
42 DebugMsgIds = os.Getenv("DEBUG_GOVPP_SOCKMSG") != ""
44 Log = logger.New() // global logger
47 // init initializes global logger, which logs debug level messages to stdout.
51 Log.Level = logger.DebugLevel
55 type vppClient struct {
59 cb adapter.MsgCallback
61 msgTable map[string]uint16
68 func NewVppClient(sockAddr string) *vppClient {
70 sockAddr = DefaultSocketName
78 func nilCallback(msgID uint16, data []byte) {
79 Log.Warnf("no callback set, dropping message: ID=%v len=%d", msgID, len(data))
82 // WaitReady checks socket file existence and waits for it if necessary
83 func (c *vppClient) WaitReady() error {
84 // check if file at the path already exists
85 if _, err := os.Stat(c.sockAddr); err == nil {
87 } else if os.IsExist(err) {
91 // if not, watch for it
92 watcher, err := fsnotify.NewWatcher()
97 if err := watcher.Close(); err != nil {
98 Log.Errorf("failed to close file watcher: %v", err)
102 // start watching directory
103 if err := watcher.Add(filepath.Dir(c.sockAddr)); err != nil {
109 case <-time.After(MaxWaitReady):
110 return fmt.Errorf("waiting for socket file timed out (%s)", MaxWaitReady)
111 case e := <-watcher.Errors:
113 case ev := <-watcher.Events:
114 Log.Debugf("watcher event: %+v", ev)
115 if ev.Name == c.sockAddr {
116 if (ev.Op & fsnotify.Create) == fsnotify.Create {
117 // socket was created, we are ready
125 func (c *vppClient) SetMsgCallback(cb adapter.MsgCallback) {
126 Log.Debug("SetMsgCallback")
130 func (c *vppClient) Connect() error {
131 Log.Debugf("Connecting to: %v", c.sockAddr)
133 if err := c.connect(c.sockAddr); err != nil {
137 if err := c.open(); err != nil {
141 c.quit = make(chan struct{})
148 func (c *vppClient) connect(sockAddr string) error {
149 addr, err := net.ResolveUnixAddr("unixpacket", sockAddr)
151 Log.Debugln("ResolveUnixAddr error:", err)
155 conn, err := net.DialUnix("unixpacket", nil, addr)
157 Log.Debugln("Dial error:", err)
162 c.reader = bufio.NewReader(c.conn)
164 Log.Debugf("Connected to socket: %v", addr)
169 func (c *vppClient) open() error {
170 msgCodec := new(codec.MsgCodec)
172 req := &memclnt.SockclntCreate{
173 Name: []byte(govppClientName),
175 msg, err := msgCodec.EncodeMsg(req, sockCreateMsgId)
177 Log.Debugln("Encode error:", err)
183 if err := c.write(msg); err != nil {
184 Log.Debugln("Write error: ", err)
188 readDeadline := time.Now().Add(ConnectTimeout)
189 if err := c.conn.SetReadDeadline(readDeadline); err != nil {
192 msgReply, err := c.read()
194 Log.Println("Read error:", err)
197 // reset read deadline
198 if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
202 //log.Printf("Client got (%d): % 0X", len(msgReply), msgReply)
204 reply := new(memclnt.SockclntCreateReply)
205 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
206 Log.Println("Decode error:", err)
210 Log.Debugf("SockclntCreateReply: Response=%v Index=%v Count=%v",
211 reply.Response, reply.Index, reply.Count)
213 c.clientIndex = reply.Index
214 c.msgTable = make(map[string]uint16, reply.Count)
215 for _, x := range reply.MessageTable {
216 name := string(bytes.TrimSuffix(bytes.Split(x.Name, []byte{0x00})[0], []byte{0x13}))
217 c.msgTable[name] = x.Index
218 if strings.HasPrefix(name, "sockclnt_delete_") {
219 c.sockDelMsgId = x.Index
222 Log.Debugf(" - %4d: %q", x.Index, name)
229 func (c *vppClient) Disconnect() error {
233 Log.Debugf("Disconnecting..")
237 // force readerLoop to timeout
238 if err := c.conn.SetReadDeadline(time.Now()); err != nil {
242 // wait for readerLoop to return
245 if err := c.close(); err != nil {
249 if err := c.conn.Close(); err != nil {
250 Log.Debugln("Close socket conn failed:", err)
257 func (c *vppClient) close() error {
258 msgCodec := new(codec.MsgCodec)
260 req := &memclnt.SockclntDelete{
261 Index: c.clientIndex,
263 msg, err := msgCodec.EncodeMsg(req, c.sockDelMsgId)
265 Log.Debugln("Encode error:", err)
271 Log.Debugf("sending socklntDel (%d byes): % 0X\n", len(msg), msg)
272 if err := c.write(msg); err != nil {
273 Log.Debugln("Write error: ", err)
277 readDeadline := time.Now().Add(DisconnectTimeout)
278 if err := c.conn.SetReadDeadline(readDeadline); err != nil {
281 msgReply, err := c.read()
283 Log.Debugln("Read error:", err)
284 if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
285 // we accept timeout for reply
290 // reset read deadline
291 if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
295 reply := new(memclnt.SockclntDeleteReply)
296 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
297 Log.Debugln("Decode error:", err)
301 Log.Debugf("SockclntDeleteReply: Response=%v", reply.Response)
306 func (c *vppClient) GetMsgID(msgName string, msgCrc string) (uint16, error) {
307 msg := msgName + "_" + msgCrc
308 msgID, ok := c.msgTable[msg]
310 return 0, fmt.Errorf("unknown message: %q", msg)
315 type reqHeader struct {
321 func (c *vppClient) SendMsg(context uint32, data []byte) error {
323 ClientIndex: c.clientIndex,
326 buf := new(bytes.Buffer)
327 if err := struc.Pack(buf, h); err != nil {
330 copy(data[2:], buf.Bytes())
332 Log.Debugf("SendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data)
334 if err := c.write(data); err != nil {
335 Log.Debugln("write error: ", err)
342 func (c *vppClient) write(msg []byte) error {
344 Data_len: uint32(len(msg)),
346 buf := new(bytes.Buffer)
347 if err := struc.Pack(buf, h); err != nil {
350 header := buf.Bytes()
352 // we lock to prevent mixing multiple message sends
354 defer c.writeMu.Unlock()
356 var w io.Writer = c.conn
358 if n, err := w.Write(header); err != nil {
361 Log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header)
363 if n, err := w.Write(msg); err != nil {
366 Log.Debugf(" - msg sent (%d/%d): % 0X", n, len(msg), msg)
372 type msgHeader struct {
377 func (c *vppClient) readerLoop() {
382 Log.Debugf("reader quit")
389 if isClosedError(err) {
392 Log.Debugf("READ FAILED: %v", err)
396 if err := struc.Unpack(bytes.NewReader(msg), h); err != nil {
397 Log.Debugf("unpacking header failed: %v", err)
401 Log.Debugf("recvMsg (%d) msgID=%d context=%v", len(msg), h.MsgID, h.Context)
406 type msgheader struct {
407 Q int `struc:"uint64"`
408 Data_len uint32 `struc:"uint32"`
409 Gc_mark_timestamp uint32 `struc:"uint32"`
413 func (c *vppClient) read() ([]byte, error) {
414 Log.Debug("reading next msg..")
416 header := make([]byte, 16)
418 n, err := io.ReadAtLeast(c.reader, header, 16)
422 Log.Debugln("zero bytes header")
426 Log.Debug("invalid header data (%d): % 0X", n, header[:n])
427 return nil, fmt.Errorf("invalid header (expected 16 bytes, got %d)", n)
429 Log.Debugf(" - read header %d bytes: % 0X", n, header)
432 if err := struc.Unpack(bytes.NewReader(header[:]), h); err != nil {
435 Log.Debugf(" - decoded header: %+v", h)
437 msgLen := int(h.Data_len)
438 msg := make([]byte, msgLen)
440 n, err = c.reader.Read(msg)
444 Log.Debugf(" - read msg %d bytes (%d buffered)", n, c.reader.Buffered())
448 Log.Debugf("continue read for another %d bytes", remain)
453 nbytes, err := c.reader.Read(view)
456 } else if nbytes == 0 {
457 return nil, fmt.Errorf("zero nbytes")
461 Log.Debugf("another data received: %d bytes (remain: %d)", nbytes, remain)
470 func isClosedError(err error) bool {
474 return strings.HasSuffix(err.Error(), "use of closed network connection")