Add IRB test
[vpp.git] / test / test_ip4_irb.py
1 #!/usr/bin/env python
2 import unittest
3 from random import choice, randint
4
5 from scapy.packet import Raw
6 from scapy.layers.l2 import Ether
7 from scapy.layers.inet import IP, UDP
8 from logging import *
9
10 from framework import VppTestCase, VppTestRunner
11
12 """ IRB Test Case
13
14 config
15     L2 MAC learning enabled in l2bd
16     2 routed interfaces untagged, bvi
17     2 bridged interfaces in l2bd with bvi
18 test
19     sending ip4 eth pkts between routed interfaces
20         2 routed interfaces
21         2 bridged interfaces
22     64B, 512B, 1518B, 9200B (ether_size)
23     burst of pkts per interface
24         257pkts per burst
25         routed pkts hitting different FIB entries
26         bridged pkts hitting different MAC entries
27 verify
28     all packets received correctly
29 """
30
31
32 class TestIpIrb(VppTestCase):
33     """ IRB Test Case """
34
35     @classmethod
36     def setUpClass(cls):
37         super(TestIpIrb, cls).setUpClass()
38
39         cls.pg_if_packet_sizes = [64, 512, 1518, 9018]  # packet sizes
40         cls.bd_id = 10
41         cls.remote_hosts_count = 250
42
43         # create 3 pg interfaces, 1 loopback interface
44         cls.create_pg_interfaces(range(3))
45         cls.create_loopback_interfaces(range(1))
46
47         cls.interfaces = list(cls.pg_interfaces)
48         cls.interfaces.extend(cls.lo_interfaces)
49
50         for i in cls.interfaces:
51             i.admin_up()
52
53         # Create BD with MAC learning enabled and put interfaces to this BD
54         cls.vapi.sw_interface_set_l2_bridge(
55             cls.loop0.sw_if_index, bd_id=cls.bd_id, bvi=1)
56         cls.vapi.sw_interface_set_l2_bridge(
57             cls.pg0.sw_if_index, bd_id=cls.bd_id)
58         cls.vapi.sw_interface_set_l2_bridge(
59             cls.pg1.sw_if_index, bd_id=cls.bd_id)
60
61         cls.loop0.config_ip4()
62         cls.pg2.config_ip4()
63
64         # configure MAC address binding to IPv4 neighbors on loop0
65         cls.loop0.generate_remote_hosts(cls.remote_hosts_count)
66         cls.loop0.configure_extend_ipv4_mac_binding()
67         # configure MAC address on pg2
68         cls.pg2.resolve_arp()
69
70         # one half of hosts are behind pg0 second behind pg1
71         half = cls.remote_hosts_count // 2
72         cls.pg0.remote_hosts = cls.loop0.remote_hosts[:half]
73         cls.pg1.remote_hosts = cls.loop0.remote_hosts[half:]
74
75     def tearDown(self):
76         super(TestIpIrb, self).tearDown()
77         if not self.vpp_dead:
78             info(self.vapi.cli("show l2patch"))
79             info(self.vapi.cli("show l2fib verbose"))
80             info(self.vapi.cli("show bridge-domain %s detail" % self.bd_id))
81             info(self.vapi.cli("show ip arp"))
82
83     def create_stream(self, src_ip_if, dst_ip_if, packet_sizes):
84         pkts = []
85         for i in range(0, 257):
86             remote_dst_host = choice(dst_ip_if.remote_hosts)
87             info = self.create_packet_info(
88                 src_ip_if.sw_if_index, dst_ip_if.sw_if_index)
89             payload = self.info_to_payload(info)
90             p = (Ether(dst=src_ip_if.local_mac, src=src_ip_if.remote_mac) /
91                  IP(src=src_ip_if.remote_ip4,
92                     dst=remote_dst_host.ip4) /
93                  UDP(sport=1234, dport=1234) /
94                  Raw(payload))
95             info.data = p.copy()
96             size = packet_sizes[(i // 2) % len(packet_sizes)]
97             self.extend_packet(p, size)
98             pkts.append(p)
99         return pkts
100
101     def create_stream_l2_to_ip(self, src_l2_if, src_ip_if, dst_ip_if,
102                                packet_sizes):
103         pkts = []
104         for i in range(0, 257):
105             info = self.create_packet_info(
106                 src_ip_if.sw_if_index, dst_ip_if.sw_if_index)
107             payload = self.info_to_payload(info)
108
109             host = choice(src_l2_if.remote_hosts)
110
111             p = (Ether(src=host.mac,
112                        dst = src_ip_if.local_mac) /
113                  IP(src=host.ip4,
114                     dst=dst_ip_if.remote_ip4) /
115                  UDP(sport=1234, dport=1234) /
116                  Raw(payload))
117
118             info.data = p.copy()
119             size = packet_sizes[(i // 2) % len(packet_sizes)]
120             self.extend_packet(p, size)
121
122             pkts.append(p)
123         return pkts
124
125     def verify_capture_l2_to_ip(self, dst_ip_if, src_ip_if, capture):
126         last_info = dict()
127         for i in self.interfaces:
128             last_info[i.sw_if_index] = None
129
130         dst_ip_sw_if_index = dst_ip_if.sw_if_index
131
132         for packet in capture:
133             ip = packet[IP]
134             udp = packet[IP][UDP]
135             payload_info = self.payload_to_info(str(packet[IP][UDP][Raw]))
136             packet_index = payload_info.index
137
138             self.assertEqual(payload_info.dst, dst_ip_sw_if_index)
139
140             next_info = self.get_next_packet_info_for_interface2(
141                 payload_info.src, dst_ip_sw_if_index,
142                 last_info[payload_info.src])
143             last_info[payload_info.src] = next_info
144             self.assertTrue(next_info is not None)
145             saved_packet = next_info.data
146             self.assertTrue(next_info is not None)
147
148             # MAC: src, dst
149             self.assertEqual(packet.src, dst_ip_if.local_mac)
150             self.assertEqual(packet.dst, dst_ip_if.remote_mac)
151
152             # IP: src, dst
153             host = src_ip_if.host_by_ip4(ip.src)
154             self.assertIsNotNone(host)
155             self.assertEqual(ip.dst, saved_packet[IP].dst)
156             self.assertEqual(ip.dst, dst_ip_if.remote_ip4)
157
158             # UDP:
159             self.assertEqual(udp.sport, saved_packet[UDP].sport)
160             self.assertEqual(udp.dport, saved_packet[UDP].dport)
161
162     def verify_capture(self, dst_ip_if, src_ip_if, capture):
163         last_info = dict()
164         for i in self.interfaces:
165             last_info[i.sw_if_index] = None
166
167         dst_ip_sw_if_index = dst_ip_if.sw_if_index
168
169         for packet in capture:
170             ip = packet[IP]
171             udp = packet[IP][UDP]
172             payload_info = self.payload_to_info(str(packet[IP][UDP][Raw]))
173             packet_index = payload_info.index
174
175             self.assertEqual(payload_info.dst, dst_ip_sw_if_index)
176
177             next_info = self.get_next_packet_info_for_interface2(
178                 payload_info.src, dst_ip_sw_if_index,
179                 last_info[payload_info.src])
180             last_info[payload_info.src] = next_info
181             self.assertTrue(next_info is not None)
182             self.assertEqual(packet_index, next_info.index)
183             saved_packet = next_info.data
184             self.assertTrue(next_info is not None)
185
186             # MAC: src, dst
187             self.assertEqual(packet.src, dst_ip_if.local_mac)
188             host = dst_ip_if.host_by_mac(packet.dst)
189
190             # IP: src, dst
191             self.assertEqual(ip.src, src_ip_if.remote_ip4)
192             self.assertEqual(ip.dst, saved_packet[IP].dst)
193             self.assertEqual(ip.dst, host.ip4)
194
195             # UDP:
196             self.assertEqual(udp.sport, saved_packet[UDP].sport)
197             self.assertEqual(udp.dport, saved_packet[UDP].dport)
198
199     def test_ip4_irb_1(self):
200         """ IPv4 IRB test 1
201
202         Test scenario:
203             ip traffic from pg2 interface must ends in both pg0 and pg1
204             - arp entry present in loop0 interface for dst IP
205             - no l2 entree configured, pg0 and pg1 are same
206         """
207
208         stream = self.create_stream(
209             self.pg2, self.loop0, self.pg_if_packet_sizes)
210         self.pg2.add_stream(stream)
211
212         self.pg_enable_capture(self.pg_interfaces)
213         self.pg_start()
214
215         rcvd1 = self.pg0.get_capture()
216         rcvd2 = self.pg1.get_capture()
217
218         self.verify_capture(self.loop0, self.pg2, rcvd1)
219         self.verify_capture(self.loop0, self.pg2, rcvd2)
220
221         self.assertListEqual(rcvd1.res, rcvd2.res)
222
223     def test_ip4_irb_2(self):
224         """ IPv4 IRB test 2
225
226         Test scenario:
227             ip traffic from pg0 and pg1 ends on pg2
228         """
229
230         stream1 = self.create_stream_l2_to_ip(
231             self.pg0, self.loop0, self.pg2, self.pg_if_packet_sizes)
232         stream2 = self.create_stream_l2_to_ip(
233             self.pg1, self.loop0, self.pg2, self.pg_if_packet_sizes)
234         self.pg0.add_stream(stream1)
235         self.pg1.add_stream(stream2)
236
237         self.pg_enable_capture(self.pg_interfaces)
238         self.pg_start()
239
240         rcvd = self.pg2.get_capture()
241         self.verify_capture_l2_to_ip(self.pg2, self.loop0, rcvd)
242
243         self.assertEqual(len(stream1) + len(stream2), len(rcvd.res))
244
245
246 if __name__ == '__main__':
247     unittest.main(testRunner=VppTestRunner)