import scapy.compat
from scapy.packet import Raw
-from scapy.layers.l2 import Ether
+from scapy.layers.l2 import Ether, ARP
from scapy.layers.inet import IP, UDP, ICMP
from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
from scapy.contrib.mpls import MPLS
NUM_PKTS = 67
+# scapy removed these attributes.
+# we asked that they be restored: https://github.com/secdev/scapy/pull/1878
+# semantic names have more meaning than numbers. so here they are.
+ARP.who_has = 1
+ARP.is_at = 2
+
def verify_filter(capture, sent):
if not len(capture) == len(sent):
for i in self.pg_interfaces:
i.unconfig_ip4()
i.unconfig_ip6()
- i.ip6_disable()
i.set_table_ip4(0)
i.set_table_ip6(0)
i.disable_mpls()
#
# put the connected routes back
#
+ self.logger.info(self.vapi.cli("sh log"))
self.pg2.admin_up()
self.pg2.config_ip6()
self.pg2.resolve_ndp()
tbl.add_vpp_config()
self.tables.append(tbl)
- # use pg0 as the core facing interface
+ # use pg0 as the core facing interface, don't resolve ARP
self.pg0.admin_up()
self.pg0.config_ip4()
- self.pg0.resolve_arp()
self.pg0.enable_mpls()
# use the other 2 for customer facing L2 links
self.assertEqual(rx_eth.src, tx_eth.src)
self.assertEqual(rx_eth.dst, tx_eth.dst)
+ def verify_arp_req(self, rx, smac, sip, dip):
+ ether = rx[Ether]
+ self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff")
+ self.assertEqual(ether.src, smac)
+
+ arp = rx[ARP]
+ self.assertEqual(arp.hwtype, 1)
+ self.assertEqual(arp.ptype, 0x800)
+ self.assertEqual(arp.hwlen, 6)
+ self.assertEqual(arp.plen, 4)
+ self.assertEqual(arp.op, ARP.who_has)
+ self.assertEqual(arp.hwsrc, smac)
+ self.assertEqual(arp.hwdst, "00:00:00:00:00:00")
+ self.assertEqual(arp.psrc, sip)
+ self.assertEqual(arp.pdst, dip)
+
def test_vpws(self):
""" Virtual Private Wire Service """
#
# Inject a packet from the customer/L2 side
+ # there's no resolved ARP entry so the first packet we see should be
+ # an ARP request
+ #
+ tx1 = pcore[MPLS].payload
+ rx1 = self.send_and_expect(self.pg1, [tx1], self.pg0)
+
+ self.verify_arp_req(rx1[0],
+ self.pg0.local_mac,
+ self.pg0.local_ip4,
+ self.pg0.remote_ip4)
+
#
+ # resolve the ARP entries and send again
+ #
+ self.pg0.resolve_arp()
tx1 = pcore[MPLS].payload * NUM_PKTS
rx1 = self.send_and_expect(self.pg1, tx1, self.pg0)
def test_vpls(self):
""" Virtual Private LAN Service """
+
+ # we skipped this in the setup
+ self.pg0.resolve_arp()
+
#
# Create a L2 MPLS tunnels
#