fib: Source Address Selection
[vpp.git] / test / test_ping.py
1 import socket
2
3 from scapy.layers.inet import IP, UDP, ICMP
4 from scapy.layers.inet6 import IPv6
5 from scapy.layers.l2 import Ether, GRE
6 from scapy.packet import Raw
7
8 from framework import VppTestCase
9 from util import ppp
10 from vpp_ip_route import VppIpInterfaceAddress
11 from vpp_neighbor import VppNeighbor
12
13 """ TestPing is a subclass of  VPPTestCase classes.
14
15 Basic test for sanity check of the ping.
16
17 """
18
19
20 class TestPing(VppTestCase):
21     """ Ping Test Case """
22
23     @classmethod
24     def setUpClass(cls):
25         super(TestPing, cls).setUpClass()
26         try:
27             cls.create_pg_interfaces(range(2))
28             cls.interfaces = list(cls.pg_interfaces)
29
30             for i in cls.interfaces:
31                 i.admin_up()
32                 i.config_ip4()
33                 i.config_ip6()
34                 i.disable_ipv6_ra()
35                 i.resolve_arp()
36                 i.resolve_ndp()
37         except Exception:
38             super(TestPing, cls).tearDownClass()
39             raise
40
41     @classmethod
42     def tearDownClass(cls):
43         super(TestPing, cls).tearDownClass()
44
45     def tearDown(self):
46         super(TestPing, self).tearDown()
47
48     def show_commands_at_teardown(self):
49         self.logger.info(self.vapi.cli("show hardware"))
50
51     def verify_ping_request(self, p, src, dst, seq):
52         ip = p[IP]
53         self.assertEqual(ip.version, 4)
54         self.assertEqual(ip.flags, 0)
55         self.assertEqual(ip.src, src)
56         self.assertEqual(ip.dst, dst)
57         self.assertEqual(ip.proto, 1)
58         self.assertEqual(len(ip.options), 0)
59         self.assertGreaterEqual(ip.ttl, 254)
60         icmp = p[ICMP]
61         self.assertEqual(icmp.type, 8)
62         self.assertEqual(icmp.code, 0)
63         self.assertEqual(icmp.seq, seq)
64         return icmp
65
66     def test_ping_basic(self):
67         """ basic ping test """
68         try:
69             self.pg_enable_capture(self.pg_interfaces)
70             self.pg_start()
71             self.logger.info(self.vapi.cli("show ip4 neighbors"))
72             self.logger.info(self.vapi.cli("show ip6 neighbors"))
73
74             remote_ip4 = self.pg1.remote_ip4
75             ping_cmd = "ping " + remote_ip4 + " interval 0.01 repeat 10"
76             ret = self.vapi.cli(ping_cmd)
77             self.logger.info(ret)
78             out = self.pg1.get_capture(10)
79             icmp_id = None
80             icmp_seq = 1
81             for p in out:
82                 icmp = self.verify_ping_request(p, self.pg1.local_ip4,
83                                                 self.pg1.remote_ip4, icmp_seq)
84                 icmp_seq = icmp_seq + 1
85                 if icmp_id is None:
86                     icmp_id = icmp.id
87                 else:
88                     self.assertEqual(icmp.id, icmp_id)
89         finally:
90             self.vapi.cli("show error")
91
92     def test_ping_burst(self):
93         """ burst ping test """
94         try:
95             self.pg_enable_capture(self.pg_interfaces)
96             self.pg_start()
97             self.logger.info(self.vapi.cli("show ip neighbors"))
98
99             remote_ip4 = self.pg1.remote_ip4
100             ping_cmd = "ping " + remote_ip4 + " interval 0.01 burst 3"
101             ret = self.vapi.cli(ping_cmd)
102             self.logger.info(ret)
103             out = self.pg1.get_capture(3*5)
104             icmp_id = None
105             icmp_seq = 1
106             count = 0
107             for p in out:
108                 icmp = self.verify_ping_request(p, self.pg1.local_ip4,
109                                                 self.pg1.remote_ip4, icmp_seq)
110                 count = count + 1
111                 if count >= 3:
112                     icmp_seq = icmp_seq + 1
113                     count = 0
114                 if icmp_id is None:
115                     icmp_id = icmp.id
116                 else:
117                     self.assertEqual(icmp.id, icmp_id)
118         finally:
119             self.vapi.cli("show error")
120
121     def test_ping_src(self):
122         """ ping with source address set """
123
124         self.pg_enable_capture(self.pg_interfaces)
125         self.pg_start()
126         self.logger.info(self.vapi.cli("show ip4 neighbors"))
127         self.logger.info(self.vapi.cli("show ip6 neighbors"))
128
129         nbr_addr = "10.0.0.2"
130         VppIpInterfaceAddress(self, self.pg1, "10.0.0.1", 24).add_vpp_config()
131         VppNeighbor(self, self.pg1.sw_if_index,
132                     "00:11:22:33:44:55",
133                     nbr_addr).add_vpp_config()
134
135         ping_cmd = "ping %s interval 0.01 repeat 3" % self.pg1.remote_ip4
136         ret = self.vapi.cli(ping_cmd)
137         out = self.pg1.get_capture(3)
138         icmp_seq = 1
139         for p in out:
140             icmp = self.verify_ping_request(p, self.pg1.local_ip4,
141                                             self.pg1.remote_ip4, icmp_seq)
142             icmp_seq = icmp_seq + 1
143
144         self.pg_enable_capture(self.pg_interfaces)
145         self.pg_start()
146         ping_cmd = "ping %s interval 0.01 repeat 3" % nbr_addr
147         ret = self.vapi.cli(ping_cmd)
148         out = self.pg1.get_capture(3)
149         icmp_seq = 1
150         for p in out:
151             icmp = self.verify_ping_request(p, "10.0.0.1", nbr_addr, icmp_seq)
152             icmp_seq = icmp_seq + 1