Tests Cleanup: Fix missing calls to setUpClass/tearDownClass.
[vpp.git] / test / test_ip4_irb.py
1 #!/usr/bin/env python
2 """IRB Test Case HLD:
3
4 **config**
5     - L2 MAC learning enabled in l2bd
6     - 2 routed interfaces untagged, bvi (Bridge Virtual Interface)
7     - 2 bridged interfaces in l2bd with bvi
8
9 **test**
10     - sending ip4 eth pkts between routed interfaces
11         - 2 routed interfaces
12         - 2 bridged interfaces
13
14     - 64B, 512B, 1518B, 9200B (ether_size)
15
16     - burst of pkts per interface
17         - 257pkts per burst
18         - routed pkts hitting different FIB entries
19         - bridged pkts hitting different MAC entries
20
21 **verify**
22     - all packets received correctly
23
24 """
25
26 import unittest
27 from random import choice
28
29 from scapy.packet import Raw
30 from scapy.layers.l2 import Ether
31 from scapy.layers.inet import IP, UDP
32
33 from framework import VppTestCase, VppTestRunner
34 from vpp_papi import MACAddress
35 from vpp_l2 import L2_PORT_TYPE
36
37
38 class TestIpIrb(VppTestCase):
39     """IRB Test Case"""
40
41     @classmethod
42     def setUpClass(cls):
43         """
44         #. Create BD with MAC learning enabled and put interfaces to this BD.
45         #. Configure IPv4 addresses on BVI interface and routed interface.
46         #. Configure MAC address binding to IPv4 neighbors on bvi0.
47         #. Configure MAC address on pg2.
48         #. BVI interface has remote hosts, one half of hosts are
49            behind pg0 second behind pg1.
50         """
51         super(TestIpIrb, cls).setUpClass()
52
53         cls.pg_if_packet_sizes = [64, 512, 1518, 9018]  # packet sizes
54         cls.bd_id = 10
55         cls.remote_hosts_count = 250
56
57         # create 3 pg interfaces, 1 BVI interface
58         cls.create_pg_interfaces(range(3))
59         cls.create_bvi_interfaces(1)
60
61         cls.interfaces = list(cls.pg_interfaces)
62         cls.interfaces.extend(cls.bvi_interfaces)
63
64         for i in cls.interfaces:
65             i.admin_up()
66
67         # Create BD with MAC learning enabled and put interfaces to this BD
68         cls.vapi.sw_interface_set_l2_bridge(
69             rx_sw_if_index=cls.bvi0.sw_if_index, bd_id=cls.bd_id,
70             port_type=L2_PORT_TYPE.BVI)
71         cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg0.sw_if_index,
72                                             bd_id=cls.bd_id)
73         cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg1.sw_if_index,
74                                             bd_id=cls.bd_id)
75
76         # Configure IPv4 addresses on BVI interface and routed interface
77         cls.bvi0.config_ip4()
78         cls.pg2.config_ip4()
79
80         # Configure MAC address binding to IPv4 neighbors on bvi0
81         cls.bvi0.generate_remote_hosts(cls.remote_hosts_count)
82         cls.bvi0.configure_ipv4_neighbors()
83         # configure MAC address on pg2
84         cls.pg2.resolve_arp()
85
86         # BVI interface has remote hosts, one half of hosts are behind
87         # pg0 second behind pg1
88         half = cls.remote_hosts_count // 2
89         cls.pg0.remote_hosts = cls.bvi0.remote_hosts[:half]
90         cls.pg1.remote_hosts = cls.bvi0.remote_hosts[half:]
91
92     @classmethod
93     def tearDownClass(cls):
94         super(TestIpIrb, cls).tearDownClass()
95
96     def tearDown(self):
97         """Run standard test teardown and log ``show l2patch``,
98         ``show l2fib verbose``,``show bridge-domain <bd_id> detail``,
99         ``show ip arp``.
100         """
101         super(TestIpIrb, self).tearDown()
102         if not self.vpp_dead:
103             self.logger.info(self.vapi.cli("show l2patch"))
104             self.logger.info(self.vapi.cli("show l2fib verbose"))
105             self.logger.info(self.vapi.cli("show bridge-domain %s detail" %
106                                            self.bd_id))
107             self.logger.info(self.vapi.cli("show ip arp"))
108
109     def create_stream(self, src_ip_if, dst_ip_if, packet_sizes):
110         pkts = []
111         for i in range(0, 257):
112             remote_dst_host = choice(dst_ip_if.remote_hosts)
113             info = self.create_packet_info(src_ip_if, dst_ip_if)
114             payload = self.info_to_payload(info)
115             p = (Ether(dst=src_ip_if.local_mac, src=src_ip_if.remote_mac) /
116                  IP(src=src_ip_if.remote_ip4,
117                     dst=remote_dst_host.ip4) /
118                  UDP(sport=1234, dport=1234) /
119                  Raw(payload))
120             info.data = p.copy()
121             size = packet_sizes[(i // 2) % len(packet_sizes)]
122             self.extend_packet(p, size)
123             pkts.append(p)
124         return pkts
125
126     def create_stream_l2_to_ip(self, src_l2_if, src_ip_if, dst_ip_if,
127                                packet_sizes):
128         pkts = []
129         for i in range(0, 257):
130             info = self.create_packet_info(src_ip_if, dst_ip_if)
131             payload = self.info_to_payload(info)
132
133             host = choice(src_l2_if.remote_hosts)
134
135             p = (Ether(src=host.mac,
136                        dst=src_ip_if.local_mac) /
137                  IP(src=host.ip4,
138                     dst=dst_ip_if.remote_ip4) /
139                  UDP(sport=1234, dport=1234) /
140                  Raw(payload))
141
142             info.data = p.copy()
143             size = packet_sizes[(i // 2) % len(packet_sizes)]
144             self.extend_packet(p, size)
145
146             pkts.append(p)
147         return pkts
148
149     def verify_capture_l2_to_ip(self, dst_ip_if, src_ip_if, capture):
150         last_info = dict()
151         for i in self.interfaces:
152             last_info[i.sw_if_index] = None
153
154         dst_ip_sw_if_index = dst_ip_if.sw_if_index
155
156         for packet in capture:
157             ip = packet[IP]
158             udp = packet[IP][UDP]
159             payload_info = self.payload_to_info(packet[IP][UDP][Raw])
160
161             self.assertEqual(payload_info.dst, dst_ip_sw_if_index)
162
163             next_info = self.get_next_packet_info_for_interface2(
164                 payload_info.src, dst_ip_sw_if_index,
165                 last_info[payload_info.src])
166             last_info[payload_info.src] = next_info
167             self.assertTrue(next_info is not None)
168             saved_packet = next_info.data
169             self.assertTrue(next_info is not None)
170
171             # MAC: src, dst
172             self.assertEqual(packet.src, dst_ip_if.local_mac)
173             self.assertEqual(packet.dst, dst_ip_if.remote_mac)
174
175             # IP: src, dst
176             host = src_ip_if.host_by_ip4(ip.src)
177             self.assertIsNotNone(host)
178             self.assertEqual(ip.dst, saved_packet[IP].dst)
179             self.assertEqual(ip.dst, dst_ip_if.remote_ip4)
180
181             # UDP:
182             self.assertEqual(udp.sport, saved_packet[UDP].sport)
183             self.assertEqual(udp.dport, saved_packet[UDP].dport)
184
185     def verify_capture(self, dst_ip_if, src_ip_if, capture):
186         last_info = dict()
187         for i in self.interfaces:
188             last_info[i.sw_if_index] = None
189
190         dst_ip_sw_if_index = dst_ip_if.sw_if_index
191
192         for packet in capture:
193             ip = packet[IP]
194             udp = packet[IP][UDP]
195             payload_info = self.payload_to_info(packet[IP][UDP][Raw])
196             packet_index = payload_info.index
197
198             self.assertEqual(payload_info.dst, dst_ip_sw_if_index)
199
200             next_info = self.get_next_packet_info_for_interface2(
201                 payload_info.src, dst_ip_sw_if_index,
202                 last_info[payload_info.src])
203             last_info[payload_info.src] = next_info
204             self.assertTrue(next_info is not None)
205             self.assertEqual(packet_index, next_info.index)
206             saved_packet = next_info.data
207             self.assertTrue(next_info is not None)
208
209             # MAC: src, dst
210             self.assertEqual(packet.src, dst_ip_if.local_mac)
211             host = dst_ip_if.host_by_mac(packet.dst)
212
213             # IP: src, dst
214             self.assertEqual(ip.src, src_ip_if.remote_ip4)
215             self.assertEqual(ip.dst, saved_packet[IP].dst)
216             self.assertEqual(ip.dst, host.ip4)
217
218             # UDP:
219             self.assertEqual(udp.sport, saved_packet[UDP].sport)
220             self.assertEqual(udp.dport, saved_packet[UDP].dport)
221
222     def test_ip4_irb_1(self):
223         """ IPv4 IRB test 1
224
225         Test scenario:
226             - ip traffic from pg2 interface must ends in both pg0 and pg1
227             - arp entry present in bvi0 interface for destination IP
228             - no l2 entry configured, pg0 and pg1 are same
229         """
230
231         stream = self.create_stream(
232             self.pg2, self.bvi0, self.pg_if_packet_sizes)
233         self.pg2.add_stream(stream)
234
235         self.pg_enable_capture(self.pg_interfaces)
236         self.pg_start()
237
238         packet_count = self.get_packet_count_for_if_idx(self.bvi0.sw_if_index)
239
240         rcvd1 = self.pg0.get_capture(packet_count)
241         rcvd2 = self.pg1.get_capture(packet_count)
242
243         self.verify_capture(self.bvi0, self.pg2, rcvd1)
244         self.verify_capture(self.bvi0, self.pg2, rcvd2)
245
246         self.assertListEqual(rcvd1.res, rcvd2.res)
247
248     def send_and_verify_l2_to_ip(self):
249         stream1 = self.create_stream_l2_to_ip(
250             self.pg0, self.bvi0, self.pg2, self.pg_if_packet_sizes)
251         stream2 = self.create_stream_l2_to_ip(
252             self.pg1, self.bvi0, self.pg2, self.pg_if_packet_sizes)
253         self.vapi.cli("clear trace")
254         self.pg0.add_stream(stream1)
255         self.pg1.add_stream(stream2)
256
257         self.pg_enable_capture(self.pg_interfaces)
258         self.pg_start()
259
260         rcvd = self.pg2.get_capture(514)
261         self.verify_capture_l2_to_ip(self.pg2, self.bvi0, rcvd)
262
263     def test_ip4_irb_2(self):
264         """ IPv4 IRB test 2
265
266         Test scenario:
267             - ip traffic from pg0 and pg1 ends on pg2
268         """
269         self.send_and_verify_l2_to_ip()
270
271         # change the BVI's mac and resed traffic
272         self.bvi0.set_mac(MACAddress("00:00:00:11:11:33"))
273
274         self.send_and_verify_l2_to_ip()
275         # check it wasn't flooded
276         self.pg1.assert_nothing_captured(remark="UU Flood")
277
278
279 if __name__ == '__main__':
280     unittest.main(testRunner=VppTestRunner)