tests: replace pycodestyle with black
[vpp.git] / test / test_l2xc.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import random
5
6 from scapy.packet import Raw
7 from scapy.layers.l2 import Ether
8 from scapy.layers.inet import IP, UDP
9
10 from framework import VppTestCase, VppTestRunner
11 from util import Host, ppp
12
13
14 class TestL2xc(VppTestCase):
15     """L2XC Test Case"""
16
17     @classmethod
18     def setUpClass(cls):
19         """
20         Perform standard class setup (defined by class method setUpClass in
21         class VppTestCase) before running the test case, set test case related
22         variables and configure VPP.
23
24         :var int hosts_nr: Number of hosts to be created.
25         :var int dl_pkts_per_burst: Number of packets in burst for dual-loop
26             test.
27         :var int sl_pkts_per_burst: Number of packets in burst for single-loop
28             test.
29         """
30         super(TestL2xc, cls).setUpClass()
31
32         # Test variables
33         cls.hosts_nr = 10
34         cls.dl_pkts_per_burst = 257
35         cls.sl_pkts_per_burst = 2
36
37         try:
38             # create 4 pg interfaces
39             cls.create_pg_interfaces(range(4))
40
41             # packet flows mapping pg0 -> pg1, pg2 -> pg3, etc.
42             cls.flows = dict()
43             cls.flows[cls.pg0] = [cls.pg1]
44             cls.flows[cls.pg1] = [cls.pg0]
45             cls.flows[cls.pg2] = [cls.pg3]
46             cls.flows[cls.pg3] = [cls.pg2]
47
48             # packet sizes
49             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
50
51             cls.interfaces = list(cls.pg_interfaces)
52
53             # Create bi-directional cross-connects between pg0 and pg1
54             cls.vapi.sw_interface_set_l2_xconnect(
55                 cls.pg0.sw_if_index, cls.pg1.sw_if_index, enable=1
56             )
57             cls.vapi.sw_interface_set_l2_xconnect(
58                 cls.pg1.sw_if_index, cls.pg0.sw_if_index, enable=1
59             )
60
61             # Create bi-directional cross-connects between pg2 and pg3
62             cls.vapi.sw_interface_set_l2_xconnect(
63                 cls.pg2.sw_if_index, cls.pg3.sw_if_index, enable=1
64             )
65             cls.vapi.sw_interface_set_l2_xconnect(
66                 cls.pg3.sw_if_index, cls.pg2.sw_if_index, enable=1
67             )
68
69             # mapping between packet-generator index and lists of test hosts
70             cls.hosts_by_pg_idx = dict()
71
72             # Create host MAC and IPv4 lists
73             cls.create_host_lists(cls.hosts_nr)
74
75             # setup all interfaces
76             for i in cls.interfaces:
77                 i.admin_up()
78
79         except Exception:
80             super(TestL2xc, cls).tearDownClass()
81             raise
82
83     @classmethod
84     def tearDownClass(cls):
85         super(TestL2xc, cls).tearDownClass()
86
87     def setUp(self):
88         super(TestL2xc, self).setUp()
89         self.reset_packet_infos()
90
91     def tearDown(self):
92         """
93         Show various debug prints after each test.
94         """
95         super(TestL2xc, self).tearDown()
96
97     def show_commands_at_teardown(self):
98         self.logger.info(self.vapi.ppcli("show l2patch"))
99
100     @classmethod
101     def create_host_lists(cls, count):
102         """
103         Method to create required number of MAC and IPv4 addresses.
104         Create required number of host MAC addresses and distribute them among
105         interfaces. Create host IPv4 address for every host MAC address too.
106
107         :param count: Number of hosts to create MAC and IPv4 addresses for.
108         """
109         for pg_if in cls.pg_interfaces:
110             cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
111             hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
112             for j in range(0, count):
113                 host = Host(
114                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
115                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
116                 )
117                 hosts.append(host)
118
119     def create_stream(self, src_if, packet_sizes, packets_per_burst):
120         """
121         Create input packet stream for defined interface.
122
123         :param object src_if: Interface to create packet stream for.
124         :param list packet_sizes: List of required packet sizes.
125         :param int packets_per_burst: Number of packets in burst.
126         :return: Stream of packets.
127         """
128         pkts = []
129         for i in range(0, packets_per_burst):
130             dst_if = self.flows[src_if][0]
131             dst_host = random.choice(self.hosts_by_pg_idx[dst_if.sw_if_index])
132             src_host = random.choice(self.hosts_by_pg_idx[src_if.sw_if_index])
133             pkt_info = self.create_packet_info(src_if, dst_if)
134             payload = self.info_to_payload(pkt_info)
135             p = (
136                 Ether(dst=dst_host.mac, src=src_host.mac)
137                 / IP(src=src_host.ip4, dst=dst_host.ip4)
138                 / UDP(sport=1234, dport=1234)
139                 / Raw(payload)
140             )
141             pkt_info.data = p.copy()
142             size = random.choice(packet_sizes)
143             self.extend_packet(p, size)
144             pkts.append(p)
145         return pkts
146
147     def verify_capture(self, pg_if, capture):
148         """
149         Verify captured input packet stream for defined interface.
150
151         :param object pg_if: Interface to verify captured packet stream for.
152         :param list capture: Captured packet stream.
153         """
154         last_info = dict()
155         for i in self.interfaces:
156             last_info[i.sw_if_index] = None
157         dst_sw_if_index = pg_if.sw_if_index
158         for packet in capture:
159             try:
160                 ip = packet[IP]
161                 udp = packet[UDP]
162                 payload_info = self.payload_to_info(packet[Raw])
163                 packet_index = payload_info.index
164                 self.assertEqual(payload_info.dst, dst_sw_if_index)
165                 self.logger.debug(
166                     "Got packet on port %s: src=%u (id=%u)"
167                     % (pg_if.name, payload_info.src, packet_index)
168                 )
169                 next_info = self.get_next_packet_info_for_interface2(
170                     payload_info.src, dst_sw_if_index, last_info[payload_info.src]
171                 )
172                 last_info[payload_info.src] = next_info
173                 self.assertTrue(next_info is not None)
174                 self.assertEqual(packet_index, next_info.index)
175                 saved_packet = next_info.data
176                 # Check standard fields
177                 self.assertEqual(ip.src, saved_packet[IP].src)
178                 self.assertEqual(ip.dst, saved_packet[IP].dst)
179                 self.assertEqual(udp.sport, saved_packet[UDP].sport)
180                 self.assertEqual(udp.dport, saved_packet[UDP].dport)
181             except:
182                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
183                 raise
184         for i in self.interfaces:
185             remaining_packet = self.get_next_packet_info_for_interface2(
186                 i, dst_sw_if_index, last_info[i.sw_if_index]
187             )
188             self.assertTrue(
189                 remaining_packet is None,
190                 "Port %u: Packet expected from source %u didn't"
191                 " arrive" % (dst_sw_if_index, i.sw_if_index),
192             )
193
194     def run_l2xc_test(self, pkts_per_burst):
195         """L2XC test"""
196
197         # Create incoming packet streams for packet-generator interfaces
198         for i in self.interfaces:
199             pkts = self.create_stream(i, self.pg_if_packet_sizes, pkts_per_burst)
200             i.add_stream(pkts)
201
202         # Enable packet capturing and start packet sending
203         self.pg_enable_capture(self.pg_interfaces)
204         self.pg_start()
205
206         # Verify outgoing packet streams per packet-generator interface
207         for i in self.pg_interfaces:
208             capture = i.get_capture()
209             self.logger.info("Verifying capture on interface %s" % i.name)
210             self.verify_capture(i, capture)
211
212     def test_l2xc_sl(self):
213         """L2XC single-loop test
214
215         Test scenario:
216             1. config
217                 2 pairs of 2 interfaces, l2xconnected
218
219             2. sending l2 eth packets between 4 interfaces
220                 64B, 512B, 1518B, 9018B (ether_size)
221                 burst of 2 packets per interface
222         """
223
224         self.run_l2xc_test(self.sl_pkts_per_burst)
225
226     def test_l2xc_dl(self):
227         """L2XC dual-loop test
228
229         Test scenario:
230             1. config
231                 2 pairs of 2 interfaces, l2xconnected
232
233             2. sending l2 eth packets between 4 interfaces
234                 64B, 512B, 1518B, 9018B (ether_size)
235                 burst of 257 packets per interface
236         """
237
238         self.run_l2xc_test(self.dl_pkts_per_burst)
239
240
241 if __name__ == "__main__":
242     unittest.main(testRunner=VppTestRunner)