tests: Use errno value rather than a specific int
[vpp.git] / test / test_l2xc.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import random
5
6 from scapy.packet import Raw
7 from scapy.layers.l2 import Ether
8 from scapy.layers.inet import IP, UDP
9
10 from framework import VppTestCase
11 from asfframework import VppTestRunner
12 from util import Host, ppp
13
14
15 class TestL2xc(VppTestCase):
16     """L2XC Test Case"""
17
18     @classmethod
19     def setUpClass(cls):
20         """
21         Perform standard class setup (defined by class method setUpClass in
22         class VppTestCase) before running the test case, set test case related
23         variables and configure VPP.
24
25         :var int hosts_nr: Number of hosts to be created.
26         :var int dl_pkts_per_burst: Number of packets in burst for dual-loop
27             test.
28         :var int sl_pkts_per_burst: Number of packets in burst for single-loop
29             test.
30         """
31         super(TestL2xc, cls).setUpClass()
32
33         # Test variables
34         cls.hosts_nr = 10
35         cls.dl_pkts_per_burst = 257
36         cls.sl_pkts_per_burst = 2
37
38         try:
39             # create 4 pg interfaces
40             cls.create_pg_interfaces(range(4))
41
42             # packet flows mapping pg0 -> pg1, pg2 -> pg3, etc.
43             cls.flows = dict()
44             cls.flows[cls.pg0] = [cls.pg1]
45             cls.flows[cls.pg1] = [cls.pg0]
46             cls.flows[cls.pg2] = [cls.pg3]
47             cls.flows[cls.pg3] = [cls.pg2]
48
49             # packet sizes
50             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
51
52             cls.interfaces = list(cls.pg_interfaces)
53
54             # Create bi-directional cross-connects between pg0 and pg1
55             cls.vapi.sw_interface_set_l2_xconnect(
56                 cls.pg0.sw_if_index, cls.pg1.sw_if_index, enable=1
57             )
58             cls.vapi.sw_interface_set_l2_xconnect(
59                 cls.pg1.sw_if_index, cls.pg0.sw_if_index, enable=1
60             )
61
62             # Create bi-directional cross-connects between pg2 and pg3
63             cls.vapi.sw_interface_set_l2_xconnect(
64                 cls.pg2.sw_if_index, cls.pg3.sw_if_index, enable=1
65             )
66             cls.vapi.sw_interface_set_l2_xconnect(
67                 cls.pg3.sw_if_index, cls.pg2.sw_if_index, enable=1
68             )
69
70             # mapping between packet-generator index and lists of test hosts
71             cls.hosts_by_pg_idx = dict()
72
73             # Create host MAC and IPv4 lists
74             cls.create_host_lists(cls.hosts_nr)
75
76             # setup all interfaces
77             for i in cls.interfaces:
78                 i.admin_up()
79
80         except Exception:
81             super(TestL2xc, cls).tearDownClass()
82             raise
83
84     @classmethod
85     def tearDownClass(cls):
86         super(TestL2xc, cls).tearDownClass()
87
88     def setUp(self):
89         super(TestL2xc, self).setUp()
90         self.reset_packet_infos()
91
92     def tearDown(self):
93         """
94         Show various debug prints after each test.
95         """
96         super(TestL2xc, self).tearDown()
97
98     def show_commands_at_teardown(self):
99         self.logger.info(self.vapi.ppcli("show l2patch"))
100
101     @classmethod
102     def create_host_lists(cls, count):
103         """
104         Method to create required number of MAC and IPv4 addresses.
105         Create required number of host MAC addresses and distribute them among
106         interfaces. Create host IPv4 address for every host MAC address too.
107
108         :param count: Number of hosts to create MAC and IPv4 addresses for.
109         """
110         for pg_if in cls.pg_interfaces:
111             cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
112             hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
113             for j in range(0, count):
114                 host = Host(
115                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
116                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
117                 )
118                 hosts.append(host)
119
120     def create_stream(self, src_if, packet_sizes, packets_per_burst):
121         """
122         Create input packet stream for defined interface.
123
124         :param object src_if: Interface to create packet stream for.
125         :param list packet_sizes: List of required packet sizes.
126         :param int packets_per_burst: Number of packets in burst.
127         :return: Stream of packets.
128         """
129         pkts = []
130         for i in range(0, packets_per_burst):
131             dst_if = self.flows[src_if][0]
132             dst_host = random.choice(self.hosts_by_pg_idx[dst_if.sw_if_index])
133             src_host = random.choice(self.hosts_by_pg_idx[src_if.sw_if_index])
134             pkt_info = self.create_packet_info(src_if, dst_if)
135             payload = self.info_to_payload(pkt_info)
136             p = (
137                 Ether(dst=dst_host.mac, src=src_host.mac)
138                 / IP(src=src_host.ip4, dst=dst_host.ip4)
139                 / UDP(sport=1234, dport=1234)
140                 / Raw(payload)
141             )
142             pkt_info.data = p.copy()
143             size = random.choice(packet_sizes)
144             self.extend_packet(p, size)
145             pkts.append(p)
146         return pkts
147
148     def verify_capture(self, pg_if, capture):
149         """
150         Verify captured input packet stream for defined interface.
151
152         :param object pg_if: Interface to verify captured packet stream for.
153         :param list capture: Captured packet stream.
154         """
155         last_info = dict()
156         for i in self.interfaces:
157             last_info[i.sw_if_index] = None
158         dst_sw_if_index = pg_if.sw_if_index
159         for packet in capture:
160             try:
161                 ip = packet[IP]
162                 udp = packet[UDP]
163                 payload_info = self.payload_to_info(packet[Raw])
164                 packet_index = payload_info.index
165                 self.assertEqual(payload_info.dst, dst_sw_if_index)
166                 self.logger.debug(
167                     "Got packet on port %s: src=%u (id=%u)"
168                     % (pg_if.name, payload_info.src, packet_index)
169                 )
170                 next_info = self.get_next_packet_info_for_interface2(
171                     payload_info.src, dst_sw_if_index, last_info[payload_info.src]
172                 )
173                 last_info[payload_info.src] = next_info
174                 self.assertTrue(next_info is not None)
175                 self.assertEqual(packet_index, next_info.index)
176                 saved_packet = next_info.data
177                 # Check standard fields
178                 self.assertEqual(ip.src, saved_packet[IP].src)
179                 self.assertEqual(ip.dst, saved_packet[IP].dst)
180                 self.assertEqual(udp.sport, saved_packet[UDP].sport)
181                 self.assertEqual(udp.dport, saved_packet[UDP].dport)
182             except:
183                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
184                 raise
185         for i in self.interfaces:
186             remaining_packet = self.get_next_packet_info_for_interface2(
187                 i, dst_sw_if_index, last_info[i.sw_if_index]
188             )
189             self.assertTrue(
190                 remaining_packet is None,
191                 "Port %u: Packet expected from source %u didn't"
192                 " arrive" % (dst_sw_if_index, i.sw_if_index),
193             )
194
195     def run_l2xc_test(self, pkts_per_burst):
196         """L2XC test"""
197
198         # Create incoming packet streams for packet-generator interfaces
199         for i in self.interfaces:
200             pkts = self.create_stream(i, self.pg_if_packet_sizes, pkts_per_burst)
201             i.add_stream(pkts)
202
203         # Enable packet capturing and start packet sending
204         self.pg_enable_capture(self.pg_interfaces)
205         self.pg_start()
206
207         # Verify outgoing packet streams per packet-generator interface
208         for i in self.pg_interfaces:
209             capture = i.get_capture()
210             self.logger.info("Verifying capture on interface %s" % i.name)
211             self.verify_capture(i, capture)
212
213     def test_l2xc_sl(self):
214         """L2XC single-loop test
215
216         Test scenario:
217             1. config
218                 2 pairs of 2 interfaces, l2xconnected
219
220             2. sending l2 eth packets between 4 interfaces
221                 64B, 512B, 1518B, 9018B (ether_size)
222                 burst of 2 packets per interface
223         """
224
225         self.run_l2xc_test(self.sl_pkts_per_burst)
226
227     def test_l2xc_dl(self):
228         """L2XC dual-loop test
229
230         Test scenario:
231             1. config
232                 2 pairs of 2 interfaces, l2xconnected
233
234             2. sending l2 eth packets between 4 interfaces
235                 64B, 512B, 1518B, 9018B (ether_size)
236                 burst of 257 packets per interface
237         """
238
239         self.run_l2xc_test(self.dl_pkts_per_burst)
240
241
242 if __name__ == "__main__":
243     unittest.main(testRunner=VppTestRunner)