1e786e48b4df8b8abe0c7475a0bfe31f4c0ec5fa
[vpp.git] / test / test_l2xc.py
1 #!/usr/bin/env python
2
3 import unittest
4 import random
5
6 from scapy.packet import Raw
7 from scapy.layers.l2 import Ether
8 from scapy.layers.inet import IP, UDP
9
10 from framework import VppTestCase, VppTestRunner
11 from util import Host, ppp
12
13
14 class TestL2xc(VppTestCase):
15     """ L2XC Test Case """
16
17     @classmethod
18     def setUpClass(cls):
19         """
20         Perform standard class setup (defined by class method setUpClass in
21         class VppTestCase) before running the test case, set test case related
22         variables and configure VPP.
23
24         :var int hosts_nr: Number of hosts to be created.
25         :var int dl_pkts_per_burst: Number of packets in burst for dual-loop
26             test.
27         :var int sl_pkts_per_burst: Number of packets in burst for single-loop
28             test.
29         """
30         super(TestL2xc, cls).setUpClass()
31
32         # Test variables
33         cls.hosts_nr = 10
34         cls.dl_pkts_per_burst = 257
35         cls.sl_pkts_per_burst = 2
36
37         try:
38             # create 4 pg interfaces
39             cls.create_pg_interfaces(range(4))
40
41             # packet flows mapping pg0 -> pg1, pg2 -> pg3, etc.
42             cls.flows = dict()
43             cls.flows[cls.pg0] = [cls.pg1]
44             cls.flows[cls.pg1] = [cls.pg0]
45             cls.flows[cls.pg2] = [cls.pg3]
46             cls.flows[cls.pg3] = [cls.pg2]
47
48             # packet sizes
49             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
50
51             cls.interfaces = list(cls.pg_interfaces)
52
53             # Create bi-directional cross-connects between pg0 and pg1
54             cls.vapi.sw_interface_set_l2_xconnect(
55                 cls.pg0.sw_if_index, cls.pg1.sw_if_index, enable=1)
56             cls.vapi.sw_interface_set_l2_xconnect(
57                 cls.pg1.sw_if_index, cls.pg0.sw_if_index, enable=1)
58
59             # Create bi-directional cross-connects between pg2 and pg3
60             cls.vapi.sw_interface_set_l2_xconnect(
61                 cls.pg2.sw_if_index, cls.pg3.sw_if_index, enable=1)
62             cls.vapi.sw_interface_set_l2_xconnect(
63                 cls.pg3.sw_if_index, cls.pg2.sw_if_index, enable=1)
64
65             # mapping between packet-generator index and lists of test hosts
66             cls.hosts_by_pg_idx = dict()
67
68             # Create host MAC and IPv4 lists
69             cls.create_host_lists(cls.hosts_nr)
70
71             # setup all interfaces
72             for i in cls.interfaces:
73                 i.admin_up()
74
75         except Exception:
76             super(TestL2xc, cls).tearDownClass()
77             raise
78
79     @classmethod
80     def tearDownClass(cls):
81         super(TestL2xc, cls).tearDownClass()
82
83     def setUp(self):
84         super(TestL2xc, self).setUp()
85         self.reset_packet_infos()
86
87     def tearDown(self):
88         """
89         Show various debug prints after each test.
90         """
91         super(TestL2xc, self).tearDown()
92
93     def show_commands_at_teardown(self):
94         self.logger.info(self.vapi.ppcli("show l2patch"))
95
96     @classmethod
97     def create_host_lists(cls, count):
98         """
99         Method to create required number of MAC and IPv4 addresses.
100         Create required number of host MAC addresses and distribute them among
101         interfaces. Create host IPv4 address for every host MAC address too.
102
103         :param count: Number of hosts to create MAC and IPv4 addresses for.
104         """
105         for pg_if in cls.pg_interfaces:
106             cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
107             hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
108             for j in range(0, count):
109                 host = Host(
110                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
111                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j))
112                 hosts.append(host)
113
114     def create_stream(self, src_if, packet_sizes, packets_per_burst):
115         """
116         Create input packet stream for defined interface.
117
118         :param object src_if: Interface to create packet stream for.
119         :param list packet_sizes: List of required packet sizes.
120         :param int packets_per_burst: Number of packets in burst.
121         :return: Stream of packets.
122         """
123         pkts = []
124         for i in range(0, packets_per_burst):
125             dst_if = self.flows[src_if][0]
126             dst_host = random.choice(self.hosts_by_pg_idx[dst_if.sw_if_index])
127             src_host = random.choice(self.hosts_by_pg_idx[src_if.sw_if_index])
128             pkt_info = self.create_packet_info(src_if, dst_if)
129             payload = self.info_to_payload(pkt_info)
130             p = (Ether(dst=dst_host.mac, src=src_host.mac) /
131                  IP(src=src_host.ip4, dst=dst_host.ip4) /
132                  UDP(sport=1234, dport=1234) /
133                  Raw(payload))
134             pkt_info.data = p.copy()
135             size = random.choice(packet_sizes)
136             self.extend_packet(p, size)
137             pkts.append(p)
138         return pkts
139
140     def verify_capture(self, pg_if, capture):
141         """
142         Verify captured input packet stream for defined interface.
143
144         :param object pg_if: Interface to verify captured packet stream for.
145         :param list capture: Captured packet stream.
146         """
147         last_info = dict()
148         for i in self.interfaces:
149             last_info[i.sw_if_index] = None
150         dst_sw_if_index = pg_if.sw_if_index
151         for packet in capture:
152             try:
153                 ip = packet[IP]
154                 udp = packet[UDP]
155                 payload_info = self.payload_to_info(packet[Raw])
156                 packet_index = payload_info.index
157                 self.assertEqual(payload_info.dst, dst_sw_if_index)
158                 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
159                                   (pg_if.name, payload_info.src, packet_index))
160                 next_info = self.get_next_packet_info_for_interface2(
161                     payload_info.src, dst_sw_if_index,
162                     last_info[payload_info.src])
163                 last_info[payload_info.src] = next_info
164                 self.assertTrue(next_info is not None)
165                 self.assertEqual(packet_index, next_info.index)
166                 saved_packet = next_info.data
167                 # Check standard fields
168                 self.assertEqual(ip.src, saved_packet[IP].src)
169                 self.assertEqual(ip.dst, saved_packet[IP].dst)
170                 self.assertEqual(udp.sport, saved_packet[UDP].sport)
171                 self.assertEqual(udp.dport, saved_packet[UDP].dport)
172             except:
173                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
174                 raise
175         for i in self.interfaces:
176             remaining_packet = self.get_next_packet_info_for_interface2(
177                 i, dst_sw_if_index, last_info[i.sw_if_index])
178             self.assertTrue(remaining_packet is None,
179                             "Port %u: Packet expected from source %u didn't"
180                             " arrive" % (dst_sw_if_index, i.sw_if_index))
181
182     def run_l2xc_test(self, pkts_per_burst):
183         """ L2XC test """
184
185         # Create incoming packet streams for packet-generator interfaces
186         for i in self.interfaces:
187             pkts = self.create_stream(i, self.pg_if_packet_sizes,
188                                       pkts_per_burst)
189             i.add_stream(pkts)
190
191         # Enable packet capturing and start packet sending
192         self.pg_enable_capture(self.pg_interfaces)
193         self.pg_start()
194
195         # Verify outgoing packet streams per packet-generator interface
196         for i in self.pg_interfaces:
197             capture = i.get_capture()
198             self.logger.info("Verifying capture on interface %s" % i.name)
199             self.verify_capture(i, capture)
200
201     def test_l2xc_sl(self):
202         """ L2XC single-loop test
203
204         Test scenario:
205             1. config
206                 2 pairs of 2 interfaces, l2xconnected
207
208             2. sending l2 eth packets between 4 interfaces
209                 64B, 512B, 1518B, 9018B (ether_size)
210                 burst of 2 packets per interface
211         """
212
213         self.run_l2xc_test(self.sl_pkts_per_burst)
214
215     def test_l2xc_dl(self):
216         """ L2XC dual-loop test
217
218         Test scenario:
219             1. config
220                 2 pairs of 2 interfaces, l2xconnected
221
222             2. sending l2 eth packets between 4 interfaces
223                 64B, 512B, 1518B, 9018B (ether_size)
224                 burst of 257 packets per interface
225         """
226
227         self.run_l2xc_test(self.dl_pkts_per_burst)
228
229
230 if __name__ == '__main__':
231     unittest.main(testRunner=VppTestRunner)