X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_ip6_vrf_multi_instance.py;h=73df30d77f29ccb428fb1533220db25c12f82554;hb=d9b0c6fbf7aa5bd9af84264105b39c82028a4a29;hp=d95e7927f98dd0c024de98d69ca651dde934c640;hpb=f90348bcb4afd0af2611cefc43b17ef3042b511c;p=vpp.git diff --git a/test/test_ip6_vrf_multi_instance.py b/test/test_ip6_vrf_multi_instance.py index d95e7927f98..73df30d77f2 100644 --- a/test/test_ip6_vrf_multi_instance.py +++ b/test/test_ip6_vrf_multi_instance.py @@ -69,8 +69,14 @@ import socket from scapy.packet import Raw from scapy.layers.l2 import Ether -from scapy.layers.inet6 import UDP, IPv6, ICMPv6ND_NS, ICMPv6ND_RA, \ - RouterAlert, IPv6ExtHdrHopByHop +from scapy.layers.inet6 import ( + UDP, + IPv6, + ICMPv6ND_NS, + ICMPv6ND_RA, + RouterAlert, + IPv6ExtHdrHopByHop, +) from scapy.utils6 import in6_ismaddr, in6_isllsnmaddr, in6_getAddrType from scapy.pton_ntop import inet_ntop @@ -80,8 +86,8 @@ from vrf import VRFState def is_ipv6_misc_ext(p): - """ Is packet one of uninteresting IPv6 broadcasts (extended to filter out - ICMPv6 Neighbor Discovery - Neighbor Advertisement packets too)? """ + """Is packet one of uninteresting IPv6 broadcasts (extended to filter out + ICMPv6 Neighbor Discovery - Neighbor Advertisement packets too)?""" if p.haslayer(ICMPv6ND_RA): if in6_ismaddr(p[IPv6].dst): return True @@ -96,7 +102,7 @@ def is_ipv6_misc_ext(p): class TestIP6VrfMultiInst(VppTestCase): - """ IP6 VRF Multi-instance Test Case """ + """IP6 VRF Multi-instance Test Case""" @classmethod def setUpClass(cls): @@ -114,8 +120,7 @@ class TestIP6VrfMultiInst(VppTestCase): try: # Create pg interfaces - cls.create_pg_interfaces( - range(cls.nr_of_vrfs * cls.pg_ifs_per_vrf)) + cls.create_pg_interfaces(range(cls.nr_of_vrfs * cls.pg_ifs_per_vrf)) # Packet flows mapping pg0 -> pg1, pg2 etc. cls.flows = dict() @@ -124,7 +129,8 @@ class TestIP6VrfMultiInst(VppTestCase): pg_list = [ cls.pg_interfaces[multiplicand * cls.pg_ifs_per_vrf + j] for j in range(cls.pg_ifs_per_vrf) - if (multiplicand * cls.pg_ifs_per_vrf + j) != i] + if (multiplicand * cls.pg_ifs_per_vrf + j) != i + ] cls.flows[cls.pg_interfaces[i]] = pg_list # Packet sizes - jumbo packet (9018 bytes) skipped @@ -153,7 +159,8 @@ class TestIP6VrfMultiInst(VppTestCase): set_id = i + 1 pg_list = [ cls.pg_interfaces[i * cls.pg_ifs_per_vrf + j] - for j in range(cls.pg_ifs_per_vrf)] + for j in range(cls.pg_ifs_per_vrf) + ] cls.pg_if_sets[set_id] = pg_list except Exception: @@ -185,8 +192,9 @@ class TestIP6VrfMultiInst(VppTestCase): for i in range(self.pg_ifs_per_vrf): pg_if = self.pg_if_sets[if_set_id][i] pg_if.set_table_ip6(vrf_id) - self.logger.info("pg-interface %s added to IPv6 VRF ID %d" - % (pg_if.name, vrf_id)) + self.logger.info( + "pg-interface %s added to IPv6 VRF ID %d" % (pg_if.name, vrf_id) + ) if pg_if not in self.pg_in_vrf: self.pg_in_vrf.append(pg_if) if pg_if in self.pg_not_in_vrf: @@ -206,8 +214,9 @@ class TestIP6VrfMultiInst(VppTestCase): """ for i in range(count): vrf_id = i + start - self.vapi.ip_table_add_del(is_add=1, - table={'table_id': vrf_id, 'is_ip6': 1}) + self.vapi.ip_table_add_del( + is_add=1, table={"table_id": vrf_id, "is_ip6": 1} + ) self.logger.info("IPv6 VRF ID %d created" % vrf_id) if vrf_id not in self.vrf_list: self.vrf_list.append(vrf_id) @@ -217,8 +226,7 @@ class TestIP6VrfMultiInst(VppTestCase): self.logger.debug(self.vapi.ppcli("show ip6 fib")) self.logger.debug(self.vapi.ppcli("show ip6 neighbors")) - def create_vrf_by_id_and_assign_interfaces(self, set_id, - vrf_id=0xffffffff): + def create_vrf_by_id_and_assign_interfaces(self, set_id, vrf_id=0xFFFFFFFF): """ Create a FIB table / VRF by vrf_id, put 3 pg-ip6 interfaces to FIB table / VRF. @@ -226,8 +234,7 @@ class TestIP6VrfMultiInst(VppTestCase): :param int vrf_id: Required table ID / VRF ID. \ (Default value = 0xffffffff, ID will be selected automatically) """ - ret = self.vapi.ip_table_allocate(table={'table_id': vrf_id, - 'is_ip6': 1}) + ret = self.vapi.ip_table_allocate(table={"table_id": vrf_id, "is_ip6": 1}) vrf_id = ret.table.table_id self.logger.info("IPv6 VRF ID %d created" % vrf_id) if vrf_id not in self.vrf_list: @@ -248,7 +255,7 @@ class TestIP6VrfMultiInst(VppTestCase): """ if if_set_id is None: if_set_id = vrf_id - self.vapi.ip_table_flush(table={'table_id': vrf_id, 'is_ip6': 1}) + self.vapi.ip_table_flush(table={"table_id": vrf_id, "is_ip6": 1}) if vrf_id in self.vrf_list: self.vrf_list.remove(vrf_id) if vrf_id not in self.vrf_reset_list: @@ -270,8 +277,7 @@ class TestIP6VrfMultiInst(VppTestCase): self.vrf_list.remove(vrf_id) if vrf_id in self.vrf_reset_list: self.vrf_reset_list.remove(vrf_id) - self.vapi.ip_table_add_del(is_add=0, - table={'table_id': vrf_id, 'is_ip6': 1}) + self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id, "is_ip6": 1}) def create_stream(self, src_if, packet_sizes): """ @@ -288,16 +294,20 @@ class TestIP6VrfMultiInst(VppTestCase): src_host = random.choice(src_hosts) pkt_info = self.create_packet_info(src_if, dst_if) payload = self.info_to_payload(pkt_info) - p = (Ether(dst=src_if.local_mac, src=src_host.mac) / - IPv6(src=src_host.ip6, dst=dst_host.ip6) / - UDP(sport=1234, dport=1234) / - Raw(payload)) + p = ( + Ether(dst=src_if.local_mac, src=src_host.mac) + / IPv6(src=src_host.ip6, dst=dst_host.ip6) + / UDP(sport=1234, dport=1234) + / Raw(payload) + ) pkt_info.data = p.copy() size = random.choice(packet_sizes) self.extend_packet(p, size) pkts.append(p) - self.logger.debug("Input stream created for port %s. Length: %u pkt(s)" - % (src_if.name, len(pkts))) + self.logger.debug( + "Input stream created for port %s. Length: %u pkt(s)" + % (src_if.name, len(pkts)) + ) return pkts def create_stream_crosswise_vrf(self, src_if, vrf_id, packet_sizes): @@ -320,16 +330,20 @@ class TestIP6VrfMultiInst(VppTestCase): src_host = random.choice(src_hosts) pkt_info = self.create_packet_info(src_if, dst_if) payload = self.info_to_payload(pkt_info) - p = (Ether(dst=src_if.local_mac, src=src_host.mac) / - IPv6(src=src_host.ip6, dst=dst_host.ip6) / - UDP(sport=1234, dport=1234) / - Raw(payload)) + p = ( + Ether(dst=src_if.local_mac, src=src_host.mac) + / IPv6(src=src_host.ip6, dst=dst_host.ip6) + / UDP(sport=1234, dport=1234) + / Raw(payload) + ) pkt_info.data = p.copy() size = random.choice(packet_sizes) self.extend_packet(p, size) pkts.append(p) - self.logger.debug("Input stream created for port %s. Length: %u pkt(s)" - % (src_if.name, len(pkts))) + self.logger.debug( + "Input stream created for port %s. Length: %u pkt(s)" + % (src_if.name, len(pkts)) + ) return pkts def verify_capture(self, pg_if, capture): @@ -350,11 +364,13 @@ class TestIP6VrfMultiInst(VppTestCase): payload_info = self.payload_to_info(packet[Raw]) packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_sw_if_index) - self.logger.debug("Got packet on port %s: src=%u (id=%u)" % - (pg_if.name, payload_info.src, packet_index)) + self.logger.debug( + "Got packet on port %s: src=%u (id=%u)" + % (pg_if.name, payload_info.src, packet_index) + ) next_info = self.get_next_packet_info_for_interface2( - payload_info.src, dst_sw_if_index, - last_info[payload_info.src]) + payload_info.src, dst_sw_if_index, last_info[payload_info.src] + ) last_info[payload_info.src] = next_info self.assertIsNotNone(next_info) self.assertEqual(packet_index, next_info.index) @@ -369,11 +385,13 @@ class TestIP6VrfMultiInst(VppTestCase): raise for i in self.pg_interfaces: remaining_packet = self.get_next_packet_info_for_interface2( - i, dst_sw_if_index, last_info[i.sw_if_index]) + i, dst_sw_if_index, last_info[i.sw_if_index] + ) self.assertIsNone( remaining_packet, - "Port %u: Packet expected from source %u didn't arrive" % - (dst_sw_if_index, i.sw_if_index)) + "Port %u: Packet expected from source %u didn't arrive" + % (dst_sw_if_index, i.sw_if_index), + ) def verify_vrf(self, vrf_id, if_set_id=None): """ @@ -437,8 +455,9 @@ class TestIP6VrfMultiInst(VppTestCase): capture = pg_if.get_capture(remark="interface is in VRF") self.verify_capture(pg_if, capture) elif pg_if in self.pg_not_in_vrf: - pg_if.assert_nothing_captured(remark="interface is not in VRF", - filter_out_fn=is_ipv6_misc_ext) + pg_if.assert_nothing_captured( + remark="interface is not in VRF", filter_out_fn=is_ipv6_misc_ext + ) self.logger.debug("No capture for interface %s" % pg_if.name) else: raise Exception("Unknown interface: %s" % pg_if.name) @@ -458,7 +477,8 @@ class TestIP6VrfMultiInst(VppTestCase): for vrf_id in self.vrf_list: for pg_if in self.pg_if_sets[vrf_id]: pkts = self.create_stream_crosswise_vrf( - pg_if, vrf_id, self.pg_if_packet_sizes) + pg_if, vrf_id, self.pg_if_packet_sizes + ) pg_if.add_stream(pkts) # Enable packet capture and start packet sending @@ -468,29 +488,27 @@ class TestIP6VrfMultiInst(VppTestCase): # Verify # Verify outgoing packet streams per packet-generator interface for pg_if in self.pg_interfaces: - pg_if.assert_nothing_captured(remark="interface is in other VRF", - filter_out_fn=is_ipv6_misc_ext) + pg_if.assert_nothing_captured( + remark="interface is in other VRF", filter_out_fn=is_ipv6_misc_ext + ) self.logger.debug("No capture for interface %s" % pg_if.name) def test_ip6_vrf_01(self): - """ IP6 VRF Multi-instance test 1 - create 4 VRFs - """ + """IP6 VRF Multi-instance test 1 - create 4 VRFs""" # Config 1 # Create 4 VRFs self.create_vrf_and_assign_interfaces(4) # Verify 1 for vrf_id in self.vrf_list: - self.assert_equal(self.verify_vrf(vrf_id), - VRFState.configured, VRFState) + self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState) # Test 1 self.run_verify_test() self.run_crosswise_vrf_test() def test_ip6_vrf_02(self): - """ IP6 VRF Multi-instance test 2 - reset 2 VRFs - """ + """IP6 VRF Multi-instance test 2 - reset 2 VRFs""" # Config 2 # Delete 2 VRFs self.reset_vrf_and_remove_from_vrf_list(1) @@ -498,11 +516,9 @@ class TestIP6VrfMultiInst(VppTestCase): # Verify 2 for vrf_id in self.vrf_reset_list: - self.assert_equal(self.verify_vrf(vrf_id), - VRFState.reset, VRFState) + self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState) for vrf_id in self.vrf_list: - self.assert_equal(self.verify_vrf(vrf_id), - VRFState.configured, VRFState) + self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState) # Test 2 self.run_verify_test() @@ -513,8 +529,7 @@ class TestIP6VrfMultiInst(VppTestCase): # self.reset_vrf_and_remove_from_vrf_list(vrf_id) def test_ip6_vrf_03(self): - """ IP6 VRF Multi-instance 3 - add 2 VRFs - """ + """IP6 VRF Multi-instance 3 - add 2 VRFs""" # Config 3 # Add 1 of reset VRFs and 1 new VRF self.create_vrf_and_assign_interfaces(1) @@ -522,11 +537,9 @@ class TestIP6VrfMultiInst(VppTestCase): # Verify 3 for vrf_id in self.vrf_reset_list: - self.assert_equal(self.verify_vrf(vrf_id), - VRFState.reset, VRFState) + self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState) for vrf_id in self.vrf_list: - self.assert_equal(self.verify_vrf(vrf_id), - VRFState.configured, VRFState) + self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState) # Test 3 self.run_verify_test() @@ -537,8 +550,7 @@ class TestIP6VrfMultiInst(VppTestCase): # self.reset_vrf_and_remove_from_vrf_list(vrf_id) def test_ip6_vrf_04(self): - """ IP6 VRF Multi-instance test 4 - reset 4 VRFs - """ + """IP6 VRF Multi-instance test 4 - reset 4 VRFs""" # Config 4 # Reset all VRFs (i.e. no VRF except VRF=0 configured) for i in range(len(self.vrf_list)): @@ -547,20 +559,20 @@ class TestIP6VrfMultiInst(VppTestCase): # Verify 4 for vrf_id in self.vrf_reset_list: - self.assert_equal(self.verify_vrf(vrf_id), - VRFState.reset, VRFState) + self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState) vrf_list_length = len(self.vrf_list) self.assertEqual( - vrf_list_length, 0, - "List of configured VRFs is not empty: %s != 0" % vrf_list_length) + vrf_list_length, + 0, + "List of configured VRFs is not empty: %s != 0" % vrf_list_length, + ) # Test 4 self.run_verify_test() self.run_crosswise_vrf_test() def test_ip6_vrf_05(self): - """ IP6 VRF Multi-instance test 5 - auto allocate vrf id - """ + """IP6 VRF Multi-instance test 5 - auto allocate vrf id""" # Config 5 # Create several VRFs # Set vrf_id manually first @@ -571,11 +583,11 @@ class TestIP6VrfMultiInst(VppTestCase): ] # Verify 5 - self.assert_equal(self.verify_vrf(10, 1), VRFState.configured, - VRFState) + self.assert_equal(self.verify_vrf(10, 1), VRFState.configured, VRFState) for i, vrf in enumerate(auto_vrf_id): - self.assert_equal(self.verify_vrf(vrf, i+2), - VRFState.configured, VRFState) + self.assert_equal( + self.verify_vrf(vrf, i + 2), VRFState.configured, VRFState + ) # Test 5 self.run_verify_test() @@ -584,18 +596,19 @@ class TestIP6VrfMultiInst(VppTestCase): # Reset VRFs self.reset_vrf_and_remove_from_vrf_list(10, 1) for i, vrf in enumerate(auto_vrf_id): - self.reset_vrf_and_remove_from_vrf_list(vrf, i+2) + self.reset_vrf_and_remove_from_vrf_list(vrf, i + 2) # Verify 5.1 self.assert_equal(self.verify_vrf(10, 1), VRFState.reset, VRFState) for i, vrf in enumerate(auto_vrf_id): - self.assert_equal(self.verify_vrf(vrf, i+2), - VRFState.reset, VRFState) + self.assert_equal(self.verify_vrf(vrf, i + 2), VRFState.reset, VRFState) vrf_list_length = len(self.vrf_list) self.assertEqual( - vrf_list_length, 0, - "List of configured VRFs is not empty: %s != 0" % vrf_list_length) + vrf_list_length, + 0, + "List of configured VRFs is not empty: %s != 0" % vrf_list_length, + ) # Cleanup our extra created VRFs for vrf in auto_vrf_id: @@ -604,14 +617,12 @@ class TestIP6VrfMultiInst(VppTestCase): self.delete_vrf(10) def test_ip6_vrf_06(self): - """ IP6 VRF Multi-instance test 6 - recreate 4 VRFs - """ + """IP6 VRF Multi-instance test 6 - recreate 4 VRFs""" # Reconfigure all the VRFs self.create_vrf_and_assign_interfaces(4) # Verify for vrf_id in self.vrf_list: - self.assert_equal(self.verify_vrf(vrf_id), - VRFState.configured, VRFState) + self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState) # Test self.run_verify_test() self.run_crosswise_vrf_test() @@ -620,16 +631,17 @@ class TestIP6VrfMultiInst(VppTestCase): self.reset_vrf_and_remove_from_vrf_list(self.vrf_list[0]) # Verify for vrf_id in self.vrf_reset_list: - self.assert_equal(self.verify_vrf(vrf_id), - VRFState.reset, VRFState) + self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState) vrf_list_length = len(self.vrf_list) self.assertEqual( - vrf_list_length, 0, - "List of configured VRFs is not empty: %s != 0" % vrf_list_length) + vrf_list_length, + 0, + "List of configured VRFs is not empty: %s != 0" % vrf_list_length, + ) # Test self.run_verify_test() self.run_crosswise_vrf_test() -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)