ping: Simple binary API for running ping based on events
[vpp.git] / test / test_ping.py
1 import socket
2
3 from scapy.layers.inet import IP, UDP, ICMP
4 from scapy.layers.inet6 import IPv6
5 from scapy.layers.l2 import Ether, GRE
6 from scapy.packet import Raw
7
8 from framework import VppTestCase
9 from util import ppp
10 from vpp_ip_route import VppIpInterfaceAddress, VppIpRoute, VppRoutePath
11 from vpp_neighbor import VppNeighbor
12
13 """ TestPing is a subclass of  VPPTestCase classes.
14
15 Basic test for sanity check of the ping.
16
17 """
18
19
20 class TestPing(VppTestCase):
21     """Ping Test Case"""
22
23     @classmethod
24     def setUpClass(cls):
25         super(TestPing, cls).setUpClass()
26         try:
27             cls.create_pg_interfaces(range(2))
28             cls.interfaces = list(cls.pg_interfaces)
29
30             for i in cls.interfaces:
31                 i.admin_up()
32                 i.config_ip4()
33                 i.config_ip6()
34                 i.disable_ipv6_ra()
35                 i.resolve_arp()
36                 i.resolve_ndp()
37         except Exception:
38             super(TestPing, cls).tearDownClass()
39             raise
40
41     @classmethod
42     def tearDownClass(cls):
43         super(TestPing, cls).tearDownClass()
44
45     def tearDown(self):
46         super(TestPing, self).tearDown()
47
48     def show_commands_at_teardown(self):
49         self.logger.info(self.vapi.cli("show hardware"))
50
51     def verify_ping_request(self, p, src, dst, seq):
52         ip = p[IP]
53         self.assertEqual(ip.version, 4)
54         self.assertEqual(ip.flags, 0)
55         self.assertEqual(ip.src, src)
56         self.assertEqual(ip.dst, dst)
57         self.assertEqual(ip.proto, 1)
58         self.assertEqual(len(ip.options), 0)
59         self.assertGreaterEqual(ip.ttl, 254)
60         icmp = p[ICMP]
61         self.assertEqual(icmp.type, 8)
62         self.assertEqual(icmp.code, 0)
63         self.assertEqual(icmp.seq, seq)
64         return icmp
65
66     def test_ping_basic(self):
67         """basic ping test"""
68         try:
69             self.pg_enable_capture(self.pg_interfaces)
70             self.pg_start()
71             self.logger.info(self.vapi.cli("show ip4 neighbors"))
72             self.logger.info(self.vapi.cli("show ip6 neighbors"))
73
74             remote_ip4 = self.pg1.remote_ip4
75             ping_cmd = "ping " + remote_ip4 + " interval 0.01 repeat 10"
76             ret = self.vapi.cli(ping_cmd)
77             self.logger.info(ret)
78             out = self.pg1.get_capture(10)
79             icmp_id = None
80             icmp_seq = 1
81             for p in out:
82                 icmp = self.verify_ping_request(
83                     p, self.pg1.local_ip4, self.pg1.remote_ip4, icmp_seq
84                 )
85                 icmp_seq = icmp_seq + 1
86                 if icmp_id is None:
87                     icmp_id = icmp.id
88                 else:
89                     self.assertEqual(icmp.id, icmp_id)
90         finally:
91             self.vapi.cli("show error")
92
93     def test_ping_burst(self):
94         """burst ping test"""
95         try:
96             self.pg_enable_capture(self.pg_interfaces)
97             self.pg_start()
98             self.logger.info(self.vapi.cli("show ip neighbors"))
99
100             remote_ip4 = self.pg1.remote_ip4
101             ping_cmd = "ping " + remote_ip4 + " interval 0.01 burst 3"
102             ret = self.vapi.cli(ping_cmd)
103             self.logger.info(ret)
104             out = self.pg1.get_capture(3 * 5)
105             icmp_id = None
106             icmp_seq = 1
107             count = 0
108             for p in out:
109                 icmp = self.verify_ping_request(
110                     p, self.pg1.local_ip4, self.pg1.remote_ip4, icmp_seq
111                 )
112                 count = count + 1
113                 if count >= 3:
114                     icmp_seq = icmp_seq + 1
115                     count = 0
116                 if icmp_id is None:
117                     icmp_id = icmp.id
118                 else:
119                     self.assertEqual(icmp.id, icmp_id)
120         finally:
121             self.vapi.cli("show error")
122
123     def test_ping_src(self):
124         """ping with source address set"""
125
126         self.pg_enable_capture(self.pg_interfaces)
127         self.pg_start()
128         self.logger.info(self.vapi.cli("show ip4 neighbors"))
129         self.logger.info(self.vapi.cli("show ip6 neighbors"))
130
131         nbr_addr = "10.0.0.2"
132         VppIpInterfaceAddress(self, self.pg1, "10.0.0.1", 24).add_vpp_config()
133         VppNeighbor(
134             self, self.pg1.sw_if_index, "00:11:22:33:44:55", nbr_addr
135         ).add_vpp_config()
136
137         ping_cmd = "ping %s interval 0.01 repeat 3" % self.pg1.remote_ip4
138         ret = self.vapi.cli(ping_cmd)
139         out = self.pg1.get_capture(3)
140         icmp_seq = 1
141         for p in out:
142             icmp = self.verify_ping_request(
143                 p, self.pg1.local_ip4, self.pg1.remote_ip4, icmp_seq
144             )
145             icmp_seq = icmp_seq + 1
146
147         self.pg_enable_capture(self.pg_interfaces)
148         self.pg_start()
149         ping_cmd = "ping %s interval 0.01 repeat 3" % nbr_addr
150         ret = self.vapi.cli(ping_cmd)
151         out = self.pg1.get_capture(3)
152         icmp_seq = 1
153         for p in out:
154             icmp = self.verify_ping_request(p, "10.0.0.1", nbr_addr, icmp_seq)
155             icmp_seq = icmp_seq + 1
156
157     def test_ping_fib_routed_dst(self):
158         """ping destination routed according to FIB table"""
159
160         try:
161             self.pg1.generate_remote_hosts(1)
162             self.pg_enable_capture(self.pg_interfaces)
163             self.pg_start()
164             routed_dst = "10.0.2.0"
165             self.logger.info(self.vapi.cli("show ip4 neighbors"))
166             VppIpRoute(
167                 self,
168                 routed_dst,
169                 24,
170                 [VppRoutePath(self.pg1.remote_hosts[0].ip4, self.pg1.sw_if_index)],
171             ).add_vpp_config()
172             ping_cmd = "ping %s interval 0.01 repeat 3" % routed_dst
173             ret = self.vapi.cli(ping_cmd)
174             self.logger.info(ret)
175             out = self.pg1.get_capture(3)
176             icmp_seq = 1
177             for p in out:
178                 self.verify_ping_request(p, self.pg1.local_ip4, routed_dst, icmp_seq)
179                 icmp_seq = icmp_seq + 1
180         finally:
181             self.vapi.cli("show error")
182
183     def test_ping_api(self):
184         """ping api"""
185
186         try:
187             self.pg_enable_capture(self.pg_interfaces)
188             self.pg_start()
189
190             ret = self.vapi.want_ping_finished_events(
191                 address=self.pg1.remote_ip4,
192                 repeat=4,
193                 interval=0.2,
194             )
195             self.logger.info(ret)
196             timeout = 1
197
198             ev = self.vapi.wait_for_event(timeout, "ping_finished_event")
199             self.logger.info(ev)
200             self.assertEqual(ev.request_count, 4)
201
202             out = self.pg1.get_capture(4)
203             icmp_id = None
204             icmp_seq = 1
205             for p in out:
206                 icmp = self.verify_ping_request(
207                     p, self.pg1.local_ip4, self.pg1.remote_ip4, icmp_seq
208                 )
209                 icmp_seq = icmp_seq + 1
210                 if icmp_id is None:
211                     icmp_id = icmp.id
212                 else:
213                     self.assertEqual(icmp.id, icmp_id)
214         finally:
215             self.vapi.cli("show error")