import unittest
from scapy.layers.inet6 import IPv6, Ether, IP, UDP, IPv6ExtHdrFragment, Raw
+from scapy.contrib.mpls import MPLS
from scapy.all import fragment, fragment6, RandShort, defragment6
from framework import VppTestCase, VppTestRunner
from vpp_ip import DpoProto
-from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable, FibPathProto
+from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable, FibPathProto, \
+ VppMplsLabel, VppMplsRoute, VppMplsTable
from vpp_ipip_tun_interface import VppIpIpTunInterface
+from vpp_teib import VppTeib
from vpp_papi import VppEnum
from socket import AF_INET, AF_INET6, inet_pton
from util import reassemble4
@classmethod
def setUpClass(cls):
super(TestIPIP, cls).setUpClass()
- cls.create_pg_interfaces(range(2))
+ cls.create_pg_interfaces(range(3))
cls.interfaces = list(cls.pg_interfaces)
@classmethod
def setUp(self):
super(TestIPIP, self).setUp()
+ self.table = VppIpTable(self, 1, register=False)
+ self.table.add_vpp_config()
+
for i in self.interfaces:
i.admin_up()
+
+ self.pg2.set_table_ip4(self.table.table_id)
+ for i in self.interfaces:
i.config_ip4()
i.config_ip6()
i.disable_ipv6_ra()
for i in self.pg_interfaces:
i.unconfig_ip4()
i.unconfig_ip6()
+ i.set_table_ip4(0)
i.admin_down()
+ self.table.remove_vpp_config()
+
def validate(self, rx, expected):
self.assertEqual(rx, expected.__class__(expected))
self.pg1.generate_remote_hosts(5)
self.pg1.configure_ipv4_neighbors()
- e = VppEnum.vl_api_ipip_tunnel_flags_t
+ e = VppEnum.vl_api_tunnel_encap_decap_flags_t
d = VppEnum.vl_api_ip_dscp_t
self.p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
self.p_payload = UDP(sport=1234, dport=1234) / Raw(b'X' * 100)
self.pg0,
self.pg0.local_ip4,
self.pg1.remote_hosts[0].ip4,
- flags=e.IPIP_TUNNEL_API_FLAG_ENCAP_COPY_DSCP).add_vpp_config()
+ flags=e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP)
+ tun_dscp.add_vpp_config()
# IPv4 transport that copies the DCSP and ECN from the payload
tun_dscp_ecn = VppIpIpTunInterface(
self,
self.pg0,
self.pg0.local_ip4,
self.pg1.remote_hosts[1].ip4,
- flags=(e.IPIP_TUNNEL_API_FLAG_ENCAP_COPY_DSCP |
- e.IPIP_TUNNEL_API_FLAG_ENCAP_COPY_ECN)).add_vpp_config()
+ flags=(e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP |
+ e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN))
+ tun_dscp_ecn.add_vpp_config()
# IPv4 transport that copies the ECN from the payload and sets the
# DF bit on encap. copies the ECN on decap
tun_ecn = VppIpIpTunInterface(
self.pg0,
self.pg0.local_ip4,
self.pg1.remote_hosts[2].ip4,
- flags=(e.IPIP_TUNNEL_API_FLAG_ENCAP_COPY_ECN |
- e.IPIP_TUNNEL_API_FLAG_ENCAP_SET_DF |
- e.IPIP_TUNNEL_API_FLAG_DECAP_COPY_ECN)).add_vpp_config()
+ flags=(e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN |
+ e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_SET_DF |
+ e.TUNNEL_API_ENCAP_DECAP_FLAG_DECAP_COPY_ECN))
+ tun_ecn.add_vpp_config()
# IPv4 transport that sets a fixed DSCP in the encap and copies
# the DF bit
tun = VppIpIpTunInterface(
self.pg0.local_ip4,
self.pg1.remote_hosts[3].ip4,
dscp=d.IP_API_DSCP_AF11,
- flags=e.IPIP_TUNNEL_API_FLAG_ENCAP_COPY_DF).add_vpp_config()
+ flags=e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DF)
+ tun.add_vpp_config()
# array of all the tunnels
tuns = [tun_dscp, tun_dscp_ecn, tun_ecn, tun]
def payload(self, len):
return 'x' * len
+ def test_mipip4(self):
+ """ p2mp IPv4 tunnel Tests """
+
+ for itf in self.pg_interfaces[:2]:
+ #
+ # one underlay nh for each overlay/tunnel peer
+ #
+ itf.generate_remote_hosts(4)
+ itf.configure_ipv4_neighbors()
+
+ #
+ # Create an p2mo IPIP tunnel.
+ # - set it admin up
+ # - assign an IP Addres
+ # - Add a route via the tunnel
+ #
+ ipip_if = VppIpIpTunInterface(self, itf,
+ itf.local_ip4,
+ "0.0.0.0",
+ mode=(VppEnum.vl_api_tunnel_mode_t.
+ TUNNEL_API_MODE_MP))
+ ipip_if.add_vpp_config()
+ ipip_if.admin_up()
+ ipip_if.config_ip4()
+ ipip_if.generate_remote_hosts(4)
+
+ self.logger.info(self.vapi.cli("sh adj"))
+ self.logger.info(self.vapi.cli("sh ip fib"))
+
+ #
+ # ensure we don't match to the tunnel if the source address
+ # is all zeros
+ #
+ # tx = self.create_tunnel_stream_4o4(self.pg0,
+ # "0.0.0.0",
+ # itf.local_ip4,
+ # self.pg0.local_ip4,
+ # self.pg0.remote_ip4)
+ # self.send_and_assert_no_replies(self.pg0, tx)
+
+ #
+ # for-each peer
+ #
+ for ii in range(1, 4):
+ route_addr = "4.4.4.%d" % ii
+
+ #
+ # route traffic via the peer
+ #
+ route_via_tun = VppIpRoute(
+ self, route_addr, 32,
+ [VppRoutePath(ipip_if._remote_hosts[ii].ip4,
+ ipip_if.sw_if_index)])
+ route_via_tun.add_vpp_config()
+
+ #
+ # Add a TEIB entry resolves the peer
+ #
+ teib = VppTeib(self, ipip_if,
+ ipip_if._remote_hosts[ii].ip4,
+ itf._remote_hosts[ii].ip4)
+ teib.add_vpp_config()
+ self.logger.info(self.vapi.cli("sh adj nbr ipip0 %s" %
+ ipip_if._remote_hosts[ii].ip4))
+
+ #
+ # Send a packet stream that is routed into the tunnel
+ # - packets are IPIP encapped
+ #
+ inner = (IP(dst=route_addr, src="5.5.5.5") /
+ UDP(sport=1234, dport=1234) /
+ Raw(b'0x44' * 100))
+ tx_e = [(Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_mac) /
+ inner) for x in range(63)]
+
+ rxs = self.send_and_expect(self.pg0, tx_e, itf)
+
+ for rx in rxs:
+ self.assertEqual(rx[IP].src, itf.local_ip4)
+ self.assertEqual(rx[IP].dst, itf._remote_hosts[ii].ip4)
+
+ tx_i = [(Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_mac) /
+ IP(src=itf._remote_hosts[ii].ip4,
+ dst=itf.local_ip4) /
+ IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4) /
+ UDP(sport=1234, dport=1234) /
+ Raw(b'0x44' * 100)) for x in range(63)]
+
+ self.logger.info(self.vapi.cli("sh ipip tunnel-hash"))
+ rx = self.send_and_expect(self.pg0, tx_i, self.pg0)
+
+ #
+ # delete and re-add the TEIB
+ #
+ teib.remove_vpp_config()
+ self.send_and_assert_no_replies(self.pg0, tx_e)
+ self.send_and_assert_no_replies(self.pg0, tx_i)
+
+ teib.add_vpp_config()
+ rx = self.send_and_expect(self.pg0, tx_e, itf)
+ for rx in rxs:
+ self.assertEqual(rx[IP].src, itf.local_ip4)
+ self.assertEqual(rx[IP].dst, itf._remote_hosts[ii].ip4)
+ rx = self.send_and_expect(self.pg0, tx_i, self.pg0)
+
+ #
+ # we can also send to the peer's address
+ #
+ inner = (IP(dst=teib.peer, src="5.5.5.5") /
+ UDP(sport=1234, dport=1234) /
+ Raw(b'0x44' * 100))
+ tx_e = [(Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_mac) /
+ inner) for x in range(63)]
+
+ rxs = self.send_and_expect(self.pg0, tx_e, itf)
+
+ #
+ # with all of the peers in place, swap the ip-table of
+ # the ipip interface
+ #
+ table = VppIpTable(self, 2)
+ table.add_vpp_config()
+
+ ipip_if.unconfig_ip4()
+ ipip_if.set_table_ip4(self.table.table_id)
+ ipip_if.config_ip4()
+
+ #
+ # we should still be able to reach the peers from the new table
+ #
+ inner = (IP(dst=teib.peer, src="5.5.5.5") /
+ UDP(sport=1234, dport=1234) /
+ Raw(b'0x44' * 100))
+ tx_e = [(Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_mac) /
+ inner) for x in range(63)]
+
+ rxs = self.send_and_expect(self.pg2, tx_e, itf)
+
+ ipip_if.admin_down()
+ ipip_if.unconfig_ip4()
+ ipip_if.set_table_ip4(0)
+
class TestIPIP6(VppTestCase):
""" IPIP6 Test Case """
self.pg1.generate_remote_hosts(5)
self.pg1.configure_ipv6_neighbors()
- e = VppEnum.vl_api_ipip_tunnel_flags_t
+ e = VppEnum.vl_api_tunnel_encap_decap_flags_t
d = VppEnum.vl_api_ip_dscp_t
self.p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
self.p_payload = UDP(sport=1234, dport=1234) / Raw(b'X' * 100)
self.pg0,
self.pg0.local_ip6,
self.pg1.remote_hosts[0].ip6,
- flags=e.IPIP_TUNNEL_API_FLAG_ENCAP_COPY_DSCP).add_vpp_config()
+ flags=e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP)
+ tun_dscp.add_vpp_config()
# IPv4 transport that copies the DCSP and ECN from the payload
tun_dscp_ecn = VppIpIpTunInterface(
self,
self.pg0,
self.pg0.local_ip6,
self.pg1.remote_hosts[1].ip6,
- flags=(e.IPIP_TUNNEL_API_FLAG_ENCAP_COPY_DSCP |
- e.IPIP_TUNNEL_API_FLAG_ENCAP_COPY_ECN)).add_vpp_config()
+ flags=(e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP |
+ e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN))
+ tun_dscp_ecn.add_vpp_config()
# IPv4 transport that copies the ECN from the payload and sets the
# DF bit on encap. copies the ECN on decap
tun_ecn = VppIpIpTunInterface(
self.pg0,
self.pg0.local_ip6,
self.pg1.remote_hosts[2].ip6,
- flags=(e.IPIP_TUNNEL_API_FLAG_ENCAP_COPY_ECN |
- e.IPIP_TUNNEL_API_FLAG_ENCAP_SET_DF |
- e.IPIP_TUNNEL_API_FLAG_DECAP_COPY_ECN)).add_vpp_config()
+ flags=(e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN |
+ e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_SET_DF |
+ e.TUNNEL_API_ENCAP_DECAP_FLAG_DECAP_COPY_ECN))
+ tun_ecn.add_vpp_config()
# IPv4 transport that sets a fixed DSCP in the encap and copies
# the DF bit
tun = VppIpIpTunInterface(
self.pg0.local_ip6,
self.pg1.remote_hosts[3].ip6,
dscp=d.IP_API_DSCP_AF11,
- flags=e.IPIP_TUNNEL_API_FLAG_ENCAP_COPY_DF).add_vpp_config()
+ flags=e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DF)
+ tun.add_vpp_config()
# array of all the tunnels
tuns = [tun_dscp, tun_dscp_ecn, tun_ecn, tun]
return 'x' * len
+class TestMPLS(VppTestCase):
+ """ MPLS Test Case """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestMPLS, cls).setUpClass()
+ cls.create_pg_interfaces(range(2))
+ cls.interfaces = list(cls.pg_interfaces)
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestMPLS, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestMPLS, self).setUp()
+ for i in self.interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.config_ip6()
+ i.disable_ipv6_ra()
+ i.resolve_arp()
+ i.resolve_ndp()
+
+ def tearDown(self):
+ super(TestMPLS, self).tearDown()
+
+ for i in self.pg_interfaces:
+ i.unconfig_ip4()
+ i.unconfig_ip6()
+ i.admin_down()
+
+ def test_mpls(self):
+ """ MPLS over ip{6,4} test """
+
+ tbl = VppMplsTable(self, 0)
+ tbl.add_vpp_config()
+
+ self.p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ self.p_payload = UDP(sport=1234, dport=1234) / Raw(b'X' * 100)
+ f = FibPathProto
+
+ # IPv4 transport
+ tun4 = VppIpIpTunInterface(
+ self,
+ self.pg1,
+ self.pg1.local_ip4,
+ self.pg1.remote_ip4).add_vpp_config()
+ tun4.admin_up()
+ tun4.config_ip4()
+ tun4.enable_mpls()
+
+ # IPv6 transport
+ tun6 = VppIpIpTunInterface(
+ self,
+ self.pg1,
+ self.pg1.local_ip6,
+ self.pg1.remote_ip6).add_vpp_config()
+ tun6.admin_up()
+ tun6.config_ip6()
+ tun6.enable_mpls()
+
+ # ip routes into the tunnels with output labels
+ r4 = VppIpRoute(self, "1.1.1.1", 32,
+ [VppRoutePath(
+ tun4.remote_ip4,
+ tun4.sw_if_index,
+ labels=[VppMplsLabel(44)])]).add_vpp_config()
+ r6 = VppIpRoute(self, "1::1", 128,
+ [VppRoutePath(
+ tun6.remote_ip6,
+ tun6.sw_if_index,
+ labels=[VppMplsLabel(66)])]).add_vpp_config()
+
+ # deag MPLS routes from the tunnel
+ r4 = VppMplsRoute(self, 44, 1,
+ [VppRoutePath(
+ self.pg0.remote_ip4,
+ self.pg0.sw_if_index)]).add_vpp_config()
+ r6 = VppMplsRoute(self, 66, 1,
+ [VppRoutePath(
+ self.pg0.remote_ip6,
+ self.pg0.sw_if_index)],
+ eos_proto=f.FIB_PATH_NH_PROTO_IP6).add_vpp_config()
+
+ #
+ # Tunnel Encap
+ #
+ p4 = (self.p_ether / IP(src="2.2.2.2", dst="1.1.1.1") / self.p_payload)
+
+ rxs = self.send_and_expect(self.pg0, p4 * N_PACKETS, self.pg1)
+
+ for rx in rxs:
+ self.assertEqual(rx[IP].src, self.pg1.local_ip4)
+ self.assertEqual(rx[IP].dst, self.pg1.remote_ip4)
+ self.assertEqual(rx[MPLS].label, 44)
+ inner = rx[MPLS].payload
+ self.assertEqual(inner.src, "2.2.2.2")
+ self.assertEqual(inner.dst, "1.1.1.1")
+
+ p6 = (self.p_ether / IPv6(src="2::2", dst="1::1") / self.p_payload)
+
+ rxs = self.send_and_expect(self.pg0, p6 * N_PACKETS, self.pg1)
+
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].src, self.pg1.local_ip6)
+ self.assertEqual(rx[IPv6].dst, self.pg1.remote_ip6)
+ self.assertEqual(rx[MPLS].label, 66)
+ inner = rx[MPLS].payload
+ self.assertEqual(inner.src, "2::2")
+ self.assertEqual(inner.dst, "1::1")
+
+ #
+ # Tunnel Decap
+ #
+ p4 = (self.p_ether /
+ IP(src=self.pg1.remote_ip4,
+ dst=self.pg1.local_ip4) /
+ MPLS(label=44, ttl=4) /
+ IP(src="1.1.1.1",
+ dst="2.2.2.2") /
+ self.p_payload)
+
+ rxs = self.send_and_expect(self.pg1, p4 * N_PACKETS, self.pg0)
+
+ for rx in rxs:
+ self.assertEqual(rx[IP].src, "1.1.1.1")
+ self.assertEqual(rx[IP].dst, "2.2.2.2")
+
+ p6 = (self.p_ether /
+ IPv6(src=self.pg1.remote_ip6,
+ dst=self.pg1.local_ip6) /
+ MPLS(label=66, ttl=4) /
+ IPv6(src="1::1",
+ dst="2::2") /
+ self.p_payload)
+
+ rxs = self.send_and_expect(self.pg1, p6 * N_PACKETS, self.pg0)
+
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].src, "1::1")
+ self.assertEqual(rx[IPv6].dst, "2::2")
+
+ tun4.disable_mpls()
+ tun6.disable_mpls()
+
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)