2 """IP6 VRF Multi-instance Test Case HLD:
5 - higher number of pg-ip6 interfaces causes problems => only 15 pg-ip6 \
6 interfaces in 5 VRFs are tested
7 - jumbo packets in configuration with 15 pg-ip6 interfaces leads to \
11 - add 15 pg-ip6 interfaces
12 - configure 5 hosts per pg-ip6 interface
14 - add 3 pg-ip6 interfaces per VRF
17 - send IP6 packets between all pg-ip6 interfaces in all VRF groups
20 - check VRF data by parsing output of ip_route_dump API command
21 - all packets received correctly in case of pg-ip6 interfaces in the same
23 - no packet received in case of pg-ip6 interfaces not in VRF
24 - no packet received in case of pg-ip6 interfaces in different VRFs
30 - send IP6 packets between all pg-ip6 interfaces in all VRF groups
33 - check VRF data by parsing output of ip_route_dump API command
34 - all packets received correctly in case of pg-ip6 interfaces in the same
36 - no packet received in case of pg-ip6 interfaces not in VRF
37 - no packet received in case of pg-ip6 interfaces in different VRFs
40 - add 1 of reset VRFs and 1 new VRF
43 - send IP6 packets between all pg-ip6 interfaces in all VRF groups
46 - check VRF data by parsing output of ip_route_dump API command
47 - all packets received correctly in case of pg-ip6 interfaces in the same
49 - no packet received in case of pg-ip6 interfaces not in VRF
50 - no packet received in case of pg-ip6 interfaces in different VRFs
53 - reset all VRFs (i.e. no VRF except VRF=0 created)
56 - send IP6 packets between all pg-ip6 interfaces in all VRF groups
59 - check VRF data by parsing output of ip_route_dump API command
60 - all packets received correctly in case of pg-ip6 interfaces in the same
62 - no packet received in case of pg-ip6 interfaces not in VRF
63 - no packet received in case of pg-ip6 interfaces in different VRFs
70 from scapy.packet import Raw
71 from scapy.layers.l2 import Ether
72 from scapy.layers.inet6 import UDP, IPv6, ICMPv6ND_NS, ICMPv6ND_RA, \
73 RouterAlert, IPv6ExtHdrHopByHop
74 from scapy.utils6 import in6_ismaddr, in6_isllsnmaddr, in6_getAddrType
75 from scapy.pton_ntop import inet_ntop
77 from framework import VppTestCase, VppTestRunner
79 from vrf import VRFState
82 def is_ipv6_misc_ext(p):
83 """ Is packet one of uninteresting IPv6 broadcasts (extended to filter out
84 ICMPv6 Neighbor Discovery - Neighbor Advertisement packets too)? """
85 if p.haslayer(ICMPv6ND_RA):
86 if in6_ismaddr(p[IPv6].dst):
88 if p.haslayer(ICMPv6ND_NS):
89 if in6_isllsnmaddr(p[IPv6].dst):
91 if p.haslayer(IPv6ExtHdrHopByHop):
92 for o in p[IPv6ExtHdrHopByHop].options:
93 if isinstance(o, RouterAlert):
98 class TestIP6VrfMultiInst(VppTestCase):
99 """ IP6 VRF Multi-instance Test Case """
104 Perform standard class setup (defined by class method setUpClass in
105 class VppTestCase) before running the test case, set test case related
106 variables and configure VPP.
108 super(TestIP6VrfMultiInst, cls).setUpClass()
113 cls.pg_ifs_per_vrf = 3
116 # Create pg interfaces
117 cls.create_pg_interfaces(
118 range(cls.nr_of_vrfs * cls.pg_ifs_per_vrf))
120 # Packet flows mapping pg0 -> pg1, pg2 etc.
122 for i in range(len(cls.pg_interfaces)):
123 multiplicand = i // cls.pg_ifs_per_vrf
125 cls.pg_interfaces[multiplicand * cls.pg_ifs_per_vrf + j]
126 for j in range(cls.pg_ifs_per_vrf)
127 if (multiplicand * cls.pg_ifs_per_vrf + j) != i]
128 cls.flows[cls.pg_interfaces[i]] = pg_list
130 # Packet sizes - jumbo packet (9018 bytes) skipped
131 cls.pg_if_packet_sizes = [64, 512, 1518]
133 # Set up all interfaces
134 for pg_if in cls.pg_interfaces:
136 pg_if.generate_remote_hosts(cls.hosts_per_pg)
138 # Create list of VRFs
139 cls.vrf_list = list()
141 # Create list of reset VRFs
142 cls.vrf_reset_list = list()
144 # Create list of pg_interfaces in VRFs
145 cls.pg_in_vrf = list()
147 # Create list of pg_interfaces not in VRFs
148 cls.pg_not_in_vrf = [pg_if for pg_if in cls.pg_interfaces]
150 # Create mapping of pg_interfaces to VRF IDs
151 cls.pg_if_by_vrf_id = dict()
152 for i in range(cls.nr_of_vrfs):
155 cls.pg_interfaces[i * cls.pg_ifs_per_vrf + j]
156 for j in range(cls.pg_ifs_per_vrf)]
157 cls.pg_if_by_vrf_id[vrf_id] = pg_list
160 super(TestIP6VrfMultiInst, cls).tearDownClass()
164 def tearDownClass(cls):
165 super(TestIP6VrfMultiInst, cls).tearDownClass()
169 Clear trace and packet infos before running each test.
171 super(TestIP6VrfMultiInst, self).setUp()
172 self.reset_packet_infos()
176 Show various debug prints after each test.
178 super(TestIP6VrfMultiInst, self).tearDown()
180 def show_commands_at_teardown(self):
181 self.logger.info(self.vapi.ppcli("show ip6 fib"))
182 self.logger.info(self.vapi.ppcli("show ip6 neighbors"))
184 def create_vrf_and_assign_interfaces(self, count, start=1):
186 Create required number of FIB tables / VRFs, put 3 pg-ip6 interfaces
187 to every FIB table / VRF.
189 :param int count: Number of FIB tables / VRFs to be created.
190 :param int start: Starting number of the FIB table / VRF ID. \
193 for i in range(count):
195 pg_if = self.pg_if_by_vrf_id[vrf_id][0]
196 dest_addr = pg_if.local_ip6n
198 self.vapi.ip_table_add_del(is_ipv6=1, is_add=1, table_id=vrf_id)
199 self.logger.info("IPv6 VRF ID %d created" % vrf_id)
200 if vrf_id not in self.vrf_list:
201 self.vrf_list.append(vrf_id)
202 if vrf_id in self.vrf_reset_list:
203 self.vrf_reset_list.remove(vrf_id)
204 for j in range(self.pg_ifs_per_vrf):
205 pg_if = self.pg_if_by_vrf_id[vrf_id][j]
206 pg_if.set_table_ip6(vrf_id)
207 self.logger.info("pg-interface %s added to IPv6 VRF ID %d"
208 % (pg_if.name, vrf_id))
209 if pg_if not in self.pg_in_vrf:
210 self.pg_in_vrf.append(pg_if)
211 if pg_if in self.pg_not_in_vrf:
212 self.pg_not_in_vrf.remove(pg_if)
214 pg_if.disable_ipv6_ra()
215 pg_if.configure_ipv6_neighbors()
216 self.logger.debug(self.vapi.ppcli("show ip6 fib"))
217 self.logger.debug(self.vapi.ppcli("show ip6 neighbors"))
219 def reset_vrf_and_remove_from_vrf_list(self, vrf_id):
221 Reset required FIB table / VRF and remove it from VRF list.
223 :param int vrf_id: The FIB table / VRF ID to be reset.
225 # self.vapi.reset_vrf(vrf_id, is_ipv6=1)
226 self.vapi.reset_fib(vrf_id, is_ipv6=1)
227 if vrf_id in self.vrf_list:
228 self.vrf_list.remove(vrf_id)
229 if vrf_id not in self.vrf_reset_list:
230 self.vrf_reset_list.append(vrf_id)
231 for j in range(self.pg_ifs_per_vrf):
232 pg_if = self.pg_if_by_vrf_id[vrf_id][j]
234 if pg_if in self.pg_in_vrf:
235 self.pg_in_vrf.remove(pg_if)
236 if pg_if not in self.pg_not_in_vrf:
237 self.pg_not_in_vrf.append(pg_if)
238 self.logger.info("IPv6 VRF ID %d reset finished" % vrf_id)
239 self.logger.debug(self.vapi.ppcli("show ip6 fib"))
240 self.logger.debug(self.vapi.ppcli("show ip6 neighbors"))
241 self.vapi.ip_table_add_del(is_ipv6=1, is_add=0, table_id=vrf_id)
243 def create_stream(self, src_if, packet_sizes):
245 Create input packet stream for defined interface using hosts list.
247 :param object src_if: Interface to create packet stream for.
248 :param list packet_sizes: List of required packet sizes.
249 :return: Stream of packets.
252 src_hosts = src_if.remote_hosts
253 for dst_if in self.flows[src_if]:
254 for dst_host in dst_if.remote_hosts:
255 src_host = random.choice(src_hosts)
256 pkt_info = self.create_packet_info(src_if, dst_if)
257 payload = self.info_to_payload(pkt_info)
258 p = (Ether(dst=src_if.local_mac, src=src_host.mac) /
259 IPv6(src=src_host.ip6, dst=dst_host.ip6) /
260 UDP(sport=1234, dport=1234) /
262 pkt_info.data = p.copy()
263 size = random.choice(packet_sizes)
264 self.extend_packet(p, size)
266 self.logger.debug("Input stream created for port %s. Length: %u pkt(s)"
267 % (src_if.name, len(pkts)))
270 def create_stream_crosswise_vrf(self, src_if, vrf_id, packet_sizes):
272 Create input packet stream for negative test for leaking across
273 different VRFs for defined interface using hosts list.
275 :param object src_if: Interface to create packet stream for.
276 :param int vrf_id: The FIB table / VRF ID where src_if is assigned.
277 :param list packet_sizes: List of required packet sizes.
278 :return: Stream of packets.
281 src_hosts = src_if.remote_hosts
282 vrf_lst = list(self.vrf_list)
283 vrf_lst.remove(vrf_id)
285 for dst_if in self.pg_if_by_vrf_id[vrf]:
286 for dst_host in dst_if.remote_hosts:
287 src_host = random.choice(src_hosts)
288 pkt_info = self.create_packet_info(src_if, dst_if)
289 payload = self.info_to_payload(pkt_info)
290 p = (Ether(dst=src_if.local_mac, src=src_host.mac) /
291 IPv6(src=src_host.ip6, dst=dst_host.ip6) /
292 UDP(sport=1234, dport=1234) /
294 pkt_info.data = p.copy()
295 size = random.choice(packet_sizes)
296 self.extend_packet(p, size)
298 self.logger.debug("Input stream created for port %s. Length: %u pkt(s)"
299 % (src_if.name, len(pkts)))
302 def verify_capture(self, pg_if, capture):
304 Verify captured input packet stream for defined interface.
306 :param object pg_if: Interface to verify captured packet stream for.
307 :param list capture: Captured packet stream.
310 for i in self.pg_interfaces:
311 last_info[i.sw_if_index] = None
312 dst_sw_if_index = pg_if.sw_if_index
313 for packet in capture:
317 payload_info = self.payload_to_info(packet[Raw])
318 packet_index = payload_info.index
319 self.assertEqual(payload_info.dst, dst_sw_if_index)
320 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
321 (pg_if.name, payload_info.src, packet_index))
322 next_info = self.get_next_packet_info_for_interface2(
323 payload_info.src, dst_sw_if_index,
324 last_info[payload_info.src])
325 last_info[payload_info.src] = next_info
326 self.assertIsNotNone(next_info)
327 self.assertEqual(packet_index, next_info.index)
328 saved_packet = next_info.data
329 # Check standard fields
330 self.assertEqual(ip.src, saved_packet[IPv6].src)
331 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
332 self.assertEqual(udp.sport, saved_packet[UDP].sport)
333 self.assertEqual(udp.dport, saved_packet[UDP].dport)
335 self.logger.error(ppp("Unexpected or invalid packet:", packet))
337 for i in self.pg_interfaces:
338 remaining_packet = self.get_next_packet_info_for_interface2(
339 i, dst_sw_if_index, last_info[i.sw_if_index])
342 "Port %u: Packet expected from source %u didn't arrive" %
343 (dst_sw_if_index, i.sw_if_index))
345 def verify_vrf(self, vrf_id):
347 Check if the FIB table / VRF ID is configured.
349 :param int vrf_id: The FIB table / VRF ID to be verified.
350 :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
352 ip6_fib_dump = self.vapi.ip_route_dump(vrf_id, True)
353 vrf_exist = len(ip6_fib_dump)
355 for ip6_fib_details in ip6_fib_dump:
356 addr = ip6_fib_details.route.prefix.network_address
358 for pg_if in self.pg_if_by_vrf_id[vrf_id]:
361 for host in pg_if.remote_hosts:
362 if str(addr) == host.ip6:
366 if not vrf_exist and vrf_count == 0:
367 self.logger.info("IPv6 VRF ID %d is not configured" % vrf_id)
368 return VRFState.not_configured
369 elif vrf_exist and vrf_count == 0:
370 self.logger.info("IPv6 VRF ID %d has been reset" % vrf_id)
371 return VRFState.reset
373 self.logger.info("IPv6 VRF ID %d is configured" % vrf_id)
374 return VRFState.configured
376 def run_verify_test(self):
378 Create packet streams for all configured pg interfaces, send all \
379 prepared packet streams and verify that:
380 - all packets received correctly on all pg-ip6 interfaces assigned
382 - no packet received on all pg-ip6 interfaces not assigned to VRFs
384 :raise RuntimeError: If no packet captured on pg-ip6 interface assigned
385 to VRF or if any packet is captured on pg-ip6 interface not
389 # Create incoming packet streams for packet-generator interfaces
390 for pg_if in self.pg_interfaces:
391 pkts = self.create_stream(pg_if, self.pg_if_packet_sizes)
392 pg_if.add_stream(pkts)
394 # Enable packet capture and start packet sending
395 self.pg_enable_capture(self.pg_interfaces)
399 # Verify outgoing packet streams per packet-generator interface
400 for pg_if in self.pg_interfaces:
401 if pg_if in self.pg_in_vrf:
402 capture = pg_if.get_capture(remark="interface is in VRF")
403 self.verify_capture(pg_if, capture)
404 elif pg_if in self.pg_not_in_vrf:
405 pg_if.assert_nothing_captured(remark="interface is not in VRF",
406 filter_out_fn=is_ipv6_misc_ext)
407 self.logger.debug("No capture for interface %s" % pg_if.name)
409 raise Exception("Unknown interface: %s" % pg_if.name)
411 def run_crosswise_vrf_test(self):
413 Create packet streams for every pg-ip6 interface in VRF towards all
414 pg-ip6 interfaces in other VRFs, send all prepared packet streams and \
416 - no packet received on all configured pg-ip6 interfaces
418 :raise RuntimeError: If any packet is captured on any pg-ip6 interface.
421 # Create incoming packet streams for packet-generator interfaces
422 for vrf_id in self.vrf_list:
423 for pg_if in self.pg_if_by_vrf_id[vrf_id]:
424 pkts = self.create_stream_crosswise_vrf(
425 pg_if, vrf_id, self.pg_if_packet_sizes)
426 pg_if.add_stream(pkts)
428 # Enable packet capture and start packet sending
429 self.pg_enable_capture(self.pg_interfaces)
433 # Verify outgoing packet streams per packet-generator interface
434 for pg_if in self.pg_interfaces:
435 pg_if.assert_nothing_captured(remark="interface is in other VRF",
436 filter_out_fn=is_ipv6_misc_ext)
437 self.logger.debug("No capture for interface %s" % pg_if.name)
439 def test_ip6_vrf_01(self):
440 """ IP6 VRF Multi-instance test 1 - create 4 VRFs
444 self.create_vrf_and_assign_interfaces(4)
447 for vrf_id in self.vrf_list:
448 self.assert_equal(self.verify_vrf(vrf_id),
449 VRFState.configured, VRFState)
452 self.run_verify_test()
453 self.run_crosswise_vrf_test()
455 def test_ip6_vrf_02(self):
456 """ IP6 VRF Multi-instance test 2 - reset 2 VRFs
460 self.reset_vrf_and_remove_from_vrf_list(1)
461 self.reset_vrf_and_remove_from_vrf_list(2)
464 for vrf_id in self.vrf_reset_list:
465 self.assert_equal(self.verify_vrf(vrf_id),
466 VRFState.reset, VRFState)
467 for vrf_id in self.vrf_list:
468 self.assert_equal(self.verify_vrf(vrf_id),
469 VRFState.configured, VRFState)
472 self.run_verify_test()
473 self.run_crosswise_vrf_test()
475 # Reset routes learned from ICMPv6 Neighbor Discovery
476 for vrf_id in self.vrf_reset_list:
477 self.reset_vrf_and_remove_from_vrf_list(vrf_id)
479 def test_ip6_vrf_03(self):
480 """ IP6 VRF Multi-instance 3 - add 2 VRFs
483 # Add 1 of reset VRFs and 1 new VRF
484 self.create_vrf_and_assign_interfaces(1)
485 self.create_vrf_and_assign_interfaces(1, start=5)
488 for vrf_id in self.vrf_reset_list:
489 self.assert_equal(self.verify_vrf(vrf_id),
490 VRFState.reset, VRFState)
491 for vrf_id in self.vrf_list:
492 self.assert_equal(self.verify_vrf(vrf_id),
493 VRFState.configured, VRFState)
496 self.run_verify_test()
497 self.run_crosswise_vrf_test()
499 # Reset routes learned from ICMPv6 Neighbor Discovery
500 for vrf_id in self.vrf_reset_list:
501 self.reset_vrf_and_remove_from_vrf_list(vrf_id)
503 def test_ip6_vrf_04(self):
504 """ IP6 VRF Multi-instance test 4 - reset 4 VRFs
507 # Reset all VRFs (i.e. no VRF except VRF=0 configured)
508 for i in range(len(self.vrf_list)):
509 self.reset_vrf_and_remove_from_vrf_list(self.vrf_list[0])
512 for vrf_id in self.vrf_reset_list:
513 self.assert_equal(self.verify_vrf(vrf_id),
514 VRFState.reset, VRFState)
515 vrf_list_length = len(self.vrf_list)
518 "List of configured VRFs is not empty: %s != 0" % vrf_list_length)
521 self.run_verify_test()
522 self.run_crosswise_vrf_test()
525 if __name__ == '__main__':
526 unittest.main(testRunner=VppTestRunner)