X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_neighbor.py;h=1045f4ba1e9bb66cc03e74e5a44b6ccc1f073956;hb=3c70c05e1f330ed0034319252c3dcd565cf13d6b;hp=7157839127b0ff544e60312e0fe893bd23668f4d;hpb=1284f8c71da8ec35cba04351cf62cba7bdd7f847;p=vpp.git diff --git a/test/test_neighbor.py b/test/test_neighbor.py index 7157839127b..1045f4ba1e9 100644 --- a/test/test_neighbor.py +++ b/test/test_neighbor.py @@ -13,7 +13,7 @@ from vpp_papi import VppEnum import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether, ARP, Dot1Q -from scapy.layers.inet import IP, UDP +from scapy.layers.inet import IP, UDP, TCP from scapy.layers.inet6 import IPv6 from scapy.contrib.mpls import MPLS from scapy.layers.inet6 import IPv6 @@ -777,11 +777,7 @@ class ARPTestCase(VppTestCase): # Send the ARP request with an originating address that # is VPP's own address # - self.pg2.add_stream(arp_req_from_me) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - - rx = self.pg2.get_capture(1) + rx = self.send_and_expect(self.pg2, [arp_req_from_me], self.pg2) self.verify_arp_resp(rx[0], self.pg2.local_mac, self.pg2.remote_mac, @@ -795,6 +791,48 @@ class ARPTestCase(VppTestCase): self.pg2.sw_if_index, self.pg0.local_ip4)) + # + # setup a punt redirect so packets from the uplink go to the tap + # + self.vapi.ip_punt_redirect(self.pg0.sw_if_index, + self.pg2.sw_if_index, + self.pg0.local_ip4) + + p_tcp = (Ether(src=self.pg0.remote_mac, + dst=self.pg0.local_mac,) / + IP(src=self.pg0.remote_ip4, + dst=self.pg0.local_ip4) / + TCP(sport=80, dport=80) / + Raw()) + rx = self.send_and_expect(self.pg0, [p_tcp], self.pg2) + + # there's no ARP entry so this is an ARP req + self.assertTrue(rx[0].haslayer(ARP)) + + # and ARP entry for VPP's pg0 address on the host interface + n1 = VppNeighbor(self, + self.pg2.sw_if_index, + self.pg2.remote_mac, + self.pg0.local_ip4, + is_no_fib_entry=True).add_vpp_config() + # now the packets shold forward + rx = self.send_and_expect(self.pg0, [p_tcp], self.pg2) + self.assertFalse(rx[0].haslayer(ARP)) + self.assertEqual(rx[0][Ether].dst, self.pg2.remote_mac) + + # + # flush the neighbor cache on the uplink + # + af = VppEnum.vl_api_address_family_t + self.vapi.ip_neighbor_flush(af.ADDRESS_IP4, self.pg0.sw_if_index) + + # ensure we can still resolve the ARPs on the uplink + self.pg0.resolve_arp() + + self.assertTrue(find_nbr(self, + self.pg0.sw_if_index, + self.pg0.remote_ip4)) + # # cleanup # @@ -1209,6 +1247,116 @@ class ARPTestCase(VppTestCase): static_arp.remove_vpp_config() self.pg2.set_table_ip4(0) + def test_arp_static_replace_dynamic_same_mac(self): + """ ARP Static can replace Dynamic (same mac) """ + self.pg2.generate_remote_hosts(1) + + dyn_arp = VppNeighbor(self, + self.pg2.sw_if_index, + self.pg2.remote_hosts[0].mac, + self.pg2.remote_hosts[0].ip4) + static_arp = VppNeighbor(self, + self.pg2.sw_if_index, + self.pg2.remote_hosts[0].mac, + self.pg2.remote_hosts[0].ip4, + is_static=1) + + # + # Add a dynamic ARP entry + # + dyn_arp.add_vpp_config() + + # + # We should find the dynamic nbr + # + self.assertFalse(find_nbr(self, + self.pg2.sw_if_index, + self.pg2.remote_hosts[0].ip4, + is_static=1)) + self.assertTrue(find_nbr(self, + self.pg2.sw_if_index, + self.pg2.remote_hosts[0].ip4, + is_static=0, + mac=self.pg2.remote_hosts[0].mac)) + + # + # Add a static ARP entry with the same mac + # + static_arp.add_vpp_config() + + # + # We should now find the static nbr with the same mac + # + self.assertFalse(find_nbr(self, + self.pg2.sw_if_index, + self.pg2.remote_hosts[0].ip4, + is_static=0)) + self.assertTrue(find_nbr(self, + self.pg2.sw_if_index, + self.pg2.remote_hosts[0].ip4, + is_static=1, + mac=self.pg2.remote_hosts[0].mac)) + + # + # clean-up + # + static_arp.remove_vpp_config() + + def test_arp_static_replace_dynamic_diff_mac(self): + """ ARP Static can replace Dynamic (diff mac) """ + self.pg2.generate_remote_hosts(2) + + dyn_arp = VppNeighbor(self, + self.pg2.sw_if_index, + self.pg2.remote_hosts[0].mac, + self.pg2.remote_hosts[0].ip4) + static_arp = VppNeighbor(self, + self.pg2.sw_if_index, + self.pg2.remote_hosts[1].mac, + self.pg2.remote_hosts[0].ip4, + is_static=1) + + # + # Add a dynamic ARP entry + # + dyn_arp.add_vpp_config() + + # + # We should find the dynamic nbr + # + self.assertFalse(find_nbr(self, + self.pg2.sw_if_index, + self.pg2.remote_hosts[0].ip4, + is_static=1)) + self.assertTrue(find_nbr(self, + self.pg2.sw_if_index, + self.pg2.remote_hosts[0].ip4, + is_static=0, + mac=self.pg2.remote_hosts[0].mac)) + + # + # Add a static ARP entry with a changed mac + # + static_arp.add_vpp_config() + + # + # We should now find the static nbr with a changed mac + # + self.assertFalse(find_nbr(self, + self.pg2.sw_if_index, + self.pg2.remote_hosts[0].ip4, + is_static=0)) + self.assertTrue(find_nbr(self, + self.pg2.sw_if_index, + self.pg2.remote_hosts[0].ip4, + is_static=1, + mac=self.pg2.remote_hosts[1].mac)) + + # + # clean-up + # + static_arp.remove_vpp_config() + def test_arp_incomplete(self): """ ARP Incomplete""" self.pg1.generate_remote_hosts(3) @@ -1849,5 +1997,234 @@ class NeighborAgeTestCase(VppTestCase): self.pg0.remote_hosts[0].ip4)) +class NeighborReplaceTestCase(VppTestCase): + """ ARP/ND Replacement """ + + @classmethod + def setUpClass(cls): + super(NeighborReplaceTestCase, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(NeighborReplaceTestCase, cls).tearDownClass() + + def setUp(self): + super(NeighborReplaceTestCase, self).setUp() + + self.create_pg_interfaces(range(4)) + + # pg0 configured with ip4 and 6 addresses used for input + # pg1 configured with ip4 and 6 addresses used for output + # pg2 is unnumbered to pg0 + for i in self.pg_interfaces: + i.admin_up() + i.config_ip4() + i.config_ip6() + i.resolve_arp() + i.resolve_ndp() + + def tearDown(self): + super(NeighborReplaceTestCase, self).tearDown() + + for i in self.pg_interfaces: + i.unconfig_ip4() + i.unconfig_ip6() + i.admin_down() + + def test_replace(self): + """ replace """ + + N_HOSTS = 16 + + for i in self.pg_interfaces: + i.generate_remote_hosts(N_HOSTS) + i.configure_ipv4_neighbors() + i.configure_ipv6_neighbors() + + # replace them all + self.vapi.ip_neighbor_replace_begin() + self.vapi.ip_neighbor_replace_end() + + for i in self.pg_interfaces: + for h in range(N_HOSTS): + self.assertFalse(find_nbr(self, + self.pg0.sw_if_index, + self.pg0.remote_hosts[h].ip4)) + self.assertFalse(find_nbr(self, + self.pg0.sw_if_index, + self.pg0.remote_hosts[h].ip6)) + + # + # and them all back via the API + # + for i in self.pg_interfaces: + for h in range(N_HOSTS): + VppNeighbor(self, + i.sw_if_index, + i.remote_hosts[h].mac, + i.remote_hosts[h].ip4).add_vpp_config() + VppNeighbor(self, + i.sw_if_index, + i.remote_hosts[h].mac, + i.remote_hosts[h].ip6).add_vpp_config() + + # + # begin the replacement again, this time touch some + # the neighbours on pg1 so they are not deleted + # + self.vapi.ip_neighbor_replace_begin() + + # update from the API all neighbours on pg1 + for h in range(N_HOSTS): + VppNeighbor(self, + self.pg1.sw_if_index, + self.pg1.remote_hosts[h].mac, + self.pg1.remote_hosts[h].ip4).add_vpp_config() + VppNeighbor(self, + self.pg1.sw_if_index, + self.pg1.remote_hosts[h].mac, + self.pg1.remote_hosts[h].ip6).add_vpp_config() + + # update from the data-plane all neighbours on pg3 + self.pg3.configure_ipv4_neighbors() + self.pg3.configure_ipv6_neighbors() + + # complete the replacement + self.logger.info(self.vapi.cli("sh ip neighbors")) + self.vapi.ip_neighbor_replace_end() + + for i in self.pg_interfaces: + if i == self.pg1 or i == self.pg3: + # neighbours on pg1 and pg3 are still present + for h in range(N_HOSTS): + self.assertTrue(find_nbr(self, + i.sw_if_index, + i.remote_hosts[h].ip4)) + self.assertTrue(find_nbr(self, + i.sw_if_index, + i.remote_hosts[h].ip6)) + else: + # all other neighbours are toast + for h in range(N_HOSTS): + self.assertFalse(find_nbr(self, + i.sw_if_index, + i.remote_hosts[h].ip4)) + self.assertFalse(find_nbr(self, + i.sw_if_index, + i.remote_hosts[h].ip6)) + + +class NeighborFlush(VppTestCase): + """ Neighbor Flush """ + + @classmethod + def setUpClass(cls): + super(NeighborFlush, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(NeighborFlush, cls).tearDownClass() + + def setUp(self): + super(NeighborFlush, self).setUp() + + self.create_pg_interfaces(range(2)) + + for i in self.pg_interfaces: + i.admin_up() + i.config_ip4() + i.config_ip6() + i.resolve_arp() + i.resolve_ndp() + + def tearDown(self): + super(NeighborFlush, self).tearDown() + + for i in self.pg_interfaces: + i.unconfig_ip4() + i.unconfig_ip6() + i.admin_down() + + def test_flush(self): + """ Neighbour Flush """ + + e = VppEnum + nf = e.vl_api_ip_neighbor_flags_t + af = e.vl_api_address_family_t + N_HOSTS = 16 + static = [False, True] + self.pg0.generate_remote_hosts(N_HOSTS) + self.pg1.generate_remote_hosts(N_HOSTS) + + for s in static: + # a few v4 and v6 dynamic neoghbors + for n in range(N_HOSTS): + VppNeighbor(self, + self.pg0.sw_if_index, + self.pg0.remote_hosts[n].mac, + self.pg0.remote_hosts[n].ip4, + is_static=s).add_vpp_config() + VppNeighbor(self, + self.pg1.sw_if_index, + self.pg1.remote_hosts[n].mac, + self.pg1.remote_hosts[n].ip6, + is_static=s).add_vpp_config() + + # flush the interfaces individually + self.vapi.ip_neighbor_flush(af.ADDRESS_IP4, self.pg0.sw_if_index) + + # check we haven't flushed that which we shouldn't + for n in range(N_HOSTS): + self.assertTrue(find_nbr(self, + self.pg1.sw_if_index, + self.pg1.remote_hosts[n].ip6, + is_static=s)) + + self.vapi.ip_neighbor_flush(af.ADDRESS_IP6, self.pg1.sw_if_index) + + for n in range(N_HOSTS): + self.assertFalse(find_nbr(self, + self.pg0.sw_if_index, + self.pg0.remote_hosts[n].ip4)) + self.assertFalse(find_nbr(self, + self.pg1.sw_if_index, + self.pg1.remote_hosts[n].ip6)) + + # add the nieghbours back + for n in range(N_HOSTS): + VppNeighbor(self, + self.pg0.sw_if_index, + self.pg0.remote_hosts[n].mac, + self.pg0.remote_hosts[n].ip4, + is_static=s).add_vpp_config() + VppNeighbor(self, + self.pg1.sw_if_index, + self.pg1.remote_hosts[n].mac, + self.pg1.remote_hosts[n].ip6, + is_static=s).add_vpp_config() + + self.logger.info(self.vapi.cli("sh ip neighbor")) + + # flush both interfaces at the same time + self.vapi.ip_neighbor_flush(af.ADDRESS_IP6, 0xffffffff) + + # check we haven't flushed that which we shouldn't + for n in range(N_HOSTS): + self.assertTrue(find_nbr(self, + self.pg0.sw_if_index, + self.pg0.remote_hosts[n].ip4, + is_static=s)) + + self.vapi.ip_neighbor_flush(af.ADDRESS_IP4, 0xffffffff) + + for n in range(N_HOSTS): + self.assertFalse(find_nbr(self, + self.pg0.sw_if_index, + self.pg0.remote_hosts[n].ip4)) + self.assertFalse(find_nbr(self, + self.pg1.sw_if_index, + self.pg1.remote_hosts[n].ip6)) + + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)