-#!/usr/bin/env python
+#!/usr/bin/env python3
import unittest
+import os
from socket import AF_INET, AF_INET6, inet_pton
from framework import VppTestCase, VppTestRunner
from vpp_neighbor import VppNeighbor, find_nbr
-from vpp_ip_route import VppIpRoute, VppRoutePath, find_route
+from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, \
+ VppIpTable, DpoProto, FibPathType
+from vpp_papi import VppEnum
+import scapy.compat
from scapy.packet import Raw
-from scapy.layers.l2 import Ether, ARP
+from scapy.layers.l2 import Ether, ARP, Dot1Q
from scapy.layers.inet import IP, UDP
+from scapy.layers.inet6 import IPv6
from scapy.contrib.mpls import MPLS
+from scapy.layers.inet6 import IPv6
+
+
+NUM_PKTS = 67
# not exported by scapy, so redefined here
arp_opts = {"who-has": 1, "is-at": 2}
class ARPTestCase(VppTestCase):
""" ARP Test Case """
+ @classmethod
+ def setUpClass(cls):
+ super(ARPTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(ARPTestCase, cls).tearDownClass()
+
def setUp(self):
super(ARPTestCase, self).setUp()
self.pg1.config_ip6()
# pg3 in a different VRF
+ self.tbl = VppIpTable(self, 1)
+ self.tbl.add_vpp_config()
+
self.pg3.set_table_ip4(1)
self.pg3.config_ip4()
def tearDown(self):
- super(ARPTestCase, self).tearDown()
self.pg0.unconfig_ip4()
self.pg0.unconfig_ip6()
self.pg1.unconfig_ip6()
self.pg3.unconfig_ip4()
+ self.pg3.set_table_ip4(0)
for i in self.pg_interfaces:
i.admin_down()
+ super(ARPTestCase, self).tearDown()
+
def verify_arp_req(self, rx, smac, sip, dip):
ether = rx[Ether]
self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff")
self.assertEqual(arp.psrc, sip)
self.assertEqual(arp.pdst, dip)
+ def verify_arp_vrrp_resp(self, rx, smac, dmac, sip, dip):
+ ether = rx[Ether]
+ self.assertEqual(ether.dst, dmac)
+ self.assertEqual(ether.src, smac)
+
+ arp = rx[ARP]
+ self.assertEqual(arp.hwtype, 1)
+ self.assertEqual(arp.ptype, 0x800)
+ self.assertEqual(arp.hwlen, 6)
+ self.assertEqual(arp.plen, 4)
+ self.assertEqual(arp.op, arp_opts["is-at"])
+ self.assertNotEqual(arp.hwsrc, smac)
+ self.assertTrue("00:00:5e:00:01" in arp.hwsrc or
+ "00:00:5E:00:01" in arp.hwsrc)
+ self.assertEqual(arp.hwdst, dmac)
+ self.assertEqual(arp.psrc, sip)
+ self.assertEqual(arp.pdst, dip)
+
def verify_ip(self, rx, smac, dmac, sip, dip):
ether = rx[Ether]
self.assertEqual(ether.dst, dmac)
self.assertEqual(ip.src, sip)
self.assertEqual(ip.dst, dip)
- def send_and_assert_no_replies(self, intf, pkts, remark):
- intf.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- for i in self.pg_interfaces:
- i.assert_nothing_captured(remark=remark)
-
def test_arp(self):
""" ARP """
#
# Generate some hosts on the LAN
#
- self.pg1.generate_remote_hosts(9)
+ self.pg1.generate_remote_hosts(11)
+
+ #
+ # watch for:
+ # - all neighbour events
+ # - all neighbor events on pg1
+ # - neighbor events for host[1] on pg1
+ #
+ self.vapi.want_ip_neighbor_events(enable=1,
+ pid=os.getpid())
+ self.vapi.want_ip_neighbor_events(enable=1,
+ pid=os.getpid(),
+ sw_if_index=self.pg1.sw_if_index)
+ self.vapi.want_ip_neighbor_events(enable=1,
+ pid=os.getpid(),
+ sw_if_index=self.pg1.sw_if_index,
+ ip=self.pg1.remote_hosts[1].ip4)
+
+ self.logger.info(self.vapi.cli("sh ip neighbor-watcher"))
#
# Send IP traffic to one of these unresolved hosts.
self.pg1.remote_hosts[1].mac,
self.pg1.remote_hosts[1].ip4)
dyn_arp.add_vpp_config()
+ self.assertTrue(dyn_arp.query_vpp_config())
+
+ # this matches all of the listnerers
+ es = [self.vapi.wait_for_event(1, "ip_neighbor_event")
+ for i in range(3)]
+ for e in es:
+ self.assertEqual(str(e.neighbor.ip_address),
+ self.pg1.remote_hosts[1].ip4)
#
# now we expect IP traffic forwarded
self.pg1.remote_hosts[2].ip4,
is_static=1)
static_arp.add_vpp_config()
+ es = [self.vapi.wait_for_event(1, "ip_neighbor_event")
+ for i in range(2)]
+ for e in es:
+ self.assertEqual(str(e.neighbor.ip_address),
+ self.pg1.remote_hosts[2].ip4)
static_p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
IP(src=self.pg0.remote_ip4,
self.pg0.remote_ip4,
self.pg1._remote_hosts[2].ip4)
+ #
+ # remove all the listeners
+ #
+ self.vapi.want_ip_neighbor_events(enable=0,
+ pid=os.getpid())
+ self.vapi.want_ip_neighbor_events(enable=0,
+ pid=os.getpid(),
+ sw_if_index=self.pg1.sw_if_index)
+ self.vapi.want_ip_neighbor_events(enable=0,
+ pid=os.getpid(),
+ sw_if_index=self.pg1.sw_if_index,
+ ip=self.pg1.remote_hosts[1].ip4)
+
#
# flap the link. dynamic ARPs get flush, statics don't
#
self.pg1.local_ip4,
self.pg1._remote_hosts[1].ip4)
+ self.assertFalse(dyn_arp.query_vpp_config())
+ self.assertTrue(static_arp.query_vpp_config())
#
# Send an ARP request from one of the so-far unlearned remote hosts
#
hwsrc=self.pg2.remote_mac,
pdst=self.pg1.local_ip4,
psrc=self.pg2.remote_hosts[3].ip4))
+ pt = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac) /
+ Dot1Q(vlan=0) /
+ ARP(op="who-has",
+ hwsrc=self.pg2.remote_mac,
+ pdst=self.pg1.local_ip4,
+ psrc=self.pg2.remote_hosts[3].ip4))
self.send_and_assert_no_replies(self.pg2, p,
"interface not IP enabled")
#
self.pg2.set_unnumbered(self.pg1.sw_if_index)
+ #
+ # test the unnumbered dump both by all interfaces and just the enabled
+ # one
+ #
+ unnum = self.vapi.ip_unnumbered_dump()
+ self.assertTrue(len(unnum))
+ self.assertEqual(unnum[0].ip_sw_if_index, self.pg1.sw_if_index)
+ self.assertEqual(unnum[0].sw_if_index, self.pg2.sw_if_index)
+ unnum = self.vapi.ip_unnumbered_dump(self.pg2.sw_if_index)
+ self.assertTrue(len(unnum))
+ self.assertEqual(unnum[0].ip_sw_if_index, self.pg1.sw_if_index)
+ self.assertEqual(unnum[0].sw_if_index, self.pg2.sw_if_index)
+
#
# We should respond to ARP requests for the unnumbered to address
# once an attached route to the source is known
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
+ rx = self.pg2.get_capture(1)
+ self.verify_arp_resp(rx[0],
+ self.pg2.local_mac,
+ self.pg2.remote_mac,
+ self.pg1.local_ip4,
+ self.pg2.remote_hosts[3].ip4)
+
+ self.pg2.add_stream(pt)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
rx = self.pg2.get_capture(1)
self.verify_arp_resp(rx[0],
self.pg2.local_mac,
self.pg1.local_ip4,
self.pg1.remote_hosts[8].ip4)
+ #
+ # Send an ARP request from one of the so-far unlearned remote hosts
+ # with a VLAN0 tag
+ #
+ p = (Ether(dst="ff:ff:ff:ff:ff:ff",
+ src=self.pg1._remote_hosts[9].mac) /
+ Dot1Q(vlan=0) /
+ ARP(op="who-has",
+ hwsrc=self.pg1._remote_hosts[9].mac,
+ pdst=self.pg1.local_ip4,
+ psrc=self.pg1._remote_hosts[9].ip4))
+
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg1.get_capture(1)
+ self.verify_arp_resp(rx[0],
+ self.pg1.local_mac,
+ self.pg1._remote_hosts[9].mac,
+ self.pg1.local_ip4,
+ self.pg1._remote_hosts[9].ip4)
+
+ #
+ # Add a hierarchy of routes for a host in the sub-net.
+ # Should still get an ARP resp since the cover is attached
+ #
+ p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg1.remote_mac) /
+ ARP(op="who-has",
+ hwsrc=self.pg1.remote_mac,
+ pdst=self.pg1.local_ip4,
+ psrc=self.pg1.remote_hosts[10].ip4))
+
+ r1 = VppIpRoute(self, self.pg1.remote_hosts[10].ip4, 30,
+ [VppRoutePath(self.pg1.remote_hosts[10].ip4,
+ self.pg1.sw_if_index)])
+ r1.add_vpp_config()
+
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ rx = self.pg1.get_capture(1)
+ self.verify_arp_resp(rx[0],
+ self.pg1.local_mac,
+ self.pg1.remote_mac,
+ self.pg1.local_ip4,
+ self.pg1.remote_hosts[10].ip4)
+
+ r2 = VppIpRoute(self, self.pg1.remote_hosts[10].ip4, 32,
+ [VppRoutePath(self.pg1.remote_hosts[10].ip4,
+ self.pg1.sw_if_index)])
+ r2.add_vpp_config()
+
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ rx = self.pg1.get_capture(1)
+ self.verify_arp_resp(rx[0],
+ self.pg1.local_mac,
+ self.pg1.remote_mac,
+ self.pg1.local_ip4,
+ self.pg1.remote_hosts[10].ip4)
+
+ #
+ # add an ARP entry that's not on the sub-net and so whose
+ # adj-fib fails the refinement check. then send an ARP request
+ # from that source
+ #
+ a1 = VppNeighbor(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_mac,
+ "100.100.100.50")
+ a1.add_vpp_config()
+
+ p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) /
+ ARP(op="who-has",
+ hwsrc=self.pg0.remote_mac,
+ psrc="100.100.100.50",
+ pdst=self.pg0.remote_ip4))
+ self.send_and_assert_no_replies(self.pg0, p,
+ "ARP req for from failed adj-fib")
+
#
# ERROR Cases
# 1 - don't respond to ARP request for address not within the
# interface's sub-net
- # 1a - nor within the unnumbered subnet
+ # 1b - nor within the unnumbered subnet
+ # 1c - nor within the subnet of a different interface
+ #
p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) /
ARP(op="who-has",
hwsrc=self.pg0.remote_mac,
psrc=self.pg0.remote_ip4))
self.send_and_assert_no_replies(self.pg0, p,
"ARP req for non-local destination")
+ self.assertFalse(find_nbr(self,
+ self.pg0.sw_if_index,
+ "10.10.10.3"))
+
p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac) /
ARP(op="who-has",
hwsrc=self.pg2.remote_mac,
self.pg0, p,
"ARP req for non-local destination - unnum")
+ p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) /
+ ARP(op="who-has",
+ hwsrc=self.pg0.remote_mac,
+ pdst=self.pg1.local_ip4,
+ psrc=self.pg1.remote_ip4))
+ self.send_and_assert_no_replies(self.pg0, p,
+ "ARP req diff sub-net")
+ self.assertFalse(find_nbr(self,
+ self.pg0.sw_if_index,
+ self.pg1.remote_ip4))
+
#
# 2 - don't respond to ARP request from an address not within the
# interface's sub-net
- #
+ # 2b - to a proxied address
+ # 2c - not within a different interface's sub-net
p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) /
ARP(op="who-has",
hwsrc=self.pg0.remote_mac,
self.send_and_assert_no_replies(
self.pg0, p,
"ARP req for non-local source - unnum")
+ p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) /
+ ARP(op="who-has",
+ hwsrc=self.pg0.remote_mac,
+ psrc=self.pg1.remote_ip4,
+ pdst=self.pg0.local_ip4))
+ self.send_and_assert_no_replies(self.pg0, p,
+ "ARP req for non-local source 2c")
#
# 3 - don't respond to ARP request from an address that belongs to
#
# 4 - don't respond to ARP requests that has mac source different
# from ARP request HW source
- # the router
#
p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) /
ARP(op="who-has",
self.send_and_assert_no_replies(self.pg0, p,
"ARP req for non-local source")
+ #
+ # 5 - don't respond to ARP requests for address within the
+ # interface's sub-net but not the interface's address
+ #
+ self.pg0.generate_remote_hosts(2)
+ p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) /
+ ARP(op="who-has",
+ hwsrc=self.pg0.remote_mac,
+ psrc=self.pg0.remote_hosts[0].ip4,
+ pdst=self.pg0.remote_hosts[1].ip4))
+ self.send_and_assert_no_replies(self.pg0, p,
+ "ARP req for non-local destination")
+
#
# cleanup
#
- dyn_arp.remove_vpp_config()
static_arp.remove_vpp_config()
self.pg2.unset_unnumbered(self.pg1.sw_if_index)
# need this to flush the adj-fibs
self.pg2.unset_unnumbered(self.pg1.sw_if_index)
self.pg2.admin_down()
+ self.pg1.admin_down()
+
+ def test_proxy_mirror_arp(self):
+ """ Interface Mirror Proxy ARP """
+
+ #
+ # When VPP has an interface whose address is also applied to a TAP
+ # interface on the host, then VPP's TAP interface will be unnumbered
+ # to the 'real' interface and do proxy ARP from the host.
+ # the curious aspect of this setup is that ARP requests from the host
+ # will come from the VPP's own address.
+ #
+ self.pg0.generate_remote_hosts(2)
+
+ arp_req_from_me = (Ether(src=self.pg2.remote_mac,
+ dst="ff:ff:ff:ff:ff:ff") /
+ ARP(op="who-has",
+ hwsrc=self.pg2.remote_mac,
+ pdst=self.pg0.remote_hosts[1].ip4,
+ psrc=self.pg0.local_ip4))
+
+ #
+ # Configure Proxy ARP for the subnet on PG0addresses on pg0
+ #
+ self.vapi.proxy_arp_add_del(self.pg0._local_ip4_subnet,
+ self.pg0._local_ip4_bcast)
+
+ # Make pg2 un-numbered to pg0
+ #
+ self.pg2.set_unnumbered(self.pg0.sw_if_index)
+
+ #
+ # Enable pg2 for proxy ARP
+ #
+ self.pg2.set_proxy_arp()
+
+ #
+ # Send the ARP request with an originating address that
+ # is VPP's own address
+ #
+ self.pg2.add_stream(arp_req_from_me)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg2.get_capture(1)
+ self.verify_arp_resp(rx[0],
+ self.pg2.local_mac,
+ self.pg2.remote_mac,
+ self.pg0.remote_hosts[1].ip4,
+ self.pg0.local_ip4)
+
+ #
+ # validate we have not learned an ARP entry as a result of this
+ #
+ self.assertFalse(find_nbr(self,
+ self.pg2.sw_if_index,
+ self.pg0.local_ip4))
+
+ #
+ # cleanup
+ #
+ self.pg2.set_proxy_arp(0)
+ self.vapi.proxy_arp_add_del(self.pg0._local_ip4_subnet,
+ self.pg0._local_ip4_bcast,
+ is_add=0)
def test_proxy_arp(self):
""" Proxy ARP """
+ self.pg1.generate_remote_hosts(2)
+
#
- # Proxy ARP rewquest packets for each interface
+ # Proxy ARP request packets for each interface
#
- arp_req_pg2 = (Ether(src=self.pg2.remote_mac,
- dst="ff:ff:ff:ff:ff:ff") /
- ARP(op="who-has",
- hwsrc=self.pg2.remote_mac,
- pdst="10.10.10.3",
- psrc=self.pg1.remote_ip4))
arp_req_pg0 = (Ether(src=self.pg0.remote_mac,
dst="ff:ff:ff:ff:ff:ff") /
ARP(op="who-has",
hwsrc=self.pg0.remote_mac,
pdst="10.10.10.3",
psrc=self.pg0.remote_ip4))
+ arp_req_pg0_tagged = (Ether(src=self.pg0.remote_mac,
+ dst="ff:ff:ff:ff:ff:ff") /
+ Dot1Q(vlan=0) /
+ ARP(op="who-has",
+ hwsrc=self.pg0.remote_mac,
+ pdst="10.10.10.3",
+ psrc=self.pg0.remote_ip4))
arp_req_pg1 = (Ether(src=self.pg1.remote_mac,
dst="ff:ff:ff:ff:ff:ff") /
ARP(op="who-has",
hwsrc=self.pg1.remote_mac,
pdst="10.10.10.3",
psrc=self.pg1.remote_ip4))
+ arp_req_pg2 = (Ether(src=self.pg2.remote_mac,
+ dst="ff:ff:ff:ff:ff:ff") /
+ ARP(op="who-has",
+ hwsrc=self.pg2.remote_mac,
+ pdst="10.10.10.3",
+ psrc=self.pg1.remote_hosts[1].ip4))
arp_req_pg3 = (Ether(src=self.pg3.remote_mac,
dst="ff:ff:ff:ff:ff:ff") /
ARP(op="who-has",
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
+ rx = self.pg0.get_capture(1)
+ self.verify_arp_resp(rx[0],
+ self.pg0.local_mac,
+ self.pg0.remote_mac,
+ "10.10.10.3",
+ self.pg0.remote_ip4)
+
+ self.pg0.add_stream(arp_req_pg0_tagged)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
rx = self.pg0.get_capture(1)
self.verify_arp_resp(rx[0],
self.pg0.local_mac,
self.pg2.local_mac,
self.pg2.remote_mac,
"10.10.10.3",
- self.pg1.remote_ip4)
+ self.pg1.remote_hosts[1].ip4)
#
# A request for an address out of the configured range
self.pg2.generate_remote_hosts(2)
#
- # Add a reoute with out going label via an ARP unresolved next-hop
+ # Add a route with out going label via an ARP unresolved next-hop
#
ip_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
[VppRoutePath(self.pg2.remote_hosts[1].ip4,
dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst="10.0.0.1") /
UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
"10.0.0.1")
self.pg2.unconfig_ip4()
+ def test_arp_vrrp(self):
+ """ ARP reply with VRRP virtual src hw addr """
+
+ #
+ # IP packet destined for pg1 remote host arrives on pg0 resulting
+ # in an ARP request for the address of the remote host on pg1
+ #
+ p0 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+
+ rx1 = self.send_and_expect(self.pg0, [p0], self.pg1)
+
+ self.verify_arp_req(rx1[0],
+ self.pg1.local_mac,
+ self.pg1.local_ip4,
+ self.pg1.remote_ip4)
+
+ #
+ # ARP reply for address of pg1 remote host arrives on pg1 with
+ # the hw src addr set to a value in the VRRP IPv4 range of
+ # MAC addresses
+ #
+ p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ ARP(op="is-at", hwdst=self.pg1.local_mac,
+ hwsrc="00:00:5e:00:01:09", pdst=self.pg1.local_ip4,
+ psrc=self.pg1.remote_ip4))
+
+ self.send_and_assert_no_replies(self.pg1, p1, "ARP reply")
+
+ #
+ # IP packet destined for pg1 remote host arrives on pg0 again.
+ # VPP should have an ARP entry for that address now and the packet
+ # should be sent out pg1.
+ #
+ rx1 = self.send_and_expect(self.pg0, [p0], self.pg1)
+
+ self.verify_ip(rx1[0],
+ self.pg1.local_mac,
+ "00:00:5e:00:01:09",
+ self.pg0.remote_ip4,
+ self.pg1.remote_ip4)
+
+ self.pg1.admin_down()
+ self.pg1.admin_up()
+
+ def test_arp_duplicates(self):
+ """ ARP Duplicates"""
+
+ #
+ # Generate some hosts on the LAN
+ #
+ self.pg1.generate_remote_hosts(3)
+
+ #
+ # Add host 1 on pg1 and pg2
+ #
+ arp_pg1 = VppNeighbor(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[1].mac,
+ self.pg1.remote_hosts[1].ip4)
+ arp_pg1.add_vpp_config()
+ arp_pg2 = VppNeighbor(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_mac,
+ self.pg1.remote_hosts[1].ip4)
+ arp_pg2.add_vpp_config()
+
+ #
+ # IP packet destined for pg1 remote host arrives on pg1 again.
+ #
+ p = (Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4,
+ dst=self.pg1.remote_hosts[1].ip4) /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx1 = self.pg1.get_capture(1)
+
+ self.verify_ip(rx1[0],
+ self.pg1.local_mac,
+ self.pg1.remote_hosts[1].mac,
+ self.pg0.remote_ip4,
+ self.pg1.remote_hosts[1].ip4)
+
+ #
+ # remove the duplicate on pg1
+ # packet stream should generate ARPs out of pg1
+ #
+ arp_pg1.remove_vpp_config()
+
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx1 = self.pg1.get_capture(1)
+
+ self.verify_arp_req(rx1[0],
+ self.pg1.local_mac,
+ self.pg1.local_ip4,
+ self.pg1.remote_hosts[1].ip4)
+
+ #
+ # Add it back
+ #
+ arp_pg1.add_vpp_config()
+
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx1 = self.pg1.get_capture(1)
+
+ self.verify_ip(rx1[0],
+ self.pg1.local_mac,
+ self.pg1.remote_hosts[1].mac,
+ self.pg0.remote_ip4,
+ self.pg1.remote_hosts[1].ip4)
+
+ def test_arp_static(self):
+ """ ARP Static"""
+ self.pg2.generate_remote_hosts(3)
+
+ #
+ # Add a static ARP entry
+ #
+ static_arp = VppNeighbor(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[1].mac,
+ self.pg2.remote_hosts[1].ip4,
+ is_static=1)
+ static_arp.add_vpp_config()
+
+ #
+ # Add the connected prefix to the interface
+ #
+ self.pg2.config_ip4()
+
+ #
+ # We should now find the adj-fib
+ #
+ self.assertTrue(find_nbr(self,
+ self.pg2.sw_if_index,
+ self.pg2.remote_hosts[1].ip4,
+ is_static=1))
+ self.assertTrue(find_route(self,
+ self.pg2.remote_hosts[1].ip4,
+ 32))
+
+ #
+ # remove the connected
+ #
+ self.pg2.unconfig_ip4()
+
+ #
+ # put the interface into table 1
+ #
+ self.pg2.set_table_ip4(1)
+
+ #
+ # configure the same connected and expect to find the
+ # adj fib in the new table
+ #
+ self.pg2.config_ip4()
+ self.assertTrue(find_route(self,
+ self.pg2.remote_hosts[1].ip4,
+ 32,
+ table_id=1))
+
+ #
+ # clean-up
+ #
+ self.pg2.unconfig_ip4()
+ static_arp.remove_vpp_config()
+ self.pg2.set_table_ip4(0)
+
+ def test_arp_incomplete(self):
+ """ ARP Incomplete"""
+ self.pg1.generate_remote_hosts(3)
+
+ p0 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4,
+ dst=self.pg1.remote_hosts[1].ip4) /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+ p1 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4,
+ dst=self.pg1.remote_hosts[2].ip4) /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+
+ #
+ # a packet to an unresolved destination generates an ARP request
+ #
+ rx = self.send_and_expect(self.pg0, [p0], self.pg1)
+ self.verify_arp_req(rx[0],
+ self.pg1.local_mac,
+ self.pg1.local_ip4,
+ self.pg1._remote_hosts[1].ip4)
+
+ #
+ # add a neighbour for remote host 1
+ #
+ static_arp = VppNeighbor(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[1].mac,
+ self.pg1.remote_hosts[1].ip4,
+ is_static=1)
+ static_arp.add_vpp_config()
+
+ #
+ # change the interface's MAC
+ #
+ mac = [scapy.compat.chb(0x00), scapy.compat.chb(0x00),
+ scapy.compat.chb(0x00), scapy.compat.chb(0x33),
+ scapy.compat.chb(0x33), scapy.compat.chb(0x33)]
+ mac_string = ''.join(mac)
+
+ self.vapi.sw_interface_set_mac_address(self.pg1.sw_if_index,
+ mac_string)
+
+ #
+ # now ARP requests come from the new source mac
+ #
+ rx = self.send_and_expect(self.pg0, [p1], self.pg1)
+ self.verify_arp_req(rx[0],
+ "00:00:00:33:33:33",
+ self.pg1.local_ip4,
+ self.pg1._remote_hosts[2].ip4)
+
+ #
+ # packets to the resolved host also have the new source mac
+ #
+ rx = self.send_and_expect(self.pg0, [p0], self.pg1)
+ self.verify_ip(rx[0],
+ "00:00:00:33:33:33",
+ self.pg1.remote_hosts[1].mac,
+ self.pg0.remote_ip4,
+ self.pg1.remote_hosts[1].ip4)
+
+ #
+ # set the mac address on the interface that does not have a
+ # configured subnet and thus no glean
+ #
+ self.vapi.sw_interface_set_mac_address(self.pg2.sw_if_index,
+ mac_string)
+
+ def test_garp(self):
+ """ GARP """
+
+ #
+ # Generate some hosts on the LAN
+ #
+ self.pg1.generate_remote_hosts(4)
+
+ #
+ # And an ARP entry
+ #
+ arp = VppNeighbor(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[1].mac,
+ self.pg1.remote_hosts[1].ip4)
+ arp.add_vpp_config()
+
+ self.assertTrue(find_nbr(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[1].ip4,
+ mac=self.pg1.remote_hosts[1].mac))
+
+ #
+ # Send a GARP (request) to swap the host 1's address to that of host 2
+ #
+ p1 = (Ether(dst="ff:ff:ff:ff:ff:ff",
+ src=self.pg1.remote_hosts[2].mac) /
+ ARP(op="who-has",
+ hwdst=self.pg1.local_mac,
+ hwsrc=self.pg1.remote_hosts[2].mac,
+ pdst=self.pg1.remote_hosts[1].ip4,
+ psrc=self.pg1.remote_hosts[1].ip4))
+
+ self.pg1.add_stream(p1)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ self.assertTrue(find_nbr(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[1].ip4,
+ mac=self.pg1.remote_hosts[2].mac))
+
+ #
+ # Send a GARP (reply) to swap the host 1's address to that of host 3
+ #
+ p1 = (Ether(dst="ff:ff:ff:ff:ff:ff",
+ src=self.pg1.remote_hosts[3].mac) /
+ ARP(op="is-at",
+ hwdst=self.pg1.local_mac,
+ hwsrc=self.pg1.remote_hosts[3].mac,
+ pdst=self.pg1.remote_hosts[1].ip4,
+ psrc=self.pg1.remote_hosts[1].ip4))
+
+ self.pg1.add_stream(p1)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ self.assertTrue(find_nbr(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[1].ip4,
+ mac=self.pg1.remote_hosts[3].mac))
+
+ #
+ # GARPs (request nor replies) for host we don't know yet
+ # don't result in new neighbour entries
+ #
+ p1 = (Ether(dst="ff:ff:ff:ff:ff:ff",
+ src=self.pg1.remote_hosts[3].mac) /
+ ARP(op="who-has",
+ hwdst=self.pg1.local_mac,
+ hwsrc=self.pg1.remote_hosts[3].mac,
+ pdst=self.pg1.remote_hosts[2].ip4,
+ psrc=self.pg1.remote_hosts[2].ip4))
+
+ self.pg1.add_stream(p1)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ self.assertFalse(find_nbr(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[2].ip4))
+
+ p1 = (Ether(dst="ff:ff:ff:ff:ff:ff",
+ src=self.pg1.remote_hosts[3].mac) /
+ ARP(op="is-at",
+ hwdst=self.pg1.local_mac,
+ hwsrc=self.pg1.remote_hosts[3].mac,
+ pdst=self.pg1.remote_hosts[2].ip4,
+ psrc=self.pg1.remote_hosts[2].ip4))
+
+ self.pg1.add_stream(p1)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ self.assertFalse(find_nbr(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[2].ip4))
+
+ def test_arp_incomplete(self):
+ """ Incomplete Entries """
+
+ #
+ # ensure that we throttle the ARP and ND requests
+ #
+ self.pg0.generate_remote_hosts(2)
+
+ #
+ # IPv4/ARP
+ #
+ ip_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
+ [VppRoutePath(self.pg0.remote_hosts[1].ip4,
+ self.pg0.sw_if_index)])
+ ip_10_0_0_1.add_vpp_config()
+
+ p1 = (Ether(dst=self.pg1.local_mac,
+ src=self.pg1.remote_mac) /
+ IP(src=self.pg1.remote_ip4,
+ dst="10.0.0.1") /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+
+ self.pg1.add_stream(p1 * 257)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ rx = self.pg0._get_capture(1)
+
+ #
+ # how many we get is going to be dependent on the time for packet
+ # processing but it should be small
+ #
+ self.assertLess(len(rx), 64)
+
+ #
+ # IPv6/ND
+ #
+ ip_10_1 = VppIpRoute(self, "10::1", 128,
+ [VppRoutePath(self.pg0.remote_hosts[1].ip6,
+ self.pg0.sw_if_index,
+ proto=DpoProto.DPO_PROTO_IP6)])
+ ip_10_1.add_vpp_config()
+
+ p1 = (Ether(dst=self.pg1.local_mac,
+ src=self.pg1.remote_mac) /
+ IPv6(src=self.pg1.remote_ip6,
+ dst="10::1") /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+
+ self.pg1.add_stream(p1 * 257)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ rx = self.pg0._get_capture(1)
+
+ #
+ # how many we get is going to be dependent on the time for packet
+ # processing but it should be small
+ #
+ self.assertLess(len(rx), 64)
+
+ def test_arp_forus(self):
+ """ ARP for for-us """
+
+ #
+ # Test that VPP responds with ARP requests to addresses that
+ # are connected and local routes.
+ # Use one of the 'remote' addresses in the subnet as a local address
+ # The intention of this route is that it then acts like a secondary
+ # address added to an interface
+ #
+ self.pg0.generate_remote_hosts(2)
+
+ forus = VppIpRoute(
+ self, self.pg0.remote_hosts[1].ip4, 32,
+ [VppRoutePath("0.0.0.0",
+ self.pg0.sw_if_index,
+ type=FibPathType.FIB_PATH_TYPE_LOCAL)])
+ forus.add_vpp_config()
+
+ p = (Ether(dst="ff:ff:ff:ff:ff:ff",
+ src=self.pg0.remote_mac) /
+ ARP(op="who-has",
+ hwdst=self.pg0.local_mac,
+ hwsrc=self.pg0.remote_mac,
+ pdst=self.pg0.remote_hosts[1].ip4,
+ psrc=self.pg0.remote_ip4))
+
+ rx = self.send_and_expect(self.pg0, [p], self.pg0)
+
+ self.verify_arp_resp(rx[0],
+ self.pg0.local_mac,
+ self.pg0.remote_mac,
+ self.pg0.remote_hosts[1].ip4,
+ self.pg0.remote_ip4)
+
+
+class NeighborStatsTestCase(VppTestCase):
+ """ ARP/ND Counters """
+
+ @classmethod
+ def setUpClass(cls):
+ super(NeighborStatsTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(NeighborStatsTestCase, cls).tearDownClass()
+
+ def setUp(self):
+ super(NeighborStatsTestCase, self).setUp()
+
+ self.create_pg_interfaces(range(2))
+
+ # pg0 configured with ip4 and 6 addresses used for input
+ # pg1 configured with ip4 and 6 addresses used for output
+ # pg2 is unnumbered to pg0
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.config_ip6()
+ i.resolve_arp()
+ i.resolve_ndp()
+
+ def tearDown(self):
+ super(NeighborStatsTestCase, self).tearDown()
+
+ for i in self.pg_interfaces:
+ i.unconfig_ip4()
+ i.unconfig_ip6()
+ i.admin_down()
+
+ def test_arp_stats(self):
+ """ ARP Counters """
+
+ self.vapi.cli("adj counters enable")
+ self.pg1.generate_remote_hosts(2)
+
+ arp1 = VppNeighbor(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[0].mac,
+ self.pg1.remote_hosts[0].ip4)
+ arp1.add_vpp_config()
+ arp2 = VppNeighbor(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[1].mac,
+ self.pg1.remote_hosts[1].ip4)
+ arp2.add_vpp_config()
+
+ p1 = (Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4,
+ dst=self.pg1.remote_hosts[0].ip4) /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+ p2 = (Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4,
+ dst=self.pg1.remote_hosts[1].ip4) /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+
+ rx = self.send_and_expect(self.pg0, p1 * NUM_PKTS, self.pg1)
+ rx = self.send_and_expect(self.pg0, p2 * NUM_PKTS, self.pg1)
+
+ self.assertEqual(NUM_PKTS, arp1.get_stats()['packets'])
+ self.assertEqual(NUM_PKTS, arp2.get_stats()['packets'])
+
+ rx = self.send_and_expect(self.pg0, p1 * NUM_PKTS, self.pg1)
+ self.assertEqual(NUM_PKTS*2, arp1.get_stats()['packets'])
+
+ def test_nd_stats(self):
+ """ ND Counters """
+
+ self.vapi.cli("adj counters enable")
+ self.pg0.generate_remote_hosts(3)
+
+ nd1 = VppNeighbor(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[1].mac,
+ self.pg0.remote_hosts[1].ip6)
+ nd1.add_vpp_config()
+ nd2 = VppNeighbor(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[2].mac,
+ self.pg0.remote_hosts[2].ip6)
+ nd2.add_vpp_config()
+
+ p1 = (Ether(dst=self.pg1.local_mac,
+ src=self.pg1.remote_mac) /
+ IPv6(src=self.pg1.remote_ip6,
+ dst=self.pg0.remote_hosts[1].ip6) /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+ p2 = (Ether(dst=self.pg1.local_mac,
+ src=self.pg1.remote_mac) /
+ IPv6(src=self.pg1.remote_ip6,
+ dst=self.pg0.remote_hosts[2].ip6) /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+
+ rx = self.send_and_expect(self.pg1, p1 * 16, self.pg0)
+ rx = self.send_and_expect(self.pg1, p2 * 16, self.pg0)
+
+ self.assertEqual(16, nd1.get_stats()['packets'])
+ self.assertEqual(16, nd2.get_stats()['packets'])
+
+ rx = self.send_and_expect(self.pg1, p1 * NUM_PKTS, self.pg0)
+ self.assertEqual(NUM_PKTS+16, nd1.get_stats()['packets'])
+
+
+class NeighborAgeTestCase(VppTestCase):
+ """ ARP/ND Aging """
+
+ @classmethod
+ def setUpClass(cls):
+ super(NeighborAgeTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(NeighborAgeTestCase, cls).tearDownClass()
+
+ def setUp(self):
+ super(NeighborAgeTestCase, self).setUp()
+
+ self.create_pg_interfaces(range(1))
+
+ # pg0 configured with ip4 and 6 addresses used for input
+ # pg1 configured with ip4 and 6 addresses used for output
+ # pg2 is unnumbered to pg0
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.config_ip6()
+ i.resolve_arp()
+ i.resolve_ndp()
+
+ def tearDown(self):
+ super(NeighborAgeTestCase, self).tearDown()
+
+ for i in self.pg_interfaces:
+ i.unconfig_ip4()
+ i.unconfig_ip6()
+ i.admin_down()
+
+ def wait_for_no_nbr(self, intf, address,
+ n_tries=50, s_time=1):
+ while (n_tries):
+ if not find_nbr(self, intf, address):
+ return True
+ n_tries = n_tries - 1
+ self.sleep(s_time)
+
+ return False
+
+ def verify_arp_req(self, rx, smac, sip, dip):
+ ether = rx[Ether]
+ self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff")
+ self.assertEqual(ether.src, smac)
+
+ arp = rx[ARP]
+ self.assertEqual(arp.hwtype, 1)
+ self.assertEqual(arp.ptype, 0x800)
+ self.assertEqual(arp.hwlen, 6)
+ self.assertEqual(arp.plen, 4)
+ self.assertEqual(arp.op, arp_opts["who-has"])
+ self.assertEqual(arp.hwsrc, smac)
+ self.assertEqual(arp.hwdst, "00:00:00:00:00:00")
+ self.assertEqual(arp.psrc, sip)
+ self.assertEqual(arp.pdst, dip)
+
+ def test_age(self):
+ """ Aging/Recycle """
+
+ self.vapi.cli("set logging unthrottle 0")
+ self.vapi.cli("set logging size %d" % 0xffff)
+
+ self.pg0.generate_remote_hosts(201)
+
+ vaf = VppEnum.vl_api_address_family_t
+
+ #
+ # start listening on all interfaces
+ #
+ self.pg_enable_capture(self.pg_interfaces)
+
+ #
+ # Set the neighbor configuration:
+ # limi = 200
+ # age = 2 seconds
+ # recycle = false
+ #
+ self.vapi.ip_neighbor_config(af=vaf.ADDRESS_IP4,
+ max_number=200,
+ max_age=0,
+ recycle=False)
+
+ self.vapi.cli("sh ip neighbor-config")
+
+ # add the 198 neighbours that should pass (-1 for one created in setup)
+ for ii in range(200):
+ VppNeighbor(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[ii].mac,
+ self.pg0.remote_hosts[ii].ip4).add_vpp_config()
+
+ # one more neighbor over the limit should fail
+ with self.vapi.assert_negative_api_retval():
+ VppNeighbor(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[200].mac,
+ self.pg0.remote_hosts[200].ip4).add_vpp_config()
+
+ #
+ # change the config to allow recycling the old neighbors
+ #
+ self.vapi.ip_neighbor_config(af=vaf.ADDRESS_IP4,
+ max_number=200,
+ max_age=0,
+ recycle=True)
+
+ # now new additions are allowed
+ VppNeighbor(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[200].mac,
+ self.pg0.remote_hosts[200].ip4).add_vpp_config()
+
+ # add the first neighbor we configured has been re-used
+ self.assertFalse(find_nbr(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[0].ip4))
+ self.assertTrue(find_nbr(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[200].ip4))
+
+ #
+ # change the config to age old neighbors
+ #
+ self.vapi.ip_neighbor_config(af=vaf.ADDRESS_IP4,
+ max_number=200,
+ max_age=2,
+ recycle=True)
+
+ self.vapi.cli("sh ip4 neighbor-sorted")
+
+ #
+ # expect probes from all these ARP entries as they age
+ # 3 probes for each neighbor 3*200 = 600
+ rxs = self.pg0.get_capture(600, timeout=8)
+
+ for ii in range(3):
+ for jj in range(200):
+ rx = rxs[ii*200 + jj]
+ # rx.show()
+
+ #
+ # 3 probes sent then 1 more second to see if a reply comes, before
+ # they age out
+ #
+ for jj in range(1, 201):
+ self.wait_for_no_nbr(self.pg0.sw_if_index,
+ self.pg0.remote_hosts[jj].ip4)
+
+ self.assertFalse(self.vapi.ip_neighbor_dump(sw_if_index=0xffffffff,
+ af=vaf.ADDRESS_IP4))
+
+ #
+ # load up some neighbours again, then disable the aging
+ # they should still be there in 10 seconds time
+ #
+ for ii in range(10):
+ VppNeighbor(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[ii].mac,
+ self.pg0.remote_hosts[ii].ip4).add_vpp_config()
+ self.vapi.ip_neighbor_config(af=vaf.ADDRESS_IP4,
+ max_number=200,
+ max_age=0,
+ recycle=False)
+
+ self.sleep(10)
+ self.assertTrue(find_nbr(self,
+ self.pg0.sw_if_index,
+ self.pg0.remote_hosts[0].ip4))
+
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)