#!/usr/bin/env python
import unittest
-import socket
from logging import *
from framework import VppTestCase, VppTestRunner
-from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint
-from vpp_gre_interface import VppGreInterface
-from vpp_ip_route import IpRoute, IpPath
+from vpp_sub_interface import VppDot1QSubint
+from vpp_gre_interface import VppGreInterface, VppGre6Interface
+from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable
from vpp_papi_provider import L2_VTR_OP
from scapy.packet import Raw
-from scapy.layers.l2 import Ether, Dot1Q, ARP, GRE
+from scapy.layers.l2 import Ether, Dot1Q, GRE
from scapy.layers.inet import IP, UDP
-from scapy.layers.inet6 import ICMPv6ND_NS, ICMPv6ND_RA, IPv6, UDP
-from scapy.contrib.mpls import MPLS
+from scapy.layers.inet6 import IPv6
from scapy.volatile import RandMAC, RandIP
+from util import ppp, ppc
+
+
+class GreTunnelTypes:
+ TT_L3 = 0
+ TT_TEB = 1
+ TT_ERSPAN = 2
+
class TestGRE(VppTestCase):
""" GRE Test Case """
def setUp(self):
super(TestGRE, self).setUp()
- # create 2 pg interfaces - set one in a non-default table.
- self.create_pg_interfaces(range(2))
+ # create 3 pg interfaces - set one in a non-default table.
+ self.create_pg_interfaces(range(3))
+ self.tbl = VppIpTable(self, 1)
+ self.tbl.add_vpp_config()
self.pg1.set_table_ip4(1)
+
for i in self.pg_interfaces:
i.admin_up()
- i.config_ip4()
- i.resolve_arp()
+
+ self.pg0.config_ip4()
+ self.pg0.resolve_arp()
+ self.pg1.config_ip4()
+ self.pg1.resolve_arp()
+ self.pg2.config_ip6()
+ self.pg2.resolve_ndp()
def tearDown(self):
+ for i in self.pg_interfaces:
+ i.unconfig_ip4()
+ i.unconfig_ip6()
+ i.admin_down()
+ self.pg1.set_table_ip4(0)
super(TestGRE, self).tearDown()
def create_stream_ip4(self, src_if, src_ip, dst_ip):
pkts = []
for i in range(0, 257):
- info = self.create_packet_info(src_if.sw_if_index,
- src_if.sw_if_index)
+ info = self.create_packet_info(src_if, src_if)
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
IP(src=src_ip, dst=dst_ip) /
pkts.append(p)
return pkts
+ def create_stream_ip6(self, src_if, src_ip, dst_ip):
+ pkts = []
+ for i in range(0, 257):
+ info = self.create_packet_info(src_if, src_if)
+ payload = self.info_to_payload(info)
+ p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
+ IPv6(src=src_ip, dst=dst_ip) /
+ UDP(sport=1234, dport=1234) /
+ Raw(payload))
+ info.data = p.copy()
+ pkts.append(p)
+ return pkts
+
def create_tunnel_stream_4o4(self, src_if,
tunnel_src, tunnel_dst,
src_ip, dst_ip):
pkts = []
for i in range(0, 257):
- info = self.create_packet_info(src_if.sw_if_index,
- src_if.sw_if_index)
+ info = self.create_packet_info(src_if, src_if)
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
IP(src=tunnel_src, dst=tunnel_dst) /
src_ip, dst_ip):
pkts = []
for i in range(0, 257):
- info = self.create_packet_info(src_if.sw_if_index,
- src_if.sw_if_index)
+ info = self.create_packet_info(src_if, src_if)
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
IP(src=tunnel_src, dst=tunnel_dst) /
pkts.append(p)
return pkts
+ def create_tunnel_stream_6o6(self, src_if,
+ tunnel_src, tunnel_dst,
+ src_ip, dst_ip):
+ pkts = []
+ for i in range(0, 257):
+ info = self.create_packet_info(src_if, src_if)
+ payload = self.info_to_payload(info)
+ p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
+ IPv6(src=tunnel_src, dst=tunnel_dst) /
+ GRE() /
+ IPv6(src=src_ip, dst=dst_ip) /
+ UDP(sport=1234, dport=1234) /
+ Raw(payload))
+ info.data = p.copy()
+ pkts.append(p)
+ return pkts
+
def create_tunnel_stream_l2o4(self, src_if,
tunnel_src, tunnel_dst):
pkts = []
for i in range(0, 257):
- info = self.create_packet_info(src_if.sw_if_index,
- src_if.sw_if_index)
+ info = self.create_packet_info(src_if, src_if)
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
IP(src=tunnel_src, dst=tunnel_dst) /
tunnel_src, tunnel_dst, vlan):
pkts = []
for i in range(0, 257):
- info = self.create_packet_info(src_if.sw_if_index,
- src_if.sw_if_index)
+ info = self.create_packet_info(src_if, src_if)
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
IP(src=tunnel_src, dst=tunnel_dst) /
pkts.append(p)
return pkts
- def verify_filter(self, capture, sent):
- if not len(capture) == len(sent):
- # filter out any IPv6 RAs from the captur
- for p in capture:
- if (p.haslayer(ICMPv6ND_RA)):
- capture.remove(p)
- return capture
-
def verify_tunneled_4o4(self, src_if, capture, sent,
tunnel_src, tunnel_dst):
- capture = self.verify_filter(capture, sent)
self.assertEqual(len(capture), len(sent))
for i in range(len(capture)):
self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
except:
- rx.show()
- tx.show()
+ self.logger.error(ppp("Rx:", rx))
+ self.logger.error(ppp("Tx:", tx))
+ raise
+
+ def verify_tunneled_6o6(self, src_if, capture, sent,
+ tunnel_src, tunnel_dst):
+
+ self.assertEqual(len(capture), len(sent))
+
+ for i in range(len(capture)):
+ try:
+ tx = sent[i]
+ rx = capture[i]
+
+ tx_ip = tx[IPv6]
+ rx_ip = rx[IPv6]
+
+ self.assertEqual(rx_ip.src, tunnel_src)
+ self.assertEqual(rx_ip.dst, tunnel_dst)
+
+ rx_gre = GRE(str(rx_ip[IPv6].payload))
+ rx_ip = rx_gre[IPv6]
+
+ self.assertEqual(rx_ip.src, tx_ip.src)
+ self.assertEqual(rx_ip.dst, tx_ip.dst)
+
+ except:
+ self.logger.error(ppp("Rx:", rx))
+ self.logger.error(ppp("Tx:", tx))
raise
def verify_tunneled_l2o4(self, src_if, capture, sent,
tunnel_src, tunnel_dst):
- capture = self.verify_filter(capture, sent)
self.assertEqual(len(capture), len(sent))
for i in range(len(capture)):
self.assertEqual(rx_ip.ttl, tx_ip.ttl)
except:
- rx.show()
- tx.show()
+ self.logger.error(ppp("Rx:", rx))
+ self.logger.error(ppp("Tx:", tx))
raise
def verify_tunneled_vlano4(self, src_if, capture, sent,
tunnel_src, tunnel_dst, vlan):
try:
- capture = self.verify_filter(capture, sent)
self.assertEqual(len(capture), len(sent))
except:
- capture.show()
+ ppc("Unexpected packets captured:", capture)
raise
for i in range(len(capture)):
self.assertEqual(rx_ip.ttl, tx_ip.ttl)
except:
- rx.show()
- tx.show()
+ self.logger.error(ppp("Rx:", rx))
+ self.logger.error(ppp("Tx:", tx))
raise
def verify_decapped_4o4(self, src_if, capture, sent):
- capture = self.verify_filter(capture, sent)
self.assertEqual(len(capture), len(sent))
for i in range(len(capture)):
self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
except:
- rx.show()
- tx.show()
+ self.logger.error(ppp("Rx:", rx))
+ self.logger.error(ppp("Tx:", tx))
raise
def verify_decapped_6o4(self, src_if, capture, sent):
- capture = self.verify_filter(capture, sent)
self.assertEqual(len(capture), len(sent))
for i in range(len(capture)):
self.assertEqual(rx_ip.hlim + 1, tx_ip.hlim)
except:
- rx.show()
- tx.show()
+ self.logger.error(ppp("Rx:", rx))
+ self.logger.error(ppp("Tx:", tx))
raise
def test_gre(self):
- """ GRE tunnel Tests """
+ """ GRE IPv4 tunnel Tests """
#
# Create an L3 GRE tunnel.
gre_if.admin_up()
gre_if.config_ip4()
- route_via_tun = IpRoute(self, "4.4.4.4", 32,
- [IpPath("0.0.0.0", gre_if.sw_if_index)])
+ route_via_tun = VppIpRoute(self, "4.4.4.4", 32,
+ [VppRoutePath("0.0.0.0",
+ gre_if.sw_if_index)])
route_via_tun.add_vpp_config()
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
-
- try:
- self.assertEqual(0, len(rx))
- except:
- error("GRE packets forwarded without DIP resolved")
- error(rx.show())
- raise
+ self.pg0.assert_nothing_captured(
+ remark="GRE packets forwarded without DIP resolved")
#
# Add a route that resolves the tunnel's destination
#
- route_tun_dst = IpRoute(self, "1.1.1.2", 32,
- [IpPath(self.pg0.remote_ip4, self.pg0.sw_if_index)])
+ route_tun_dst = VppIpRoute(self, "1.1.1.2", 32,
+ [VppRoutePath(self.pg0.remote_ip4,
+ self.pg0.sw_if_index)])
route_tun_dst.add_vpp_config()
#
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
+ rx = self.pg0.get_capture(len(tx))
self.verify_tunneled_4o4(self.pg0, rx, tx,
self.pg0.local_ip4, "1.1.1.2")
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
+ rx = self.pg0.get_capture(len(tx))
self.verify_decapped_4o4(self.pg0, rx, tx)
#
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
- try:
- self.assertEqual(0, len(rx))
- except:
- error("GRE packets forwarded despite no SRC address match")
- error(rx.show())
- raise
+ self.pg0.assert_nothing_captured(
+ remark="GRE packets forwarded despite no SRC address match")
#
# Configure IPv6 on the PG interface so we can route IPv6
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
- try:
- self.assertEqual(0, len(rx))
- except:
- error("IPv6 GRE packets forwarded despite IPv6 not enabled on tunnel")
- error(rx.show())
- raise
+ self.pg0.assert_nothing_captured(remark="IPv6 GRE packets forwarded "
+ "despite IPv6 not enabled on tunnel")
#
# Enable IPv6 on the tunnel
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
+ rx = self.pg0.get_capture(len(tx))
self.verify_decapped_6o4(self.pg0, rx, tx)
#
self.pg0.unconfig_ip6()
+ def test_gre6(self):
+ """ GRE IPv6 tunnel Tests """
+
+ self.pg1.config_ip6()
+ self.pg1.resolve_ndp()
+
+ #
+ # Create an L3 GRE tunnel.
+ # - set it admin up
+ # - assign an IP Address
+ # - Add a route via the tunnel
+ #
+ gre_if = VppGre6Interface(self,
+ self.pg2.local_ip6,
+ "1002::1")
+ gre_if.add_vpp_config()
+ gre_if.admin_up()
+ gre_if.config_ip6()
+
+ route_via_tun = VppIpRoute(
+ self, "4004::1", 128,
+ [VppRoutePath("0::0",
+ gre_if.sw_if_index,
+ proto=DpoProto.DPO_PROTO_IP6)],
+ is_ip6=1)
+
+ route_via_tun.add_vpp_config()
+
+ #
+ # Send a packet stream that is routed into the tunnel
+ # - they are all dropped since the tunnel's desintation IP
+ # is unresolved - or resolves via the default route - which
+ # which is a drop.
+ #
+ tx = self.create_stream_ip6(self.pg2, "5005::1", "4004::1")
+ self.pg2.add_stream(tx)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ self.pg2.assert_nothing_captured(
+ remark="GRE packets forwarded without DIP resolved")
+
+ #
+ # Add a route that resolves the tunnel's destination
+ #
+ route_tun_dst = VppIpRoute(
+ self, "1002::1", 128,
+ [VppRoutePath(self.pg2.remote_ip6,
+ self.pg2.sw_if_index,
+ proto=DpoProto.DPO_PROTO_IP6)],
+ is_ip6=1)
+ route_tun_dst.add_vpp_config()
+
+ #
+ # Send a packet stream that is routed into the tunnel
+ # - packets are GRE encapped
+ #
+ self.vapi.cli("clear trace")
+ tx = self.create_stream_ip6(self.pg2, "5005::1", "4004::1")
+ self.pg2.add_stream(tx)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg2.get_capture(len(tx))
+ self.verify_tunneled_6o6(self.pg2, rx, tx,
+ self.pg2.local_ip6, "1002::1")
+
+ #
+ # Test decap. decapped packets go out pg1
+ #
+ tx = self.create_tunnel_stream_6o6(self.pg2,
+ "1002::1",
+ self.pg2.local_ip6,
+ "2001::1",
+ self.pg1.remote_ip6)
+ self.vapi.cli("clear trace")
+ self.pg2.add_stream(tx)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ rx = self.pg1.get_capture(len(tx))
+
+ #
+ # RX'd packet is UDP over IPv6, test the GRE header is gone.
+ #
+ self.assertFalse(rx[0].haslayer(GRE))
+ self.assertEqual(rx[0][IPv6].dst, self.pg1.remote_ip6)
+
+ #
+ # test case cleanup
+ #
+ route_tun_dst.remove_vpp_config()
+ route_via_tun.remove_vpp_config()
+ gre_if.remove_vpp_config()
+
+ self.pg2.unconfig_ip6()
+ self.pg1.unconfig_ip6()
+
def test_gre_vrf(self):
""" GRE tunnel VRF Tests """
#
# Add a route via the tunnel - in the overlay
#
- route_via_tun = IpRoute(self, "9.9.9.9", 32,
- [IpPath("0.0.0.0", gre_if.sw_if_index)])
+ route_via_tun = VppIpRoute(self, "9.9.9.9", 32,
+ [VppRoutePath("0.0.0.0",
+ gre_if.sw_if_index)])
route_via_tun.add_vpp_config()
#
# Add a route that resolves the tunnel's destination - in the
# underlay table
#
- route_tun_dst = IpRoute(self, "2.2.2.2", 32, table_id=1,
- paths=[IpPath(self.pg1.remote_ip4,
- self.pg1.sw_if_index)])
+ route_tun_dst = VppIpRoute(self, "2.2.2.2", 32, table_id=1,
+ paths=[VppRoutePath(self.pg1.remote_ip4,
+ self.pg1.sw_if_index)])
route_tun_dst.add_vpp_config()
#
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg1.get_capture()
+ rx = self.pg1.get_capture(len(tx))
self.verify_tunneled_4o4(self.pg1, rx, tx,
self.pg1.local_ip4, "2.2.2.2")
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
+ rx = self.pg0.get_capture(len(tx))
self.verify_decapped_4o4(self.pg0, rx, tx)
+ #
+ # Send tunneled packets that match the created tunnel and
+ # but arrive on an interface that is not in the tunnel's
+ # encap VRF, these are dropped
+ #
+ self.vapi.cli("clear trace")
+ tx = self.create_tunnel_stream_4o4(self.pg2,
+ "2.2.2.2",
+ self.pg1.local_ip4,
+ self.pg0.local_ip4,
+ self.pg0.remote_ip4)
+ self.pg1.add_stream(tx)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ self.pg0.assert_nothing_captured(
+ remark="GRE decap packets in wrong VRF")
+
#
# test case cleanup
#
#
# Add routes to resolve the tunnel destinations
#
- route_tun1_dst = IpRoute(self, "2.2.2.2", 32,
- [IpPath(self.pg0.remote_ip4,
- self.pg0.sw_if_index)])
- route_tun2_dst = IpRoute(self, "2.2.2.3", 32,
- [IpPath(self.pg0.remote_ip4,
- self.pg0.sw_if_index)])
+ route_tun1_dst = VppIpRoute(self, "2.2.2.2", 32,
+ [VppRoutePath(self.pg0.remote_ip4,
+ self.pg0.sw_if_index)])
+ route_tun2_dst = VppIpRoute(self, "2.2.2.3", 32,
+ [VppRoutePath(self.pg0.remote_ip4,
+ self.pg0.sw_if_index)])
route_tun1_dst.add_vpp_config()
route_tun2_dst.add_vpp_config()
#
gre_if1 = VppGreInterface(self, self.pg0.local_ip4,
"2.2.2.2",
- is_teb=1)
+ type=GreTunnelTypes.TT_TEB)
gre_if2 = VppGreInterface(self, self.pg0.local_ip4,
"2.2.2.3",
- is_teb=1)
+ type=GreTunnelTypes.TT_TEB)
gre_if1.add_vpp_config()
gre_if2.add_vpp_config()
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
+ rx = self.pg0.get_capture(len(tx))
self.verify_tunneled_l2o4(self.pg0, rx, tx,
self.pg0.local_ip4,
"2.2.2.3")
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
+ rx = self.pg0.get_capture(len(tx))
self.verify_tunneled_l2o4(self.pg0, rx, tx,
self.pg0.local_ip4,
"2.2.2.2")
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
+ rx = self.pg0.get_capture(len(tx))
self.verify_tunneled_vlano4(self.pg0, rx, tx,
self.pg0.local_ip4,
"2.2.2.3",
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
+ rx = self.pg0.get_capture(len(tx))
self.verify_tunneled_vlano4(self.pg0, rx, tx,
self.pg0.local_ip4,
"2.2.2.2",