2 """IP6 VRF Multi-instance Test Case HLD:
5 - higher number of pg-ip6 interfaces causes problems => only 15 pg-ip6 \
6 interfaces in 5 VRFs are tested
7 - jumbo packets in configuration with 15 pg-ip6 interfaces leads to \
11 - add 15 pg-ip6 interfaces
12 - configure 5 hosts per pg-ip6 interface
14 - add 3 pg-ip6 interfaces per VRF
17 - send IP6 packets between all pg-ip6 interfaces in all VRF groups
20 - check VRF data by parsing output of ip6_fib_dump API command
21 - all packets received correctly in case of pg-ip6 interfaces in VRF
22 - no packet received in case of pg-ip6 interfaces not in VRF
28 - send IP6 packets between all pg-ip6 interfaces in all VRF groups
31 - check VRF data by parsing output of ip6_fib_dump API command
32 - all packets received correctly in case of pg-ip6 interfaces in VRF
33 - no packet received in case of pg-ip6 interfaces not in VRF
36 - add 1 of reset VRFs and 1 new VRF
39 - send IP6 packets between all pg-ip6 interfaces in all VRF groups
42 - check VRF data by parsing output of ip6_fib_dump API command
43 - all packets received correctly in case of pg-ip6 interfaces in VRF
44 - no packet received in case of pg-ip6 interfaces not in VRF
47 - reset all VRFs (i.e. no VRF except VRF=0 created)
50 - send IP6 packets between all pg-ip6 interfaces in all VRF groups
53 - check VRF data by parsing output of ip6_fib_dump API command
54 - all packets received correctly in case of pg-ip6 interfaces in VRF
55 - no packet received in case of pg-ip6 interfaces not in VRF
62 from scapy.packet import Raw
63 from scapy.layers.l2 import Ether
64 from scapy.layers.inet6 import UDP, IPv6, ICMPv6ND_NS, ICMPv6ND_RA, \
65 RouterAlert, IPv6ExtHdrHopByHop
66 from scapy.utils6 import in6_ismaddr, in6_isllsnmaddr, in6_getAddrType
67 from scapy.pton_ntop import inet_ntop
68 from scapy.data import IPV6_ADDR_UNICAST
70 from framework import VppTestCase, VppTestRunner
73 # VRF status constants
74 VRF_NOT_CONFIGURED = 0
79 def is_ipv6_misc_ext(p):
80 """ Is packet one of uninteresting IPv6 broadcasts (extended to filter out
81 ICMPv6 Neighbor Discovery - Neighbor Advertisement packets too)? """
82 if p.haslayer(ICMPv6ND_RA):
83 if in6_ismaddr(p[IPv6].dst):
85 if p.haslayer(ICMPv6ND_NS):
86 if in6_isllsnmaddr(p[IPv6].dst):
88 if p.haslayer(IPv6ExtHdrHopByHop):
89 for o in p[IPv6ExtHdrHopByHop].options:
90 if isinstance(o, RouterAlert):
95 class TestIP6VrfMultiInst(VppTestCase):
96 """ IP6 VRF Multi-instance Test Case """
101 Perform standard class setup (defined by class method setUpClass in
102 class VppTestCase) before running the test case, set test case related
103 variables and configure VPP.
105 super(TestIP6VrfMultiInst, cls).setUpClass()
110 cls.pg_ifs_per_vrf = 3
113 # Create pg interfaces
114 cls.create_pg_interfaces(
115 range(cls.nr_of_vrfs * cls.pg_ifs_per_vrf))
117 # Packet flows mapping pg0 -> pg1, pg2 etc.
119 for i in range(len(cls.pg_interfaces)):
120 multiplicand = i / cls.pg_ifs_per_vrf
122 cls.pg_interfaces[multiplicand * cls.pg_ifs_per_vrf + j]
123 for j in range(cls.pg_ifs_per_vrf)
124 if (multiplicand * cls.pg_ifs_per_vrf + j) != i]
125 cls.flows[cls.pg_interfaces[i]] = pg_list
127 # Packet sizes - jumbo packet (9018 bytes) skipped
128 cls.pg_if_packet_sizes = [64, 512, 1518]
130 # Set up all interfaces
131 for pg_if in cls.pg_interfaces:
133 pg_if.generate_remote_hosts(cls.hosts_per_pg)
135 # Create list of VRFs
136 cls.vrf_list = list()
138 # Create list of reset VRFs
139 cls.vrf_reset_list = list()
141 # Create list of pg_interfaces in VRFs
142 cls.pg_in_vrf = list()
144 # Create list of pg_interfaces not in BDs
145 cls.pg_not_in_vrf = [pg_if for pg_if in cls.pg_interfaces]
147 # Create mapping of pg_interfaces to VRF IDs
148 cls.pg_if_by_vrf_id = dict()
149 for i in range(cls.nr_of_vrfs):
152 cls.pg_interfaces[i * cls.pg_ifs_per_vrf + j]
153 for j in range(cls.pg_ifs_per_vrf)]
154 cls.pg_if_by_vrf_id[vrf_id] = pg_list
157 super(TestIP6VrfMultiInst, cls).tearDownClass()
162 Clear trace and packet infos before running each test.
164 super(TestIP6VrfMultiInst, self).setUp()
165 self.reset_packet_infos()
169 Show various debug prints after each test.
171 super(TestIP6VrfMultiInst, self).tearDown()
172 if not self.vpp_dead:
173 self.logger.info(self.vapi.ppcli("show ip6 fib"))
174 self.logger.info(self.vapi.ppcli("show ip6 neighbors"))
176 def create_vrf_and_assign_interfaces(self, count, start=1):
178 Create required number of FIB tables / VRFs, put 3 l2-pg interfaces
179 to every FIB table / VRF.
181 :param int count: Number of FIB tables / VRFs to be created.
182 :param int start: Starting number of the FIB table / VRF ID. \
185 for i in range(count):
187 pg_if = self.pg_if_by_vrf_id[vrf_id][0]
188 dest_addr = pg_if.remote_hosts[0].ip6n
190 self.vapi.ip_add_del_route(
191 dest_addr, dest_addr_len, pg_if.local_ip6n, is_ipv6=1,
192 table_id=vrf_id, create_vrf_if_needed=1, is_multipath=1)
193 self.logger.info("IPv6 VRF ID %d created" % vrf_id)
194 if vrf_id not in self.vrf_list:
195 self.vrf_list.append(vrf_id)
196 if vrf_id in self.vrf_reset_list:
197 self.vrf_reset_list.remove(vrf_id)
198 for j in range(self.pg_ifs_per_vrf):
199 pg_if = self.pg_if_by_vrf_id[vrf_id][j]
200 pg_if.set_table_ip6(vrf_id)
201 self.logger.info("pg-interface %s added to IPv6 VRF ID %d"
202 % (pg_if.name, vrf_id))
203 if pg_if not in self.pg_in_vrf:
204 self.pg_in_vrf.append(pg_if)
205 if pg_if in self.pg_not_in_vrf:
206 self.pg_not_in_vrf.remove(pg_if)
208 pg_if.disable_ipv6_ra()
209 pg_if.configure_ipv6_neighbors(vrf_id)
210 self.logger.debug(self.vapi.ppcli("show ip6 fib"))
211 self.logger.debug(self.vapi.ppcli("show ip6 neighbors"))
213 def reset_vrf(self, vrf_id):
215 Reset required FIB table / VRF.
217 :param int vrf_id: The FIB table / VRF ID to be reset.
219 # self.vapi.reset_vrf(vrf_id, is_ipv6=1)
220 self.vapi.reset_fib(vrf_id, is_ipv6=1)
221 if vrf_id in self.vrf_list:
222 self.vrf_list.remove(vrf_id)
223 if vrf_id not in self.vrf_reset_list:
224 self.vrf_reset_list.append(vrf_id)
225 for j in range(self.pg_ifs_per_vrf):
226 pg_if = self.pg_if_by_vrf_id[vrf_id][j]
227 if pg_if in self.pg_in_vrf:
228 self.pg_in_vrf.remove(pg_if)
229 if pg_if not in self.pg_not_in_vrf:
230 self.pg_not_in_vrf.append(pg_if)
231 self.logger.info("IPv6 VRF ID %d reset" % vrf_id)
232 self.logger.debug(self.vapi.ppcli("show ip6 fib"))
233 self.logger.debug(self.vapi.ppcli("show ip6 neighbors"))
235 def create_stream(self, src_if, packet_sizes):
237 Create input packet stream for defined interface using hosts list.
239 :param object src_if: Interface to create packet stream for.
240 :param list packet_sizes: List of required packet sizes.
241 :return: Stream of packets.
244 src_hosts = src_if.remote_hosts
245 for dst_if in self.flows[src_if]:
246 for dst_host in dst_if.remote_hosts:
247 src_host = random.choice(src_hosts)
248 pkt_info = self.create_packet_info(src_if, dst_if)
249 payload = self.info_to_payload(pkt_info)
250 p = (Ether(dst=src_if.local_mac, src=src_host.mac) /
251 IPv6(src=src_host.ip6, dst=dst_host.ip6) /
252 UDP(sport=1234, dport=1234) /
254 pkt_info.data = p.copy()
255 size = random.choice(packet_sizes)
256 self.extend_packet(p, size)
258 self.logger.debug("Input stream created for port %s. Length: %u pkt(s)"
259 % (src_if.name, len(pkts)))
262 def verify_capture(self, pg_if, capture):
264 Verify captured input packet stream for defined interface.
266 :param object pg_if: Interface to verify captured packet stream for.
267 :param list capture: Captured packet stream.
270 for i in self.pg_interfaces:
271 last_info[i.sw_if_index] = None
272 dst_sw_if_index = pg_if.sw_if_index
273 for packet in capture:
277 payload_info = self.payload_to_info(str(packet[Raw]))
278 packet_index = payload_info.index
279 self.assertEqual(payload_info.dst, dst_sw_if_index)
280 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
281 (pg_if.name, payload_info.src, packet_index))
282 next_info = self.get_next_packet_info_for_interface2(
283 payload_info.src, dst_sw_if_index,
284 last_info[payload_info.src])
285 last_info[payload_info.src] = next_info
286 self.assertIsNotNone(next_info)
287 self.assertEqual(packet_index, next_info.index)
288 saved_packet = next_info.data
289 # Check standard fields
290 self.assertEqual(ip.src, saved_packet[IPv6].src)
291 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
292 self.assertEqual(udp.sport, saved_packet[UDP].sport)
293 self.assertEqual(udp.dport, saved_packet[UDP].dport)
295 self.logger.error(ppp("Unexpected or invalid packet:", packet))
297 for i in self.pg_interfaces:
298 remaining_packet = self.get_next_packet_info_for_interface2(
299 i, dst_sw_if_index, last_info[i.sw_if_index])
302 "Port %u: Packet expected from source %u didn't arrive" %
303 (dst_sw_if_index, i.sw_if_index))
305 def verify_vrf(self, vrf_id):
307 Check if the FIB table / VRF ID is configured.
309 :param int vrf_id: The FIB table / VRF ID to be verified.
310 :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
312 ip6_fib_dump = self.vapi.ip6_fib_dump()
315 for ip6_fib_details in ip6_fib_dump:
316 if ip6_fib_details[2] == vrf_id:
319 addr = inet_ntop(socket.AF_INET6, ip6_fib_details[4])
320 addrtype = in6_getAddrType(addr)
321 vrf_count += 1 if addrtype == IPV6_ADDR_UNICAST else 0
322 if not vrf_exist and vrf_count == 0:
323 self.logger.info("IPv6 VRF ID %d is not configured" % vrf_id)
324 return VRF_NOT_CONFIGURED
325 elif vrf_exist and vrf_count == 0:
326 self.logger.info("IPv6 VRF ID %d has been reset" % vrf_id)
329 self.logger.info("IPv6 VRF ID %d is configured" % vrf_id)
330 return VRF_CONFIGURED
332 def run_verify_test(self):
334 Create packet streams for all configured l2-pg interfaces, send all \
335 prepared packet streams and verify that:
336 - all packets received correctly on all pg-l2 interfaces assigned
338 - no packet received on all pg-l2 interfaces not assigned to bridge
341 :raise RuntimeError: If no packet captured on l2-pg interface assigned
342 to the bridge domain or if any packet is captured on l2-pg
343 interface not assigned to the bridge domain.
346 # Create incoming packet streams for packet-generator interfaces
347 for pg_if in self.pg_interfaces:
348 pkts = self.create_stream(pg_if, self.pg_if_packet_sizes)
349 pg_if.add_stream(pkts)
351 # Enable packet capture and start packet sending
352 self.pg_enable_capture(self.pg_interfaces)
356 # Verify outgoing packet streams per packet-generator interface
357 for pg_if in self.pg_interfaces:
358 if pg_if in self.pg_in_vrf:
359 capture = pg_if.get_capture(remark="interface is in VRF")
360 self.verify_capture(pg_if, capture)
361 elif pg_if in self.pg_not_in_vrf:
362 pg_if.assert_nothing_captured(remark="interface is not in VRF",
363 filter_out_fn=is_ipv6_misc_ext)
364 self.logger.debug("No capture for interface %s" % pg_if.name)
366 raise Exception("Unknown interface: %s" % pg_if.name)
368 def test_ip6_vrf_01(self):
369 """ IP6 VRF Multi-instance test 1 - create 4 VRFs
373 self.create_vrf_and_assign_interfaces(4)
376 for vrf_id in self.vrf_list:
377 self.assertEqual(self.verify_vrf(vrf_id), VRF_CONFIGURED)
380 self.run_verify_test()
382 def test_ip6_vrf_02(self):
383 """ IP6 VRF Multi-instance test 2 - reset 2 VRFs
391 for vrf_id in self.vrf_reset_list:
392 self.assertEqual(self.verify_vrf(vrf_id), VRF_RESET)
393 for vrf_id in self.vrf_list:
394 self.assertEqual(self.verify_vrf(vrf_id), VRF_CONFIGURED)
397 self.run_verify_test()
399 # Reset routes learned from ICMPv6 Neighbor Discovery
400 for vrf_id in self.vrf_reset_list:
401 self.reset_vrf(vrf_id)
403 def test_ip6_vrf_03(self):
404 """ IP6 VRF Multi-instance 3 - add 2 VRFs
407 # Add 1 of reset VRFs and 1 new VRF
408 self.create_vrf_and_assign_interfaces(1)
409 self.create_vrf_and_assign_interfaces(1, start=5)
412 for vrf_id in self.vrf_reset_list:
413 self.assertEqual(self.verify_vrf(vrf_id), VRF_RESET)
414 for vrf_id in self.vrf_list:
415 self.assertEqual(self.verify_vrf(vrf_id), VRF_CONFIGURED)
418 self.run_verify_test()
420 # Reset routes learned from ICMPv6 Neighbor Discovery
421 for vrf_id in self.vrf_reset_list:
422 self.reset_vrf(vrf_id)
424 def test_ip6_vrf_04(self):
425 """ IP6 VRF Multi-instance test 4 - reset 4 VRFs
428 # Reset all VRFs (i.e. no VRF except VRF=0 created)
429 for i in range(len(self.vrf_list)):
430 self.reset_vrf(self.vrf_list[0])
433 for vrf_id in self.vrf_reset_list:
434 self.assertEqual(self.verify_vrf(vrf_id), VRF_RESET)
435 for vrf_id in self.vrf_list:
436 self.assertEqual(self.verify_vrf(vrf_id), VRF_CONFIGURED)
439 self.run_verify_test()
442 if __name__ == '__main__':
443 unittest.main(testRunner=VppTestRunner)