baa93740ac8422c2fe9340c560119dab31f3aefb
[vpp.git] / test / test_bufmon.py
1 from framework import VppTestCase, VppTestRunner
2 import unittest
3 from config import config
4 from scapy.layers.l2 import Ether
5 from scapy.packet import Raw
6 from scapy.layers.inet import IP, UDP
7 from random import randint
8 from util import ppp
9
10
11 @unittest.skipIf("bufmon" in config.excluded_plugins, "Exclude bufmon plugin tests")
12 class TestBufmon(VppTestCase):
13     """bufmon plugin test"""
14
15     @classmethod
16     def setUpClass(cls):
17         super(TestBufmon, cls).setUpClass()
18         try:
19             cls.create_pg_interfaces(range(2))
20             for i in cls.pg_interfaces:
21                 i.config_ip4()
22                 i.resolve_arp()
23                 i.admin_up()
24         except Exception:
25             cls.tearDownClass()
26             raise
27
28     @classmethod
29     def tearDownClass(cls):
30         for i in cls.pg_interfaces:
31             i.unconfig_ip4()
32             i.admin_down()
33         super(TestBufmon, cls).tearDownClass()
34
35     # https://fd.io/docs/vpp/master/developer/tests/overview.html#example-how-to-add-a-new-test
36     def create_stream(self, src_if, dst_if, count):
37         packets = []
38         for i in range(count):
39             info = self.create_packet_info(src_if, dst_if)
40             payload = self.info_to_payload(info)
41             p = (
42                 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
43                 / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
44                 / UDP(sport=randint(1000, 2000), dport=5678)
45                 / Raw(payload)
46             )
47             info.data = p.copy()
48             packets.append(p)
49         return packets
50
51     def verify_capture(self, src_if, dst_if, capture):
52         packet_info = None
53         for packet in capture:
54             try:
55                 ip = packet[IP]
56                 udp = packet[UDP]
57                 # convert the payload to packet info object
58                 payload_info = self.payload_to_info(packet[Raw])
59                 # make sure the indexes match
60                 self.assert_equal(
61                     payload_info.src, src_if.sw_if_index, "source sw_if_index"
62                 )
63                 self.assert_equal(
64                     payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
65                 )
66                 packet_info = self.get_next_packet_info_for_interface2(
67                     src_if.sw_if_index, dst_if.sw_if_index, packet_info
68                 )
69                 # make sure we didn't run out of saved packets
70                 self.assertIsNotNone(packet_info)
71                 self.assert_equal(
72                     payload_info.index, packet_info.index, "packet info index"
73                 )
74                 saved_packet = packet_info.data  # fetch the saved packet
75                 # assert the values match
76                 self.assert_equal(ip.src, saved_packet[IP].src, "IP source address")
77                 # ... more assertions here
78                 self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
79             except Exception:
80                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
81                 raise
82         remaining_packet = self.get_next_packet_info_for_interface2(
83             src_if.sw_if_index, dst_if.sw_if_index, packet_info
84         )
85         self.assertIsNone(
86             remaining_packet,
87             "Interface %s: Packet expected from interface "
88             "%s didn't arrive" % (dst_if.name, src_if.name),
89         )
90
91     def test_bufmon(self):
92         self.vapi.cli("set buffer traces on")
93
94         reply = self.vapi.cli("show buffer traces status")
95         self.assertIn("buffers tracing is on", reply)
96
97         packets = self.create_stream(self.pg0, self.pg1, 5)
98         self.pg0.add_stream(packets)
99         self.pg0.enable_capture()
100         self.pg1.enable_capture()
101         self.pg_start()
102
103         capture = self.pg1.get_capture()
104         self.pg0.assert_nothing_captured()
105         self.verify_capture(self.pg0, self.pg1, capture)
106
107         expected = [
108             "pg-input",
109             "ip4-input",
110             "ip4-rewrite",
111             "ip4-lookup",
112             "ethernet-input",
113             "pg1-tx",
114             "pg1-output",
115         ]
116         reply = self.vapi.cli("show buffer traces verbose")
117         for entry in expected:
118             self.assertIn(entry, reply)
119
120         # not verbose, skips nodes w/o buffered buffers
121         reply = self.vapi.cli("show buffer traces")
122         self.assertNotIn("pg-input", reply)
123
124         self.vapi.cli("clear buffer traces")
125         reply = self.vapi.cli("show buffer traces verbose")
126         self.assertNotIn("pg-input", reply)
127
128
129 if __name__ == "__main__":
130     unittest.main(testRunner=VppTestRunner)