7e5c92f63c67a036ab87d643491c4ac333d759f4
[vpp.git] / test / test_ping.py
1 import socket
2
3 from scapy.layers.inet import IP, UDP, ICMP
4 from scapy.layers.inet6 import IPv6
5 from scapy.layers.l2 import Ether, GRE
6 from scapy.packet import Raw
7
8 from framework import VppTestCase
9 from util import ppp
10
11 """ TestPing is a subclass of  VPPTestCase classes.
12
13 Basic test for sanity check of the ping.
14
15 """
16
17
18 class TestPing(VppTestCase):
19     """ Ping Test Case """
20
21     @classmethod
22     def setUpClass(cls):
23         super(TestPing, cls).setUpClass()
24         try:
25             cls.create_pg_interfaces(range(2))
26             cls.interfaces = list(cls.pg_interfaces)
27
28             for i in cls.interfaces:
29                 i.admin_up()
30                 i.config_ip4()
31                 i.config_ip6()
32                 i.disable_ipv6_ra()
33                 i.resolve_arp()
34                 i.resolve_ndp()
35         except Exception:
36             super(TestPing, cls).tearDownClass()
37             raise
38
39     @classmethod
40     def tearDownClass(cls):
41         super(TestPing, cls).tearDownClass()
42
43     def tearDown(self):
44         super(TestPing, self).tearDown()
45
46     def show_commands_at_teardown(self):
47         self.logger.info(self.vapi.cli("show hardware"))
48
49     def test_ping_basic(self):
50         """ basic ping test """
51         try:
52             self.pg_enable_capture(self.pg_interfaces)
53             self.pg_start()
54             self.logger.info(self.vapi.cli("show ip arp"))
55             self.logger.info(self.vapi.cli("show ip6 neighbors"))
56
57             remote_ip4 = self.pg1.remote_ip4
58             ping_cmd = "ping " + remote_ip4 + " interval 0.01 repeat 10"
59             ret = self.vapi.cli(ping_cmd)
60             self.logger.info(ret)
61             out = self.pg1.get_capture(10)
62             icmp_id = None
63             icmp_seq = 1
64             for p in out:
65                 ip = p[IP]
66                 self.assertEqual(ip.version, 4)
67                 self.assertEqual(ip.flags, 0)
68                 self.assertEqual(ip.src, self.pg1.local_ip4)
69                 self.assertEqual(ip.dst, self.pg1.remote_ip4)
70                 self.assertEqual(ip.proto, 1)
71                 self.assertEqual(len(ip.options), 0)
72                 self.assertGreaterEqual(ip.ttl, 254)
73                 icmp = p[ICMP]
74                 self.assertEqual(icmp.type, 8)
75                 self.assertEqual(icmp.code, 0)
76                 self.assertEqual(icmp.seq, icmp_seq)
77                 icmp_seq = icmp_seq + 1
78                 if icmp_id is None:
79                     icmp_id = icmp.id
80                 else:
81                     self.assertEqual(icmp.id, icmp_id)
82         finally:
83             self.vapi.cli("show error")
84
85     def test_ping_burst(self):
86         """ burst ping test """
87         try:
88             self.pg_enable_capture(self.pg_interfaces)
89             self.pg_start()
90             self.logger.info(self.vapi.cli("show ip arp"))
91             self.logger.info(self.vapi.cli("show ip6 neighbors"))
92
93             remote_ip4 = self.pg1.remote_ip4
94             ping_cmd = "ping " + remote_ip4 + " interval 0.01 burst 3"
95             ret = self.vapi.cli(ping_cmd)
96             self.logger.info(ret)
97             out = self.pg1.get_capture(3*5)
98             icmp_id = None
99             icmp_seq = 1
100             count = 0
101             for p in out:
102                 ip = p[IP]
103                 self.assertEqual(ip.version, 4)
104                 self.assertEqual(ip.flags, 0)
105                 self.assertEqual(ip.src, self.pg1.local_ip4)
106                 self.assertEqual(ip.dst, self.pg1.remote_ip4)
107                 self.assertEqual(ip.proto, 1)
108                 self.assertEqual(len(ip.options), 0)
109                 self.assertGreaterEqual(ip.ttl, 254)
110                 icmp = p[ICMP]
111                 self.assertEqual(icmp.type, 8)
112                 self.assertEqual(icmp.code, 0)
113                 self.assertEqual(icmp.seq, icmp_seq)
114                 count = count + 1
115                 if count >= 3:
116                     icmp_seq = icmp_seq + 1
117                     count = 0
118                 if icmp_id is None:
119                     icmp_id = icmp.id
120                 else:
121                     self.assertEqual(icmp.id, icmp_id)
122         finally:
123             self.vapi.cli("show error")