-#!/usr/bin/env python
+#!/usr/bin/env python3
import unittest
+import os
from socket import AF_INET, AF_INET6, inet_pton
from framework import VppTestCase, VppTestRunner
from vpp_neighbor import VppNeighbor, find_nbr
from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, \
- VppIpTable, DpoProto
+ VppIpTable, DpoProto, FibPathType
+from vpp_papi import VppEnum
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, ARP, Dot1Q
from scapy.layers.inet import IP, UDP
from scapy.contrib.mpls import MPLS
from scapy.layers.inet6 import IPv6
+
+NUM_PKTS = 67
+
# not exported by scapy, so redefined here
arp_opts = {"who-has": 1, "is-at": 2}
class ARPTestCase(VppTestCase):
""" ARP Test Case """
+ @classmethod
+ def setUpClass(cls):
+ super(ARPTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(ARPTestCase, cls).tearDownClass()
+
def setUp(self):
super(ARPTestCase, self).setUp()
#
self.pg1.generate_remote_hosts(11)
+ #
+ # watch for:
+ # - all neighbour events
+ # - all neighbor events on pg1
+ # - neighbor events for host[1] on pg1
+ #
+ self.vapi.want_ip_neighbor_events(enable=1,
+ pid=os.getpid())
+ self.vapi.want_ip_neighbor_events(enable=1,
+ pid=os.getpid(),
+ sw_if_index=self.pg1.sw_if_index)
+ self.vapi.want_ip_neighbor_events(enable=1,
+ pid=os.getpid(),
+ sw_if_index=self.pg1.sw_if_index,
+ ip=self.pg1.remote_hosts[1].ip4)
+
+ self.logger.info(self.vapi.cli("sh ip neighbor-watcher"))
+
#
# Send IP traffic to one of these unresolved hosts.
# expect the generation of an ARP request
self.pg1.remote_hosts[1].mac,
self.pg1.remote_hosts[1].ip4)
dyn_arp.add_vpp_config()
+ self.assertTrue(dyn_arp.query_vpp_config())
+
+ # this matches all of the listnerers
+ es = [self.vapi.wait_for_event(1, "ip_neighbor_event")
+ for i in range(3)]
+ for e in es:
+ self.assertEqual(str(e.neighbor.ip_address),
+ self.pg1.remote_hosts[1].ip4)
#
# now we expect IP traffic forwarded
self.pg1.remote_hosts[2].ip4,
is_static=1)
static_arp.add_vpp_config()
+ es = [self.vapi.wait_for_event(1, "ip_neighbor_event")
+ for i in range(2)]
+ for e in es:
+ self.assertEqual(str(e.neighbor.ip_address),
+ self.pg1.remote_hosts[2].ip4)
static_p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
IP(src=self.pg0.remote_ip4,
self.pg0.remote_ip4,
self.pg1._remote_hosts[2].ip4)
+ #
+ # remove all the listeners
+ #
+ self.vapi.want_ip_neighbor_events(enable=0,
+ pid=os.getpid())
+ self.vapi.want_ip_neighbor_events(enable=0,
+ pid=os.getpid(),
+ sw_if_index=self.pg1.sw_if_index)
+ self.vapi.want_ip_neighbor_events(enable=0,
+ pid=os.getpid(),
+ sw_if_index=self.pg1.sw_if_index,
+ ip=self.pg1.remote_hosts[1].ip4)
+
#
# flap the link. dynamic ARPs get flush, statics don't
#
self.pg1.local_ip4,
self.pg1._remote_hosts[1].ip4)
+ self.assertFalse(dyn_arp.query_vpp_config())
+ self.assertTrue(static_arp.query_vpp_config())
#
# Send an ARP request from one of the so-far unlearned remote hosts
#
#
self.pg2.set_unnumbered(self.pg1.sw_if_index)
+ #
+ # test the unnumbered dump both by all interfaces and just the enabled
+ # one
+ #
unnum = self.vapi.ip_unnumbered_dump()
+ self.assertTrue(len(unnum))
+ self.assertEqual(unnum[0].ip_sw_if_index, self.pg1.sw_if_index)
+ self.assertEqual(unnum[0].sw_if_index, self.pg2.sw_if_index)
+ unnum = self.vapi.ip_unnumbered_dump(self.pg2.sw_if_index)
+ self.assertTrue(len(unnum))
self.assertEqual(unnum[0].ip_sw_if_index, self.pg1.sw_if_index)
self.assertEqual(unnum[0].sw_if_index, self.pg2.sw_if_index)
self.pg1._remote_hosts[9].ip4)
#
- # Add a hierachy of routes for a host in the sub-net.
+ # Add a hierarchy of routes for a host in the sub-net.
# Should still get an ARP resp since the cover is attached
#
p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg1.remote_mac) /
#
# 2 - don't respond to ARP request from an address not within the
# interface's sub-net
- # 2b - to a prxied address
- # 2c - not within a differents interface's sub-net
+ # 2b - to a proxied address
+ # 2c - not within a different interface's sub-net
p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) /
ARP(op="who-has",
hwsrc=self.pg0.remote_mac,
#
# cleanup
#
- dyn_arp.remove_vpp_config()
static_arp.remove_vpp_config()
self.pg2.unset_unnumbered(self.pg1.sw_if_index)
#
# Configure Proxy ARP for the subnet on PG0addresses on pg0
#
- self.vapi.proxy_arp_add_del(self.pg0._local_ip4n_subnet,
- self.pg0._local_ip4n_bcast)
+ self.vapi.proxy_arp_add_del(self.pg0._local_ip4_subnet,
+ self.pg0._local_ip4_bcast)
# Make pg2 un-numbered to pg0
#
# cleanup
#
self.pg2.set_proxy_arp(0)
- self.vapi.proxy_arp_add_del(self.pg0._local_ip4n_subnet,
- self.pg0._local_ip4n_bcast,
+ self.vapi.proxy_arp_add_del(self.pg0._local_ip4_subnet,
+ self.pg0._local_ip4_bcast,
is_add=0)
def test_proxy_arp(self):
self.pg1.generate_remote_hosts(2)
#
- # Proxy ARP rewquest packets for each interface
+ # Proxy ARP request packets for each interface
#
arp_req_pg0 = (Ether(src=self.pg0.remote_mac,
dst="ff:ff:ff:ff:ff:ff") /
self.pg2.generate_remote_hosts(2)
#
- # Add a reoute with out going label via an ARP unresolved next-hop
+ # Add a route with out going label via an ARP unresolved next-hop
#
ip_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
[VppRoutePath(self.pg2.remote_hosts[1].ip4,
dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst="10.0.0.1") /
UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
UDP(sport=1234, dport=1234) /
Raw())
- self.pg0.add_stream(p0)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
- rx1 = self.pg1.get_capture(1)
+ rx1 = self.send_and_expect(self.pg0, [p0], self.pg1)
self.verify_arp_req(rx1[0],
self.pg1.local_mac,
hwsrc="00:00:5e:00:01:09", pdst=self.pg1.local_ip4,
psrc=self.pg1.remote_ip4))
- self.pg1.add_stream(p1)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
+ self.send_and_assert_no_replies(self.pg1, p1, "ARP reply")
#
# IP packet destined for pg1 remote host arrives on pg0 again.
# VPP should have an ARP entry for that address now and the packet
# should be sent out pg1.
#
- self.pg0.add_stream(p0)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
- rx1 = self.pg1.get_capture(1)
+ rx1 = self.send_and_expect(self.pg0, [p0], self.pg1)
self.verify_ip(rx1[0],
self.pg1.local_mac,
#
# remove the duplicate on pg1
- # packet stream shoud generate ARPs out of pg1
+ # packet stream should generate ARPs out of pg1
#
arp_pg1.remove_vpp_config()
# clean-up
#
self.pg2.unconfig_ip4()
+ static_arp.remove_vpp_config()
self.pg2.set_table_ip4(0)
def test_arp_incomplete(self):
#
# change the interface's MAC
#
- mac = [chr(0x00), chr(0x00), chr(0x00),
- chr(0x33), chr(0x33), chr(0x33)]
+ mac = [scapy.compat.chb(0x00), scapy.compat.chb(0x00),
+ scapy.compat.chb(0x00), scapy.compat.chb(0x33),
+ scapy.compat.chb(0x33), scapy.compat.chb(0x33)]
mac_string = ''.join(mac)
self.vapi.sw_interface_set_mac_address(self.pg1.sw_if_index,
self.pg1.remote_hosts[1].ip4)
#
- # set the mac address on the inteface that does not have a
+ # set the mac address on the interface that does not have a
# configured subnet and thus no glean
#
self.vapi.sw_interface_set_mac_address(self.pg2.sw_if_index,
mac=self.pg1.remote_hosts[3].mac))
#
- # GARPs (requets nor replies) for host we don't know yet
+ # GARPs (request nor replies) for host we don't know yet
# don't result in new neighbour entries
#
p1 = (Ether(dst="ff:ff:ff:ff:ff:ff",
# how many we get is going to be dependent on the time for packet
# processing but it should be small
#
- self.assertTrue(len(rx) < 64)
+ self.assertLess(len(rx), 64)
#
# IPv6/ND
ip_10_1 = VppIpRoute(self, "10::1", 128,
[VppRoutePath(self.pg0.remote_hosts[1].ip6,
self.pg0.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
+ proto=DpoProto.DPO_PROTO_IP6)])
ip_10_1.add_vpp_config()
p1 = (Ether(dst=self.pg1.local_mac,
# how many we get is going to be dependent on the time for packet
# processing but it should be small
#
- self.assertTrue(len(rx) < 64)
+ self.assertLess(len(rx), 64)
+
+ def test_arp_forus(self):
+ """ ARP for for-us """
+
+ #
+ # Test that VPP responds with ARP requests to addresses that
+ # are connected and local routes.
+ # Use one of the 'remote' addresses in the subnet as a local address
+ # The intention of this route is that it then acts like a secondary
+ # address added to an interface
+ #
+ self.pg0.generate_remote_hosts(2)
+
+ forus = VppIpRoute(
+ self, self.pg0.remote_hosts[1].ip4, 32,
+ [VppRoutePath("0.0.0.0",
+ self.pg0.sw_if_index,
+ type=FibPathType.FIB_PATH_TYPE_LOCAL)])
+ forus.add_vpp_config()
+
+ p = (Ether(dst="ff:ff:ff:ff:ff:ff",
+ src=self.pg0.remote_mac) /
+ ARP(op="who-has",
+ hwdst=self.pg0.local_mac,
+ hwsrc=self.pg0.remote_mac,
+ pdst=self.pg0.remote_hosts[1].ip4,
+ psrc=self.pg0.remote_ip4))
+
+ rx = self.send_and_expect(self.pg0, [p], self.pg0)
+
+ self.verify_arp_resp(rx[0],
+ self.pg0.local_mac,
+ self.pg0.remote_mac,
+ self.pg0.remote_hosts[1].ip4,
+ self.pg0.remote_ip4)
+
+ def test_arp_table_swap(self):
+ #
+ # Generate some hosts on the LAN
+ #
+ N_NBRS = 4
+ self.pg1.generate_remote_hosts(N_NBRS)
+
+ for n in range(N_NBRS):
+ # a route thru each neighbour
+ VppIpRoute(self, "10.0.0.%d" % n, 32,
+ [VppRoutePath(self.pg1.remote_hosts[n].ip4,
+ self.pg1.sw_if_index)]).add_vpp_config()
+
+ # resolve each neighbour
+ p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ ARP(op="is-at", hwdst=self.pg1.local_mac,
+ hwsrc="00:00:5e:00:01:09", pdst=self.pg1.local_ip4,
+ psrc=self.pg1.remote_hosts[n].ip4))
+
+ self.send_and_assert_no_replies(self.pg1, p1, "ARP reply")
+
+ self.logger.info(self.vapi.cli("sh ip neighbors"))
+
+ #
+ # swap the table pg1 is in
+ #
+ table = VppIpTable(self, 100).add_vpp_config()
+
+ self.pg1.unconfig_ip4()
+ self.pg1.set_table_ip4(100)
+ self.pg1.config_ip4()
+
+ #
+ # all neighbours are cleared
+ #
+ for n in range(N_NBRS):
+ self.assertFalse(find_nbr(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[n].ip4))
+
+ #
+ # packets to all neighbours generate ARP requests
+ #
+ for n in range(N_NBRS):
+ # a route thru each neighbour
+ VppIpRoute(self, "10.0.0.%d" % n, 32,
+ [VppRoutePath(self.pg1.remote_hosts[n].ip4,
+ self.pg1.sw_if_index)],
+ table_id=100).add_vpp_config()
+
+ p = (Ether(src=self.pg1.remote_hosts[n].mac,
+ dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_hosts[n].ip4,
+ dst="10.0.0.%d" % n) /
+ Raw(b'0x5' * 100))
+ rxs = self.send_and_expect(self.pg1, [p], self.pg1)
+ for rx in rxs:
+ self.verify_arp_req(rx,
+ self.pg1.local_mac,
+ self.pg1.local_ip4,
+ self.pg1.remote_hosts[n].ip4)
+
+ self.pg1.unconfig_ip4()
+ self.pg1.set_table_ip4(0)
class NeighborStatsTestCase(VppTestCase):
- """ ARP Test Case """
+ """ ARP/ND Counters """
+
+ @classmethod
+ def setUpClass(cls):
+ super(NeighborStatsTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(NeighborStatsTestCase, cls).tearDownClass()
def setUp(self):
super(NeighborStatsTestCase, self).setUp()
UDP(sport=1234, dport=1234) /
Raw())
- rx = self.send_and_expect(self.pg0, p1 * 65, self.pg1)
- rx = self.send_and_expect(self.pg0, p2 * 65, self.pg1)
+ rx = self.send_and_expect(self.pg0, p1 * NUM_PKTS, self.pg1)
+ rx = self.send_and_expect(self.pg0, p2 * NUM_PKTS, self.pg1)
- self.assertEqual(65, arp1.get_stats()['packets'])
- self.assertEqual(65, arp2.get_stats()['packets'])
+ self.assertEqual(NUM_PKTS, arp1.get_stats()['packets'])
+ self.assertEqual(NUM_PKTS, arp2.get_stats()['packets'])
- rx = self.send_and_expect(self.pg0, p1 * 65, self.pg1)
- self.assertEqual(130, arp1.get_stats()['packets'])
+ rx = self.send_and_expect(self.pg0, p1 * NUM_PKTS, self.pg1)
+ self.assertEqual(NUM_PKTS*2, arp1.get_stats()['packets'])
def test_nd_stats(self):
""" ND Counters """
nd1 = VppNeighbor(self,
self.pg0.sw_if_index,
self.pg0.remote_hosts[1].mac,
- self.pg0.remote_hosts[1].ip6,
- af=AF_INET6)
+ self.pg0.remote_hosts[1].ip6)
nd1.add_vpp_config()
nd2 = VppNeighbor(self,
self.pg0.sw_if_index,
self.pg0.remote_hosts[2].mac,
- self.pg0.remote_hosts[2].ip6,
- af=AF_INET6)
+ self.pg0.remote_hosts[2].ip6)
nd2.add_vpp_config()
p1 = (Ether(dst=self.pg1.local_mac,
self.assertEqual(16, nd1.get_stats()['packets'])
self.assertEqual(16, nd2.get_stats()['packets'])
- rx = self.send_and_expect(self.pg1, p1 * 65, self.pg0)
- self.assertEqual(81, nd1.get_stats()['packets'])
+ rx = self.send_and_expect(self.pg1, p1 * NUM_PKTS, self.pg0)
+ self.assertEqual(NUM_PKTS+16, nd1.get_stats()['packets'])
+
+
+class NeighborAgeTestCase(VppTestCase):
+ """ ARP/ND Aging """
+
+ @classmethod
+ def setUpClass(cls):
+ super(NeighborAgeTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(NeighborAgeTestCase, cls).tearDownClass()
+
+ def setUp(self):
+ super(NeighborAgeTestCase, self).setUp()
+
+ self.create_pg_interfaces(range(1))
+
+ # pg0 configured with ip4 and 6 addresses used for input
+ # pg1 configured with ip4 and 6 addresses used for output
+ # pg2 is unnumbered to pg0
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.config_ip6()
+ i.resolve_arp()
+ i.resolve_ndp()
+
+ def tearDown(self):
+ super(NeighborAgeTestCase, self).tearDown()
+
+ for i in self.pg_interfaces:
+ i.unconfig_ip4()
+ i.unconfig_ip6()
+ i.admin_down()
+
+ def wait_for_no_nbr(self, intf, address,
+ n_tries=50, s_time=1):
+ while (n_tries):
+ if not find_nbr(self, intf, address):
+ return True
+ n_tries = n_tries - 1
+ self.sleep(s_time)
+
+ return False
+
+ def verify_arp_req(self, rx, smac, sip, dip):
+ ether = rx[Ether]
+ self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff")
+ self.assertEqual(ether.src, smac)
+
+ arp = rx[ARP]
+ self.assertEqual(arp.hwtype, 1)
+ self.assertEqual(arp.ptype, 0x800)
+ self.assertEqual(arp.hwlen, 6)
+ self.assertEqual(arp.plen, 4)
+ self.assertEqual(arp.op, arp_opts["who-has"])
+ self.assertEqual(arp.hwsrc, smac)
+ self.assertEqual(arp.hwdst, "00:00:00:00:00:00")
+ self.assertEqual(arp.psrc, sip)
+ self.assertEqual(arp.pdst, dip)
+
+ def test_age(self):
+ """ Aging/Recycle """
+
+ self.vapi.cli("set logging unthrottle 0")
+ self.vapi.cli("set logging size %d" % 0xffff)
+
+ self.pg0.generate_remote_hosts(201)
+
+ vaf = VppEnum.vl_api_address_family_t
+
+ #
+ # start listening on all interfaces
+ #
+ self.pg_enable_capture(self.pg_interfaces)
+
+ #
+ # Set the neighbor configuration:
+ # limi = 200
+ # age = 0 seconds
+ # recycle = false
+ #
+ self.vapi.ip_neighbor_config(af=vaf.ADDRESS_IP4,
+ max_number=200,
+ max_age=0,
+ recycle=False)
+
+ self.vapi.cli("sh ip neighbor-config")
+
+ # add the 198 neighbours that should pass (-1 for one created in setup)
+ for ii in range(200):
+ VppNeighbor(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[ii].mac,
+ self.pg0.remote_hosts[ii].ip4).add_vpp_config()
+
+ # one more neighbor over the limit should fail
+ with self.vapi.assert_negative_api_retval():
+ VppNeighbor(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[200].mac,
+ self.pg0.remote_hosts[200].ip4).add_vpp_config()
+
+ #
+ # change the config to allow recycling the old neighbors
+ #
+ self.vapi.ip_neighbor_config(af=vaf.ADDRESS_IP4,
+ max_number=200,
+ max_age=0,
+ recycle=True)
+
+ # now new additions are allowed
+ VppNeighbor(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[200].mac,
+ self.pg0.remote_hosts[200].ip4).add_vpp_config()
+
+ # add the first neighbor we configured has been re-used
+ self.assertFalse(find_nbr(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[0].ip4))
+ self.assertTrue(find_nbr(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[200].ip4))
+
+ #
+ # change the config to age old neighbors
+ #
+ self.vapi.ip_neighbor_config(af=vaf.ADDRESS_IP4,
+ max_number=200,
+ max_age=2,
+ recycle=True)
+
+ self.vapi.cli("sh ip4 neighbor-sorted")
+
+ #
+ # expect probes from all these ARP entries as they age
+ # 3 probes for each neighbor 3*200 = 600
+ rxs = self.pg0.get_capture(600, timeout=8)
+
+ for ii in range(3):
+ for jj in range(200):
+ rx = rxs[ii*200 + jj]
+ # rx.show()
+
+ #
+ # 3 probes sent then 1 more second to see if a reply comes, before
+ # they age out
+ #
+ for jj in range(1, 201):
+ self.wait_for_no_nbr(self.pg0.sw_if_index,
+ self.pg0.remote_hosts[jj].ip4)
+
+ self.assertFalse(self.vapi.ip_neighbor_dump(sw_if_index=0xffffffff,
+ af=vaf.ADDRESS_IP4))
+
+ #
+ # load up some neighbours again with 2s aging enabled
+ # they should be removed after 10s (2s age + 4s for probes + gap)
+ #
+ for ii in range(10):
+ VppNeighbor(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[ii].mac,
+ self.pg0.remote_hosts[ii].ip4).add_vpp_config()
+ self.sleep(10)
+ self.assertFalse(self.vapi.ip_neighbor_dump(sw_if_index=0xffffffff,
+ af=vaf.ADDRESS_IP4))
+
+ #
+ # check if we can set age and recycle with empty neighbor list
+ #
+ self.vapi.ip_neighbor_config(af=vaf.ADDRESS_IP4,
+ max_number=200,
+ max_age=1000,
+ recycle=True)
+
+ #
+ # load up some neighbours again, then disable the aging
+ # they should still be there in 10 seconds time
+ #
+ for ii in range(10):
+ VppNeighbor(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[ii].mac,
+ self.pg0.remote_hosts[ii].ip4).add_vpp_config()
+ self.vapi.ip_neighbor_config(af=vaf.ADDRESS_IP4,
+ max_number=200,
+ max_age=0,
+ recycle=False)
+
+ self.sleep(10)
+ self.assertTrue(find_nbr(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[0].ip4))
if __name__ == '__main__':