From 1dff6b3485ffac976d3b49b5daaf7ad687d103f8 Mon Sep 17 00:00:00 2001 From: Nathan Skrzypczak Date: Wed, 20 Oct 2021 17:41:07 +0200 Subject: [PATCH] cnat: improve tests Type: improvement This is an attempt to make the cnat tests more readable by hiding the scapy packet generation under a common context concept. This in order to add tests covering a wider range of scenarios. As of now, only VIP, snat & DHCP being covered Change-Id: Ia84868984506bbb652fe974e9a6f54f7a3cc0bd9 Signed-off-by: Nathan Skrzypczak --- test/test_cnat.py | 1299 +++++++++++++++++++++++------------------------------ 1 file changed, 562 insertions(+), 737 deletions(-) diff --git a/test/test_cnat.py b/test/test_cnat.py index ff4c44033cb..198dd739d94 100644 --- a/test/test_cnat.py +++ b/test/test_cnat.py @@ -22,106 +22,113 @@ from vpp_object import VppObject from vpp_papi import VppEnum N_PKTS = 15 +N_REMOTE_HOSTS = 3 +SRC = 0 +DST = 1 -class Ep(object): - """ CNat endpoint """ - def __init__(self, ip=None, port=0, l4p=TCP, - sw_if_index=INVALID_INDEX, is_v6=False): - self.ip = ip - if ip is None: - self.ip = "::" if is_v6 else "0.0.0.0" - self.port = port - self.l4p = l4p - self.sw_if_index = sw_if_index - if is_v6: - self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP6 - else: - self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP4 +class CnatCommonTestCase(VppTestCase): + """ CNat common test class """ - def encode(self): - return {'addr': self.ip, - 'port': self.port, - 'sw_if_index': self.sw_if_index, - 'if_af': self.if_af} + # + # turn the scanner off whilst testing otherwise sessions + # will time out + # + extra_vpp_punt_config = ["cnat", "{", + "session-db-buckets", "64", + "session-cleanup-timeout", "0.1", + "session-max-age", "1", + "tcp-max-age", "1", + "scanner", "off", "}"] @classmethod - def from_pg(cls, pg, is_v6=False): - if pg is None: - return cls(is_v6=is_v6) - else: - return cls(sw_if_index=pg.sw_if_index, is_v6=is_v6) - - @property - def isV6(self): - return ":" in self.ip + def setUpClass(cls): + super(CnatCommonTestCase, cls).setUpClass() - def __str__(self): - return ("%s:%d" % (self.ip, self.port)) + @classmethod + def tearDownClass(cls): + super(CnatCommonTestCase, cls).tearDownClass() -class EpTuple(object): +class Endpoint(object): """ CNat endpoint """ - def __init__(self, src, dst): - self.src = src - self.dst = dst + def __init__(self, pg=None, pgi=None, port=0, is_v6=False, ip=None): + self.port = port + self.is_v6 = is_v6 + self.sw_if_index = INVALID_INDEX + if pg is not None and pgi is not None: + # pg interface specified and remote index + self.ip = self.get_ip46(pg.remote_hosts[pgi]) + elif pg is not None: + self.ip = None + self.sw_if_index = pg.sw_if_index + elif ip is not None: + self.ip = ip + else: + self.ip = "::" if self.is_v6 else "0.0.0.0" + + def get_ip46(self, obj): + if self.is_v6: + return obj.ip6 + return obj.ip4 + + def udpate(self, **kwargs): + self.__init__(**kwargs) + + def _vpp_if_af(self): + if self.is_v6: + return VppEnum.vl_api_address_family_t.ADDRESS_IP6 + return VppEnum.vl_api_address_family_t.ADDRESS_IP4 def encode(self): - return {'src_ep': self.src.encode(), - 'dst_ep': self.dst.encode()} + return {'addr': self.ip, + 'port': self.port, + 'sw_if_index': self.sw_if_index, + 'if_af': self._vpp_if_af()} def __str__(self): - return ("%s->%s" % (self.src, self.dst)) + return ("%s:%d" % (self.ip, self.port)) -class VppCNatTranslation(VppObject): +class Translation(VppObject): def __init__(self, test, iproto, vip, paths): self._test = test self.vip = vip self.iproto = iproto self.paths = paths - self.encoded_paths = [] - for path in self.paths: - self.encoded_paths.append(path.encode()) + self.id = None def __str__(self): return ("%s %s %s" % (self.vip, self.iproto, self.paths)) - @property - def vl4_proto(self): + def _vl4_proto(self): ip_proto = VppEnum.vl_api_ip_proto_t return { UDP: ip_proto.IP_API_PROTO_UDP, TCP: ip_proto.IP_API_PROTO_TCP, }[self.iproto] + def _encoded_paths(self): + return [{'src_ep': src.encode(), + 'dst_ep': dst.encode()} for (src, dst) in self.paths] + def add_vpp_config(self): r = self._test.vapi.cnat_translation_update( {'vip': self.vip.encode(), - 'ip_proto': self.vl4_proto, + 'ip_proto': self._vl4_proto(), 'n_paths': len(self.paths), - 'paths': self.encoded_paths}) + 'paths': self._encoded_paths()}) self._test.registry.register(self, self._test.logger) self.id = r.id - - def modify_vpp_config(self, paths): - self.paths = paths - self.encoded_paths = [] - for path in self.paths: - self.encoded_paths.append(path.encode()) - - r = self._test.vapi.cnat_translation_update( - {'vip': self.vip.encode(), - 'ip_proto': self.vl4_proto, - 'n_paths': len(self.paths), - 'paths': self.encoded_paths}) - self._test.registry.register(self, self._test.logger) + return self def remove_vpp_config(self): + assert(self.id is not None) self._test.vapi.cnat_translation_del(id=self.id) + return self def query_vpp_config(self): for t in self._test.vapi.cnat_translation_dump(): @@ -129,22 +136,190 @@ class VppCNatTranslation(VppObject): return t.translation return None - def object_id(self): - return ("cnat-translation-%s" % (self.vip)) - def get_stats(self): - c = self._test.statistics.get_counter("/net/cnat-translation") - return c[0][self.id] +class CnatTestContext(object): + """ + Usage : + ctx = CnatTestContext(self, TCP, is_v6=True) -class TestCNatTranslation(VppTestCase): + # send pg0.remote[0]:1234 -> pg1.remote[0]:6661 + ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661) + + # We expect this to be NATed as + # pg2.remote[0]: -> pg1.remote[0]:6661 + ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661) + + # After running cnat_expect, we can send back the received packet + # and expect it be 'unnated' so that we get the original packet + ctx.cnat_send_return().cnat_expect_return() + + # same thing for ICMP errors + ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return() + """ + + def __init__(self, test, L4PROTO, is_v6): + self.L4PROTO = L4PROTO + self.is_v6 = is_v6 + self._test = test + + def get_ip46(self, obj): + if self.is_v6: + return obj.ip6 + return obj.ip4 + + @property + def IP46(self): + return IPv6 if self.is_v6 else IP + + def cnat_send(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port, + no_replies=False): + if isinstance(src_id, int): + self.src_addr = self.get_ip46(src_pg.remote_hosts[src_id]) + else: + self.dst_addr = src_id + if isinstance(dst_id, int): + self.dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id]) + else: + self.dst_addr = dst_id + self.src_port = src_port # also ICMP id + self.dst_port = dst_port # also ICMP type + + if self.L4PROTO in [TCP, UDP]: + l4 = self.L4PROTO(sport=self.src_port, dport=self.dst_port) + elif self.L4PROTO in [ICMP] and not self.is_v6: + l4 = self.L4PROTO(id=self.src_port, type=self.dst_port) + elif self.L4PROTO in [ICMP] and self.is_v6: + l4 = ICMPv6EchoRequest(id=self.src_port) + p1 = (Ether(src=src_pg.remote_mac, + dst=src_pg.local_mac) / + self.IP46(src=self.src_addr, dst=self.dst_addr) / + l4 / + Raw()) + + if no_replies: + self._test.send_and_assert_no_replies(src_pg, p1 * N_PKTS, dst_pg) + else: + self.rxs = self._test.send_and_expect(src_pg, p1 * N_PKTS, dst_pg) + self.expected_src_pg = src_pg + self.expected_dst_pg = dst_pg + return self + + def cnat_expect(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port): + if isinstance(src_id, int): + self.expect_src_addr = self.get_ip46(src_pg.remote_hosts[src_id]) + else: + self.expect_src_addr = src_id + if isinstance(dst_id, int): + self.expect_dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id]) + else: + self.expect_dst_addr = dst_id + self.expect_src_port = src_port + self.expect_dst_port = dst_port + + if self.expect_src_port is None: + if self.L4PROTO in [TCP, UDP]: + self.expect_src_port = self.rxs[0][self.L4PROTO].sport + elif self.L4PROTO in [ICMP] and not self.is_v6: + self.expect_src_port = self.rxs[0][self.L4PROTO].id + elif self.L4PROTO in [ICMP] and self.is_v6: + self.expect_src_port = self.rxs[0][ICMPv6EchoRequest].id + + for rx in self.rxs: + self._test.assert_packet_checksums_valid(rx) + self._test.assertEqual(rx[self.IP46].dst, self.expect_dst_addr) + self._test.assertEqual(rx[self.IP46].src, self.expect_src_addr) + if self.L4PROTO in [TCP, UDP]: + self._test.assertEqual( + rx[self.L4PROTO].dport, self.expect_dst_port) + self._test.assertEqual( + rx[self.L4PROTO].sport, self.expect_src_port) + elif self.L4PROTO in [ICMP] and not self.is_v6: + self._test.assertEqual( + rx[self.L4PROTO].type, self.expect_dst_port) + self._test.assertEqual( + rx[self.L4PROTO].id, self.expect_src_port) + elif self.L4PROTO in [ICMP] and self.is_v6: + self._test.assertEqual( + rx[ICMPv6EchoRequest].id, self.expect_src_port) + return self + + def cnat_send_return(self): + """This sends the return traffic""" + if self.L4PROTO in [TCP, UDP]: + l4 = self.L4PROTO(sport=self.expect_dst_port, + dport=self.expect_src_port) + elif self.L4PROTO in [ICMP] and not self.is_v6: + # icmp type 0 if echo reply + l4 = self.L4PROTO(id=self.expect_src_port, type=0) + elif self.L4PROTO in [ICMP] and self.is_v6: + l4 = ICMPv6EchoReply(id=self.expect_src_port) + src_mac = self.expected_dst_pg.remote_mac + p1 = (Ether(src=src_mac, dst=self.expected_dst_pg.local_mac) / + self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr) / + l4 / + Raw()) + + self.return_rxs = self._test.send_and_expect( + self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg) + return self + + def cnat_expect_return(self): + for rx in self.return_rxs: + self._test.assert_packet_checksums_valid(rx) + self._test.assertEqual(rx[self.IP46].dst, self.src_addr) + self._test.assertEqual(rx[self.IP46].src, self.dst_addr) + if self.L4PROTO in [TCP, UDP]: + self._test.assertEqual(rx[self.L4PROTO].dport, self.src_port) + self._test.assertEqual(rx[self.L4PROTO].sport, self.dst_port) + elif self.L4PROTO in [ICMP] and not self.is_v6: + # icmp type 0 if echo reply + self._test.assertEqual(rx[self.L4PROTO].type, 0) + self._test.assertEqual(rx[self.L4PROTO].id, self.src_port) + elif self.L4PROTO in [ICMP] and self.is_v6: + self._test.assertEqual(rx[ICMPv6EchoReply].id, self.src_port) + return self + + def cnat_send_icmp_return_error(self): + """ + This called after cnat_expect will send an icmp error + on the reverse path + """ + ICMPelem = ICMPv6DestUnreach(code=1) if self.is_v6 else ICMP(type=11) + InnerIP = self.rxs[0][self.IP46] + p1 = ( + Ether(src=self.expected_dst_pg.remote_mac, + dst=self.expected_dst_pg.local_mac) / + self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr) / + ICMPelem / InnerIP) + self.return_rxs = self._test.send_and_expect( + self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg) + return self + + def cnat_expect_icmp_error_return(self): + ICMP46 = ICMPv6DestUnreach if self.is_v6 else ICMP + IP46err = IPerror6 if self.is_v6 else IPerror + L4err = TCPerror if self.L4PROTO is TCP else UDPerror + for rx in self.return_rxs: + self._test.assert_packet_checksums_valid(rx) + self._test.assertEqual(rx[self.IP46].dst, self.src_addr) + self._test.assertEqual(rx[self.IP46].src, self.dst_addr) + self._test.assertEqual(rx[ICMP46][IP46err].src, self.src_addr) + self._test.assertEqual(rx[ICMP46][IP46err].dst, self.dst_addr) + self._test.assertEqual( + rx[ICMP46][IP46err][L4err].sport, self.src_port) + self._test.assertEqual( + rx[ICMP46][IP46err][L4err].dport, self.dst_port) + return self + +# ------------------------------------------------------------------- +# ------------------------------------------------------------------- +# ------------------------------------------------------------------- +# ------------------------------------------------------------------- + + +class TestCNatTranslation(CnatCommonTestCase): """ CNat Translation """ - extra_vpp_punt_config = ["cnat", "{", - "session-db-buckets", "64", - "session-cleanup-timeout", "0.1", - "session-max-age", "1", - "tcp-max-age", "1", - "scanner", "off", "}"] @classmethod def setUpClass(cls): @@ -158,6 +333,8 @@ class TestCNatTranslation(VppTestCase): super(TestCNatTranslation, self).setUp() self.create_pg_interfaces(range(3)) + self.pg0.generate_remote_hosts(N_REMOTE_HOSTS) + self.pg1.generate_remote_hosts(N_REMOTE_HOSTS) for i in self.pg_interfaces: i.admin_up() @@ -165,188 +342,87 @@ class TestCNatTranslation(VppTestCase): i.resolve_arp() i.config_ip6() i.resolve_ndp() + i.configure_ipv4_neighbors() + i.configure_ipv6_neighbors() def tearDown(self): + for translation in self.translations: + translation.remove_vpp_config() + + self.vapi.cnat_session_purge() + self.assertFalse(self.vapi.cnat_session_dump()) + for i in self.pg_interfaces: i.unconfig_ip4() i.unconfig_ip6() i.admin_down() super(TestCNatTranslation, self).tearDown() - def cnat_create_translation(self, vip, nbr): - ip_v = "ip6" if vip.isV6 else "ip4" - dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr) - sep = Ep("::", 0) if vip.isV6 else Ep("0.0.0.0", 0) - t1 = VppCNatTranslation( - self, vip.l4p, vip, - [EpTuple(sep, dep), EpTuple(sep, dep)]) - t1.add_vpp_config() - return t1 - - def cnat_test_translation(self, t1, nbr, sports, isV6=False): - ip_v = "ip6" if isV6 else "ip4" - ip_class = IPv6 if isV6 else IP - vip = t1.vip + def cnat_translation(self): + """ CNat Translation """ + self.logger.info(self.vapi.cli("sh cnat client")) + self.logger.info(self.vapi.cli("sh cnat translation")) + + for nbr, translation in enumerate(self.translations): + vip = translation.vip - # - # Flows - # - for src in self.pg0.remote_hosts: - for sport in sports: + # + # Test Flows to the VIP + # + ctx = CnatTestContext(self, translation.iproto, vip.is_v6) + for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]): # from client to vip - p1 = (Ether(dst=self.pg0.local_mac, - src=src.mac) / - ip_class(src=getattr(src, ip_v), dst=vip.ip) / - vip.l4p(sport=sport, dport=vip.port) / - Raw()) - - self.vapi.cli("trace add pg-input 1") - rxs = self.send_and_expect(self.pg0, - p1 * N_PKTS, - self.pg1) - self.logger.info(self.vapi.cli("show trace max 1")) - - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual( - rx[ip_class].dst, - getattr(self.pg1.remote_hosts[nbr], ip_v)) - self.assertEqual(rx[vip.l4p].dport, 4000 + nbr) - self.assertEqual( - rx[ip_class].src, - getattr(src, ip_v)) - self.assertEqual(rx[vip.l4p].sport, sport) - + ctx.cnat_send(self.pg0, src_pgi, sport, + self.pg1, vip.ip, vip.port) + dst_port = translation.paths[0][DST].port + ctx.cnat_expect(self.pg0, src_pgi, sport, + self.pg1, nbr, dst_port) # from vip to client - p1 = (Ether(dst=self.pg1.local_mac, - src=self.pg1.remote_mac) / - ip_class(src=getattr( - self.pg1.remote_hosts[nbr], - ip_v), - dst=getattr(src, ip_v)) / - vip.l4p(sport=4000 + nbr, dport=sport) / - Raw()) - - rxs = self.send_and_expect(self.pg1, - p1 * N_PKTS, - self.pg0) - - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual( - rx[ip_class].dst, - getattr(src, ip_v)) - self.assertEqual(rx[vip.l4p].dport, sport) - self.assertEqual(rx[ip_class].src, vip.ip) - self.assertEqual(rx[vip.l4p].sport, vip.port) + ctx.cnat_send_return().cnat_expect_return() # # packets to the VIP that do not match a # translation are dropped # - p1 = (Ether(dst=self.pg0.local_mac, - src=src.mac) / - ip_class(src=getattr(src, ip_v), dst=vip.ip) / - vip.l4p(sport=sport, dport=6666) / - Raw()) - - self.send_and_assert_no_replies(self.pg0, - p1 * N_PKTS, - self.pg1) + ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, + vip.ip, 6666, no_replies=True) # # packets from the VIP that do not match a # session are forwarded # - p1 = (Ether(dst=self.pg1.local_mac, - src=self.pg1.remote_mac) / - ip_class(src=getattr( - self.pg1.remote_hosts[nbr], - ip_v), - dst=getattr(src, ip_v)) / - vip.l4p(sport=6666, dport=sport) / - Raw()) - - rxs = self.send_and_expect(self.pg1, - p1 * N_PKTS, - self.pg0) - - def cnat_test_translation_update(self, t1, sports, isV6=False): - ip_v = "ip6" if isV6 else "ip4" - ip_class = IPv6 if isV6 else IP - vip = t1.vip - - # - # modify the translation to use a different backend - # - dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000) - sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0) - t1.modify_vpp_config([EpTuple(sep, dep)]) - - # - # existing flows follow the old path - # - for src in self.pg0.remote_hosts: - for sport in sports: - # from client to vip - p1 = (Ether(dst=self.pg0.local_mac, - src=src.mac) / - ip_class(src=getattr(src, ip_v), dst=vip.ip) / - vip.l4p(sport=sport, dport=vip.port) / - Raw()) - - rxs = self.send_and_expect(self.pg0, - p1 * N_PKTS, - self.pg1) - - # - # new flows go to the new backend - # - for src in self.pg0.remote_hosts: - p1 = (Ether(dst=self.pg0.local_mac, - src=src.mac) / - ip_class(src=getattr(src, ip_v), dst=vip.ip) / - vip.l4p(sport=9999, dport=vip.port) / - Raw()) - - rxs = self.send_and_expect(self.pg0, - p1 * N_PKTS, - self.pg2) - - def cnat_translation(self, vips, isV6=False): - """ CNat Translation """ - - ip_class = IPv6 if isV6 else IP - ip_v = "ip6" if isV6 else "ip4" - sports = [1234, 1233] - - # - # turn the scanner off whilst testing otherwise sessions - # will time out - # - self.vapi.cli("test cnat scanner off") - - sessions = self.vapi.cnat_session_dump() - - trs = [] - for nbr, vip in enumerate(vips): - trs.append(self.cnat_create_translation(vip, nbr)) - - self.logger.info(self.vapi.cli("sh cnat client")) - self.logger.info(self.vapi.cli("sh cnat translation")) + ctx.cnat_send(self.pg1, nbr, 6666, self.pg0, src_pgi, sport) + ctx.cnat_expect(self.pg1, nbr, 6666, self.pg0, src_pgi, sport) + + # + # modify the translation to use a different backend + # + old_dst_port = translation.paths[0][DST].port + translation.paths[0][DST].udpate( + pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6) + translation.add_vpp_config() + + # + # existing flows follow the old path + # + for src_pgi in range(N_REMOTE_HOSTS): + for sport in [1234, 1233]: + # from client to vip + ctx.cnat_send(self.pg0, src_pgi, sport, + self.pg1, vip.ip, vip.port) + ctx.cnat_expect(self.pg0, src_pgi, sport, + self.pg1, nbr, old_dst_port) + # from vip to client + ctx.cnat_send_return().cnat_expect_return() + + # + # new flows go to the new backend + # + for src_pgi in range(N_REMOTE_HOSTS): + ctx.cnat_send(self.pg0, src_pgi, 9999, + self.pg2, vip.ip, vip.port) + ctx.cnat_expect(self.pg0, src_pgi, 9999, self.pg2, 0, 5000) - # - # translations - # - for nbr, vip in enumerate(vips): - self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6) - self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6) - if isV6: - self.logger.info(self.vapi.cli( - "sh ip6 fib %s" % self.pg0.remote_ip6)) - else: - self.logger.info(self.vapi.cli( - "sh ip fib %s" % self.pg0.remote_ip4)) self.logger.info(self.vapi.cli("sh cnat session verbose")) # @@ -369,168 +445,114 @@ class TestCNatTranslation(VppTestCase): # # load some flows again and purge # - for vip in vips: - for src in self.pg0.remote_hosts: - for sport in sports: + for translation in self.translations: + vip = translation.vip + ctx = CnatTestContext(self, translation.iproto, vip.is_v6) + for src_pgi in range(N_REMOTE_HOSTS): + for sport in [1234, 1233]: # from client to vip - p1 = (Ether(dst=self.pg0.local_mac, - src=src.mac) / - ip_class(src=getattr(src, ip_v), dst=vip.ip) / - vip.l4p(sport=sport, dport=vip.port) / - Raw()) - self.send_and_expect(self.pg0, - p1 * N_PKTS, - self.pg2) - - for tr in trs: - tr.remove_vpp_config() - - self.assertTrue(self.vapi.cnat_session_dump()) - self.vapi.cnat_session_purge() - self.assertFalse(self.vapi.cnat_session_dump()) + ctx.cnat_send(self.pg0, src_pgi, sport, + self.pg2, vip.ip, vip.port) + ctx.cnat_expect(self.pg0, src_pgi, + sport, self.pg2, 0, 5000) - def test_icmp(self): - vips = [ - Ep("30.0.0.1", 5555), - Ep("30.0.0.2", 5554), - Ep("30.0.0.2", 5553, UDP), - Ep("30::1", 6666), - Ep("30::2", 5553, UDP), - ] - sport = 1234 - - self.pg0.generate_remote_hosts(len(vips)) - self.pg0.configure_ipv6_neighbors() - self.pg0.configure_ipv4_neighbors() - - self.pg1.generate_remote_hosts(len(vips)) - self.pg1.configure_ipv6_neighbors() - self.pg1.configure_ipv4_neighbors() + def _test_icmp(self): - self.vapi.cli("test cnat scanner off") - trs = [] - for nbr, vip in enumerate(vips): - trs.append(self.cnat_create_translation(vip, nbr)) - - self.logger.info(self.vapi.cli("sh cnat client")) - self.logger.info(self.vapi.cli("sh cnat translation")) - - for nbr, vip in enumerate(vips): - if vip.isV6: - client_addr = self.pg0.remote_hosts[0].ip6 - remote_addr = self.pg1.remote_hosts[nbr].ip6 - remote2_addr = self.pg2.remote_hosts[0].ip6 - else: - client_addr = self.pg0.remote_hosts[0].ip4 - remote_addr = self.pg1.remote_hosts[nbr].ip4 - remote2_addr = self.pg2.remote_hosts[0].ip4 - IP46 = IPv6 if vip.isV6 else IP - # from client to vip - p1 = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_hosts[0].mac) / - IP46(src=client_addr, dst=vip.ip) / - vip.l4p(sport=sport, dport=vip.port) / - Raw()) - - rxs = self.send_and_expect(self.pg0, - p1 * N_PKTS, - self.pg1) - - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual(rx[IP46].dst, remote_addr) - self.assertEqual(rx[vip.l4p].dport, 4000 + nbr) - self.assertEqual(rx[IP46].src, client_addr) - self.assertEqual(rx[vip.l4p].sport, sport) - - InnerIP = rxs[0][IP46] - - ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP - ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11) - # from vip to client, ICMP error - p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IP46(src=remote_addr, dst=client_addr) / - ICMPelem / InnerIP) - - rxs = self.send_and_expect(self.pg1, - p1 * N_PKTS, - self.pg0) - - TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror - IP46error = IPerror6 if vip.isV6 else IPerror - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual(rx[IP46].src, vip.ip) - self.assertEqual(rx[ICMP46][IP46error].src, client_addr) - self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip) - self.assertEqual(rx[ICMP46][IP46error] - [TCPUDPError].sport, sport) - self.assertEqual(rx[ICMP46][IP46error] - [TCPUDPError].dport, vip.port) - - # from other remote to client, ICMP error - # outside shouldn't be NAT-ed - p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) / - IP46(src=remote2_addr, dst=client_addr) / - ICMPelem / InnerIP) - - rxs = self.send_and_expect(self.pg1, - p1 * N_PKTS, - self.pg0) - - TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror - IP46error = IPerror6 if vip.isV6 else IPerror - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual(rx[IP46].src, remote2_addr) - self.assertEqual(rx[ICMP46][IP46error].src, client_addr) - self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip) - self.assertEqual(rx[ICMP46][IP46error] - [TCPUDPError].sport, sport) - self.assertEqual(rx[ICMP46][IP46error] - [TCPUDPError].dport, vip.port) - - self.vapi.cnat_session_purge() + # + # Testing ICMP + # + for nbr, translation in enumerate(self.translations): + vip = translation.vip + ctx = CnatTestContext(self, translation.iproto, vip.is_v6) + + # + # NATing ICMP errors + # + ctx.cnat_send(self.pg0, 0, 1234, self.pg1, vip.ip, vip.port) + dst_port = translation.paths[0][DST].port + ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, nbr, dst_port) + ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return() + + # + # ICMP errors with no VIP associated should not be + # modified + # + ctx.cnat_send(self.pg0, 0, 1234, self.pg2, 0, vip.port) + dst_port = translation.paths[0][DST].port + ctx.cnat_expect(self.pg0, 0, 1234, self.pg2, 0, vip.port) + ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return() + + def _make_translations_v4(self): + self.translations = [] + self.translations.append(Translation( + self, TCP, Endpoint(ip="30.0.0.1", port=5555, is_v6=False), + [( + Endpoint(is_v6=False), + Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False), + )] + ).add_vpp_config()) + self.translations.append(Translation( + self, TCP, Endpoint(ip="30.0.0.2", port=5554, is_v6=False), + [( + Endpoint(is_v6=False), + Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False), + )] + ).add_vpp_config()) + self.translations.append(Translation( + self, UDP, Endpoint(ip="30.0.0.2", port=5553, is_v6=False), + [( + Endpoint(is_v6=False), + Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False), + )] + ).add_vpp_config()) + + def _make_translations_v6(self): + self.translations = [] + self.translations.append(Translation( + self, TCP, Endpoint(ip="30::1", port=5555, is_v6=True), + [( + Endpoint(is_v6=True), + Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True), + )] + ).add_vpp_config()) + self.translations.append(Translation( + self, TCP, Endpoint(ip="30::2", port=5554, is_v6=True), + [( + Endpoint(is_v6=True), + Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True), + )] + ).add_vpp_config()) + self.translations.append(Translation( + self, UDP, Endpoint(ip="30::2", port=5553, is_v6=True), + [( + Endpoint(is_v6=True), + Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True), + )] + ).add_vpp_config()) + + def test_icmp4(self): + # """ CNat Translation icmp v4 """ + self._make_translations_v4() + self._test_icmp() + + def test_icmp6(self): + # """ CNat Translation icmp v6 """ + self._make_translations_v6() + self._test_icmp() def test_cnat6(self): # """ CNat Translation ipv6 """ - vips = [ - Ep("30::1", 5555), - Ep("30::2", 5554), - Ep("30::2", 5553, UDP), - ] - - self.pg0.generate_remote_hosts(len(vips)) - self.pg0.configure_ipv6_neighbors() - self.pg1.generate_remote_hosts(len(vips)) - self.pg1.configure_ipv6_neighbors() - - self.cnat_translation(vips, isV6=True) + self._make_translations_v6() + self.cnat_translation() def test_cnat4(self): # """ CNat Translation ipv4 """ + self._make_translations_v4() + self.cnat_translation() - vips = [ - Ep("30.0.0.1", 5555), - Ep("30.0.0.2", 5554), - Ep("30.0.0.2", 5553, UDP), - ] - - self.pg0.generate_remote_hosts(len(vips)) - self.pg0.configure_ipv4_neighbors() - self.pg1.generate_remote_hosts(len(vips)) - self.pg1.configure_ipv4_neighbors() - self.cnat_translation(vips) - - -class TestCNatSourceNAT(VppTestCase): +class TestCNatSourceNAT(CnatCommonTestCase): """ CNat Source NAT """ - extra_vpp_punt_config = ["cnat", "{", - "session-cleanup-timeout", "0.1", - "session-max-age", "1", - "tcp-max-age", "1", - "scanner", "off", "}"] @classmethod def setUpClass(cls): @@ -540,35 +562,18 @@ class TestCNatSourceNAT(VppTestCase): def tearDownClass(cls): super(TestCNatSourceNAT, cls).tearDownClass() - def setUp(self): - super(TestCNatSourceNAT, self).setUp() - - self.create_pg_interfaces(range(3)) - - for i in self.pg_interfaces: - i.admin_up() - i.config_ip4() - i.resolve_arp() - i.config_ip6() - i.resolve_ndp() - - self.pg0.configure_ipv6_neighbors() - self.pg0.configure_ipv4_neighbors() - self.pg1.generate_remote_hosts(2) - self.pg1.configure_ipv4_neighbors() - self.pg1.configure_ipv6_neighbors() - + def _enable_disable_snat(self, is_enable=True): self.vapi.cnat_set_snat_addresses( snat_ip4=self.pg2.remote_hosts[0].ip4, snat_ip6=self.pg2.remote_hosts[0].ip6, sw_if_index=INVALID_INDEX) self.vapi.feature_enable_disable( - enable=1, + enable=1 if is_enable else 0, arc_name="ip6-unicast", feature_name="cnat-snat-ip6", sw_if_index=self.pg0.sw_if_index) self.vapi.feature_enable_disable( - enable=1, + enable=1 if is_enable else 0, arc_name="ip4-unicast", feature_name="cnat-snat-ip4", sw_if_index=self.pg0.sw_if_index) @@ -578,13 +583,32 @@ class TestCNatSourceNAT(VppTestCase): policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX) for i in self.pg_interfaces: self.vapi.cnat_snat_policy_add_del_if( - sw_if_index=i.sw_if_index, is_add=1, + sw_if_index=i.sw_if_index, is_add=1 if is_enable else 0, table=policie_tbls.CNAT_POLICY_INCLUDE_V6) self.vapi.cnat_snat_policy_add_del_if( - sw_if_index=i.sw_if_index, is_add=1, + sw_if_index=i.sw_if_index, is_add=1 if is_enable else 0, table=policie_tbls.CNAT_POLICY_INCLUDE_V4) + def setUp(self): + super(TestCNatSourceNAT, self).setUp() + + self.create_pg_interfaces(range(3)) + self.pg1.generate_remote_hosts(2) + + for i in self.pg_interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + i.config_ip6() + i.resolve_ndp() + i.configure_ipv6_neighbors() + i.configure_ipv4_neighbors() + + self._enable_disable_snat(is_enable=True) + def tearDown(self): + self._enable_disable_snat(is_enable=True) + self.vapi.cnat_session_purge() for i in self.pg_interfaces: i.unconfig_ip4() @@ -594,272 +618,72 @@ class TestCNatSourceNAT(VppTestCase): def test_snat_v6(self): # """ CNat Source Nat v6 """ - self.sourcenat_test_tcp_udp_conf(TCP, isV6=True) - self.sourcenat_test_tcp_udp_conf(UDP, isV6=True) - self.sourcenat_test_icmp_err_conf(isV6=True) - self.sourcenat_test_icmp_echo6_conf() + self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True) + self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True) + self.sourcenat_test_icmp_echo_conf(is_v6=True) def test_snat_v4(self): # """ CNat Source Nat v4 """ self.sourcenat_test_tcp_udp_conf(TCP) self.sourcenat_test_tcp_udp_conf(UDP) - self.sourcenat_test_icmp_err_conf() - self.sourcenat_test_icmp_echo4_conf() - - def sourcenat_test_icmp_echo6_conf(self): - sports = [1234, 1235] - dports = [6661, 6662] - - for nbr, remote_host in enumerate(self.pg1.remote_hosts): - client_addr = self.pg0.remote_hosts[0].ip6 - remote_addr = self.pg1.remote_hosts[nbr].ip6 - src_nat_addr = self.pg2.remote_hosts[0].ip6 - - # ping from pods to outside network - p1 = ( - Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_hosts[0].mac) / - IPv6(src=client_addr, dst=remote_addr) / - ICMPv6EchoRequest(id=0xfeed) / - Raw()) - - rxs = self.send_and_expect( - self.pg0, - p1 * N_PKTS, - self.pg1) - - for rx in rxs: - self.assertEqual(rx[IPv6].src, src_nat_addr) - self.assert_packet_checksums_valid(rx) - - received_id = rx[0][ICMPv6EchoRequest].id - # ping reply from outside to pods - p2 = ( - Ether(dst=self.pg1.local_mac, - src=self.pg1.remote_hosts[nbr].mac) / - IPv6(src=remote_addr, dst=src_nat_addr) / - ICMPv6EchoReply(id=received_id)) - rxs = self.send_and_expect( - self.pg1, - p2 * N_PKTS, - self.pg0) - - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual(rx[IPv6].src, remote_addr) - self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed) - - def sourcenat_test_icmp_echo4_conf(self): - sports = [1234, 1235] - dports = [6661, 6662] - - for nbr, remote_host in enumerate(self.pg1.remote_hosts): - IP46 = IP - client_addr = self.pg0.remote_hosts[0].ip4 - remote_addr = self.pg1.remote_hosts[nbr].ip4 - src_nat_addr = self.pg2.remote_hosts[0].ip4 - - # ping from pods to outside network - p1 = ( - Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_hosts[0].mac) / - IP46(src=client_addr, dst=remote_addr) / - ICMP(type=8, id=0xfeed) / - Raw()) - - rxs = self.send_and_expect( - self.pg0, - p1 * N_PKTS, - self.pg1) - - for rx in rxs: - self.assertEqual(rx[IP46].src, src_nat_addr) - self.assert_packet_checksums_valid(rx) - - received_id = rx[0][ICMP].id - # ping reply from outside to pods - p2 = ( - Ether(dst=self.pg1.local_mac, - src=self.pg1.remote_hosts[nbr].mac) / - IP46(src=remote_addr, dst=src_nat_addr) / - ICMP(type=0, id=received_id)) - rxs = self.send_and_expect( - self.pg1, - p2 * N_PKTS, - self.pg0) - - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual(rx[IP46].src, remote_addr) - self.assertEqual(rx[ICMP].id, 0xfeed) - - def sourcenat_test_icmp_err_conf(self, isV6=False): - sports = [1234, 1235] - dports = [6661, 6662] - - for nbr, remote_host in enumerate(self.pg1.remote_hosts): - if isV6: - IP46 = IPv6 - client_addr = self.pg0.remote_hosts[0].ip6 - remote_addr = self.pg1.remote_hosts[nbr].ip6 - src_nat_addr = self.pg2.remote_hosts[0].ip6 - ICMP46 = ICMPv6DestUnreach - ICMPelem = ICMPv6DestUnreach(code=1) - IP46error = IPerror6 - else: - IP46 = IP - client_addr = self.pg0.remote_hosts[0].ip4 - remote_addr = self.pg1.remote_hosts[nbr].ip4 - src_nat_addr = self.pg2.remote_hosts[0].ip4 - IP46error = IPerror - ICMP46 = ICMP - ICMPelem = ICMP(type=11) - - # from pods to outside network - p1 = ( - Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_hosts[0].mac) / - IP46(src=client_addr, dst=remote_addr) / - TCP(sport=sports[nbr], dport=dports[nbr]) / - Raw()) - - rxs = self.send_and_expect( - self.pg0, - p1 * N_PKTS, - self.pg1) - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual(rx[IP46].dst, remote_addr) - self.assertEqual(rx[TCP].dport, dports[nbr]) - self.assertEqual(rx[IP46].src, src_nat_addr) - sport = rx[TCP].sport - - InnerIP = rxs[0][IP46] - # from outside to pods, ICMP error - p2 = ( - Ether(dst=self.pg1.local_mac, - src=self.pg1.remote_hosts[nbr].mac) / - IP46(src=remote_addr, dst=src_nat_addr) / - ICMPelem / InnerIP) - - rxs = self.send_and_expect( - self.pg1, - p2 * N_PKTS, - self.pg0) - - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual(rx[IP46].src, remote_addr) - self.assertEqual(rx[ICMP46][IP46error].src, client_addr) - self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr) - self.assertEqual(rx[ICMP46][IP46error] - [TCPerror].sport, sports[nbr]) - self.assertEqual(rx[ICMP46][IP46error] - [TCPerror].dport, dports[nbr]) - - def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False): - sports = [1234, 1235] - dports = [6661, 6662] - - for nbr, remote_host in enumerate(self.pg1.remote_hosts): - if isV6: - IP46 = IPv6 - client_addr = self.pg0.remote_hosts[0].ip6 - remote_addr = self.pg1.remote_hosts[nbr].ip6 - src_nat_addr = self.pg2.remote_hosts[0].ip6 - exclude_prefix = ip_network( - "%s/100" % remote_addr, strict=False) - else: - IP46 = IP - client_addr = self.pg0.remote_hosts[0].ip4 - remote_addr = self.pg1.remote_hosts[nbr].ip4 - src_nat_addr = self.pg2.remote_hosts[0].ip4 - exclude_prefix = ip_network( - "%s/16" % remote_addr, strict=False) - # from pods to outside network - p1 = ( - Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_hosts[0].mac) / - IP46(src=client_addr, dst=remote_addr) / - l4p(sport=sports[nbr], dport=dports[nbr]) / - Raw()) - - self.vapi.cli("trace add pg-input 1") - rxs = self.send_and_expect( - self.pg0, - p1 * N_PKTS, - self.pg1) - self.logger.info(self.vapi.cli("show trace max 1")) - - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual(rx[IP46].dst, remote_addr) - self.assertEqual(rx[l4p].dport, dports[nbr]) - self.assertEqual(rx[IP46].src, src_nat_addr) - sport = rx[l4p].sport - - # from outside to pods - p2 = ( - Ether(dst=self.pg1.local_mac, - src=self.pg1.remote_hosts[nbr].mac) / - IP46(src=remote_addr, dst=src_nat_addr) / - l4p(sport=dports[nbr], dport=sport) / - Raw()) - - rxs = self.send_and_expect( - self.pg1, - p2 * N_PKTS, - self.pg0) - - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual(rx[IP46].dst, client_addr) - self.assertEqual(rx[l4p].dport, sports[nbr]) - self.assertEqual(rx[l4p].sport, dports[nbr]) - self.assertEqual(rx[IP46].src, remote_addr) - - # add remote host to exclude list - self.vapi.cnat_snat_policy_add_del_exclude_pfx( - prefix=exclude_prefix, is_add=1) - self.vapi.cnat_session_purge() - - rxs = self.send_and_expect( - self.pg0, - p1 * N_PKTS, - self.pg1) - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual(rx[IP46].dst, remote_addr) - self.assertEqual(rx[l4p].dport, dports[nbr]) - self.assertEqual(rx[IP46].src, client_addr) - - # remove remote host from exclude list - self.vapi.cnat_snat_policy_add_del_exclude_pfx( - prefix=exclude_prefix, is_add=0) - self.vapi.cnat_session_purge() - - rxs = self.send_and_expect( - self.pg0, - p1 * N_PKTS, - self.pg1) - - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual(rx[IP46].dst, remote_addr) - self.assertEqual(rx[l4p].dport, dports[nbr]) - self.assertEqual(rx[IP46].src, src_nat_addr) - - self.vapi.cnat_session_purge() - - -class TestCNatDHCP(VppTestCase): + self.sourcenat_test_icmp_echo_conf() + + def sourcenat_test_icmp_echo_conf(self, is_v6=False): + ctx = CnatTestContext(self, ICMP, is_v6=is_v6) + # 8 is ICMP type echo (v4 only) + ctx.cnat_send(self.pg0, 0, 0xfeed, self.pg1, 0, 8) + ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8) + ctx.cnat_send_return().cnat_expect_return() + + def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False): + ctx = CnatTestContext(self, L4PROTO, is_v6) + # we should source NAT + ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661) + ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661) + ctx.cnat_send_return().cnat_expect_return() + + # exclude dst address of pg1.1 from snat + if is_v6: + exclude_prefix = ip_network( + "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False) + else: + exclude_prefix = ip_network( + "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False) + + # add remote host to exclude list + self.vapi.cnat_snat_policy_add_del_exclude_pfx( + prefix=exclude_prefix, is_add=1) + + # We should not source NAT the id=1 + ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661) + ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, 1, 6661) + ctx.cnat_send_return().cnat_expect_return() + + # But we should source NAT the id=0 + ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661) + ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661) + ctx.cnat_send_return().cnat_expect_return() + + # remove remote host from exclude list + self.vapi.cnat_snat_policy_add_del_exclude_pfx( + prefix=exclude_prefix, is_add=0) + self.vapi.cnat_session_purge() + + # We should source NAT again + ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661) + ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661) + ctx.cnat_send_return().cnat_expect_return() + + # test return ICMP error nating + ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661) + ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661) + ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return() + + self.vapi.cnat_session_purge() + + +class TestCNatDHCP(CnatCommonTestCase): """ CNat Translation """ - extra_vpp_punt_config = ["cnat", "{", - "session-db-buckets", "64", - "session-cleanup-timeout", "0.1", - "session-max-age", "1", - "tcp-max-age", "1", - "scanner", "off", "}"] @classmethod def setUpClass(cls): @@ -874,100 +698,101 @@ class TestCNatDHCP(VppTestCase): i.admin_down() super(TestCNatDHCP, self).tearDown() - def create_translation(self, vip_pg, *args, is_v6=False): - vip = Ep(sw_if_index=vip_pg.sw_if_index, is_v6=is_v6) - paths = [] - for (src_pg, dst_pg) in args: - paths.append(EpTuple( - Ep.from_pg(src_pg, is_v6=is_v6), - Ep.from_pg(dst_pg, is_v6=is_v6) - )) - t1 = VppCNatTranslation(self, TCP, vip, paths) - t1.add_vpp_config() - return t1 - - def make_addr(self, sw_if_index, i, is_v6): + def make_addr(self, sw_if_index, addr_id, is_v6): if is_v6: - return "fd01:%x::%u" % (sw_if_index, i + 1) - else: - return "172.16.%u.%u" % (sw_if_index, i) + return "fd01:%x::%u" % (sw_if_index, addr_id + 1) + return "172.16.%u.%u" % (sw_if_index, addr_id) - def make_prefix(self, sw_if_index, i, is_v6): + def make_prefix(self, sw_if_index, addr_id, is_v6): if is_v6: - return "%s/128" % self.make_addr(sw_if_index, i, is_v6) - else: - return "%s/32" % self.make_addr(sw_if_index, i, is_v6) - - def check_resolved(self, tr, vip_pg, *args, i=0, is_v6=False): - qt1 = tr.query_vpp_config() - self.assertEqual(str(qt1.vip.addr), self.make_addr( - vip_pg.sw_if_index, i, is_v6)) - for (src_pg, dst_pg), path in zip(args, qt1.paths): - if src_pg: - self.assertEqual(str(path.src_ep.addr), self.make_addr( - src_pg.sw_if_index, i, is_v6)) - if dst_pg: - self.assertEqual(str(path.dst_ep.addr), self.make_addr( - dst_pg.sw_if_index, i, is_v6)) - - def config_ips(self, rng, is_add=1, is_v6=False): - for pg, i in product(self.pg_interfaces, rng): - self.vapi.sw_interface_add_del_address( - sw_if_index=pg.sw_if_index, - prefix=self.make_prefix(pg.sw_if_index, i, is_v6), - is_add=is_add) - - def test_dhcp_v4(self): - self.create_pg_interfaces(range(5)) + return "%s/128" % self.make_addr(sw_if_index, addr_id, is_v6) + return "%s/32" % self.make_addr(sw_if_index, addr_id, is_v6) + + def check_resolved(self, tr, addr_id, is_v6=False): + qt = tr.query_vpp_config() + self.assertEqual(str(qt.vip.addr), self.make_addr( + tr.vip.sw_if_index, addr_id, is_v6)) + self.assertEqual(len(qt.paths), len(tr.paths)) + for path_tr, path_qt in zip(tr.paths, qt.paths): + src_qt = path_qt.src_ep + dst_qt = path_qt.dst_ep + src_tr, dst_tr = path_tr + self.assertEqual(str(src_qt.addr), self.make_addr( + src_tr.sw_if_index, addr_id, is_v6)) + self.assertEqual(str(dst_qt.addr), self.make_addr( + dst_tr.sw_if_index, addr_id, is_v6)) + + def add_del_address(self, pg, addr_id, is_add=True, is_v6=False): + self.vapi.sw_interface_add_del_address( + sw_if_index=pg.sw_if_index, + prefix=self.make_prefix(pg.sw_if_index, addr_id, is_v6), + is_add=1 if is_add else 0) + + def _test_dhcp_v46(self, is_v6): + self.create_pg_interfaces(range(4)) for i in self.pg_interfaces: i.admin_up() - pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4)) - t1 = self.create_translation(*pglist) - self.config_ips([0]) - self.check_resolved(t1, *pglist) - self.config_ips([1]) - self.config_ips([0], is_add=0) - self.check_resolved(t1, *pglist, i=1) - self.config_ips([1], is_add=0) - t1.remove_vpp_config() + paths = [ + (Endpoint(pg=self.pg1, is_v6=is_v6), + Endpoint(pg=self.pg2, is_v6=is_v6)), + (Endpoint(pg=self.pg1, is_v6=is_v6), + Endpoint(pg=self.pg3, is_v6=is_v6)) + ] + ep = Endpoint(pg=self.pg0, is_v6=is_v6) + t = Translation(self, TCP, ep, paths).add_vpp_config() + # Add an address on every interface + # and check it is reflected in the cnat config + for pg in self.pg_interfaces: + self.add_del_address(pg, addr_id=0, is_add=True, is_v6=is_v6) + self.check_resolved(t, addr_id=0, is_v6=is_v6) + # Add a new address on every interface, remove the old one + # and check it is reflected in the cnat config + for pg in self.pg_interfaces: + self.add_del_address(pg, addr_id=1, is_add=True, is_v6=is_v6) + self.add_del_address(pg, addr_id=0, is_add=False, is_v6=is_v6) + self.check_resolved(t, addr_id=1, is_v6=is_v6) + # remove the configuration + for pg in self.pg_interfaces: + self.add_del_address(pg, addr_id=1, is_add=False, is_v6=is_v6) + t.remove_vpp_config() + + def test_dhcp_v4(self): + self._test_dhcp_v46(False) def test_dhcp_v6(self): - self.create_pg_interfaces(range(5)) - for i in self.pg_interfaces: - i.admin_up() - pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4)) - t1 = self.create_translation(*pglist, is_v6=True) - self.config_ips([0], is_v6=True) - self.check_resolved(t1, *pglist, is_v6=True) - self.config_ips([1], is_v6=True) - self.config_ips([0], is_add=0, is_v6=True) - self.check_resolved(t1, *pglist, i=1, is_v6=True) - self.config_ips([1], is_add=0, is_v6=True) - t1.remove_vpp_config() + self._test_dhcp_v46(True) def test_dhcp_snat(self): self.create_pg_interfaces(range(1)) for i in self.pg_interfaces: i.admin_up() self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index) - self.config_ips([0], is_v6=False) - self.config_ips([0], is_v6=True) + # Add an address on every interface + # and check it is reflected in the cnat config + for pg in self.pg_interfaces: + self.add_del_address(pg, addr_id=0, is_add=True, is_v6=False) + self.add_del_address(pg, addr_id=0, is_add=True, is_v6=True) r = self.vapi.cnat_get_snat_addresses() self.assertEqual(str(r.snat_ip4), self.make_addr( - self.pg0.sw_if_index, 0, False)) + self.pg0.sw_if_index, addr_id=0, is_v6=False)) self.assertEqual(str(r.snat_ip6), self.make_addr( - self.pg0.sw_if_index, 0, True)) - self.config_ips([1], is_v6=False) - self.config_ips([1], is_v6=True) - self.config_ips([0], is_add=0, is_v6=False) - self.config_ips([0], is_add=0, is_v6=True) + self.pg0.sw_if_index, addr_id=0, is_v6=True)) + # Add a new address on every interface, remove the old one + # and check it is reflected in the cnat config + for pg in self.pg_interfaces: + self.add_del_address(pg, addr_id=1, is_add=True, is_v6=False) + self.add_del_address(pg, addr_id=1, is_add=True, is_v6=True) + self.add_del_address(pg, addr_id=0, is_add=False, is_v6=False) + self.add_del_address(pg, addr_id=0, is_add=False, is_v6=True) r = self.vapi.cnat_get_snat_addresses() self.assertEqual(str(r.snat_ip4), self.make_addr( - self.pg0.sw_if_index, 1, False)) + self.pg0.sw_if_index, addr_id=1, is_v6=False)) self.assertEqual(str(r.snat_ip6), self.make_addr( - self.pg0.sw_if_index, 1, True)) - self.config_ips([1], is_add=0, is_v6=False) - self.config_ips([1], is_add=0, is_v6=True) + self.pg0.sw_if_index, addr_id=1, is_v6=True)) + # remove the configuration + for pg in self.pg_interfaces: + self.add_del_address(pg, addr_id=1, is_add=False, is_v6=False) + self.add_del_address(pg, addr_id=1, is_add=False, is_v6=True) self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX) -- 2.16.6