tests: refactor. Replace literal constant w/ named constant.
[vpp.git] / test / test_punt.py
1 #!/usr/bin/env python
2 import binascii
3 import random
4 import socket
5 import os
6 import threading
7 import struct
8 from struct import unpack, unpack_from
9
10 try:
11     import unittest2 as unittest
12 except ImportError:
13     import unittest
14
15 from util import ppp, ppc
16 from re import compile
17 import scapy.compat
18 from scapy.packet import Raw
19 from scapy.layers.l2 import Ether
20 from scapy.layers.inet import IP, UDP, ICMP
21 import scapy.layers.inet6 as inet6
22 from scapy.layers.inet6 import IPv6, ICMPv6DestUnreach
23 import six
24 from framework import VppTestCase, VppTestRunner
25
26 from vpp_ip import DpoProto
27 from vpp_ip_route import VppIpRoute, VppRoutePath
28
29 NUM_PKTS = 67
30
31
32 # Format MAC Address
33 def get_mac_addr(bytes_addr):
34     return ':'.join('%02x' % scapy.compat.orb(b) for b in bytes_addr)
35
36
37 # Format IP Address
38 def ipv4(bytes_addr):
39     return '.'.join('%d' % scapy.compat.orb(b) for b in bytes_addr)
40
41
42 # Unpack Ethernet Frame
43 def ethernet_frame(data):
44     dest_mac, src_mac, proto = struct.unpack('! 6s 6s H', data[:14])
45     return dest_mac, src_mac, socket.htons(proto), data[14:]
46
47
48 # Unpack IPv4 Packets
49 def ipv4_packet(data):
50     proto, src, target = struct.unpack('! 8x 1x B 2x 4s 4s', data[:20])
51     return proto, src, target, data[20:]
52
53
54 # Unpack IPv6 Packets
55 def ipv6_packet(data):
56     nh, src, target = struct.unpack('! 6x B 1x 16s 16s', data[:40])
57     return nh, src, target, data[40:]
58
59
60 # Unpacks any UDP Packet
61 def udp_seg(data):
62     src_port, dest_port, size = struct.unpack('! H H 2x H', data[:8])
63     return src_port, dest_port, size, data[8:]
64
65
66 # Unpacks any TCP Packet
67 def tcp_seg(data):
68     src_port, dest_port, seq, flag = struct.unpack('! H H L 4x H', data[:14])
69     return src_port, dest_port, seq, data[((flag >> 12) * 4):]
70
71
72 def receivePackets(sock, counters):
73     # Wait for some packets on socket
74     while True:
75         data = sock.recv(65536)
76
77         # punt socket metadata
78         # packet_desc = data[0:8]
79
80         # Ethernet
81         _, _, eth_proto, data = ethernet_frame(data[8:])
82         # Ipv4
83         if eth_proto == 8:
84             proto, _, _, data = ipv4_packet(data)
85             # TCP
86             if proto == 6:
87                 _, dst_port, _, data = udp_seg(data)
88             # UDP
89             elif proto == 17:
90                 _, dst_port, _, data = udp_seg(data)
91                 counters[dst_port] = 0
92         # Ipv6
93         elif eth_proto == 0xdd86:
94             nh, _, _, data = ipv6_packet(data)
95             # TCP
96             if nh == 6:
97                 _, dst_port, _, data = udp_seg(data)
98             # UDP
99             elif nh == 17:
100                 _, dst_port, _, data = udp_seg(data)
101                 counters[dst_port] = 0
102
103
104 class serverSocketThread(threading.Thread):
105     """ Socket server thread"""
106
107     def __init__(self, threadID, sockName, counters):
108         threading.Thread.__init__(self)
109         self.threadID = threadID
110         self.sockName = sockName
111         self.sock = None
112         self.counters = counters
113
114     def run(self):
115         self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
116         try:
117             os.unlink(self.sockName)
118         except:
119             pass
120         self.sock.bind(self.sockName)
121
122         receivePackets(self.sock, self.counters)
123
124
125 class TestPuntSocket(VppTestCase):
126     """ Punt Socket """
127
128     ports = [1111, 2222, 3333, 4444]
129     sock_servers = list()
130     portsCheck = dict()
131     nr_packets = 256
132
133     @classmethod
134     def setUpClass(cls):
135         super(TestPuntSocket, cls).setUpClass()
136
137     @classmethod
138     def tearDownClass(cls):
139         super(TestPuntSocket, cls).tearDownClass()
140
141     @classmethod
142     def setUpConstants(cls):
143         cls.extra_vpp_punt_config = [
144             "punt", "{", "socket", cls.tempdir+"/socket_punt", "}"]
145         super(TestPuntSocket, cls).setUpConstants()
146
147     def setUp(self):
148         super(TestPuntSocket, self).setUp()
149         random.seed()
150
151         self.create_pg_interfaces(range(2))
152         for i in self.pg_interfaces:
153             i.admin_up()
154
155     def tearDown(self):
156         del self.sock_servers[:]
157         super(TestPuntSocket, self).tearDown()
158
159     def socket_client_create(self, sock_name, id=None):
160         thread = serverSocketThread(id, sock_name, self.portsCheck)
161         self.sock_servers.append(thread)
162         thread.start()
163
164     def socket_client_close(self):
165         for thread in self.sock_servers:
166             thread.sock.close()
167
168
169 class TestIP4PuntSocket(TestPuntSocket):
170     """ Punt Socket for IPv4 """
171
172     @classmethod
173     def setUpClass(cls):
174         super(TestIP4PuntSocket, cls).setUpClass()
175
176     @classmethod
177     def tearDownClass(cls):
178         super(TestIP4PuntSocket, cls).tearDownClass()
179
180     def setUp(self):
181         super(TestIP4PuntSocket, self).setUp()
182
183         for i in self.pg_interfaces:
184             i.config_ip4()
185             i.resolve_arp()
186
187     def tearDown(self):
188         super(TestIP4PuntSocket, self).tearDown()
189         for i in self.pg_interfaces:
190             i.unconfig_ip4()
191             i.admin_down()
192
193     def test_punt_socket_dump(self):
194         """ Punt socket registration/deregistration"""
195
196         punts = self.vapi.punt_socket_dump(is_ip6=0)
197         self.assertEqual(len(punts), 0)
198
199         #
200         # configure a punt socket
201         #
202         self.vapi.punt_socket_register(1111, b"%s/socket_punt_1111" %
203                                        six.ensure_binary(self.tempdir))
204         self.vapi.punt_socket_register(2222, b"%s/socket_punt_2222" %
205                                        six.ensure_binary(self.tempdir))
206         punts = self.vapi.punt_socket_dump(is_ip6=0)
207         self.assertEqual(len(punts), 2)
208         self.assertEqual(punts[0].punt.l4_port, 1111)
209         self.assertEqual(punts[1].punt.l4_port, 2222)
210
211         #
212         # deregister a punt socket
213         #
214         self.vapi.punt_socket_deregister(1111)
215         punts = self.vapi.punt_socket_dump(is_ip6=0)
216         self.assertEqual(len(punts), 1)
217
218         #
219         # configure a punt socket again
220         #
221         self.vapi.punt_socket_register(1111, b"%s/socket_punt_1111" %
222                                        six.ensure_binary(self.tempdir))
223         self.vapi.punt_socket_register(3333, b"%s/socket_punt_3333" %
224                                        six.ensure_binary(self.tempdir))
225         punts = self.vapi.punt_socket_dump(is_ip6=0)
226         self.assertEqual(len(punts), 3)
227
228         #
229         # deregister all punt socket
230         #
231         self.vapi.punt_socket_deregister(1111)
232         self.vapi.punt_socket_deregister(2222)
233         self.vapi.punt_socket_deregister(3333)
234         punts = self.vapi.punt_socket_dump(is_ip6=0)
235         self.assertEqual(len(punts), 0)
236
237     def test_punt_socket_traffic_single_port_single_socket(self):
238         """ Punt socket traffic single port single socket"""
239
240         port = self.ports[0]
241
242         p = (Ether(src=self.pg0.remote_mac,
243                    dst=self.pg0.local_mac) /
244              IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
245              UDP(sport=9876, dport=port) /
246              Raw('\xa5' * 100))
247
248         pkts = p * self.nr_packets
249         self.portsCheck[port] = self.nr_packets
250
251         punts = self.vapi.punt_socket_dump(is_ip6=0)
252         self.assertEqual(len(punts), 0)
253
254         #
255         # expect ICMP - port unreachable for all packets
256         #
257         self.vapi.cli("clear trace")
258         self.pg0.add_stream(pkts)
259         self.pg_enable_capture(self.pg_interfaces)
260         self.pg_start()
261         # FIXME - when punt socket deregister is implemented
262         # rx = self.pg0.get_capture(self.nr_packets)
263         # for p in rx:
264         #     self.assertEqual(int(p[IP].proto), 1)   # ICMP
265         #     self.assertEqual(int(p[ICMP].code), 3)  # unreachable
266
267         #
268         # configure a punt socket
269         #
270         self.socket_client_create(b"%s/socket_%d" % (
271             six.ensure_binary(self.tempdir), port))
272         self.vapi.punt_socket_register(port, b"%s/socket_%d" % (
273             six.ensure_binary(self.tempdir), port))
274         punts = self.vapi.punt_socket_dump(is_ip6=0)
275         self.assertEqual(len(punts), 1)
276
277         self.logger.debug("Sending %s packets to port %d",
278                           str(self.portsCheck[port]), port)
279         #
280         # expect punt socket and no packets on pg0
281         #
282         self.vapi.cli("clear errors")
283         self.vapi.cli("clear trace")
284         self.pg0.add_stream(pkts)
285         self.pg_enable_capture(self.pg_interfaces)
286         self.pg_start()
287         self.pg0.get_capture(0)
288         self.logger.info(self.vapi.cli("show trace"))
289         self.socket_client_close()
290         self.assertEqual(self.portsCheck[port], 0)
291
292         #
293         # remove punt socket. expect ICMP - port unreachable for all packets
294         #
295         self.vapi.punt_socket_deregister(port)
296         punts = self.vapi.punt_socket_dump(is_ip6=0)
297         self.assertEqual(len(punts), 0)
298         self.pg0.add_stream(pkts)
299         self.pg_enable_capture(self.pg_interfaces)
300         self.pg_start()
301         # FIXME - when punt socket deregister is implemented
302         # self.pg0.get_capture(nr_packets)
303
304     def test_punt_socket_traffic_multi_port_multi_sockets(self):
305         """ Punt socket traffic multi ports and multi sockets"""
306
307         for p in self.ports:
308             self.portsCheck[p] = 0
309
310         #
311         # create stream with random packets count per given ports
312         #
313         pkts = list()
314         for _ in range(0, self.nr_packets):
315             # choose port from port list
316             p = random.choice(self.ports)
317             pkts.append((
318                 Ether(src=self.pg0.remote_mac,
319                       dst=self.pg0.local_mac) /
320                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
321                 UDP(sport=9876, dport=p) /
322                 Raw('\xa5' * 100)))
323             self.portsCheck[p] += 1
324         #
325         # no punt socket
326         #
327         punts = self.vapi.punt_socket_dump(is_ip6=0)
328         self.assertEqual(len(punts), 0)
329
330         #
331         # configure a punt socket
332         #
333         for p in self.ports:
334             self.socket_client_create(b"%s/socket_%d" % (
335                 six.ensure_binary(self.tempdir), p))
336             self.vapi.punt_socket_register(p, b"%s/socket_%d" % (
337                 six.ensure_binary(self.tempdir),  p))
338         punts = self.vapi.punt_socket_dump(is_ip6=0)
339         self.assertEqual(len(punts), len(self.ports))
340
341         for p in self.ports:
342             self.logger.debug("Sending %s packets to port %d",
343                               str(self.portsCheck[p]), p)
344
345         #
346         # expect punt socket and no packets on pg0
347         #
348         self.vapi.cli("clear errors")
349         self.vapi.cli("clear trace")
350         self.pg0.add_stream(pkts)
351         self.pg_enable_capture(self.pg_interfaces)
352         self.pg_start()
353         self.pg0.get_capture(0)
354         self.logger.info(self.vapi.cli("show trace"))
355         self.socket_client_close()
356
357         for p in self.ports:
358             self.assertEqual(self.portsCheck[p], 0)
359             self.vapi.punt_socket_deregister(p)
360         punts = self.vapi.punt_socket_dump(is_ip6=0)
361         self.assertEqual(len(punts), 0)
362
363     def test_punt_socket_traffic_multi_ports_single_socket(self):
364         """ Punt socket traffic multi ports and single socket"""
365
366         for p in self.ports:
367             self.portsCheck[p] = 0
368
369         #
370         # create stream with random packets count per given ports
371         #
372         pkts = list()
373         for _ in range(0, self.nr_packets):
374             # choose port from port list
375             p = random.choice(self.ports)
376             pkts.append((
377                 Ether(src=self.pg0.remote_mac,
378                       dst=self.pg0.local_mac) /
379                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
380                 UDP(sport=9876, dport=p) /
381                 Raw('\xa5' * 100)))
382             self.portsCheck[p] += 1
383
384         #
385         # no punt socket
386         #
387         punts = self.vapi.punt_socket_dump(is_ip6=0)
388         self.assertEqual(len(punts), 0)
389
390         # configure a punt socket
391         #
392         self.socket_client_create(b"%s/socket_multi" %
393                                   six.ensure_binary(self.tempdir))
394         for p in self.ports:
395             self.vapi.punt_socket_register(p,
396                                            b"%s/socket_multi" %
397                                            six.ensure_binary(self.tempdir))
398         punts = self.vapi.punt_socket_dump(is_ip6=0)
399         self.assertEqual(len(punts), len(self.ports))
400
401         for p in self.ports:
402             self.logger.debug("Sending %s packets to port %d",
403                               str(self.portsCheck[p]), p)
404         #
405         # expect punt socket and no packets on pg0
406         #
407         self.vapi.cli("clear errors")
408         self.vapi.cli("clear trace")
409         self.pg0.add_stream(pkts)
410         self.pg_enable_capture(self.pg_interfaces)
411         self.pg_start()
412         self.pg0.get_capture(0)
413         self.logger.info(self.vapi.cli("show trace"))
414         self.socket_client_close()
415
416         for p in self.ports:
417             self.assertEqual(self.portsCheck[p], 0)
418             self.vapi.punt_socket_deregister(p)
419         punts = self.vapi.punt_socket_dump(is_ip6=0)
420         self.assertEqual(len(punts), 0)
421
422
423 class TestIP6PuntSocket(TestPuntSocket):
424     """ Punt Socket for IPv6"""
425
426     @classmethod
427     def setUpClass(cls):
428         super(TestIP6PuntSocket, cls).setUpClass()
429
430     @classmethod
431     def tearDownClass(cls):
432         super(TestIP6PuntSocket, cls).tearDownClass()
433
434     def setUp(self):
435         super(TestIP6PuntSocket, self).setUp()
436
437         for i in self.pg_interfaces:
438             i.config_ip6()
439             i.resolve_ndp()
440
441     def tearDown(self):
442         super(TestIP6PuntSocket, self).tearDown()
443         for i in self.pg_interfaces:
444             i.unconfig_ip6()
445             i.admin_down()
446
447     def test_punt_socket_dump(self):
448         """ Punt socket registration """
449
450         punts = self.vapi.punt_socket_dump(is_ip6=1)
451         self.assertEqual(len(punts), 0)
452
453         #
454         # configure a punt socket
455         #
456         self.vapi.punt_socket_register(1111, b"%s/socket_1111" %
457                                        six.ensure_binary(self.tempdir),
458                                        is_ip4=0)
459         self.vapi.punt_socket_register(2222, b"%s/socket_2222" %
460                                        six.ensure_binary(self.tempdir),
461                                        is_ip4=0)
462         punts = self.vapi.punt_socket_dump(is_ip6=1)
463         self.assertEqual(len(punts), 2)
464         self.assertEqual(punts[0].punt.l4_port, 1111)
465         self.assertEqual(punts[1].punt.l4_port, 2222)
466
467         #
468         # deregister a punt socket
469         #
470         self.vapi.punt_socket_deregister(1111, is_ip4=0)
471         punts = self.vapi.punt_socket_dump(is_ip6=1)
472         self.assertEqual(len(punts), 1)
473
474         #
475         # configure a punt socket again
476         #
477         self.vapi.punt_socket_register(1111, b"%s/socket_1111" %
478                                        six.ensure_binary(self.tempdir),
479                                        is_ip4=0)
480         punts = self.vapi.punt_socket_dump(is_ip6=1)
481         self.assertEqual(len(punts), 2)
482
483         #
484         # deregister all punt socket
485         #
486         self.vapi.punt_socket_deregister(1111, is_ip4=0)
487         self.vapi.punt_socket_deregister(2222, is_ip4=0)
488         self.vapi.punt_socket_deregister(3333, is_ip4=0)
489         punts = self.vapi.punt_socket_dump(is_ip6=1)
490         self.assertEqual(len(punts), 0)
491
492     def test_punt_socket_traffic_single_port_single_socket(self):
493         """ Punt socket traffic single port single socket"""
494
495         port = self.ports[0]
496
497         p = (Ether(src=self.pg0.remote_mac,
498                    dst=self.pg0.local_mac) /
499              IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
500              inet6.UDP(sport=9876, dport=port) /
501              Raw('\xa5' * 100))
502
503         pkts = p * self.nr_packets
504         self.portsCheck[port] = self.nr_packets
505
506         punts = self.vapi.punt_socket_dump(is_ip6=1)
507         self.assertEqual(len(punts), 0)
508
509         #
510         # expect ICMPv6 - destination unreachable for all packets
511         #
512         self.vapi.cli("clear trace")
513         self.pg0.add_stream(pkts)
514         self.pg_enable_capture(self.pg_interfaces)
515         self.pg_start()
516         # FIXME - when punt socket deregister is implemented
517         # rx = self.pg0.get_capture(self.nr_packets)
518         # for p in rx:
519         #     self.assertEqual(int(p[IPv6].nh), 58)                # ICMPv6
520         #     self.assertEqual(int(p[ICMPv6DestUnreach].code),4)  # unreachable
521
522         #
523         # configure a punt socket
524         #
525         self.socket_client_create(b"%s/socket_%d" % (
526             six.ensure_binary(self.tempdir), port))
527         self.vapi.punt_socket_register(port, b"%s/socket_%d" % (
528             six.ensure_binary(self.tempdir), port), is_ip4=0)
529         punts = self.vapi.punt_socket_dump(is_ip6=1)
530         self.assertEqual(len(punts), 1)
531
532         self.logger.debug("Sending %s packets to port %d",
533                           str(self.portsCheck[port]), port)
534         #
535         # expect punt socket and no packets on pg0
536         #
537         self.vapi.cli("clear errors")
538         self.vapi.cli("clear trace")
539         self.pg0.add_stream(pkts)
540         self.pg_enable_capture(self.pg_interfaces)
541         self.pg_start()
542         self.pg0.get_capture(0)
543         self.logger.info(self.vapi.cli("show trace"))
544         self.socket_client_close()
545         self.assertEqual(self.portsCheck[port], 0)
546
547         #
548         # remove punt socket. expect ICMP - dest. unreachable for all packets
549         #
550         self.vapi.punt_socket_deregister(port, is_ip4=0)
551         punts = self.vapi.punt_socket_dump(is_ip6=1)
552         self.assertEqual(len(punts), 0)
553         self.pg0.add_stream(pkts)
554         self.pg_enable_capture(self.pg_interfaces)
555         self.pg_start()
556         # FIXME - when punt socket deregister is implemented
557         # self.pg0.get_capture(nr_packets)
558
559     def test_punt_socket_traffic_multi_port_multi_sockets(self):
560         """ Punt socket traffic multi ports and multi sockets"""
561
562         for p in self.ports:
563             self.portsCheck[p] = 0
564
565         #
566         # create stream with random packets count per given ports
567         #
568         pkts = list()
569         for _ in range(0, self.nr_packets):
570             # choose port from port list
571             p = random.choice(self.ports)
572             pkts.append((
573                 Ether(src=self.pg0.remote_mac,
574                       dst=self.pg0.local_mac) /
575                 IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
576                 inet6.UDP(sport=9876, dport=p) /
577                 Raw('\xa5' * 100)))
578             self.portsCheck[p] += 1
579         #
580         # no punt socket
581         #
582         punts = self.vapi.punt_socket_dump(is_ip6=1)
583         self.assertEqual(len(punts), 0)
584
585         #
586         # configure a punt socket
587         #
588         for p in self.ports:
589             self.socket_client_create(b"%s/socket_%d" % (
590                 six.ensure_binary(self.tempdir), p))
591             self.vapi.punt_socket_register(p, b"%s/socket_%d" % (
592                 six.ensure_binary(self.tempdir), p), is_ip4=0)
593         punts = self.vapi.punt_socket_dump(is_ip6=1)
594         self.assertEqual(len(punts), len(self.ports))
595
596         for p in self.ports:
597             self.logger.debug("Sending %s packets to port %d",
598                               str(self.portsCheck[p]), p)
599
600         #
601         # expect punt socket and no packets on pg0
602         #
603         self.vapi.cli("clear errors")
604         self.vapi.cli("clear trace")
605         self.pg0.add_stream(pkts)
606         self.pg_enable_capture(self.pg_interfaces)
607         self.pg_start()
608         self.pg0.get_capture(0)
609         self.logger.info(self.vapi.cli("show trace"))
610         self.socket_client_close()
611
612         for p in self.ports:
613             self.assertEqual(self.portsCheck[p], 0)
614             self.vapi.punt_socket_deregister(p, is_ip4=0)
615         punts = self.vapi.punt_socket_dump(is_ip6=1)
616         self.assertEqual(len(punts), 0)
617
618     def test_punt_socket_traffic_multi_ports_single_socket(self):
619         """ Punt socket traffic multi ports and single socket"""
620
621         for p in self.ports:
622             self.portsCheck[p] = 0
623
624         #
625         # create stream with random packets count per given ports
626         #
627         pkts = list()
628         for _ in range(0, self.nr_packets):
629             # choose port from port list
630             p = random.choice(self.ports)
631             pkts.append((
632                 Ether(src=self.pg0.remote_mac,
633                       dst=self.pg0.local_mac) /
634                 IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
635                 inet6.UDP(sport=9876, dport=p) /
636                 Raw('\xa5' * 100)))
637             self.portsCheck[p] += 1
638
639         #
640         # no punt socket
641         #
642         punts = self.vapi.punt_socket_dump(is_ip6=1)
643         self.assertEqual(len(punts), 0)
644
645         #
646         # configure a punt socket
647         #
648         self.socket_client_create(b"%s/socket_multi" %
649                                   six.ensure_binary(self.tempdir))
650         for p in self.ports:
651             self.vapi.punt_socket_register(p,
652                                            b"%s/socket_multi" %
653                                            six.ensure_binary(self.tempdir),
654                                            is_ip4=0)
655         punts = self.vapi.punt_socket_dump(is_ip6=1)
656         self.assertEqual(len(punts), len(self.ports))
657
658         for p in self.ports:
659             self.logger.debug("Send %s packets to port %d",
660                               str(self.portsCheck[p]), p)
661         #
662         # expect punt socket and no packets on pg0
663         #
664         self.vapi.cli("clear errors")
665         self.vapi.cli("clear trace")
666         self.pg0.add_stream(pkts)
667         self.pg_enable_capture(self.pg_interfaces)
668         self.pg_start()
669         self.pg0.get_capture(0)
670         self.logger.info(self.vapi.cli("show trace"))
671         self.socket_client_close()
672
673         for p in self.ports:
674             self.assertEqual(self.portsCheck[p], 0)
675             self.vapi.punt_socket_deregister(p, is_ip4=0)
676         punts = self.vapi.punt_socket_dump(is_ip6=1)
677         self.assertEqual(len(punts), 0)
678
679
680 class TestPunt(VppTestCase):
681     """ Punt Test Case """
682
683     @classmethod
684     def setUpClass(cls):
685         super(TestPunt, cls).setUpClass()
686
687     @classmethod
688     def tearDownClass(cls):
689         super(TestPunt, cls).tearDownClass()
690
691     def setUp(self):
692         super(TestPunt, self).setUp()
693
694         self.create_pg_interfaces(range(4))
695
696         for i in self.pg_interfaces:
697             i.admin_up()
698             i.config_ip4()
699             i.resolve_arp()
700             i.config_ip6()
701             i.resolve_ndp()
702
703     def tearDown(self):
704         for i in self.pg_interfaces:
705             i.unconfig_ip4()
706             i.unconfig_ip6()
707             i.ip6_disable()
708             i.admin_down()
709         super(TestPunt, self).tearDown()
710
711     def test_punt(self):
712         """ Excpetion Path testing """
713
714         #
715         # Using the test CLI we will hook in a exception path to
716         # send ACL deny packets out of pg0 and pg1.
717         # the ACL is src,dst = 1.1.1.1,1.1.1.2
718         #
719         ip_1_1_1_2 = VppIpRoute(self, "1.1.1.2", 32,
720                                 [VppRoutePath(self.pg3.remote_ip4,
721                                               self.pg3.sw_if_index)])
722         ip_1_1_1_2.add_vpp_config()
723         ip_1_2 = VppIpRoute(self, "1::2", 128,
724                             [VppRoutePath(self.pg3.remote_ip6,
725                                           self.pg3.sw_if_index,
726                                           proto=DpoProto.DPO_PROTO_IP6)],
727                             is_ip6=1)
728         ip_1_2.add_vpp_config()
729
730         p4 = (Ether(src=self.pg2.remote_mac,
731                     dst=self.pg2.local_mac) /
732               IP(src="1.1.1.1", dst="1.1.1.2") /
733               UDP(sport=1234, dport=1234) /
734               Raw('\xa5' * 100))
735         p6 = (Ether(src=self.pg2.remote_mac,
736                     dst=self.pg2.local_mac) /
737               IPv6(src="1::1", dst="1::2") /
738               UDP(sport=1234, dport=1234) /
739               Raw('\xa5' * 100))
740         self.send_and_expect(self.pg2, p4*1, self.pg3)
741         self.send_and_expect(self.pg2, p6*1, self.pg3)
742
743         #
744         # apply the punting features
745         #
746         self.vapi.cli("test punt pg2")
747
748         #
749         # pkts now dropped
750         #
751         self.send_and_assert_no_replies(self.pg2, p4*NUM_PKTS)
752         self.send_and_assert_no_replies(self.pg2, p6*NUM_PKTS)
753
754         #
755         # Check state:
756         #  1 - node error counters
757         #  2 - per-reason counters
758         #    2, 3 are the index of the assigned punt reason
759         #
760         stats = self.statistics.get_counter(
761             "/err/punt-dispatch/No registrations")
762         self.assertEqual(stats, 2*NUM_PKTS)
763
764         stats = self.statistics.get_counter("/net/punt")
765         self.assertEqual(stats[0][7]['packets'], NUM_PKTS)
766         self.assertEqual(stats[0][8]['packets'], NUM_PKTS)
767
768         #
769         # use the test CLI to test a client that punts exception
770         # packets out of pg0
771         #
772         self.vapi.cli("test punt pg0 %s" % self.pg0.remote_ip4)
773         self.vapi.cli("test punt pg0 %s" % self.pg0.remote_ip6)
774
775         rx4s = self.send_and_expect(self.pg2, p4*NUM_PKTS, self.pg0)
776         rx6s = self.send_and_expect(self.pg2, p6*NUM_PKTS, self.pg0)
777
778         #
779         # check the packets come out IP unmodified but destined to pg0 host
780         #
781         for rx in rx4s:
782             self.assertEqual(rx[Ether].dst, self.pg0.remote_mac)
783             self.assertEqual(rx[Ether].src, self.pg0.local_mac)
784             self.assertEqual(p4[IP].dst, rx[IP].dst)
785             self.assertEqual(p4[IP].ttl, rx[IP].ttl)
786         for rx in rx6s:
787             self.assertEqual(rx[Ether].dst, self.pg0.remote_mac)
788             self.assertEqual(rx[Ether].src, self.pg0.local_mac)
789             self.assertEqual(p6[IPv6].dst, rx[IPv6].dst)
790             self.assertEqual(p6[IPv6].hlim, rx[IPv6].hlim)
791
792         stats = self.statistics.get_counter("/net/punt")
793         self.assertEqual(stats[0][7]['packets'], 2*NUM_PKTS)
794         self.assertEqual(stats[0][8]['packets'], 2*NUM_PKTS)
795
796         #
797         # add another registration for the same reason to send packets
798         # out of pg1
799         #
800         self.vapi.cli("test punt pg1 %s" % self.pg1.remote_ip4)
801         self.vapi.cli("test punt pg1 %s" % self.pg1.remote_ip6)
802
803         self.vapi.cli("clear trace")
804         self.pg2.add_stream(p4 * NUM_PKTS)
805         self.pg_enable_capture(self.pg_interfaces)
806         self.pg_start()
807
808         rxd = self.pg0.get_capture(NUM_PKTS)
809         for rx in rxd:
810             self.assertEqual(rx[Ether].dst, self.pg0.remote_mac)
811             self.assertEqual(rx[Ether].src, self.pg0.local_mac)
812             self.assertEqual(p4[IP].dst, rx[IP].dst)
813             self.assertEqual(p4[IP].ttl, rx[IP].ttl)
814         rxd = self.pg1.get_capture(NUM_PKTS)
815         for rx in rxd:
816             self.assertEqual(rx[Ether].dst, self.pg1.remote_mac)
817             self.assertEqual(rx[Ether].src, self.pg1.local_mac)
818             self.assertEqual(p4[IP].dst, rx[IP].dst)
819             self.assertEqual(p4[IP].ttl, rx[IP].ttl)
820
821         self.vapi.cli("clear trace")
822         self.pg2.add_stream(p6 * NUM_PKTS)
823         self.pg_enable_capture(self.pg_interfaces)
824         self.pg_start()
825
826         rxd = self.pg0.get_capture(NUM_PKTS)
827         for rx in rxd:
828             self.assertEqual(rx[Ether].dst, self.pg0.remote_mac)
829             self.assertEqual(rx[Ether].src, self.pg0.local_mac)
830             self.assertEqual(p6[IPv6].dst, rx[IPv6].dst)
831             self.assertEqual(p6[IPv6].hlim, rx[IPv6].hlim)
832         rxd = self.pg1.get_capture(NUM_PKTS)
833         for rx in rxd:
834             self.assertEqual(rx[Ether].dst, self.pg1.remote_mac)
835             self.assertEqual(rx[Ether].src, self.pg1.local_mac)
836             self.assertEqual(p6[IPv6].dst, rx[IPv6].dst)
837             self.assertEqual(p6[IPv6].hlim, rx[IPv6].hlim)
838
839         stats = self.statistics.get_counter("/net/punt")
840         self.assertEqual(stats[0][7]['packets'], 3*NUM_PKTS)
841         self.assertEqual(stats[0][8]['packets'], 3*NUM_PKTS)
842
843         self.logger.info(self.vapi.cli("show vlib graph punt-dispatch"))
844         self.logger.info(self.vapi.cli("show punt client"))
845         self.logger.info(self.vapi.cli("show punt reason"))
846         self.logger.info(self.vapi.cli("show punt stats"))
847         self.logger.info(self.vapi.cli("show punt db"))
848
849         self.vapi.cli("test punt clear")
850
851
852 if __name__ == '__main__':
853     unittest.main(testRunner=VppTestRunner)