tests: replace pycodestyle with black
[vpp.git] / test / test_l2bd.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import random
5
6 from scapy.packet import Raw
7 from scapy.layers.l2 import Ether, Dot1Q
8 from scapy.layers.inet import IP, UDP
9
10 from framework import VppTestCase, VppTestRunner
11 from util import Host, ppp
12 from vpp_sub_interface import VppDot1QSubint, VppDot1ADSubint
13
14
15 class TestL2bd(VppTestCase):
16     """L2BD Test Case"""
17
18     @classmethod
19     def setUpClass(cls):
20         """
21         Perform standard class setup (defined by class method setUpClass in
22         class VppTestCase) before running the test case, set test case related
23         variables and configure VPP.
24
25         :var int bd_id: Bridge domain ID.
26         :var int mac_entries_count: Number of MAC entries for bridge-domain to
27             learn.
28         :var int dot1q_tag: VLAN tag for dot1q sub-interface.
29         :var int dot1ad_sub_id: SubID of dot1ad sub-interface.
30         :var int dot1ad_outer_tag: VLAN S-tag for dot1ad sub-interface.
31         :var int dot1ad_inner_tag: VLAN C-tag for dot1ad sub-interface.
32         :var int sl_pkts_per_burst: Number of packets in burst for single-loop
33             test.
34         :var int dl_pkts_per_burst: Number of packets in burst for dual-loop
35             test.
36         """
37         super(TestL2bd, cls).setUpClass()
38
39         # Test variables
40         cls.bd_id = 1
41         cls.mac_entries_count = 100
42         # cls.dot1q_sub_id = 100
43         cls.dot1q_tag = 100
44         cls.dot1ad_sub_id = 20
45         cls.dot1ad_outer_tag = 200
46         cls.dot1ad_inner_tag = 300
47         cls.sl_pkts_per_burst = 2
48         cls.dl_pkts_per_burst = 257
49
50         try:
51             # create 3 pg interfaces
52             cls.create_pg_interfaces(range(3))
53
54             # create 2 sub-interfaces for pg1 and pg2
55             cls.sub_interfaces = [
56                 VppDot1QSubint(cls, cls.pg1, cls.dot1q_tag),
57                 VppDot1ADSubint(
58                     cls,
59                     cls.pg2,
60                     cls.dot1ad_sub_id,
61                     cls.dot1ad_outer_tag,
62                     cls.dot1ad_inner_tag,
63                 ),
64             ]
65
66             # packet flows mapping pg0 -> pg1, pg2, etc.
67             cls.flows = dict()
68             cls.flows[cls.pg0] = [cls.pg1, cls.pg2]
69             cls.flows[cls.pg1] = [cls.pg0, cls.pg2]
70             cls.flows[cls.pg2] = [cls.pg0, cls.pg1]
71
72             # packet sizes
73             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
74             cls.sub_if_packet_sizes = [64, 512, 1518 + 4, 9018 + 4]
75
76             cls.interfaces = list(cls.pg_interfaces)
77             cls.interfaces.extend(cls.sub_interfaces)
78
79             # Create BD with MAC learning enabled and put interfaces and
80             #  sub-interfaces to this BD
81             for pg_if in cls.pg_interfaces:
82                 sw_if_index = (
83                     pg_if.sub_if.sw_if_index
84                     if hasattr(pg_if, "sub_if")
85                     else pg_if.sw_if_index
86                 )
87                 cls.vapi.sw_interface_set_l2_bridge(
88                     rx_sw_if_index=sw_if_index, bd_id=cls.bd_id
89                 )
90
91             # setup all interfaces
92             for i in cls.interfaces:
93                 i.admin_up()
94
95             # mapping between packet-generator index and lists of test hosts
96             cls.hosts_by_pg_idx = dict()
97
98             # create test host entries and inject packets to learn MAC entries
99             # in the bridge-domain
100             cls.create_hosts_and_learn(cls.mac_entries_count)
101             cls.logger.info(cls.vapi.ppcli("show l2fib"))
102
103         except Exception:
104             super(TestL2bd, cls).tearDownClass()
105             raise
106
107     @classmethod
108     def tearDownClass(cls):
109         super(TestL2bd, cls).tearDownClass()
110
111     def setUp(self):
112         """
113         Clear trace and packet infos before running each test.
114         """
115         super(TestL2bd, self).setUp()
116         self.reset_packet_infos()
117
118     def tearDown(self):
119         """
120         Show various debug prints after each test.
121         """
122         super(TestL2bd, self).tearDown()
123         if not self.vpp_dead:
124             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
125             self.logger.info(
126                 self.vapi.ppcli("show bridge-domain %s detail" % self.bd_id)
127             )
128
129     @classmethod
130     def create_hosts_and_learn(cls, count):
131         """
132         Create required number of host MAC addresses and distribute them among
133         interfaces. Create host IPv4 address for every host MAC address. Create
134         L2 MAC packet stream with host MAC addresses per interface to let
135         the bridge domain learn these MAC addresses.
136
137         :param count: Integer number of hosts to create MAC/IPv4 addresses for.
138         """
139         n_int = len(cls.pg_interfaces)
140         macs_per_if = count // n_int
141         i = -1
142         for pg_if in cls.pg_interfaces:
143             i += 1
144             start_nr = macs_per_if * i
145             end_nr = count if i == (n_int - 1) else macs_per_if * (i + 1)
146             cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
147             hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
148             packets = []
149             for j in range(start_nr, end_nr):
150                 host = Host(
151                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
152                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
153                 )
154                 packet = Ether(dst="ff:ff:ff:ff:ff:ff", src=host.mac)
155                 hosts.append(host)
156                 if hasattr(pg_if, "sub_if"):
157                     packet = pg_if.sub_if.add_dot1_layer(packet)
158                 packets.append(packet)
159             pg_if.add_stream(packets)
160         cls.logger.info("Sending broadcast eth frames for MAC learning")
161         cls.pg_start()
162
163     def create_stream(self, src_if, packet_sizes, packets_per_burst):
164         """
165         Create input packet stream for defined interface.
166
167         :param object src_if: Interface to create packet stream for.
168         :param list packet_sizes: List of required packet sizes.
169         :param int packets_per_burst: Number of packets in burst.
170         :return: Stream of packets.
171         """
172         pkts = []
173         for i in range(0, packets_per_burst):
174             dst_if = self.flows[src_if][i % 2]
175             dst_host = random.choice(self.hosts_by_pg_idx[dst_if.sw_if_index])
176             src_host = random.choice(self.hosts_by_pg_idx[src_if.sw_if_index])
177             pkt_info = self.create_packet_info(src_if, dst_if)
178             payload = self.info_to_payload(pkt_info)
179             p = (
180                 Ether(dst=dst_host.mac, src=src_host.mac)
181                 / IP(src=src_host.ip4, dst=dst_host.ip4)
182                 / UDP(sport=1234, dport=1234)
183                 / Raw(payload)
184             )
185             pkt_info.data = p.copy()
186             if hasattr(src_if, "sub_if"):
187                 p = src_if.sub_if.add_dot1_layer(p)
188             size = random.choice(packet_sizes)
189             self.extend_packet(p, size)
190             pkts.append(p)
191         return pkts
192
193     def verify_capture(self, pg_if, capture):
194         """
195         Verify captured input packet stream for defined interface.
196
197         :param object pg_if: Interface to verify captured packet stream for.
198         :param list capture: Captured packet stream.
199         """
200         last_info = dict()
201         for i in self.pg_interfaces:
202             last_info[i.sw_if_index] = None
203         dst_sw_if_index = pg_if.sw_if_index
204         for packet in capture:
205             payload_info = self.payload_to_info(packet[Raw])
206             src_sw_if_index = payload_info.src
207             src_if = None
208             for ifc in self.pg_interfaces:
209                 if ifc != pg_if:
210                     if ifc.sw_if_index == src_sw_if_index:
211                         src_if = ifc
212                         break
213             if hasattr(src_if, "sub_if"):
214                 # Check VLAN tags and Ethernet header
215                 packet = src_if.sub_if.remove_dot1_layer(packet)
216             self.assertTrue(Dot1Q not in packet)
217             try:
218                 ip = packet[IP]
219                 udp = packet[UDP]
220                 packet_index = payload_info.index
221                 self.assertEqual(payload_info.dst, dst_sw_if_index)
222                 self.logger.debug(
223                     "Got packet on port %s: src=%u (id=%u)"
224                     % (pg_if.name, payload_info.src, packet_index)
225                 )
226                 next_info = self.get_next_packet_info_for_interface2(
227                     payload_info.src, dst_sw_if_index, last_info[payload_info.src]
228                 )
229                 last_info[payload_info.src] = next_info
230                 self.assertTrue(next_info is not None)
231                 self.assertEqual(packet_index, next_info.index)
232                 saved_packet = next_info.data
233                 # Check standard fields
234                 self.assertEqual(ip.src, saved_packet[IP].src)
235                 self.assertEqual(ip.dst, saved_packet[IP].dst)
236                 self.assertEqual(udp.sport, saved_packet[UDP].sport)
237                 self.assertEqual(udp.dport, saved_packet[UDP].dport)
238             except:
239                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
240                 raise
241         for i in self.pg_interfaces:
242             remaining_packet = self.get_next_packet_info_for_interface2(
243                 i, dst_sw_if_index, last_info[i.sw_if_index]
244             )
245             self.assertTrue(
246                 remaining_packet is None,
247                 "Port %u: Packet expected from source %u didn't arrive"
248                 % (dst_sw_if_index, i.sw_if_index),
249             )
250
251     def run_l2bd_test(self, pkts_per_burst):
252         """L2BD MAC learning test"""
253
254         # Create incoming packet streams for packet-generator interfaces
255         for i in self.pg_interfaces:
256             packet_sizes = (
257                 self.sub_if_packet_sizes
258                 if hasattr(i, "sub_if")
259                 else self.pg_if_packet_sizes
260             )
261             pkts = self.create_stream(i, packet_sizes, pkts_per_burst)
262             i.add_stream(pkts)
263
264         # Enable packet capture and start packet sending
265         self.pg_enable_capture(self.pg_interfaces)
266         self.pg_start()
267
268         # Verify outgoing packet streams per packet-generator interface
269         for i in self.pg_interfaces:
270             capture = i.get_capture()
271             self.logger.info("Verifying capture on interface %s" % i.name)
272             self.verify_capture(i, capture)
273
274     def test_l2bd_sl(self):
275         """L2BD MAC learning single-loop test
276
277         Test scenario:
278             1.config
279                 MAC learning enabled
280                 learn 100 MAC entries
281                 3 interfaces: untagged, dot1q, dot1ad (dot1q used instead of
282                 dot1ad in the first version)
283
284             2.sending l2 eth pkts between 3 interface
285                 64B, 512B, 1518B, 9200B (ether_size)
286                 burst of 2 pkts per interface
287         """
288
289         self.run_l2bd_test(self.sl_pkts_per_burst)
290
291     def test_l2bd_dl(self):
292         """L2BD MAC learning dual-loop test
293
294         Test scenario:
295            1.config
296                MAC learning enabled
297                learn 100 MAC entries
298                3 interfaces: untagged, dot1q, dot1ad (dot1q used instead of
299                dot1ad in the first version)
300
301            2.sending l2 eth pkts between 3 interface
302                64B, 512B, 1518B, 9200B (ether_size)
303                burst of 257 pkts per interface
304         """
305
306         self.run_l2bd_test(self.dl_pkts_per_burst)
307
308
309 if __name__ == "__main__":
310     unittest.main(testRunner=VppTestRunner)