ip: Replace Sematics for Interface IP addresses
[vpp.git] / test / test_ip4_irb.py
1 #!/usr/bin/env python3
2 """IRB Test Case HLD:
3
4 **config**
5     - L2 MAC learning enabled in l2bd
6     - 2 routed interfaces untagged, bvi (Bridge Virtual Interface)
7     - 2 bridged interfaces in l2bd with bvi
8
9 **test**
10     - sending ip4 eth pkts between routed interfaces
11         - 2 routed interfaces
12         - 2 bridged interfaces
13
14     - 64B, 512B, 1518B, 9200B (ether_size)
15
16     - burst of pkts per interface
17         - 257pkts per burst
18         - routed pkts hitting different FIB entries
19         - bridged pkts hitting different MAC entries
20
21 **verify**
22     - all packets received correctly
23
24 """
25
26 import unittest
27 from random import choice
28
29 from scapy.packet import Raw
30 from scapy.layers.l2 import Ether
31 from scapy.layers.inet import IP, UDP
32
33 from framework import VppTestCase, VppTestRunner
34 from vpp_papi import MACAddress
35 from vpp_l2 import L2_PORT_TYPE
36
37
38 class TestIpIrb(VppTestCase):
39     """IRB Test Case"""
40
41     @classmethod
42     def setUpClass(cls):
43         """
44         #. Create BD with MAC learning enabled and put interfaces to this BD.
45         #. Configure IPv4 addresses on BVI interface and routed interface.
46         #. Configure MAC address binding to IPv4 neighbors on bvi0.
47         #. Configure MAC address on pg2.
48         #. BVI interface has remote hosts, one half of hosts are
49            behind pg0 second behind pg1.
50         """
51         super(TestIpIrb, cls).setUpClass()
52
53         cls.pg_if_packet_sizes = [64, 512, 1518, 9018]  # packet sizes
54         cls.bd_id = 10
55         cls.remote_hosts_count = 250
56
57         # create 3 pg interfaces, 1 BVI interface
58         cls.create_pg_interfaces(range(3))
59         cls.create_bvi_interfaces(1)
60
61         cls.interfaces = list(cls.pg_interfaces)
62         cls.interfaces.extend(cls.bvi_interfaces)
63
64         for i in cls.interfaces:
65             i.admin_up()
66
67         # Create BD with MAC learning enabled and put interfaces to this BD
68         cls.vapi.sw_interface_set_l2_bridge(
69             rx_sw_if_index=cls.bvi0.sw_if_index, bd_id=cls.bd_id,
70             port_type=L2_PORT_TYPE.BVI)
71         cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg0.sw_if_index,
72                                             bd_id=cls.bd_id)
73         cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg1.sw_if_index,
74                                             bd_id=cls.bd_id)
75
76         # Configure IPv4 addresses on BVI interface and routed interface
77         cls.bvi0.config_ip4()
78         cls.pg2.config_ip4()
79
80         # Configure MAC address binding to IPv4 neighbors on bvi0
81         cls.bvi0.generate_remote_hosts(cls.remote_hosts_count)
82         cls.bvi0.configure_ipv4_neighbors()
83         # configure MAC address on pg2
84         cls.pg2.resolve_arp()
85
86         # BVI interface has remote hosts, one half of hosts are behind
87         # pg0 second behind pg1
88         half = cls.remote_hosts_count // 2
89         cls.pg0.remote_hosts = cls.bvi0.remote_hosts[:half]
90         cls.pg1.remote_hosts = cls.bvi0.remote_hosts[half:]
91
92     @classmethod
93     def tearDownClass(cls):
94         super(TestIpIrb, cls).tearDownClass()
95
96     def tearDown(self):
97         """Run standard test teardown and log ``show l2patch``,
98         ``show l2fib verbose``,``show bridge-domain <bd_id> detail``,
99         ``show ip neighbors``.
100         """
101         super(TestIpIrb, self).tearDown()
102
103     def show_commands_at_teardown(self):
104         self.logger.info(self.vapi.cli("show l2patch"))
105         self.logger.info(self.vapi.cli("show l2fib verbose"))
106         self.logger.info(self.vapi.cli("show bridge-domain %s detail" %
107                                        self.bd_id))
108         self.logger.info(self.vapi.cli("show ip neighbors"))
109
110     def create_stream(self, src_ip_if, dst_ip_if, packet_sizes):
111         pkts = []
112         for i in range(0, 257):
113             remote_dst_host = choice(dst_ip_if.remote_hosts)
114             info = self.create_packet_info(src_ip_if, dst_ip_if)
115             payload = self.info_to_payload(info)
116             p = (Ether(dst=src_ip_if.local_mac, src=src_ip_if.remote_mac) /
117                  IP(src=src_ip_if.remote_ip4,
118                     dst=remote_dst_host.ip4) /
119                  UDP(sport=1234, dport=1234) /
120                  Raw(payload))
121             info.data = p.copy()
122             size = packet_sizes[(i // 2) % len(packet_sizes)]
123             self.extend_packet(p, size)
124             pkts.append(p)
125         return pkts
126
127     def create_stream_l2_to_ip(self, src_l2_if, src_ip_if, dst_ip_if,
128                                packet_sizes):
129         pkts = []
130         for i in range(0, 257):
131             info = self.create_packet_info(src_ip_if, dst_ip_if)
132             payload = self.info_to_payload(info)
133
134             host = choice(src_l2_if.remote_hosts)
135
136             p = (Ether(src=host.mac,
137                        dst=src_ip_if.local_mac) /
138                  IP(src=host.ip4,
139                     dst=dst_ip_if.remote_ip4) /
140                  UDP(sport=1234, dport=1234) /
141                  Raw(payload))
142
143             info.data = p.copy()
144             size = packet_sizes[(i // 2) % len(packet_sizes)]
145             self.extend_packet(p, size)
146
147             pkts.append(p)
148         return pkts
149
150     def verify_capture_l2_to_ip(self, dst_ip_if, src_ip_if, capture):
151         last_info = dict()
152         for i in self.interfaces:
153             last_info[i.sw_if_index] = None
154
155         dst_ip_sw_if_index = dst_ip_if.sw_if_index
156
157         for packet in capture:
158             ip = packet[IP]
159             udp = packet[IP][UDP]
160             payload_info = self.payload_to_info(packet[IP][UDP][Raw])
161
162             self.assertEqual(payload_info.dst, dst_ip_sw_if_index)
163
164             next_info = self.get_next_packet_info_for_interface2(
165                 payload_info.src, dst_ip_sw_if_index,
166                 last_info[payload_info.src])
167             last_info[payload_info.src] = next_info
168             self.assertTrue(next_info is not None)
169             saved_packet = next_info.data
170             self.assertTrue(next_info is not None)
171
172             # MAC: src, dst
173             self.assertEqual(packet.src, dst_ip_if.local_mac)
174             self.assertEqual(packet.dst, dst_ip_if.remote_mac)
175
176             # IP: src, dst
177             host = src_ip_if.host_by_ip4(ip.src)
178             self.assertIsNotNone(host)
179             self.assertEqual(ip.dst, saved_packet[IP].dst)
180             self.assertEqual(ip.dst, dst_ip_if.remote_ip4)
181
182             # UDP:
183             self.assertEqual(udp.sport, saved_packet[UDP].sport)
184             self.assertEqual(udp.dport, saved_packet[UDP].dport)
185
186     def verify_capture(self, dst_ip_if, src_ip_if, capture):
187         last_info = dict()
188         for i in self.interfaces:
189             last_info[i.sw_if_index] = None
190
191         dst_ip_sw_if_index = dst_ip_if.sw_if_index
192
193         for packet in capture:
194             ip = packet[IP]
195             udp = packet[IP][UDP]
196             payload_info = self.payload_to_info(packet[IP][UDP][Raw])
197             packet_index = payload_info.index
198
199             self.assertEqual(payload_info.dst, dst_ip_sw_if_index)
200
201             next_info = self.get_next_packet_info_for_interface2(
202                 payload_info.src, dst_ip_sw_if_index,
203                 last_info[payload_info.src])
204             last_info[payload_info.src] = next_info
205             self.assertTrue(next_info is not None)
206             self.assertEqual(packet_index, next_info.index)
207             saved_packet = next_info.data
208             self.assertTrue(next_info is not None)
209
210             # MAC: src, dst
211             self.assertEqual(packet.src, dst_ip_if.local_mac)
212             host = dst_ip_if.host_by_mac(packet.dst)
213
214             # IP: src, dst
215             self.assertEqual(ip.src, src_ip_if.remote_ip4)
216             self.assertEqual(ip.dst, saved_packet[IP].dst)
217             self.assertEqual(ip.dst, host.ip4)
218
219             # UDP:
220             self.assertEqual(udp.sport, saved_packet[UDP].sport)
221             self.assertEqual(udp.dport, saved_packet[UDP].dport)
222
223     def test_ip4_irb_1(self):
224         """ IPv4 IRB test 1
225
226         Test scenario:
227             - ip traffic from pg2 interface must ends in both pg0 and pg1
228             - arp entry present in bvi0 interface for destination IP
229             - no l2 entry configured, pg0 and pg1 are same
230         """
231
232         stream = self.create_stream(
233             self.pg2, self.bvi0, self.pg_if_packet_sizes)
234         self.pg2.add_stream(stream)
235
236         self.pg_enable_capture(self.pg_interfaces)
237         self.pg_start()
238
239         packet_count = self.get_packet_count_for_if_idx(self.bvi0.sw_if_index)
240
241         rcvd1 = self.pg0.get_capture(packet_count)
242         rcvd2 = self.pg1.get_capture(packet_count)
243
244         self.verify_capture(self.bvi0, self.pg2, rcvd1)
245         self.verify_capture(self.bvi0, self.pg2, rcvd2)
246
247         self.assertListEqual(rcvd1.res, rcvd2.res)
248
249     def send_and_verify_l2_to_ip(self):
250         stream1 = self.create_stream_l2_to_ip(
251             self.pg0, self.bvi0, self.pg2, self.pg_if_packet_sizes)
252         stream2 = self.create_stream_l2_to_ip(
253             self.pg1, self.bvi0, self.pg2, self.pg_if_packet_sizes)
254         self.vapi.cli("clear trace")
255         self.pg0.add_stream(stream1)
256         self.pg1.add_stream(stream2)
257
258         self.pg_enable_capture(self.pg_interfaces)
259         self.pg_start()
260
261         rcvd = self.pg2.get_capture(514)
262         self.verify_capture_l2_to_ip(self.pg2, self.bvi0, rcvd)
263
264     def test_ip4_irb_2(self):
265         """ IPv4 IRB test 2
266
267         Test scenario:
268             - ip traffic from pg0 and pg1 ends on pg2
269         """
270         self.send_and_verify_l2_to_ip()
271
272         # change the BVI's mac and resed traffic
273         self.bvi0.set_mac(MACAddress("00:00:00:11:11:33"))
274
275         self.send_and_verify_l2_to_ip()
276         # check it wasn't flooded
277         self.pg1.assert_nothing_captured(remark="UU Flood")
278
279
280 if __name__ == '__main__':
281     unittest.main(testRunner=VppTestRunner)