tests: Add support for getting corefile patterns on FreeBSD
[vpp.git] / test / test_bufmon.py
1 from asfframework import VppTestRunner
2 from framework import VppTestCase
3 import unittest
4 from config import config
5 from scapy.layers.l2 import Ether
6 from scapy.packet import Raw
7 from scapy.layers.inet import IP, UDP
8 from random import randint
9 from util import ppp
10
11
12 @unittest.skipIf("bufmon" in config.excluded_plugins, "Exclude bufmon plugin tests")
13 class TestBufmon(VppTestCase):
14     """bufmon plugin test"""
15
16     @classmethod
17     def setUpClass(cls):
18         super(TestBufmon, cls).setUpClass()
19         try:
20             cls.create_pg_interfaces(range(2))
21             for i in cls.pg_interfaces:
22                 i.config_ip4()
23                 i.resolve_arp()
24                 i.admin_up()
25         except Exception:
26             cls.tearDownClass()
27             raise
28
29     @classmethod
30     def tearDownClass(cls):
31         for i in cls.pg_interfaces:
32             i.unconfig_ip4()
33             i.admin_down()
34         super(TestBufmon, cls).tearDownClass()
35
36     # https://fd.io/docs/vpp/master/developer/tests/overview.html#example-how-to-add-a-new-test
37     def create_stream(self, src_if, dst_if, count):
38         packets = []
39         for i in range(count):
40             info = self.create_packet_info(src_if, dst_if)
41             payload = self.info_to_payload(info)
42             p = (
43                 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
44                 / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
45                 / UDP(sport=randint(49152, 65535), dport=5678)
46                 / Raw(payload)
47             )
48             info.data = p.copy()
49             packets.append(p)
50         return packets
51
52     def verify_capture(self, src_if, dst_if, capture):
53         packet_info = None
54         for packet in capture:
55             try:
56                 ip = packet[IP]
57                 udp = packet[UDP]
58                 # convert the payload to packet info object
59                 payload_info = self.payload_to_info(packet[Raw])
60                 # make sure the indexes match
61                 self.assert_equal(
62                     payload_info.src, src_if.sw_if_index, "source sw_if_index"
63                 )
64                 self.assert_equal(
65                     payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
66                 )
67                 packet_info = self.get_next_packet_info_for_interface2(
68                     src_if.sw_if_index, dst_if.sw_if_index, packet_info
69                 )
70                 # make sure we didn't run out of saved packets
71                 self.assertIsNotNone(packet_info)
72                 self.assert_equal(
73                     payload_info.index, packet_info.index, "packet info index"
74                 )
75                 saved_packet = packet_info.data  # fetch the saved packet
76                 # assert the values match
77                 self.assert_equal(ip.src, saved_packet[IP].src, "IP source address")
78                 # ... more assertions here
79                 self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
80             except Exception:
81                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
82                 raise
83         remaining_packet = self.get_next_packet_info_for_interface2(
84             src_if.sw_if_index, dst_if.sw_if_index, packet_info
85         )
86         self.assertIsNone(
87             remaining_packet,
88             "Interface %s: Packet expected from interface "
89             "%s didn't arrive" % (dst_if.name, src_if.name),
90         )
91
92     def test_bufmon(self):
93         self.vapi.cli("set buffer traces on")
94
95         reply = self.vapi.cli("show buffer traces status")
96         self.assertIn("buffers tracing is on", reply)
97
98         packets = self.create_stream(self.pg0, self.pg1, 5)
99         self.pg0.add_stream(packets)
100         self.pg0.enable_capture()
101         self.pg1.enable_capture()
102         self.pg_start()
103
104         capture = self.pg1.get_capture()
105         self.pg0.assert_nothing_captured()
106         self.verify_capture(self.pg0, self.pg1, capture)
107
108         expected = [
109             "pg-input",
110             "ip4-input",
111             "ip4-rewrite",
112             "ip4-lookup",
113             "ethernet-input",
114             "pg1-tx",
115             "pg1-output",
116         ]
117         reply = self.vapi.cli("show buffer traces verbose")
118         for entry in expected:
119             self.assertIn(entry, reply)
120
121         # not verbose, skips nodes w/o buffered buffers
122         reply = self.vapi.cli("show buffer traces")
123         self.assertNotIn("pg-input", reply)
124
125         self.vapi.cli("clear buffer traces")
126         reply = self.vapi.cli("show buffer traces verbose")
127         self.assertNotIn("pg-input", reply)
128
129
130 if __name__ == "__main__":
131     unittest.main(testRunner=VppTestRunner)