5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto, INVALID_INDEX
7 from itertools import product
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
14 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
18 from ipaddress import (
27 from vpp_object import VppObject
28 from vpp_papi import VppEnum
37 class CnatCommonTestCase(VppTestCase):
38 """CNat common test class"""
41 # turn the scanner off whilst testing otherwise sessions
49 "session-cleanup-timeout",
62 super(CnatCommonTestCase, cls).setUpClass()
65 def tearDownClass(cls):
66 super(CnatCommonTestCase, cls).tearDownClass()
69 class Endpoint(object):
72 def __init__(self, pg=None, pgi=None, port=0, is_v6=False, ip=None):
75 self.sw_if_index = INVALID_INDEX
76 if pg is not None and pgi is not None:
77 # pg interface specified and remote index
78 self.ip = self.get_ip46(pg.remote_hosts[pgi])
81 self.sw_if_index = pg.sw_if_index
85 self.ip = "::" if self.is_v6 else "0.0.0.0"
87 def get_ip46(self, obj):
92 def udpate(self, **kwargs):
93 self.__init__(**kwargs)
97 return VppEnum.vl_api_address_family_t.ADDRESS_IP6
98 return VppEnum.vl_api_address_family_t.ADDRESS_IP4
104 "sw_if_index": self.sw_if_index,
105 "if_af": self._vpp_if_af(),
109 return "%s:%d" % (self.ip, self.port)
112 class Translation(VppObject):
113 def __init__(self, test, iproto, vip, paths, fhc):
122 return "%s %s %s" % (self.vip, self.iproto, self.paths)
124 def _vl4_proto(self):
125 ip_proto = VppEnum.vl_api_ip_proto_t
127 UDP: ip_proto.IP_API_PROTO_UDP,
128 TCP: ip_proto.IP_API_PROTO_TCP,
131 def _encoded_paths(self):
133 {"src_ep": src.encode(), "dst_ep": dst.encode()}
134 for (src, dst) in self.paths
137 def add_vpp_config(self):
138 r = self._test.vapi.cnat_translation_update(
140 "vip": self.vip.encode(),
141 "ip_proto": self._vl4_proto(),
142 "n_paths": len(self.paths),
143 "paths": self._encoded_paths(),
144 "flow_hash_config": self.fhc,
147 self._test.registry.register(self, self._test.logger)
151 def remove_vpp_config(self):
152 assert self.id is not None
153 self._test.vapi.cnat_translation_del(id=self.id)
156 def query_vpp_config(self):
157 for t in self._test.vapi.cnat_translation_dump():
158 if self.id == t.translation.id:
163 class CnatTestContext(object):
167 ctx = CnatTestContext(self, TCP, is_v6=True)
169 # send pg0.remote[0]:1234 -> pg1.remote[0]:6661
170 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
172 # We expect this to be NATed as
173 # pg2.remote[0]:<anyport> -> pg1.remote[0]:6661
174 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
176 # After running cnat_expect, we can send back the received packet
177 # and expect it be 'unnated' so that we get the original packet
178 ctx.cnat_send_return().cnat_expect_return()
180 # same thing for ICMP errors
181 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
184 def __init__(self, test, L4PROTO, is_v6):
185 self.L4PROTO = L4PROTO
189 def get_ip46(self, obj):
196 return IPv6 if self.is_v6 else IP
199 self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port, no_replies=False
201 if isinstance(src_id, int):
202 self.src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
204 self.dst_addr = src_id
205 if isinstance(dst_id, int):
206 self.dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
208 self.dst_addr = dst_id
209 self.src_port = src_port # also ICMP id
210 self.dst_port = dst_port # also ICMP type
212 if self.L4PROTO in [TCP, UDP]:
213 l4 = self.L4PROTO(sport=self.src_port, dport=self.dst_port)
214 elif self.L4PROTO in [ICMP] and not self.is_v6:
215 l4 = self.L4PROTO(id=self.src_port, type=self.dst_port)
216 elif self.L4PROTO in [ICMP] and self.is_v6:
217 l4 = ICMPv6EchoRequest(id=self.src_port)
219 Ether(src=src_pg.remote_mac, dst=src_pg.local_mac)
220 / self.IP46(src=self.src_addr, dst=self.dst_addr)
226 self._test.send_and_assert_no_replies(src_pg, p1 * N_PKTS, dst_pg)
228 self.rxs = self._test.send_and_expect(src_pg, p1 * N_PKTS, dst_pg)
229 self.expected_src_pg = src_pg
230 self.expected_dst_pg = dst_pg
233 def cnat_expect(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port):
234 if isinstance(src_id, int):
235 self.expect_src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
237 self.expect_src_addr = src_id
238 if isinstance(dst_id, int):
239 self.expect_dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
241 self.expect_dst_addr = dst_id
242 self.expect_src_port = src_port
243 self.expect_dst_port = dst_port
245 if self.expect_src_port is None:
246 if self.L4PROTO in [TCP, UDP]:
247 self.expect_src_port = self.rxs[0][self.L4PROTO].sport
248 elif self.L4PROTO in [ICMP] and not self.is_v6:
249 self.expect_src_port = self.rxs[0][self.L4PROTO].id
250 elif self.L4PROTO in [ICMP] and self.is_v6:
251 self.expect_src_port = self.rxs[0][ICMPv6EchoRequest].id
254 self._test.assert_packet_checksums_valid(rx)
255 self._test.assertEqual(rx[self.IP46].dst, self.expect_dst_addr)
256 self._test.assertEqual(rx[self.IP46].src, self.expect_src_addr)
257 if self.L4PROTO in [TCP, UDP]:
258 self._test.assertEqual(rx[self.L4PROTO].dport, self.expect_dst_port)
259 self._test.assertEqual(rx[self.L4PROTO].sport, self.expect_src_port)
260 elif self.L4PROTO in [ICMP] and not self.is_v6:
261 self._test.assertEqual(rx[self.L4PROTO].type, self.expect_dst_port)
262 self._test.assertEqual(rx[self.L4PROTO].id, self.expect_src_port)
263 elif self.L4PROTO in [ICMP] and self.is_v6:
264 self._test.assertEqual(rx[ICMPv6EchoRequest].id, self.expect_src_port)
267 def cnat_send_return(self):
268 """This sends the return traffic"""
269 if self.L4PROTO in [TCP, UDP]:
270 l4 = self.L4PROTO(sport=self.expect_dst_port, dport=self.expect_src_port)
271 elif self.L4PROTO in [ICMP] and not self.is_v6:
272 # icmp type 0 if echo reply
273 l4 = self.L4PROTO(id=self.expect_src_port, type=0)
274 elif self.L4PROTO in [ICMP] and self.is_v6:
275 l4 = ICMPv6EchoReply(id=self.expect_src_port)
276 src_mac = self.expected_dst_pg.remote_mac
278 Ether(src=src_mac, dst=self.expected_dst_pg.local_mac)
279 / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
284 self.return_rxs = self._test.send_and_expect(
285 self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
289 def cnat_expect_return(self):
290 for rx in self.return_rxs:
291 self._test.assert_packet_checksums_valid(rx)
292 self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
293 self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
294 if self.L4PROTO in [TCP, UDP]:
295 self._test.assertEqual(rx[self.L4PROTO].dport, self.src_port)
296 self._test.assertEqual(rx[self.L4PROTO].sport, self.dst_port)
297 elif self.L4PROTO in [ICMP] and not self.is_v6:
298 # icmp type 0 if echo reply
299 self._test.assertEqual(rx[self.L4PROTO].type, 0)
300 self._test.assertEqual(rx[self.L4PROTO].id, self.src_port)
301 elif self.L4PROTO in [ICMP] and self.is_v6:
302 self._test.assertEqual(rx[ICMPv6EchoReply].id, self.src_port)
305 def cnat_send_icmp_return_error(self):
307 This called after cnat_expect will send an icmp error
310 ICMPelem = ICMPv6DestUnreach(code=1) if self.is_v6 else ICMP(type=11)
311 InnerIP = self.rxs[0][self.IP46]
314 src=self.expected_dst_pg.remote_mac, dst=self.expected_dst_pg.local_mac
316 / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
320 self.return_rxs = self._test.send_and_expect(
321 self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
325 def cnat_expect_icmp_error_return(self):
326 ICMP46 = ICMPv6DestUnreach if self.is_v6 else ICMP
327 IP46err = IPerror6 if self.is_v6 else IPerror
328 L4err = TCPerror if self.L4PROTO is TCP else UDPerror
329 for rx in self.return_rxs:
330 self._test.assert_packet_checksums_valid(rx)
331 self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
332 self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
333 self._test.assertEqual(rx[ICMP46][IP46err].src, self.src_addr)
334 self._test.assertEqual(rx[ICMP46][IP46err].dst, self.dst_addr)
335 self._test.assertEqual(rx[ICMP46][IP46err][L4err].sport, self.src_port)
336 self._test.assertEqual(rx[ICMP46][IP46err][L4err].dport, self.dst_port)
340 # -------------------------------------------------------------------
341 # -------------------------------------------------------------------
342 # -------------------------------------------------------------------
343 # -------------------------------------------------------------------
346 class TestCNatTranslation(CnatCommonTestCase):
347 """CNat Translation"""
351 super(TestCNatTranslation, cls).setUpClass()
354 def tearDownClass(cls):
355 super(TestCNatTranslation, cls).tearDownClass()
358 super(TestCNatTranslation, self).setUp()
360 self.create_pg_interfaces(range(3))
361 self.pg0.generate_remote_hosts(N_REMOTE_HOSTS)
362 self.pg1.generate_remote_hosts(N_REMOTE_HOSTS)
364 for i in self.pg_interfaces:
370 i.configure_ipv4_neighbors()
371 i.configure_ipv6_neighbors()
374 for translation in self.translations:
375 translation.remove_vpp_config()
377 self.vapi.cnat_session_purge()
378 self.assertFalse(self.vapi.cnat_session_dump())
380 for i in self.pg_interfaces:
384 super(TestCNatTranslation, self).tearDown()
386 def cnat_fhc_translation(self):
387 """CNat Translation"""
388 self.logger.info(self.vapi.cli("sh cnat client"))
389 self.logger.info(self.vapi.cli("sh cnat translation"))
391 for nbr, translation in enumerate(self.mbtranslations):
392 vip = translation.vip
395 # Flows to the VIP with same ips and different source ports are loadbalanced identically
396 # in both cases of flow hash 0x03 (src ip and dst ip) and 0x08 (dst port)
398 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
399 for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
401 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
402 dport1 = ctx.rxs[0][ctx.L4PROTO].dport
405 [translation.paths[0][DST].port, translation.paths[1][DST].port],
407 ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg1, nbr, dport1)
410 self.pg0, src_pgi, sport + 122, self.pg1, vip.ip, vip.port
412 dport2 = ctx.rxs[0][ctx.L4PROTO].dport
415 [translation.paths[0][DST].port, translation.paths[1][DST].port],
417 ctx.cnat_expect(self.pg0, src_pgi, sport + 122, self.pg1, nbr, dport2)
419 ctx._test.assertEqual(dport1, dport2)
421 def cnat_translation(self):
422 """CNat Translation"""
423 self.logger.info(self.vapi.cli("sh cnat client"))
424 self.logger.info(self.vapi.cli("sh cnat translation"))
426 for nbr, translation in enumerate(self.translations):
427 vip = translation.vip
430 # Test Flows to the VIP
432 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
433 for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
435 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
436 dst_port = translation.paths[0][DST].port
437 ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg1, nbr, dst_port)
439 ctx.cnat_send_return().cnat_expect_return()
442 # packets to the VIP that do not match a
443 # translation are dropped
446 self.pg0, src_pgi, sport, self.pg1, vip.ip, 6666, no_replies=True
450 # packets from the VIP that do not match a
451 # session are forwarded
453 ctx.cnat_send(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
454 ctx.cnat_expect(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
457 # modify the translation to use a different backend
459 old_dst_port = translation.paths[0][DST].port
460 translation.paths[0][DST].udpate(
461 pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6
463 translation.add_vpp_config()
466 # existing flows follow the old path
468 for src_pgi in range(N_REMOTE_HOSTS):
469 for sport in [1234, 1233]:
471 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
473 self.pg0, src_pgi, sport, self.pg1, nbr, old_dst_port
476 ctx.cnat_send_return().cnat_expect_return()
479 # new flows go to the new backend
481 for src_pgi in range(N_REMOTE_HOSTS):
482 ctx.cnat_send(self.pg0, src_pgi, 9999, self.pg2, vip.ip, vip.port)
483 ctx.cnat_expect(self.pg0, src_pgi, 9999, self.pg2, 0, 5000)
485 self.logger.info(self.vapi.cli("sh cnat session verbose"))
488 # turn the scanner back on and wait until the sessions
491 self.vapi.cli("test cnat scanner on")
492 self.virtual_sleep(2)
493 sessions = self.vapi.cnat_session_dump()
494 self.assertEqual(len(sessions), 0)
495 self.vapi.cli("test cnat scanner off")
498 # load some flows again and purge
500 for translation in self.translations:
501 vip = translation.vip
502 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
503 for src_pgi in range(N_REMOTE_HOSTS):
504 for sport in [1234, 1233]:
506 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg2, vip.ip, vip.port)
507 ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg2, 0, 5000)
509 def _test_icmp(self):
513 for nbr, translation in enumerate(self.translations):
514 vip = translation.vip
515 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
520 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, vip.ip, vip.port)
521 dst_port = translation.paths[0][DST].port
522 ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, nbr, dst_port)
523 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
526 # ICMP errors with no VIP associated should not be
529 ctx.cnat_send(self.pg0, 0, 1234, self.pg2, 0, vip.port)
530 dst_port = translation.paths[0][DST].port
531 ctx.cnat_expect(self.pg0, 0, 1234, self.pg2, 0, vip.port)
532 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
534 def _make_multi_backend_translations(self):
535 self.translations = []
536 self.mbtranslations = []
537 self.mbtranslations.append(
541 Endpoint(ip="30.0.0.5", port=5555, is_v6=False),
544 Endpoint(is_v6=False),
545 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
548 Endpoint(is_v6=False),
549 Endpoint(pg=self.pg1, pgi=0, port=4005, is_v6=False),
552 0x03, # hash only on dst ip and src ip
555 self.mbtranslations.append(
559 Endpoint(ip="30.0.0.6", port=5555, is_v6=False),
562 Endpoint(is_v6=False),
563 Endpoint(pg=self.pg1, pgi=1, port=4006, is_v6=False),
566 Endpoint(is_v6=False),
567 Endpoint(pg=self.pg1, pgi=1, port=4007, is_v6=False),
570 0x08, # hash only on dst port
574 def _make_translations_v4(self):
575 self.translations = []
576 self.translations.append(
580 Endpoint(ip="30.0.0.1", port=5555, is_v6=False),
583 Endpoint(is_v6=False),
584 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
590 self.translations.append(
594 Endpoint(ip="30.0.0.2", port=5554, is_v6=False),
597 Endpoint(is_v6=False),
598 Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False),
604 self.translations.append(
608 Endpoint(ip="30.0.0.2", port=5553, is_v6=False),
611 Endpoint(is_v6=False),
612 Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False),
619 def _make_translations_v6(self):
620 self.translations = []
621 self.translations.append(
625 Endpoint(ip="30::1", port=5555, is_v6=True),
628 Endpoint(is_v6=True),
629 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True),
635 self.translations.append(
639 Endpoint(ip="30::2", port=5554, is_v6=True),
642 Endpoint(is_v6=True),
643 Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True),
649 self.translations.append(
653 Endpoint(ip="30::2", port=5553, is_v6=True),
656 Endpoint(is_v6=True),
657 Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True),
664 def test_icmp4(self):
665 # """ CNat Translation icmp v4 """
666 self._make_translations_v4()
669 def test_icmp6(self):
670 # """ CNat Translation icmp v6 """
671 self._make_translations_v6()
674 def test_cnat6(self):
675 # """ CNat Translation ipv6 """
676 self._make_translations_v6()
677 self.cnat_translation()
679 def test_cnat4(self):
680 # """ CNat Translation ipv4 """
681 self._make_translations_v4()
682 self.cnat_translation()
684 def test_cnat_fhc(self):
685 # """ CNat Translation flow hash config """
686 self._make_multi_backend_translations()
687 self.cnat_fhc_translation()
690 class TestCNatSourceNAT(CnatCommonTestCase):
691 """CNat Source NAT"""
695 super(TestCNatSourceNAT, cls).setUpClass()
698 def tearDownClass(cls):
699 super(TestCNatSourceNAT, cls).tearDownClass()
701 def _enable_disable_snat(self, is_enable=True):
702 self.vapi.cnat_set_snat_addresses(
703 snat_ip4=self.pg2.remote_hosts[0].ip4,
704 snat_ip6=self.pg2.remote_hosts[0].ip6,
705 sw_if_index=INVALID_INDEX,
707 self.vapi.feature_enable_disable(
708 enable=1 if is_enable else 0,
709 arc_name="ip6-unicast",
710 feature_name="cnat-snat-ip6",
711 sw_if_index=self.pg0.sw_if_index,
713 self.vapi.feature_enable_disable(
714 enable=1 if is_enable else 0,
715 arc_name="ip4-unicast",
716 feature_name="cnat-snat-ip4",
717 sw_if_index=self.pg0.sw_if_index,
720 policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
721 self.vapi.cnat_set_snat_policy(
722 policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX
724 for i in self.pg_interfaces:
725 self.vapi.cnat_snat_policy_add_del_if(
726 sw_if_index=i.sw_if_index,
727 is_add=1 if is_enable else 0,
728 table=policie_tbls.CNAT_POLICY_INCLUDE_V6,
730 self.vapi.cnat_snat_policy_add_del_if(
731 sw_if_index=i.sw_if_index,
732 is_add=1 if is_enable else 0,
733 table=policie_tbls.CNAT_POLICY_INCLUDE_V4,
737 super(TestCNatSourceNAT, self).setUp()
739 self.create_pg_interfaces(range(3))
740 self.pg1.generate_remote_hosts(2)
742 for i in self.pg_interfaces:
748 i.configure_ipv6_neighbors()
749 i.configure_ipv4_neighbors()
751 self._enable_disable_snat(is_enable=True)
754 self._enable_disable_snat(is_enable=True)
756 self.vapi.cnat_session_purge()
757 for i in self.pg_interfaces:
761 super(TestCNatSourceNAT, self).tearDown()
763 def test_snat_v6(self):
764 # """ CNat Source Nat v6 """
765 self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True)
766 self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True)
767 self.sourcenat_test_icmp_echo_conf(is_v6=True)
769 def test_snat_v4(self):
770 # """ CNat Source Nat v4 """
771 self.sourcenat_test_tcp_udp_conf(TCP)
772 self.sourcenat_test_tcp_udp_conf(UDP)
773 self.sourcenat_test_icmp_echo_conf()
775 def sourcenat_test_icmp_echo_conf(self, is_v6=False):
776 ctx = CnatTestContext(self, ICMP, is_v6=is_v6)
777 # 8 is ICMP type echo (v4 only)
778 ctx.cnat_send(self.pg0, 0, 0xFEED, self.pg1, 0, 8)
779 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8)
780 ctx.cnat_send_return().cnat_expect_return()
782 def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False):
783 ctx = CnatTestContext(self, L4PROTO, is_v6)
784 # we should source NAT
785 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
786 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
787 ctx.cnat_send_return().cnat_expect_return()
789 # exclude dst address of pg1.1 from snat
791 exclude_prefix = ip_network(
792 "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False
795 exclude_prefix = ip_network(
796 "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False
799 # add remote host to exclude list
800 self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=1)
802 # We should not source NAT the id=1
803 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
804 ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, 1, 6661)
805 ctx.cnat_send_return().cnat_expect_return()
807 # But we should source NAT the id=0
808 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
809 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
810 ctx.cnat_send_return().cnat_expect_return()
812 # remove remote host from exclude list
813 self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=0)
814 self.vapi.cnat_session_purge()
816 # We should source NAT again
817 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
818 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
819 ctx.cnat_send_return().cnat_expect_return()
821 # test return ICMP error nating
822 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
823 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
824 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
826 self.vapi.cnat_session_purge()
829 class TestCNatDHCP(CnatCommonTestCase):
830 """CNat Translation"""
834 super(TestCNatDHCP, cls).setUpClass()
837 def tearDownClass(cls):
838 super(TestCNatDHCP, cls).tearDownClass()
841 for i in self.pg_interfaces:
843 super(TestCNatDHCP, self).tearDown()
845 def make_addr(self, sw_if_index, addr_id, is_v6):
847 return "fd01:%x::%u" % (sw_if_index, addr_id + 1)
848 return "172.16.%u.%u" % (sw_if_index, addr_id)
850 def make_prefix(self, sw_if_index, addr_id, is_v6):
852 return "%s/128" % self.make_addr(sw_if_index, addr_id, is_v6)
853 return "%s/32" % self.make_addr(sw_if_index, addr_id, is_v6)
855 def check_resolved(self, tr, addr_id, is_v6=False):
856 qt = tr.query_vpp_config()
858 str(qt.vip.addr), self.make_addr(tr.vip.sw_if_index, addr_id, is_v6)
860 self.assertEqual(len(qt.paths), len(tr.paths))
861 for path_tr, path_qt in zip(tr.paths, qt.paths):
862 src_qt = path_qt.src_ep
863 dst_qt = path_qt.dst_ep
864 src_tr, dst_tr = path_tr
866 str(src_qt.addr), self.make_addr(src_tr.sw_if_index, addr_id, is_v6)
869 str(dst_qt.addr), self.make_addr(dst_tr.sw_if_index, addr_id, is_v6)
872 def add_del_address(self, pg, addr_id, is_add=True, is_v6=False):
873 self.vapi.sw_interface_add_del_address(
874 sw_if_index=pg.sw_if_index,
875 prefix=self.make_prefix(pg.sw_if_index, addr_id, is_v6),
876 is_add=1 if is_add else 0,
879 def _test_dhcp_v46(self, is_v6):
880 self.create_pg_interfaces(range(4))
881 for i in self.pg_interfaces:
884 (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg2, is_v6=is_v6)),
885 (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg3, is_v6=is_v6)),
887 ep = Endpoint(pg=self.pg0, is_v6=is_v6)
888 t = Translation(self, TCP, ep, paths, 0x9F).add_vpp_config()
889 # Add an address on every interface
890 # and check it is reflected in the cnat config
891 for pg in self.pg_interfaces:
892 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=is_v6)
893 self.check_resolved(t, addr_id=0, is_v6=is_v6)
894 # Add a new address on every interface, remove the old one
895 # and check it is reflected in the cnat config
896 for pg in self.pg_interfaces:
897 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=is_v6)
898 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=is_v6)
899 self.check_resolved(t, addr_id=1, is_v6=is_v6)
900 # remove the configuration
901 for pg in self.pg_interfaces:
902 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=is_v6)
903 t.remove_vpp_config()
905 def test_dhcp_v4(self):
906 self._test_dhcp_v46(False)
908 def test_dhcp_v6(self):
909 self._test_dhcp_v46(True)
911 def test_dhcp_snat(self):
912 self.create_pg_interfaces(range(1))
913 for i in self.pg_interfaces:
915 self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
916 # Add an address on every interface
917 # and check it is reflected in the cnat config
918 for pg in self.pg_interfaces:
919 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=False)
920 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=True)
921 r = self.vapi.cnat_get_snat_addresses()
924 self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=False),
927 str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=True)
929 # Add a new address on every interface, remove the old one
930 # and check it is reflected in the cnat config
931 for pg in self.pg_interfaces:
932 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=False)
933 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=True)
934 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=False)
935 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=True)
936 r = self.vapi.cnat_get_snat_addresses()
939 self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=False),
942 str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=True)
944 # remove the configuration
945 for pg in self.pg_interfaces:
946 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=False)
947 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=True)
948 self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
951 if __name__ == "__main__":
952 unittest.main(testRunner=VppTestRunner)