#!/usr/bin/env python
import unittest
-import socket
-import struct
from framework import VppTestCase, VppTestRunner
from vpp_object import VppObject
from scapy.layers.l2 import Ether, ARP
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6NDOptSrcLLAddr, \
- ICMPv6NDOptDstLLAddr, ICMPv6ND_NA
+ ICMPv6ND_NA
from scapy.utils6 import in6_getnsma, in6_getnsmac
from socket import AF_INET, AF_INET6
from scapy.utils import inet_pton, inet_ntop
-from util import Host, mactobinary
+from util import mactobinary
class VppGbpEndpoint(VppObject):
"""
- GDB Endpoint
+ GBP Endpoint
"""
@property
if is_ip6:
self.proto = DpoProto.DPO_PROTO_IP6
self.af = AF_INET6
+ self.is_ip6 = True
+ self.ip_len = 128
else:
self.proto = DpoProto.DPO_PROTO_IP4
self.af = AF_INET
+ self.is_ip6 = False
+ self.ip_len = 32
self.ip_n = inet_pton(self.af, ip)
self.floating_ip_n = inet_pton(self.af, fip)
class VppGbpRecirc(VppObject):
"""
- GDB Recirculation Interface
+ GBP Recirculation Interface
"""
def __init__(self, test, epg, recirc, is_ext=False):
class VppGbpSubnet(VppObject):
"""
- GDB Subnet
+ GBP Subnet
"""
def __init__(self, test, table_id, address, address_len,
ss = self._test.vapi.gbp_subnet_dump()
for s in ss:
if s.subnet.table_id == self.table_id and \
- s.subnet.address_length == self.address_len:
+ s.subnet.address_length == self.address_len and \
+ s.subnet.is_ip6 == self.is_ip6:
if self.is_ip6:
if s.subnet.address == self.address_n:
return True
class VppGbpEndpointGroup(VppObject):
"""
- GDB Endpoint Group
+ GBP Endpoint Group
"""
def __init__(self, test, epg, rd, bd, uplink,
class VppGbpContract(VppObject):
"""
- GDB Contract
+ GBP Contract
"""
def __init__(self, test, src_epg, dst_epg, acl_index):
return False
+class VppGbpAcl(VppObject):
+ """
+ GBP Acl
+ """
+
+ def __init__(self, test):
+ self._test = test
+ self.acl_index = 4294967295
+
+ def create_rule(self, is_ipv6=0, permit_deny=0, proto=-1,
+ s_prefix=0, s_ip='\x00\x00\x00\x00', sport_from=0,
+ sport_to=65535, d_prefix=0, d_ip='\x00\x00\x00\x00',
+ dport_from=0, dport_to=65535):
+ if proto == -1 or proto == 0:
+ sport_to = 0
+ dport_to = sport_to
+ elif proto == 1 or proto == 58:
+ sport_to = 255
+ dport_to = sport_to
+ rule = ({'is_permit': permit_deny, 'is_ipv6': is_ipv6, 'proto': proto,
+ 'srcport_or_icmptype_first': sport_from,
+ 'srcport_or_icmptype_last': sport_to,
+ 'src_ip_prefix_len': s_prefix,
+ 'src_ip_addr': s_ip,
+ 'dstport_or_icmpcode_first': dport_from,
+ 'dstport_or_icmpcode_last': dport_to,
+ 'dst_ip_prefix_len': d_prefix,
+ 'dst_ip_addr': d_ip})
+ return rule
+
+ def add_vpp_config(self, rules):
+
+ reply = self._test.vapi.acl_add_replace(self.acl_index,
+ r=rules,
+ tag='GBPTest')
+ self.acl_index = reply.acl_index
+ return self.acl_index
+
+ def remove_vpp_config(self):
+ self._test.vapi.acl_del(self.acl_index)
+
+ def __str__(self):
+ return self.object_id()
+
+ def object_id(self):
+ return "gbp-acl;[%d]" % (self.acl_index)
+
+ def query_vpp_config(self):
+ cs = self._test.vapi.acl_dump()
+ for c in cs:
+ if c.acl_index == self.acl_index:
+ return True
+ return False
+
+
class TestGBP(VppTestCase):
""" GBP Test Case """
super(TestGBP, self).setUp()
self.create_pg_interfaces(range(9))
- self.create_loopback_interfaces(range(9))
+ self.create_loopback_interfaces(9)
self.router_mac = "00:11:22:33:44:55"
self.assertEqual(r[IP].dst, tx[0][IP].dst)
return rx
+ def send_and_expect_natted6(self, src, tx, dst, src_ip):
+ rx = self.send_and_expect(src, tx, dst)
+
+ for r in rx:
+ self.assertEqual(r[Ether].src, tx[0][Ether].src)
+ self.assertEqual(r[Ether].dst, tx[0][Ether].dst)
+ self.assertEqual(r[IPv6].src, src_ip)
+ self.assertEqual(r[IPv6].dst, tx[0][IPv6].dst)
+ return rx
+
def send_and_expect_unnatted(self, src, tx, dst, dst_ip):
rx = self.send_and_expect(src, tx, dst)
self.assertEqual(r[IP].src, tx[0][IP].src)
return rx
+ def send_and_expect_unnatted6(self, src, tx, dst, dst_ip):
+ rx = self.send_and_expect(src, tx, dst)
+
+ for r in rx:
+ self.assertEqual(r[Ether].src, tx[0][Ether].src)
+ self.assertEqual(r[Ether].dst, tx[0][Ether].dst)
+ self.assertEqual(r[IPv6].dst, dst_ip)
+ self.assertEqual(r[IPv6].src, tx[0][IPv6].src)
+ return rx
+
def send_and_expect_double_natted(self, src, tx, dst, src_ip, dst_ip):
rx = self.send_and_expect(src, tx, dst)
self.assertEqual(r[IP].src, src_ip)
return rx
+ def send_and_expect_double_natted6(self, src, tx, dst, src_ip, dst_ip):
+ rx = self.send_and_expect(src, tx, dst)
+
+ for r in rx:
+ self.assertEqual(r[Ether].src, self.router_mac)
+ self.assertEqual(r[Ether].dst, dst.remote_mac)
+ self.assertEqual(r[IPv6].dst, dst_ip)
+ self.assertEqual(r[IPv6].src, src_ip)
+ return rx
+
def test_gbp(self):
""" Group Based Policy """
nat_table = VppIpTable(self, 20)
nat_table.add_vpp_config()
+ nat_table = VppIpTable(self, 20, is_ip6=True)
+ nat_table.add_vpp_config()
#
# Bridge Domains
self.vapi.nat44_interface_add_del_feature(epg.bvi.sw_if_index,
is_inside=1,
is_add=1)
- # self.vapi.nat66_add_del_interface(epg.bvi.sw_if_index,
- # is_inside=1,
- # is_add=1)
+ self.vapi.nat66_add_del_interface(epg.bvi.sw_if_index,
+ is_inside=1,
+ is_add=1)
self.vapi.sw_interface_add_del_address(epg.bvi.sw_if_index,
epg.bvi_ip4_n,
self.vapi.sw_interface_add_del_address(epg.bvi.sw_if_index,
epg.bvi_ip6_n,
128,
- is_ipv6=1)
+ is_ipv6=True)
# EPG uplink interfaces in the BD
epg.uplink.set_table_ip4(epg.rd)
+ epg.uplink.set_table_ip6(epg.rd)
self.vapi.sw_interface_set_l2_bridge(epg.uplink.sw_if_index,
epg.bd)
for recirc in recircs:
# EPG's ingress recirculation interface maps to its RD
recirc.recirc.set_table_ip4(recirc.epg.rd)
+ recirc.recirc.set_table_ip6(recirc.epg.rd)
# in the bridge to allow DVR. L2 emulation to punt to L3
self.vapi.sw_interface_set_l2_bridge(recirc.recirc.sw_if_index,
self.vapi.sw_interface_set_l2_emulation(
recirc.recirc.sw_if_index)
- if recirc.is_ext:
- # recirc interfaces on NAT EPGs are outside and an
- # output feature
- self.vapi.nat44_interface_add_del_output_feature(
- recirc.recirc.sw_if_index,
- is_inside=0,
- is_add=1)
- else:
- self.vapi.nat44_interface_add_del_feature(
- recirc.recirc.sw_if_index,
- is_inside=0,
- is_add=1)
- # self.vapi.nat66_add_del_interface(
- # recirc.recirc.sw_if_index,
- # is_inside=0,
- # is_add=1)
+ self.vapi.nat44_interface_add_del_feature(
+ recirc.recirc.sw_if_index,
+ is_inside=0,
+ is_add=1)
+ self.vapi.nat66_add_del_interface(
+ recirc.recirc.sw_if_index,
+ is_inside=0,
+ is_add=1)
recirc.add_vpp_config()
# adj-fibs due to the fact the the BVI address has /32 and
# the subnet is not attached.
#
- r = VppIpRoute(self, ep.ip, 32,
+ r = VppIpRoute(self, ep.ip, ep.ip_len,
[VppRoutePath(ep.ip,
ep.epg.bvi.sw_if_index,
proto=ep.proto)],
ep.floating_ip_n,
vrf_id=0,
addr_only=1)
- # else:
- # self.vapi.nat66_add_del_static_mapping(ep.ip_n,
- # ep.floating_ip_n,
- # vrf_id=20)
+ else:
+ self.vapi.nat66_add_del_static_mapping(ep.ip_n,
+ ep.floating_ip_n,
+ vrf_id=0)
# VPP EP create ...
ep.add_vpp_config()
self.vapi.bd_ip_mac_add_del(bd_id=epg_nat.bd,
mac=ep.bin_mac,
ip=ep.floating_ip_n,
- is_ipv6=0,
+ is_ipv6=ep.is_ip6,
is_add=1)
# floating IPs route via EPG recirc
- r = VppIpRoute(self, ep.floating_ip, 32,
+ r = VppIpRoute(self, ep.floating_ip, ep.ip_len,
[VppRoutePath(ep.floating_ip,
ep.recirc.recirc.sw_if_index,
is_dvr=1,
#
# A uni-directional contract from EPG 220 -> 221
#
- c1 = VppGbpContract(self, 220, 221, 0)
+ acl = VppGbpAcl(self)
+ rule = acl.create_rule(permit_deny=1, proto=17)
+ rule2 = acl.create_rule(is_ipv6=1, permit_deny=1, proto=17)
+ acl_index = acl.add_vpp_config([rule, rule2])
+ c1 = VppGbpContract(self, 220, 221, acl_index)
c1.add_vpp_config()
self.send_and_expect_bridged(self.pg0,
#
# contract for the return direction
#
- c2 = VppGbpContract(self, 221, 220, 0)
+ c2 = VppGbpContract(self, 221, 220, acl_index)
c2.add_vpp_config()
self.send_and_expect_bridged(self.pg0,
#
# A uni-directional contract from EPG 220 -> 222 'L3 routed'
#
- c3 = VppGbpContract(self, 220, 222, 0)
+ c3 = VppGbpContract(self, 220, 222, acl_index)
c3.add_vpp_config()
self.logger.info(self.vapi.cli("sh gbp contract"))
c2.remove_vpp_config()
c1.remove_vpp_config()
c3.remove_vpp_config()
+ acl.remove_vpp_config()
self.send_and_assert_no_replies(self.pg2,
pkt_inter_epg_221_to_220 * 65)
sw_if_index=recirc_nat.recirc.sw_if_index,
epg=epg_nat.epg)
se2.add_vpp_config()
+ se16 = VppGbpSubnet(self, 0, "::", 0,
+ is_internal=False,
+ sw_if_index=recirc_nat.recirc.sw_if_index,
+ epg=epg_nat.epg,
+ is_ip6=True)
+ se16.add_vpp_config()
# in the NAT RD an external subnet via the NAT EPG's uplink
se3 = VppGbpSubnet(self, 20, "0.0.0.0", 0,
is_internal=False,
sw_if_index=epg_nat.uplink.sw_if_index,
epg=epg_nat.epg)
+ se36 = VppGbpSubnet(self, 20, "::", 0,
+ is_internal=False,
+ sw_if_index=epg_nat.uplink.sw_if_index,
+ epg=epg_nat.epg,
+ is_ip6=True)
se4 = VppGbpSubnet(self, 20, "11.0.0.0", 8,
is_internal=False,
sw_if_index=epg_nat.uplink.sw_if_index,
epg=epg_nat.epg)
se3.add_vpp_config()
+ se36.add_vpp_config()
se4.add_vpp_config()
self.logger.info(self.vapi.cli("sh ip fib 0.0.0.0/0"))
self.logger.info(self.vapi.cli("sh ip fib 11.0.0.1"))
+ self.logger.info(self.vapi.cli("sh ip6 fib ::/0"))
+ self.logger.info(self.vapi.cli("sh ip6 fib %s" %
+ eps[4].floating_ip))
+ #
+ # From an EP to an outside addess: IN2OUT
+ #
pkt_inter_epg_220_to_global = (Ether(src=self.pg0.remote_mac,
dst=self.router_mac) /
IP(src=eps[0].ip, dst="1.1.1.1") /
self.send_and_assert_no_replies(self.pg0,
pkt_inter_epg_220_to_global * 65)
- c4 = VppGbpContract(self, 220, 333, 0)
+ acl2 = VppGbpAcl(self)
+ rule = acl2.create_rule(permit_deny=1, proto=17, sport_from=1234,
+ sport_to=1234, dport_from=1234, dport_to=1234)
+ rule2 = acl2.create_rule(is_ipv6=1, permit_deny=1, proto=17,
+ sport_from=1234, sport_to=1234,
+ dport_from=1234, dport_to=1234)
+
+ acl_index2 = acl2.add_vpp_config([rule, rule2])
+ c4 = VppGbpContract(self, 220, 333, acl_index2)
c4.add_vpp_config()
self.send_and_expect_natted(self.pg0,
pkt_inter_epg_220_to_global * 65,
self.pg7,
- "11.0.0.1")
+ eps[0].floating_ip)
+
+ pkt_inter_epg_220_to_global = (Ether(src=self.pg0.remote_mac,
+ dst=self.router_mac) /
+ IPv6(src=eps[4].ip, dst="6001::1") /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
+
+ self.send_and_expect_natted6(self.pg0,
+ pkt_inter_epg_220_to_global * 65,
+ self.pg7,
+ eps[4].floating_ip)
+ #
+ # From a global address to an EP: OUT2IN
+ #
pkt_inter_epg_220_from_global = (Ether(src=self.router_mac,
dst=self.pg0.remote_mac) /
IP(dst=eps[0].floating_ip,
self.send_and_assert_no_replies(self.pg7,
pkt_inter_epg_220_from_global * 65)
- c5 = VppGbpContract(self, 333, 220, 0)
+ c5 = VppGbpContract(self, 333, 220, acl_index2)
c5.add_vpp_config()
self.send_and_expect_unnatted(self.pg7,
pkt_inter_epg_220_from_global * 65,
- self.pg0,
- "10.0.0.1")
+ eps[0].itf,
+ eps[0].ip)
+ pkt_inter_epg_220_from_global = (Ether(src=self.router_mac,
+ dst=self.pg0.remote_mac) /
+ IPv6(dst=eps[4].floating_ip,
+ src="6001::1") /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
+
+ self.send_and_expect_unnatted6(self.pg7,
+ pkt_inter_epg_220_from_global * 65,
+ eps[4].itf,
+ eps[4].ip)
+
+ #
+ # From a local VM to another local VM using resp. public addresses:
+ # IN2OUT2IN
+ #
pkt_intra_epg_220_global = (Ether(src=self.pg0.remote_mac,
dst=self.router_mac) /
IP(src=eps[0].ip,
UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
- self.send_and_expect_double_natted(self.pg0,
+ self.send_and_expect_double_natted(eps[0].itf,
pkt_intra_epg_220_global * 65,
- self.pg1,
- "11.0.0.1",
- "10.0.0.2")
+ eps[1].itf,
+ eps[0].floating_ip,
+ eps[1].ip)
+
+ pkt_intra_epg_220_global = (Ether(src=self.pg4.remote_mac,
+ dst=self.router_mac) /
+ IPv6(src=eps[4].ip,
+ dst=eps[5].floating_ip) /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
+
+ self.send_and_expect_double_natted6(eps[4].itf,
+ pkt_intra_epg_220_global * 65,
+ eps[5].itf,
+ eps[4].floating_ip,
+ eps[5].ip)
#
# cleanup
vrf_id=0,
addr_only=1,
is_add=0)
- # else:
- # self.vapi.nat66_add_del_static_mapping(ep.ip_n,
- # ep.floating_ip_n,
- # vrf_id=0,
- # is_add=0)
+ else:
+ self.vapi.nat66_add_del_static_mapping(ep.ip_n,
+ ep.floating_ip_n,
+ vrf_id=0,
+ is_add=0)
for epg in epgs:
# IP config on the BVI interfaces
epg.bvi_ip4_n,
32,
is_add=0)
+ self.vapi.sw_interface_add_del_address(epg.bvi.sw_if_index,
+ epg.bvi_ip6_n,
+ 128,
+ is_add=0,
+ is_ipv6=True)
self.logger.info(self.vapi.cli("sh int addr"))
epg.uplink.set_table_ip4(0)
+ epg.uplink.set_table_ip6(0)
if epg != epgs[0] and epg != epgs[3]:
epg.bvi.set_table_ip4(0)
+ epg.bvi.set_table_ip6(0)
self.vapi.nat44_interface_add_del_feature(epg.bvi.sw_if_index,
is_inside=1,
is_add=0)
- # self.vapi.nat66_add_del_interface(epg.bvi.sw_if_index,
- # is_inside=1,
- # is_add=0)
+ self.vapi.nat66_add_del_interface(epg.bvi.sw_if_index,
+ is_inside=1,
+ is_add=0)
for recirc in recircs:
recirc.recirc.set_table_ip4(0)
-
- if recirc.is_ext:
- self.vapi.nat44_interface_add_del_output_feature(
- recirc.recirc.sw_if_index,
- is_inside=0,
- is_add=0)
- else:
- self.vapi.nat44_interface_add_del_feature(
- recirc.recirc.sw_if_index,
- is_inside=0,
- is_add=0)
- # self.vapi.nat66_add_del_interface(
- # recirc.recirc.sw_if_index,
- # is_inside=0,
- # is_add=0)
+ recirc.recirc.set_table_ip6(0)
+
+ self.vapi.nat44_interface_add_del_feature(
+ recirc.recirc.sw_if_index,
+ is_inside=0,
+ is_add=0)
+ self.vapi.nat66_add_del_interface(
+ recirc.recirc.sw_if_index,
+ is_inside=0,
+ is_add=0)
if __name__ == '__main__':