5 from framework import VppTestCase
6 from asfframework import VppTestRunner
7 from vpp_ip import INVALID_INDEX
8 from itertools import product
9 from config import config
11 from scapy.packet import Raw
12 from scapy.layers.l2 import Ether
13 from scapy.layers.inet import IP, UDP, TCP, ICMP
14 from scapy.layers.inet import IPerror, TCPerror, UDPerror
15 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
16 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
18 from ipaddress import ip_network
20 from vpp_object import VppObject
21 from vpp_papi import VppEnum
30 class CnatCommonTestCase(VppTestCase):
31 """CNat common test class"""
34 # turn the scanner off whilst testing otherwise sessions
42 "session-cleanup-timeout",
55 super(CnatCommonTestCase, cls).setUpClass()
58 def tearDownClass(cls):
59 super(CnatCommonTestCase, cls).tearDownClass()
62 class Endpoint(object):
65 def __init__(self, pg=None, pgi=None, port=0, is_v6=False, ip=None):
68 self.sw_if_index = INVALID_INDEX
69 if pg is not None and pgi is not None:
70 # pg interface specified and remote index
71 self.ip = self.get_ip46(pg.remote_hosts[pgi])
74 self.sw_if_index = pg.sw_if_index
78 self.ip = "::" if self.is_v6 else "0.0.0.0"
80 def get_ip46(self, obj):
85 def udpate(self, **kwargs):
86 self.__init__(**kwargs)
90 return VppEnum.vl_api_address_family_t.ADDRESS_IP6
91 return VppEnum.vl_api_address_family_t.ADDRESS_IP4
97 "sw_if_index": self.sw_if_index,
98 "if_af": self._vpp_if_af(),
102 return "%s:%d" % (self.ip, self.port)
105 class Translation(VppObject):
106 def __init__(self, test, iproto, vip, paths, fhc):
115 return "%s %s %s" % (self.vip, self.iproto, self.paths)
117 def _vl4_proto(self):
118 ip_proto = VppEnum.vl_api_ip_proto_t
120 UDP: ip_proto.IP_API_PROTO_UDP,
121 TCP: ip_proto.IP_API_PROTO_TCP,
124 def _encoded_paths(self):
126 {"src_ep": src.encode(), "dst_ep": dst.encode()}
127 for (src, dst) in self.paths
130 def add_vpp_config(self):
131 r = self._test.vapi.cnat_translation_update(
133 "vip": self.vip.encode(),
134 "ip_proto": self._vl4_proto(),
135 "n_paths": len(self.paths),
136 "paths": self._encoded_paths(),
137 "flow_hash_config": self.fhc,
140 self._test.registry.register(self, self._test.logger)
144 def remove_vpp_config(self):
145 assert self.id is not None
146 self._test.vapi.cnat_translation_del(id=self.id)
149 def query_vpp_config(self):
150 for t in self._test.vapi.cnat_translation_dump():
151 if self.id == t.translation.id:
156 class CnatTestContext(object):
160 ctx = CnatTestContext(self, TCP, is_v6=True)
162 # send pg0.remote[0]:1234 -> pg1.remote[0]:6661
163 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
165 # We expect this to be NATed as
166 # pg2.remote[0]:<anyport> -> pg1.remote[0]:6661
167 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
169 # After running cnat_expect, we can send back the received packet
170 # and expect it be 'unnated' so that we get the original packet
171 ctx.cnat_send_return().cnat_expect_return()
173 # same thing for ICMP errors
174 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
177 def __init__(self, test, L4PROTO, is_v6):
178 self.L4PROTO = L4PROTO
182 def get_ip46(self, obj):
189 return IPv6 if self.is_v6 else IP
192 self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port, no_replies=False
194 if isinstance(src_id, int):
195 self.src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
197 self.dst_addr = src_id
198 if isinstance(dst_id, int):
199 self.dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
201 self.dst_addr = dst_id
202 self.src_port = src_port # also ICMP id
203 self.dst_port = dst_port # also ICMP type
205 if self.L4PROTO in [TCP, UDP]:
206 l4 = self.L4PROTO(sport=self.src_port, dport=self.dst_port)
207 elif self.L4PROTO in [ICMP] and not self.is_v6:
208 l4 = self.L4PROTO(id=self.src_port, type=self.dst_port)
209 elif self.L4PROTO in [ICMP] and self.is_v6:
210 l4 = ICMPv6EchoRequest(id=self.src_port)
212 Ether(src=src_pg.remote_mac, dst=src_pg.local_mac)
213 / self.IP46(src=self.src_addr, dst=self.dst_addr)
219 self._test.send_and_assert_no_replies(src_pg, p1 * N_PKTS, dst_pg)
221 self.rxs = self._test.send_and_expect(src_pg, p1 * N_PKTS, dst_pg)
222 self.expected_src_pg = src_pg
223 self.expected_dst_pg = dst_pg
226 def cnat_expect(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port):
227 if isinstance(src_id, int):
228 self.expect_src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
230 self.expect_src_addr = src_id
231 if isinstance(dst_id, int):
232 self.expect_dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
234 self.expect_dst_addr = dst_id
235 self.expect_src_port = src_port
236 self.expect_dst_port = dst_port
238 if self.expect_src_port is None:
239 if self.L4PROTO in [TCP, UDP]:
240 self.expect_src_port = self.rxs[0][self.L4PROTO].sport
241 elif self.L4PROTO in [ICMP] and not self.is_v6:
242 self.expect_src_port = self.rxs[0][self.L4PROTO].id
243 elif self.L4PROTO in [ICMP] and self.is_v6:
244 self.expect_src_port = self.rxs[0][ICMPv6EchoRequest].id
247 self._test.assert_packet_checksums_valid(rx)
248 self._test.assertEqual(rx[self.IP46].dst, self.expect_dst_addr)
249 self._test.assertEqual(rx[self.IP46].src, self.expect_src_addr)
250 if self.L4PROTO in [TCP, UDP]:
251 self._test.assertEqual(rx[self.L4PROTO].dport, self.expect_dst_port)
252 self._test.assertEqual(rx[self.L4PROTO].sport, self.expect_src_port)
253 elif self.L4PROTO in [ICMP] and not self.is_v6:
254 self._test.assertEqual(rx[self.L4PROTO].type, self.expect_dst_port)
255 self._test.assertEqual(rx[self.L4PROTO].id, self.expect_src_port)
256 elif self.L4PROTO in [ICMP] and self.is_v6:
257 self._test.assertEqual(rx[ICMPv6EchoRequest].id, self.expect_src_port)
260 def cnat_send_return(self):
261 """This sends the return traffic"""
262 if self.L4PROTO in [TCP, UDP]:
263 l4 = self.L4PROTO(sport=self.expect_dst_port, dport=self.expect_src_port)
264 elif self.L4PROTO in [ICMP] and not self.is_v6:
265 # icmp type 0 if echo reply
266 l4 = self.L4PROTO(id=self.expect_src_port, type=0)
267 elif self.L4PROTO in [ICMP] and self.is_v6:
268 l4 = ICMPv6EchoReply(id=self.expect_src_port)
269 src_mac = self.expected_dst_pg.remote_mac
271 Ether(src=src_mac, dst=self.expected_dst_pg.local_mac)
272 / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
277 self.return_rxs = self._test.send_and_expect(
278 self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
282 def cnat_expect_return(self):
283 for rx in self.return_rxs:
284 self._test.assert_packet_checksums_valid(rx)
285 self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
286 self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
287 if self.L4PROTO in [TCP, UDP]:
288 self._test.assertEqual(rx[self.L4PROTO].dport, self.src_port)
289 self._test.assertEqual(rx[self.L4PROTO].sport, self.dst_port)
290 elif self.L4PROTO in [ICMP] and not self.is_v6:
291 # icmp type 0 if echo reply
292 self._test.assertEqual(rx[self.L4PROTO].type, 0)
293 self._test.assertEqual(rx[self.L4PROTO].id, self.src_port)
294 elif self.L4PROTO in [ICMP] and self.is_v6:
295 self._test.assertEqual(rx[ICMPv6EchoReply].id, self.src_port)
298 def cnat_send_icmp_return_error(self):
300 This called after cnat_expect will send an icmp error
303 ICMPelem = ICMPv6DestUnreach(code=1) if self.is_v6 else ICMP(type=11)
304 InnerIP = self.rxs[0][self.IP46]
307 src=self.expected_dst_pg.remote_mac, dst=self.expected_dst_pg.local_mac
309 / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
313 self.return_rxs = self._test.send_and_expect(
314 self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
318 def cnat_expect_icmp_error_return(self):
319 ICMP46 = ICMPv6DestUnreach if self.is_v6 else ICMP
320 IP46err = IPerror6 if self.is_v6 else IPerror
321 L4err = TCPerror if self.L4PROTO is TCP else UDPerror
322 for rx in self.return_rxs:
323 self._test.assert_packet_checksums_valid(rx)
324 self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
325 self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
326 self._test.assertEqual(rx[ICMP46][IP46err].src, self.src_addr)
327 self._test.assertEqual(rx[ICMP46][IP46err].dst, self.dst_addr)
328 self._test.assertEqual(rx[ICMP46][IP46err][L4err].sport, self.src_port)
329 self._test.assertEqual(rx[ICMP46][IP46err][L4err].dport, self.dst_port)
333 # -------------------------------------------------------------------
334 # -------------------------------------------------------------------
335 # -------------------------------------------------------------------
336 # -------------------------------------------------------------------
339 @unittest.skipIf("cnat" in config.excluded_plugins, "Exclude CNAT plugin tests")
340 class TestCNatTranslation(CnatCommonTestCase):
341 """CNat Translation"""
345 super(TestCNatTranslation, cls).setUpClass()
348 def tearDownClass(cls):
349 super(TestCNatTranslation, cls).tearDownClass()
352 super(TestCNatTranslation, self).setUp()
354 self.create_pg_interfaces(range(3))
355 self.pg0.generate_remote_hosts(N_REMOTE_HOSTS)
356 self.pg1.generate_remote_hosts(N_REMOTE_HOSTS)
358 for i in self.pg_interfaces:
364 i.configure_ipv4_neighbors()
365 i.configure_ipv6_neighbors()
368 for translation in self.translations:
369 translation.remove_vpp_config()
371 self.vapi.cnat_session_purge()
372 self.assertFalse(self.vapi.cnat_session_dump())
374 for i in self.pg_interfaces:
378 super(TestCNatTranslation, self).tearDown()
380 def cnat_fhc_translation(self):
381 """CNat Translation"""
382 self.logger.info(self.vapi.cli("sh cnat client"))
383 self.logger.info(self.vapi.cli("sh cnat translation"))
385 for nbr, translation in enumerate(self.mbtranslations):
386 vip = translation.vip
389 # Flows to the VIP with same ips and different source ports are loadbalanced identically
390 # in both cases of flow hash 0x03 (src ip and dst ip) and 0x08 (dst port)
392 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
393 for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
395 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
396 dport1 = ctx.rxs[0][ctx.L4PROTO].dport
399 [translation.paths[0][DST].port, translation.paths[1][DST].port],
401 ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg1, nbr, dport1)
404 self.pg0, src_pgi, sport + 122, self.pg1, vip.ip, vip.port
406 dport2 = ctx.rxs[0][ctx.L4PROTO].dport
409 [translation.paths[0][DST].port, translation.paths[1][DST].port],
411 ctx.cnat_expect(self.pg0, src_pgi, sport + 122, self.pg1, nbr, dport2)
413 ctx._test.assertEqual(dport1, dport2)
415 def cnat_translation(self):
416 """CNat Translation"""
417 self.logger.info(self.vapi.cli("sh cnat client"))
418 self.logger.info(self.vapi.cli("sh cnat translation"))
420 for nbr, translation in enumerate(self.translations):
421 vip = translation.vip
424 # Test Flows to the VIP
426 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
427 for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
429 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
430 dst_port = translation.paths[0][DST].port
431 ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg1, nbr, dst_port)
433 ctx.cnat_send_return().cnat_expect_return()
436 # packets to the VIP that do not match a
437 # translation are dropped
440 self.pg0, src_pgi, sport, self.pg1, vip.ip, 6666, no_replies=True
444 # packets from the VIP that do not match a
445 # session are forwarded
447 ctx.cnat_send(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
448 ctx.cnat_expect(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
451 # modify the translation to use a different backend
453 old_dst_port = translation.paths[0][DST].port
454 translation.paths[0][DST].udpate(
455 pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6
457 translation.add_vpp_config()
460 # existing flows follow the old path
462 for src_pgi in range(N_REMOTE_HOSTS):
463 for sport in [1234, 1233]:
465 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
467 self.pg0, src_pgi, sport, self.pg1, nbr, old_dst_port
470 ctx.cnat_send_return().cnat_expect_return()
473 # new flows go to the new backend
475 for src_pgi in range(N_REMOTE_HOSTS):
476 ctx.cnat_send(self.pg0, src_pgi, 9999, self.pg2, vip.ip, vip.port)
477 ctx.cnat_expect(self.pg0, src_pgi, 9999, self.pg2, 0, 5000)
479 self.logger.info(self.vapi.cli("sh cnat session verbose"))
482 # turn the scanner back on and wait until the sessions
485 self.vapi.cli("test cnat scanner on")
486 self.virtual_sleep(2)
487 sessions = self.vapi.cnat_session_dump()
488 self.assertEqual(len(sessions), 0)
489 self.vapi.cli("test cnat scanner off")
492 # load some flows again and purge
494 for translation in self.translations:
495 vip = translation.vip
496 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
497 for src_pgi in range(N_REMOTE_HOSTS):
498 for sport in [1234, 1233]:
500 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg2, vip.ip, vip.port)
501 ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg2, 0, 5000)
503 def _test_icmp(self):
507 for nbr, translation in enumerate(self.translations):
508 vip = translation.vip
509 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
514 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, vip.ip, vip.port)
515 dst_port = translation.paths[0][DST].port
516 ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, nbr, dst_port)
517 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
520 # ICMP errors with no VIP associated should not be
523 ctx.cnat_send(self.pg0, 0, 1234, self.pg2, 0, vip.port)
524 dst_port = translation.paths[0][DST].port
525 ctx.cnat_expect(self.pg0, 0, 1234, self.pg2, 0, vip.port)
526 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
528 def _make_multi_backend_translations(self):
529 self.translations = []
530 self.mbtranslations = []
531 self.mbtranslations.append(
535 Endpoint(ip="30.0.0.5", port=5555, is_v6=False),
538 Endpoint(is_v6=False),
539 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
542 Endpoint(is_v6=False),
543 Endpoint(pg=self.pg1, pgi=0, port=4005, is_v6=False),
546 0x03, # hash only on dst ip and src ip
549 self.mbtranslations.append(
553 Endpoint(ip="30.0.0.6", port=5555, is_v6=False),
556 Endpoint(is_v6=False),
557 Endpoint(pg=self.pg1, pgi=1, port=4006, is_v6=False),
560 Endpoint(is_v6=False),
561 Endpoint(pg=self.pg1, pgi=1, port=4007, is_v6=False),
564 0x08, # hash only on dst port
568 def _make_translations_v4(self):
569 self.translations = []
570 self.translations.append(
574 Endpoint(ip="30.0.0.1", port=5555, is_v6=False),
577 Endpoint(is_v6=False),
578 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
584 self.translations.append(
588 Endpoint(ip="30.0.0.2", port=5554, is_v6=False),
591 Endpoint(is_v6=False),
592 Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False),
598 self.translations.append(
602 Endpoint(ip="30.0.0.2", port=5553, is_v6=False),
605 Endpoint(is_v6=False),
606 Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False),
613 def _make_translations_v6(self):
614 self.translations = []
615 self.translations.append(
619 Endpoint(ip="30::1", port=5555, is_v6=True),
622 Endpoint(is_v6=True),
623 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True),
629 self.translations.append(
633 Endpoint(ip="30::2", port=5554, is_v6=True),
636 Endpoint(is_v6=True),
637 Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True),
643 self.translations.append(
647 Endpoint(ip="30::2", port=5553, is_v6=True),
650 Endpoint(is_v6=True),
651 Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True),
658 def test_icmp4(self):
659 # """ CNat Translation icmp v4 """
660 self._make_translations_v4()
663 def test_icmp6(self):
664 # """ CNat Translation icmp v6 """
665 self._make_translations_v6()
668 def test_cnat6(self):
669 # """ CNat Translation ipv6 """
670 self._make_translations_v6()
671 self.cnat_translation()
673 def test_cnat4(self):
674 # """ CNat Translation ipv4 """
675 self._make_translations_v4()
676 self.cnat_translation()
678 def test_cnat_fhc(self):
679 # """ CNat Translation flow hash config """
680 self._make_multi_backend_translations()
681 self.cnat_fhc_translation()
684 @unittest.skipIf("cnat" in config.excluded_plugins, "Exclude CNAT plugin tests")
685 class TestCNatSourceNAT(CnatCommonTestCase):
686 """CNat Source NAT"""
690 super(TestCNatSourceNAT, cls).setUpClass()
693 def tearDownClass(cls):
694 super(TestCNatSourceNAT, cls).tearDownClass()
696 def _enable_disable_snat(self, is_enable=True):
697 self.vapi.cnat_set_snat_addresses(
698 snat_ip4=self.pg2.remote_hosts[0].ip4,
699 snat_ip6=self.pg2.remote_hosts[0].ip6,
700 sw_if_index=INVALID_INDEX,
702 self.vapi.feature_enable_disable(
703 enable=1 if is_enable else 0,
704 arc_name="ip6-unicast",
705 feature_name="cnat-snat-ip6",
706 sw_if_index=self.pg0.sw_if_index,
708 self.vapi.feature_enable_disable(
709 enable=1 if is_enable else 0,
710 arc_name="ip4-unicast",
711 feature_name="cnat-snat-ip4",
712 sw_if_index=self.pg0.sw_if_index,
715 policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
716 self.vapi.cnat_set_snat_policy(
717 policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX
719 for i in self.pg_interfaces:
720 self.vapi.cnat_snat_policy_add_del_if(
721 sw_if_index=i.sw_if_index,
722 is_add=1 if is_enable else 0,
723 table=policie_tbls.CNAT_POLICY_INCLUDE_V6,
725 self.vapi.cnat_snat_policy_add_del_if(
726 sw_if_index=i.sw_if_index,
727 is_add=1 if is_enable else 0,
728 table=policie_tbls.CNAT_POLICY_INCLUDE_V4,
732 super(TestCNatSourceNAT, self).setUp()
734 self.create_pg_interfaces(range(3))
735 self.pg1.generate_remote_hosts(2)
737 for i in self.pg_interfaces:
743 i.configure_ipv6_neighbors()
744 i.configure_ipv4_neighbors()
746 self._enable_disable_snat(is_enable=True)
749 self._enable_disable_snat(is_enable=True)
751 self.vapi.cnat_session_purge()
752 for i in self.pg_interfaces:
756 super(TestCNatSourceNAT, self).tearDown()
758 def test_snat_v6(self):
759 # """ CNat Source Nat v6 """
760 self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True)
761 self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True)
762 self.sourcenat_test_icmp_echo_conf(is_v6=True)
764 def test_snat_v4(self):
765 # """ CNat Source Nat v4 """
766 self.sourcenat_test_tcp_udp_conf(TCP)
767 self.sourcenat_test_tcp_udp_conf(UDP)
768 self.sourcenat_test_icmp_echo_conf()
770 def sourcenat_test_icmp_echo_conf(self, is_v6=False):
771 ctx = CnatTestContext(self, ICMP, is_v6=is_v6)
772 # 8 is ICMP type echo (v4 only)
773 ctx.cnat_send(self.pg0, 0, 0xFEED, self.pg1, 0, 8)
774 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8)
775 ctx.cnat_send_return().cnat_expect_return()
777 def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False):
778 ctx = CnatTestContext(self, L4PROTO, is_v6)
779 # we should source NAT
780 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
781 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
782 ctx.cnat_send_return().cnat_expect_return()
784 # exclude dst address of pg1.1 from snat
786 exclude_prefix = ip_network(
787 "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False
790 exclude_prefix = ip_network(
791 "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False
794 # add remote host to exclude list
795 self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=1)
797 # We should not source NAT the id=1
798 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
799 ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, 1, 6661)
800 ctx.cnat_send_return().cnat_expect_return()
802 # But we should source NAT the id=0
803 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
804 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
805 ctx.cnat_send_return().cnat_expect_return()
807 # remove remote host from exclude list
808 self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=0)
809 self.vapi.cnat_session_purge()
811 # We should source NAT again
812 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
813 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
814 ctx.cnat_send_return().cnat_expect_return()
816 # test return ICMP error nating
817 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
818 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
819 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
821 self.vapi.cnat_session_purge()
824 @unittest.skipIf("cnat" in config.excluded_plugins, "Exclude CNAT plugin tests")
825 class TestCNatDHCP(CnatCommonTestCase):
826 """CNat Translation"""
830 super(TestCNatDHCP, cls).setUpClass()
833 def tearDownClass(cls):
834 super(TestCNatDHCP, cls).tearDownClass()
837 for i in self.pg_interfaces:
839 super(TestCNatDHCP, self).tearDown()
841 def make_addr(self, sw_if_index, addr_id, is_v6):
843 return "fd01:%x::%u" % (sw_if_index, addr_id + 1)
844 return "172.16.%u.%u" % (sw_if_index, addr_id)
846 def make_prefix(self, sw_if_index, addr_id, is_v6):
848 return "%s/128" % self.make_addr(sw_if_index, addr_id, is_v6)
849 return "%s/32" % self.make_addr(sw_if_index, addr_id, is_v6)
851 def check_resolved(self, tr, addr_id, is_v6=False):
852 qt = tr.query_vpp_config()
854 str(qt.vip.addr), self.make_addr(tr.vip.sw_if_index, addr_id, is_v6)
856 self.assertEqual(len(qt.paths), len(tr.paths))
857 for path_tr, path_qt in zip(tr.paths, qt.paths):
858 src_qt = path_qt.src_ep
859 dst_qt = path_qt.dst_ep
860 src_tr, dst_tr = path_tr
862 str(src_qt.addr), self.make_addr(src_tr.sw_if_index, addr_id, is_v6)
865 str(dst_qt.addr), self.make_addr(dst_tr.sw_if_index, addr_id, is_v6)
868 def add_del_address(self, pg, addr_id, is_add=True, is_v6=False):
869 self.vapi.sw_interface_add_del_address(
870 sw_if_index=pg.sw_if_index,
871 prefix=self.make_prefix(pg.sw_if_index, addr_id, is_v6),
872 is_add=1 if is_add else 0,
875 def _test_dhcp_v46(self, is_v6):
876 self.create_pg_interfaces(range(4))
877 for i in self.pg_interfaces:
880 (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg2, is_v6=is_v6)),
881 (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg3, is_v6=is_v6)),
883 ep = Endpoint(pg=self.pg0, is_v6=is_v6)
884 t = Translation(self, TCP, ep, paths, 0x9F).add_vpp_config()
885 # Add an address on every interface
886 # and check it is reflected in the cnat config
887 for pg in self.pg_interfaces:
888 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=is_v6)
889 self.check_resolved(t, addr_id=0, is_v6=is_v6)
890 # Add a new address on every interface, remove the old one
891 # and check it is reflected in the cnat config
892 for pg in self.pg_interfaces:
893 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=is_v6)
894 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=is_v6)
895 self.check_resolved(t, addr_id=1, is_v6=is_v6)
896 # remove the configuration
897 for pg in self.pg_interfaces:
898 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=is_v6)
899 t.remove_vpp_config()
901 def test_dhcp_v4(self):
902 self._test_dhcp_v46(False)
904 def test_dhcp_v6(self):
905 self._test_dhcp_v46(True)
907 def test_dhcp_snat(self):
908 self.create_pg_interfaces(range(1))
909 for i in self.pg_interfaces:
911 self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
912 # Add an address on every interface
913 # and check it is reflected in the cnat config
914 for pg in self.pg_interfaces:
915 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=False)
916 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=True)
917 r = self.vapi.cnat_get_snat_addresses()
920 self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=False),
923 str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=True)
925 # Add a new address on every interface, remove the old one
926 # and check it is reflected in the cnat config
927 for pg in self.pg_interfaces:
928 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=False)
929 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=True)
930 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=False)
931 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=True)
932 r = self.vapi.cnat_get_snat_addresses()
935 self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=False),
938 str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=True)
940 # remove the configuration
941 for pg in self.pg_interfaces:
942 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=False)
943 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=True)
944 self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
947 if __name__ == "__main__":
948 unittest.main(testRunner=VppTestRunner)