7 "github.com/fsnotify/fsnotify"
16 "github.com/lunixbochs/struc"
17 logger "github.com/sirupsen/logrus"
19 "git.fd.io/govpp.git/adapter"
20 "git.fd.io/govpp.git/codec"
21 "git.fd.io/govpp.git/examples/bin_api/memclnt"
25 // DefaultSocketName is default VPP API socket file name
26 DefaultSocketName = "/run/vpp-api.sock"
28 sockCreateMsgId = 15 // hard-coded id for sockclnt_create message
29 govppClientName = "govppsock" // client name used for socket registration
33 ConnectTimeout = time.Second * 3
34 DisconnectTimeout = time.Second
36 Debug = os.Getenv("DEBUG_GOVPP_SOCK") != ""
37 DebugMsgIds = os.Getenv("DEBUG_GOVPP_SOCKMSG") != ""
39 Log = logger.New() // global logger
42 // init initializes global logger, which logs debug level messages to stdout.
46 Log.Level = logger.DebugLevel
50 type vppClient struct {
54 cb adapter.MsgCallback
56 msgTable map[string]uint16
63 func NewVppClient(sockAddr string) *vppClient {
65 sockAddr = DefaultSocketName
73 func nilCallback(msgID uint16, data []byte) {
74 Log.Warnf("no callback set, dropping message: ID=%v len=%d", msgID, len(data))
77 // WaitReady checks socket file existence and waits for it if necessary
78 func (c *vppClient) WaitReady() error {
79 // verify file existence
80 if _, err := os.Stat(c.sockAddr); err == nil {
82 } else if os.IsExist(err) {
86 // if not, watch for it
87 watcher, err := fsnotify.NewWatcher()
92 if err := watcher.Close(); err != nil {
93 Log.Errorf("failed to close file watcher: %v", err)
96 path := filepath.Dir(c.sockAddr)
97 if err := watcher.Add(path); err != nil {
102 ev := <-watcher.Events
104 if (ev.Op & fsnotify.Create) == fsnotify.Create {
112 func (c *vppClient) SetMsgCallback(cb adapter.MsgCallback) {
113 Log.Debug("SetMsgCallback")
117 func (c *vppClient) Connect() error {
118 Log.Debugf("Connecting to: %v", c.sockAddr)
120 if err := c.connect(c.sockAddr); err != nil {
124 if err := c.open(); err != nil {
128 c.quit = make(chan struct{})
135 func (c *vppClient) connect(sockAddr string) error {
136 addr, err := net.ResolveUnixAddr("unixpacket", sockAddr)
138 Log.Debugln("ResolveUnixAddr error:", err)
142 conn, err := net.DialUnix("unixpacket", nil, addr)
144 Log.Debugln("Dial error:", err)
149 c.reader = bufio.NewReader(c.conn)
151 Log.Debugf("Connected to socket: %v", addr)
156 func (c *vppClient) open() error {
157 msgCodec := new(codec.MsgCodec)
159 req := &memclnt.SockclntCreate{
160 Name: []byte(govppClientName),
162 msg, err := msgCodec.EncodeMsg(req, sockCreateMsgId)
164 Log.Debugln("Encode error:", err)
170 if err := c.write(msg); err != nil {
171 Log.Debugln("Write error: ", err)
175 readDeadline := time.Now().Add(ConnectTimeout)
176 if err := c.conn.SetReadDeadline(readDeadline); err != nil {
179 msgReply, err := c.read()
181 Log.Println("Read error:", err)
184 // reset read deadline
185 if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
189 //log.Printf("Client got (%d): % 0X", len(msgReply), msgReply)
191 reply := new(memclnt.SockclntCreateReply)
192 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
193 Log.Println("Decode error:", err)
197 Log.Debugf("SockclntCreateReply: Response=%v Index=%v Count=%v",
198 reply.Response, reply.Index, reply.Count)
200 c.clientIndex = reply.Index
201 c.msgTable = make(map[string]uint16, reply.Count)
202 for _, x := range reply.MessageTable {
203 name := string(bytes.TrimSuffix(bytes.Split(x.Name, []byte{0x00})[0], []byte{0x13}))
204 c.msgTable[name] = x.Index
205 if strings.HasPrefix(name, "sockclnt_delete_") {
206 c.sockDelMsgId = x.Index
209 Log.Debugf(" - %4d: %q", x.Index, name)
216 func (c *vppClient) Disconnect() error {
220 Log.Debugf("Disconnecting..")
224 // force readerLoop to timeout
225 if err := c.conn.SetReadDeadline(time.Now()); err != nil {
229 // wait for readerLoop to return
232 if err := c.close(); err != nil {
236 if err := c.conn.Close(); err != nil {
237 Log.Debugln("Close socket conn failed:", err)
244 func (c *vppClient) close() error {
245 msgCodec := new(codec.MsgCodec)
247 req := &memclnt.SockclntDelete{
248 Index: c.clientIndex,
250 msg, err := msgCodec.EncodeMsg(req, c.sockDelMsgId)
252 Log.Debugln("Encode error:", err)
258 Log.Debugf("sending socklntDel (%d byes): % 0X\n", len(msg), msg)
259 if err := c.write(msg); err != nil {
260 Log.Debugln("Write error: ", err)
264 readDeadline := time.Now().Add(DisconnectTimeout)
265 if err := c.conn.SetReadDeadline(readDeadline); err != nil {
268 msgReply, err := c.read()
270 Log.Debugln("Read error:", err)
271 if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
272 // we accept timeout for reply
277 // reset read deadline
278 if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
282 reply := new(memclnt.SockclntDeleteReply)
283 if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
284 Log.Debugln("Decode error:", err)
288 Log.Debugf("SockclntDeleteReply: Response=%v", reply.Response)
293 func (c *vppClient) GetMsgID(msgName string, msgCrc string) (uint16, error) {
294 msg := msgName + "_" + msgCrc
295 msgID, ok := c.msgTable[msg]
297 return 0, fmt.Errorf("unknown message: %q", msg)
302 type reqHeader struct {
308 func (c *vppClient) SendMsg(context uint32, data []byte) error {
310 ClientIndex: c.clientIndex,
313 buf := new(bytes.Buffer)
314 if err := struc.Pack(buf, h); err != nil {
317 copy(data[2:], buf.Bytes())
319 Log.Debugf("SendMsg (%d) context=%v client=%d: data: % 02X", len(data), context, c.clientIndex, data)
321 if err := c.write(data); err != nil {
322 Log.Debugln("write error: ", err)
329 func (c *vppClient) write(msg []byte) error {
331 Data_len: uint32(len(msg)),
333 buf := new(bytes.Buffer)
334 if err := struc.Pack(buf, h); err != nil {
337 header := buf.Bytes()
339 // we lock to prevent mixing multiple message sends
341 defer c.writeMu.Unlock()
343 var w io.Writer = c.conn
345 if n, err := w.Write(header); err != nil {
348 Log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header)
350 if n, err := w.Write(msg); err != nil {
353 Log.Debugf(" - msg sent (%d/%d): % 0X", n, len(msg), msg)
359 type msgHeader struct {
364 func (c *vppClient) readerLoop() {
369 Log.Debugf("reader quit")
376 if isClosedError(err) {
379 Log.Debugf("READ FAILED: %v", err)
383 if err := struc.Unpack(bytes.NewReader(msg), h); err != nil {
384 Log.Debugf("unpacking header failed: %v", err)
388 Log.Debugf("recvMsg (%d) msgID=%d context=%v", len(msg), h.MsgID, h.Context)
393 type msgheader struct {
394 Q int `struc:"uint64"`
395 Data_len uint32 `struc:"uint32"`
396 Gc_mark_timestamp uint32 `struc:"uint32"`
400 func (c *vppClient) read() ([]byte, error) {
401 Log.Debug("reading next msg..")
403 header := make([]byte, 16)
405 n, err := io.ReadAtLeast(c.reader, header, 16)
409 Log.Debugln("zero bytes header")
413 Log.Debug("invalid header data (%d): % 0X", n, header[:n])
414 return nil, fmt.Errorf("invalid header (expected 16 bytes, got %d)", n)
416 Log.Debugf(" - read header %d bytes: % 0X", n, header)
419 if err := struc.Unpack(bytes.NewReader(header[:]), h); err != nil {
422 Log.Debugf(" - decoded header: %+v", h)
424 msgLen := int(h.Data_len)
425 msg := make([]byte, msgLen)
427 n, err = c.reader.Read(msg)
431 Log.Debugf(" - read msg %d bytes (%d buffered)", n, c.reader.Buffered())
435 Log.Debugf("continue read for another %d bytes", remain)
440 nbytes, err := c.reader.Read(view)
443 } else if nbytes == 0 {
444 return nil, fmt.Errorf("zero nbytes")
448 Log.Debugf("another data received: %d bytes (remain: %d)", nbytes, remain)
457 func isClosedError(err error) bool {
461 return strings.HasSuffix(err.Error(), "use of closed network connection")