import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, ARP, Dot1Q
-from scapy.layers.inet import IP, UDP
+from scapy.layers.inet import IP, UDP, TCP
from scapy.layers.inet6 import IPv6
from scapy.contrib.mpls import MPLS
from scapy.layers.inet6 import IPv6
# Send the ARP request with an originating address that
# is VPP's own address
#
- self.pg2.add_stream(arp_req_from_me)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
- rx = self.pg2.get_capture(1)
+ rx = self.send_and_expect(self.pg2, [arp_req_from_me], self.pg2)
self.verify_arp_resp(rx[0],
self.pg2.local_mac,
self.pg2.remote_mac,
self.pg2.sw_if_index,
self.pg0.local_ip4))
+ #
+ # setup a punt redirect so packets from the uplink go to the tap
+ #
+ self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
+ self.pg2.sw_if_index,
+ self.pg0.local_ip4)
+
+ p_tcp = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac,) /
+ IP(src=self.pg0.remote_ip4,
+ dst=self.pg0.local_ip4) /
+ TCP(sport=80, dport=80) /
+ Raw())
+ rx = self.send_and_expect(self.pg0, [p_tcp], self.pg2)
+
+ # there's no ARP entry so this is an ARP req
+ self.assertTrue(rx[0].haslayer(ARP))
+
+ # and ARP entry for VPP's pg0 address on the host interface
+ n1 = VppNeighbor(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_mac,
+ self.pg0.local_ip4,
+ is_no_fib_entry=True).add_vpp_config()
+ # now the packets shold forward
+ rx = self.send_and_expect(self.pg0, [p_tcp], self.pg2)
+ self.assertFalse(rx[0].haslayer(ARP))
+ self.assertEqual(rx[0][Ether].dst, self.pg2.remote_mac)
+
+ #
+ # flush the neighbor cache on the uplink
+ #
+ af = VppEnum.vl_api_address_family_t
+ self.vapi.ip_neighbor_flush(af.ADDRESS_IP4, self.pg0.sw_if_index)
+
+ # ensure we can still resolve the ARPs on the uplink
+ self.pg0.resolve_arp()
+
+ self.assertTrue(find_nbr(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_ip4))
+
#
# cleanup
#
static_arp.remove_vpp_config()
self.pg2.set_table_ip4(0)
+ def test_arp_static_replace_dynamic_same_mac(self):
+ """ ARP Static can replace Dynamic (same mac) """
+ self.pg2.generate_remote_hosts(1)
+
+ dyn_arp = VppNeighbor(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[0].mac,
+ self.pg2.remote_hosts[0].ip4)
+ static_arp = VppNeighbor(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[0].mac,
+ self.pg2.remote_hosts[0].ip4,
+ is_static=1)
+
+ #
+ # Add a dynamic ARP entry
+ #
+ dyn_arp.add_vpp_config()
+
+ #
+ # We should find the dynamic nbr
+ #
+ self.assertFalse(find_nbr(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[0].ip4,
+ is_static=1))
+ self.assertTrue(find_nbr(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[0].ip4,
+ is_static=0,
+ mac=self.pg2.remote_hosts[0].mac))
+
+ #
+ # Add a static ARP entry with the same mac
+ #
+ static_arp.add_vpp_config()
+
+ #
+ # We should now find the static nbr with the same mac
+ #
+ self.assertFalse(find_nbr(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[0].ip4,
+ is_static=0))
+ self.assertTrue(find_nbr(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[0].ip4,
+ is_static=1,
+ mac=self.pg2.remote_hosts[0].mac))
+
+ #
+ # clean-up
+ #
+ static_arp.remove_vpp_config()
+
+ def test_arp_static_replace_dynamic_diff_mac(self):
+ """ ARP Static can replace Dynamic (diff mac) """
+ self.pg2.generate_remote_hosts(2)
+
+ dyn_arp = VppNeighbor(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[0].mac,
+ self.pg2.remote_hosts[0].ip4)
+ static_arp = VppNeighbor(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[1].mac,
+ self.pg2.remote_hosts[0].ip4,
+ is_static=1)
+
+ #
+ # Add a dynamic ARP entry
+ #
+ dyn_arp.add_vpp_config()
+
+ #
+ # We should find the dynamic nbr
+ #
+ self.assertFalse(find_nbr(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[0].ip4,
+ is_static=1))
+ self.assertTrue(find_nbr(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[0].ip4,
+ is_static=0,
+ mac=self.pg2.remote_hosts[0].mac))
+
+ #
+ # Add a static ARP entry with a changed mac
+ #
+ static_arp.add_vpp_config()
+
+ #
+ # We should now find the static nbr with a changed mac
+ #
+ self.assertFalse(find_nbr(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[0].ip4,
+ is_static=0))
+ self.assertTrue(find_nbr(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[0].ip4,
+ is_static=1,
+ mac=self.pg2.remote_hosts[1].mac))
+
+ #
+ # clean-up
+ #
+ static_arp.remove_vpp_config()
+
def test_arp_incomplete(self):
""" ARP Incomplete"""
self.pg1.generate_remote_hosts(3)
#
# Set the neighbor configuration:
# limi = 200
- # age = 2 seconds
+ # age = 0 seconds
# recycle = false
#
self.vapi.ip_neighbor_config(af=vaf.ADDRESS_IP4,
self.assertFalse(self.vapi.ip_neighbor_dump(sw_if_index=0xffffffff,
af=vaf.ADDRESS_IP4))
+ #
+ # load up some neighbours again with 2s aging enabled
+ # they should be removed after 10s (2s age + 4s for probes + gap)
+ #
+ for ii in range(10):
+ VppNeighbor(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[ii].mac,
+ self.pg0.remote_hosts[ii].ip4).add_vpp_config()
+ self.sleep(10)
+ self.assertFalse(self.vapi.ip_neighbor_dump(sw_if_index=0xffffffff,
+ af=vaf.ADDRESS_IP4))
+
+ #
+ # check if we can set age and recycle with empty neighbor list
+ #
+ self.vapi.ip_neighbor_config(af=vaf.ADDRESS_IP4,
+ max_number=200,
+ max_age=1000,
+ recycle=True)
+
#
# load up some neighbours again, then disable the aging
# they should still be there in 10 seconds time
self.pg0.remote_hosts[0].ip4))
+class NeighborReplaceTestCase(VppTestCase):
+ """ ARP/ND Replacement """
+
+ @classmethod
+ def setUpClass(cls):
+ super(NeighborReplaceTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(NeighborReplaceTestCase, cls).tearDownClass()
+
+ def setUp(self):
+ super(NeighborReplaceTestCase, self).setUp()
+
+ self.create_pg_interfaces(range(4))
+
+ # pg0 configured with ip4 and 6 addresses used for input
+ # pg1 configured with ip4 and 6 addresses used for output
+ # pg2 is unnumbered to pg0
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.config_ip6()
+ i.resolve_arp()
+ i.resolve_ndp()
+
+ def tearDown(self):
+ super(NeighborReplaceTestCase, self).tearDown()
+
+ for i in self.pg_interfaces:
+ i.unconfig_ip4()
+ i.unconfig_ip6()
+ i.admin_down()
+
+ def test_replace(self):
+ """ replace """
+
+ N_HOSTS = 16
+
+ for i in self.pg_interfaces:
+ i.generate_remote_hosts(N_HOSTS)
+ i.configure_ipv4_neighbors()
+ i.configure_ipv6_neighbors()
+
+ # replace them all
+ self.vapi.ip_neighbor_replace_begin()
+ self.vapi.ip_neighbor_replace_end()
+
+ for i in self.pg_interfaces:
+ for h in range(N_HOSTS):
+ self.assertFalse(find_nbr(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[h].ip4))
+ self.assertFalse(find_nbr(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[h].ip6))
+
+ #
+ # and them all back via the API
+ #
+ for i in self.pg_interfaces:
+ for h in range(N_HOSTS):
+ VppNeighbor(self,
+ i.sw_if_index,
+ i.remote_hosts[h].mac,
+ i.remote_hosts[h].ip4).add_vpp_config()
+ VppNeighbor(self,
+ i.sw_if_index,
+ i.remote_hosts[h].mac,
+ i.remote_hosts[h].ip6).add_vpp_config()
+
+ #
+ # begin the replacement again, this time touch some
+ # the neighbours on pg1 so they are not deleted
+ #
+ self.vapi.ip_neighbor_replace_begin()
+
+ # update from the API all neighbours on pg1
+ for h in range(N_HOSTS):
+ VppNeighbor(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[h].mac,
+ self.pg1.remote_hosts[h].ip4).add_vpp_config()
+ VppNeighbor(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[h].mac,
+ self.pg1.remote_hosts[h].ip6).add_vpp_config()
+
+ # update from the data-plane all neighbours on pg3
+ self.pg3.configure_ipv4_neighbors()
+ self.pg3.configure_ipv6_neighbors()
+
+ # complete the replacement
+ self.logger.info(self.vapi.cli("sh ip neighbors"))
+ self.vapi.ip_neighbor_replace_end()
+
+ for i in self.pg_interfaces:
+ if i == self.pg1 or i == self.pg3:
+ # neighbours on pg1 and pg3 are still present
+ for h in range(N_HOSTS):
+ self.assertTrue(find_nbr(self,
+ i.sw_if_index,
+ i.remote_hosts[h].ip4))
+ self.assertTrue(find_nbr(self,
+ i.sw_if_index,
+ i.remote_hosts[h].ip6))
+ else:
+ # all other neighbours are toast
+ for h in range(N_HOSTS):
+ self.assertFalse(find_nbr(self,
+ i.sw_if_index,
+ i.remote_hosts[h].ip4))
+ self.assertFalse(find_nbr(self,
+ i.sw_if_index,
+ i.remote_hosts[h].ip6))
+
+
+class NeighborFlush(VppTestCase):
+ """ Neighbor Flush """
+
+ @classmethod
+ def setUpClass(cls):
+ super(NeighborFlush, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(NeighborFlush, cls).tearDownClass()
+
+ def setUp(self):
+ super(NeighborFlush, self).setUp()
+
+ self.create_pg_interfaces(range(2))
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.config_ip6()
+ i.resolve_arp()
+ i.resolve_ndp()
+
+ def tearDown(self):
+ super(NeighborFlush, self).tearDown()
+
+ for i in self.pg_interfaces:
+ i.unconfig_ip4()
+ i.unconfig_ip6()
+ i.admin_down()
+
+ def test_flush(self):
+ """ Neighbour Flush """
+
+ e = VppEnum
+ nf = e.vl_api_ip_neighbor_flags_t
+ af = e.vl_api_address_family_t
+ N_HOSTS = 16
+ static = [False, True]
+ self.pg0.generate_remote_hosts(N_HOSTS)
+ self.pg1.generate_remote_hosts(N_HOSTS)
+
+ for s in static:
+ # a few v4 and v6 dynamic neoghbors
+ for n in range(N_HOSTS):
+ VppNeighbor(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[n].mac,
+ self.pg0.remote_hosts[n].ip4,
+ is_static=s).add_vpp_config()
+ VppNeighbor(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[n].mac,
+ self.pg1.remote_hosts[n].ip6,
+ is_static=s).add_vpp_config()
+
+ # flush the interfaces individually
+ self.vapi.ip_neighbor_flush(af.ADDRESS_IP4, self.pg0.sw_if_index)
+
+ # check we haven't flushed that which we shouldn't
+ for n in range(N_HOSTS):
+ self.assertTrue(find_nbr(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[n].ip6,
+ is_static=s))
+
+ self.vapi.ip_neighbor_flush(af.ADDRESS_IP6, self.pg1.sw_if_index)
+
+ for n in range(N_HOSTS):
+ self.assertFalse(find_nbr(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[n].ip4))
+ self.assertFalse(find_nbr(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[n].ip6))
+
+ # add the nieghbours back
+ for n in range(N_HOSTS):
+ VppNeighbor(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[n].mac,
+ self.pg0.remote_hosts[n].ip4,
+ is_static=s).add_vpp_config()
+ VppNeighbor(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[n].mac,
+ self.pg1.remote_hosts[n].ip6,
+ is_static=s).add_vpp_config()
+
+ self.logger.info(self.vapi.cli("sh ip neighbor"))
+
+ # flush both interfaces at the same time
+ self.vapi.ip_neighbor_flush(af.ADDRESS_IP6, 0xffffffff)
+
+ # check we haven't flushed that which we shouldn't
+ for n in range(N_HOSTS):
+ self.assertTrue(find_nbr(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[n].ip4,
+ is_static=s))
+
+ self.vapi.ip_neighbor_flush(af.ADDRESS_IP4, 0xffffffff)
+
+ for n in range(N_HOSTS):
+ self.assertFalse(find_nbr(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[n].ip4))
+ self.assertFalse(find_nbr(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[n].ip6))
+
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)