Update test documentation.
[vpp.git] / test / test_ip4_irb.py
1 #!/usr/bin/env python
2 """IRB Test Case HLD:
3
4 **config**
5     - L2 MAC learning enabled in l2bd
6     - 2 routed interfaces untagged, bvi (Bridge Virtual Interface)
7     - 2 bridged interfaces in l2bd with bvi
8
9 **test**
10     - sending ip4 eth pkts between routed interfaces
11         - 2 routed interfaces
12         - 2 bridged interfaces
13
14     - 64B, 512B, 1518B, 9200B (ether_size)
15
16     - burst of pkts per interface
17         - 257pkts per burst
18         - routed pkts hitting different FIB entries
19         - bridged pkts hitting different MAC entries
20
21 **verify**
22     - all packets received correctly
23
24 """
25
26 import unittest
27 from random import choice
28
29 from scapy.packet import Raw
30 from scapy.layers.l2 import Ether
31 from scapy.layers.inet import IP, UDP
32
33 from framework import VppTestCase, VppTestRunner
34
35
36 class TestIpIrb(VppTestCase):
37     """IRB Test Case"""
38
39     @classmethod
40     def setUpClass(cls):
41         """
42         #. Create BD with MAC learning enabled and put interfaces to this BD.
43         #. Configure IPv4 addresses on loopback interface and routed interface.
44         #. Configure MAC address binding to IPv4 neighbors on loop0.
45         #. Configure MAC address on pg2.
46         #. Loopback BVI interface has remote hosts, one half of hosts are
47            behind pg0 second behind pg1.
48         """
49         super(TestIpIrb, cls).setUpClass()
50
51         cls.pg_if_packet_sizes = [64, 512, 1518, 9018]  # packet sizes
52         cls.bd_id = 10
53         cls.remote_hosts_count = 250
54
55         # create 3 pg interfaces, 1 loopback interface
56         cls.create_pg_interfaces(range(3))
57         cls.create_loopback_interfaces(range(1))
58
59         cls.interfaces = list(cls.pg_interfaces)
60         cls.interfaces.extend(cls.lo_interfaces)
61
62         for i in cls.interfaces:
63             i.admin_up()
64
65         # Create BD with MAC learning enabled and put interfaces to this BD
66         cls.vapi.sw_interface_set_l2_bridge(
67             cls.loop0.sw_if_index, bd_id=cls.bd_id, bvi=1)
68         cls.vapi.sw_interface_set_l2_bridge(
69             cls.pg0.sw_if_index, bd_id=cls.bd_id)
70         cls.vapi.sw_interface_set_l2_bridge(
71             cls.pg1.sw_if_index, bd_id=cls.bd_id)
72
73         # Configure IPv4 addresses on loopback interface and routed interface
74         cls.loop0.config_ip4()
75         cls.pg2.config_ip4()
76
77         # Configure MAC address binding to IPv4 neighbors on loop0
78         cls.loop0.generate_remote_hosts(cls.remote_hosts_count)
79         cls.loop0.configure_ipv4_neighbors()
80         # configure MAC address on pg2
81         cls.pg2.resolve_arp()
82
83         # Loopback BVI interface has remote hosts, one half of hosts are behind
84         # pg0 second behind pg1
85         half = cls.remote_hosts_count // 2
86         cls.pg0.remote_hosts = cls.loop0.remote_hosts[:half]
87         cls.pg1.remote_hosts = cls.loop0.remote_hosts[half:]
88
89     def tearDown(self):
90         """Run standard test teardown and log ``show l2patch``,
91         ``show l2fib verbose``,``show bridge-domain <bd_id> detail``,
92         ``show ip arp``.
93         """
94         super(TestIpIrb, self).tearDown()
95         if not self.vpp_dead:
96             self.logger.info(self.vapi.cli("show l2patch"))
97             self.logger.info(self.vapi.cli("show l2fib verbose"))
98             self.logger.info(self.vapi.cli("show bridge-domain %s detail" %
99                                            self.bd_id))
100             self.logger.info(self.vapi.cli("show ip arp"))
101
102     def create_stream(self, src_ip_if, dst_ip_if, packet_sizes):
103         pkts = []
104         for i in range(0, 257):
105             remote_dst_host = choice(dst_ip_if.remote_hosts)
106             info = self.create_packet_info(
107                 src_ip_if.sw_if_index, dst_ip_if.sw_if_index)
108             payload = self.info_to_payload(info)
109             p = (Ether(dst=src_ip_if.local_mac, src=src_ip_if.remote_mac) /
110                  IP(src=src_ip_if.remote_ip4,
111                     dst=remote_dst_host.ip4) /
112                  UDP(sport=1234, dport=1234) /
113                  Raw(payload))
114             info.data = p.copy()
115             size = packet_sizes[(i // 2) % len(packet_sizes)]
116             self.extend_packet(p, size)
117             pkts.append(p)
118         return pkts
119
120     def create_stream_l2_to_ip(self, src_l2_if, src_ip_if, dst_ip_if,
121                                packet_sizes):
122         pkts = []
123         for i in range(0, 257):
124             info = self.create_packet_info(
125                 src_ip_if.sw_if_index, dst_ip_if.sw_if_index)
126             payload = self.info_to_payload(info)
127
128             host = choice(src_l2_if.remote_hosts)
129
130             p = (Ether(src=host.mac,
131                        dst = src_ip_if.local_mac) /
132                  IP(src=host.ip4,
133                     dst=dst_ip_if.remote_ip4) /
134                  UDP(sport=1234, dport=1234) /
135                  Raw(payload))
136
137             info.data = p.copy()
138             size = packet_sizes[(i // 2) % len(packet_sizes)]
139             self.extend_packet(p, size)
140
141             pkts.append(p)
142         return pkts
143
144     def verify_capture_l2_to_ip(self, dst_ip_if, src_ip_if, capture):
145         last_info = dict()
146         for i in self.interfaces:
147             last_info[i.sw_if_index] = None
148
149         dst_ip_sw_if_index = dst_ip_if.sw_if_index
150
151         for packet in capture:
152             ip = packet[IP]
153             udp = packet[IP][UDP]
154             payload_info = self.payload_to_info(str(packet[IP][UDP][Raw]))
155             packet_index = payload_info.index
156
157             self.assertEqual(payload_info.dst, dst_ip_sw_if_index)
158
159             next_info = self.get_next_packet_info_for_interface2(
160                 payload_info.src, dst_ip_sw_if_index,
161                 last_info[payload_info.src])
162             last_info[payload_info.src] = next_info
163             self.assertTrue(next_info is not None)
164             saved_packet = next_info.data
165             self.assertTrue(next_info is not None)
166
167             # MAC: src, dst
168             self.assertEqual(packet.src, dst_ip_if.local_mac)
169             self.assertEqual(packet.dst, dst_ip_if.remote_mac)
170
171             # IP: src, dst
172             host = src_ip_if.host_by_ip4(ip.src)
173             self.assertIsNotNone(host)
174             self.assertEqual(ip.dst, saved_packet[IP].dst)
175             self.assertEqual(ip.dst, dst_ip_if.remote_ip4)
176
177             # UDP:
178             self.assertEqual(udp.sport, saved_packet[UDP].sport)
179             self.assertEqual(udp.dport, saved_packet[UDP].dport)
180
181     def verify_capture(self, dst_ip_if, src_ip_if, capture):
182         last_info = dict()
183         for i in self.interfaces:
184             last_info[i.sw_if_index] = None
185
186         dst_ip_sw_if_index = dst_ip_if.sw_if_index
187
188         for packet in capture:
189             ip = packet[IP]
190             udp = packet[IP][UDP]
191             payload_info = self.payload_to_info(str(packet[IP][UDP][Raw]))
192             packet_index = payload_info.index
193
194             self.assertEqual(payload_info.dst, dst_ip_sw_if_index)
195
196             next_info = self.get_next_packet_info_for_interface2(
197                 payload_info.src, dst_ip_sw_if_index,
198                 last_info[payload_info.src])
199             last_info[payload_info.src] = next_info
200             self.assertTrue(next_info is not None)
201             self.assertEqual(packet_index, next_info.index)
202             saved_packet = next_info.data
203             self.assertTrue(next_info is not None)
204
205             # MAC: src, dst
206             self.assertEqual(packet.src, dst_ip_if.local_mac)
207             host = dst_ip_if.host_by_mac(packet.dst)
208
209             # IP: src, dst
210             self.assertEqual(ip.src, src_ip_if.remote_ip4)
211             self.assertEqual(ip.dst, saved_packet[IP].dst)
212             self.assertEqual(ip.dst, host.ip4)
213
214             # UDP:
215             self.assertEqual(udp.sport, saved_packet[UDP].sport)
216             self.assertEqual(udp.dport, saved_packet[UDP].dport)
217
218     def test_ip4_irb_1(self):
219         """ IPv4 IRB test 1
220
221         Test scenario:
222             - ip traffic from pg2 interface must ends in both pg0 and pg1
223             - arp entry present in loop0 interface for destination IP
224             - no l2 entree configured, pg0 and pg1 are same
225         """
226
227         stream = self.create_stream(
228             self.pg2, self.loop0, self.pg_if_packet_sizes)
229         self.pg2.add_stream(stream)
230
231         self.pg_enable_capture(self.pg_interfaces)
232         self.pg_start()
233
234         rcvd1 = self.pg0.get_capture()
235         rcvd2 = self.pg1.get_capture()
236
237         self.verify_capture(self.loop0, self.pg2, rcvd1)
238         self.verify_capture(self.loop0, self.pg2, rcvd2)
239
240         self.assertListEqual(rcvd1.res, rcvd2.res)
241
242     def test_ip4_irb_2(self):
243         """ IPv4 IRB test 2
244
245         Test scenario:
246             - ip traffic from pg0 and pg1 ends on pg2
247         """
248
249         stream1 = self.create_stream_l2_to_ip(
250             self.pg0, self.loop0, self.pg2, self.pg_if_packet_sizes)
251         stream2 = self.create_stream_l2_to_ip(
252             self.pg1, self.loop0, self.pg2, self.pg_if_packet_sizes)
253         self.pg0.add_stream(stream1)
254         self.pg1.add_stream(stream2)
255
256         self.pg_enable_capture(self.pg_interfaces)
257         self.pg_start()
258
259         rcvd = self.pg2.get_capture()
260         self.verify_capture_l2_to_ip(self.pg2, self.loop0, rcvd)
261
262         self.assertEqual(len(stream1) + len(stream2), len(rcvd.res))
263
264
265 if __name__ == '__main__':
266     unittest.main(testRunner=VppTestRunner)