import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, ARP, Dot1Q
-from scapy.layers.inet import IP, UDP
+from scapy.layers.inet import IP, UDP, TCP
from scapy.layers.inet6 import IPv6
from scapy.contrib.mpls import MPLS
from scapy.layers.inet6 import IPv6
# Send the ARP request with an originating address that
# is VPP's own address
#
- self.pg2.add_stream(arp_req_from_me)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
- rx = self.pg2.get_capture(1)
+ rx = self.send_and_expect(self.pg2, [arp_req_from_me], self.pg2)
self.verify_arp_resp(rx[0],
self.pg2.local_mac,
self.pg2.remote_mac,
self.pg2.sw_if_index,
self.pg0.local_ip4))
+ #
+ # setup a punt redirect so packets from the uplink go to the tap
+ #
+ self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
+ self.pg2.sw_if_index,
+ self.pg0.local_ip4)
+
+ p_tcp = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac,) /
+ IP(src=self.pg0.remote_ip4,
+ dst=self.pg0.local_ip4) /
+ TCP(sport=80, dport=80) /
+ Raw())
+ rx = self.send_and_expect(self.pg0, [p_tcp], self.pg2)
+
+ # there's no ARP entry so this is an ARP req
+ self.assertTrue(rx[0].haslayer(ARP))
+
+ # and ARP entry for VPP's pg0 address on the host interface
+ n1 = VppNeighbor(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_mac,
+ self.pg0.local_ip4,
+ is_no_fib_entry=True).add_vpp_config()
+ # now the packets shold forward
+ rx = self.send_and_expect(self.pg0, [p_tcp], self.pg2)
+ self.assertFalse(rx[0].haslayer(ARP))
+ self.assertEqual(rx[0][Ether].dst, self.pg2.remote_mac)
+
+ #
+ # flush the neighbor cache on the uplink
+ #
+ af = VppEnum.vl_api_address_family_t
+ self.vapi.ip_neighbor_flush(af.ADDRESS_IP4, self.pg0.sw_if_index)
+
+ # ensure we can still resolve the ARPs on the uplink
+ self.pg0.resolve_arp()
+
+ self.assertTrue(find_nbr(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_ip4))
+
#
# cleanup
#
static_arp.remove_vpp_config()
self.pg2.set_table_ip4(0)
+ def test_arp_static_replace_dynamic_same_mac(self):
+ """ ARP Static can replace Dynamic (same mac) """
+ self.pg2.generate_remote_hosts(1)
+
+ dyn_arp = VppNeighbor(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[0].mac,
+ self.pg2.remote_hosts[0].ip4)
+ static_arp = VppNeighbor(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[0].mac,
+ self.pg2.remote_hosts[0].ip4,
+ is_static=1)
+
+ #
+ # Add a dynamic ARP entry
+ #
+ dyn_arp.add_vpp_config()
+
+ #
+ # We should find the dynamic nbr
+ #
+ self.assertFalse(find_nbr(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[0].ip4,
+ is_static=1))
+ self.assertTrue(find_nbr(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[0].ip4,
+ is_static=0,
+ mac=self.pg2.remote_hosts[0].mac))
+
+ #
+ # Add a static ARP entry with the same mac
+ #
+ static_arp.add_vpp_config()
+
+ #
+ # We should now find the static nbr with the same mac
+ #
+ self.assertFalse(find_nbr(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[0].ip4,
+ is_static=0))
+ self.assertTrue(find_nbr(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[0].ip4,
+ is_static=1,
+ mac=self.pg2.remote_hosts[0].mac))
+
+ #
+ # clean-up
+ #
+ static_arp.remove_vpp_config()
+
+ def test_arp_static_replace_dynamic_diff_mac(self):
+ """ ARP Static can replace Dynamic (diff mac) """
+ self.pg2.generate_remote_hosts(2)
+
+ dyn_arp = VppNeighbor(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[0].mac,
+ self.pg2.remote_hosts[0].ip4)
+ static_arp = VppNeighbor(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[1].mac,
+ self.pg2.remote_hosts[0].ip4,
+ is_static=1)
+
+ #
+ # Add a dynamic ARP entry
+ #
+ dyn_arp.add_vpp_config()
+
+ #
+ # We should find the dynamic nbr
+ #
+ self.assertFalse(find_nbr(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[0].ip4,
+ is_static=1))
+ self.assertTrue(find_nbr(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[0].ip4,
+ is_static=0,
+ mac=self.pg2.remote_hosts[0].mac))
+
+ #
+ # Add a static ARP entry with a changed mac
+ #
+ static_arp.add_vpp_config()
+
+ #
+ # We should now find the static nbr with a changed mac
+ #
+ self.assertFalse(find_nbr(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[0].ip4,
+ is_static=0))
+ self.assertTrue(find_nbr(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[0].ip4,
+ is_static=1,
+ mac=self.pg2.remote_hosts[1].mac))
+
+ #
+ # clean-up
+ #
+ static_arp.remove_vpp_config()
+
def test_arp_incomplete(self):
""" ARP Incomplete"""
self.pg1.generate_remote_hosts(3)
# Generate some hosts on the LAN
#
self.pg1.generate_remote_hosts(4)
+ self.pg2.generate_remote_hosts(4)
#
# And an ARP entry
self.pg1.sw_if_index,
self.pg1.remote_hosts[2].ip4))
+ #
+ # IP address in different subnets are not learnt
+ #
+ self.pg2.configure_ipv4_neighbors()
+
+ for op in ["is-at", "who-has"]:
+ p1 = [(Ether(dst="ff:ff:ff:ff:ff:ff",
+ src=self.pg2.remote_hosts[1].mac) /
+ ARP(op=op,
+ hwdst=self.pg2.local_mac,
+ hwsrc=self.pg2.remote_hosts[1].mac,
+ pdst=self.pg2.remote_hosts[1].ip4,
+ psrc=self.pg2.remote_hosts[1].ip4)),
+ (Ether(dst="ff:ff:ff:ff:ff:ff",
+ src=self.pg2.remote_hosts[1].mac) /
+ ARP(op=op,
+ hwdst="ff:ff:ff:ff:ff:ff",
+ hwsrc=self.pg2.remote_hosts[1].mac,
+ pdst=self.pg2.remote_hosts[1].ip4,
+ psrc=self.pg2.remote_hosts[1].ip4))]
+
+ self.send_and_assert_no_replies(self.pg1, p1)
+ self.assertFalse(find_nbr(self,
+ self.pg1.sw_if_index,
+ self.pg2.remote_hosts[1].ip4))
+
+ # they are all dropped because the subnet's don't match
+ self.assertEqual(4, self.statistics.get_err_counter(
+ "/err/arp-reply/IP4 destination address not local to subnet"))
+
def test_arp_incomplete(self):
""" Incomplete Entries """
i.remote_hosts[h].ip6))
+class NeighborFlush(VppTestCase):
+ """ Neighbor Flush """
+
+ @classmethod
+ def setUpClass(cls):
+ super(NeighborFlush, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(NeighborFlush, cls).tearDownClass()
+
+ def setUp(self):
+ super(NeighborFlush, self).setUp()
+
+ self.create_pg_interfaces(range(2))
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.config_ip6()
+ i.resolve_arp()
+ i.resolve_ndp()
+
+ def tearDown(self):
+ super(NeighborFlush, self).tearDown()
+
+ for i in self.pg_interfaces:
+ i.unconfig_ip4()
+ i.unconfig_ip6()
+ i.admin_down()
+
+ def test_flush(self):
+ """ Neighbour Flush """
+
+ e = VppEnum
+ nf = e.vl_api_ip_neighbor_flags_t
+ af = e.vl_api_address_family_t
+ N_HOSTS = 16
+ static = [False, True]
+ self.pg0.generate_remote_hosts(N_HOSTS)
+ self.pg1.generate_remote_hosts(N_HOSTS)
+
+ for s in static:
+ # a few v4 and v6 dynamic neoghbors
+ for n in range(N_HOSTS):
+ VppNeighbor(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[n].mac,
+ self.pg0.remote_hosts[n].ip4,
+ is_static=s).add_vpp_config()
+ VppNeighbor(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[n].mac,
+ self.pg1.remote_hosts[n].ip6,
+ is_static=s).add_vpp_config()
+
+ # flush the interfaces individually
+ self.vapi.ip_neighbor_flush(af.ADDRESS_IP4, self.pg0.sw_if_index)
+
+ # check we haven't flushed that which we shouldn't
+ for n in range(N_HOSTS):
+ self.assertTrue(find_nbr(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[n].ip6,
+ is_static=s))
+
+ self.vapi.ip_neighbor_flush(af.ADDRESS_IP6, self.pg1.sw_if_index)
+
+ for n in range(N_HOSTS):
+ self.assertFalse(find_nbr(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[n].ip4))
+ self.assertFalse(find_nbr(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[n].ip6))
+
+ # add the nieghbours back
+ for n in range(N_HOSTS):
+ VppNeighbor(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[n].mac,
+ self.pg0.remote_hosts[n].ip4,
+ is_static=s).add_vpp_config()
+ VppNeighbor(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[n].mac,
+ self.pg1.remote_hosts[n].ip6,
+ is_static=s).add_vpp_config()
+
+ self.logger.info(self.vapi.cli("sh ip neighbor"))
+
+ # flush both interfaces at the same time
+ self.vapi.ip_neighbor_flush(af.ADDRESS_IP6, 0xffffffff)
+
+ # check we haven't flushed that which we shouldn't
+ for n in range(N_HOSTS):
+ self.assertTrue(find_nbr(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[n].ip4,
+ is_static=s))
+
+ self.vapi.ip_neighbor_flush(af.ADDRESS_IP4, 0xffffffff)
+
+ for n in range(N_HOSTS):
+ self.assertFalse(find_nbr(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[n].ip4))
+ self.assertFalse(find_nbr(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[n].ip6))
+
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)