5 from framework import VppTestCase
6 from asfframework import VppTestRunner
7 from vpp_ip import INVALID_INDEX
8 from itertools import product
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP, UDP, TCP, ICMP
13 from scapy.layers.inet import IPerror, TCPerror, UDPerror
14 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
15 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
17 from ipaddress import ip_network
19 from vpp_object import VppObject
20 from vpp_papi import VppEnum
29 class CnatCommonTestCase(VppTestCase):
30 """CNat common test class"""
33 # turn the scanner off whilst testing otherwise sessions
41 "session-cleanup-timeout",
54 super(CnatCommonTestCase, cls).setUpClass()
57 def tearDownClass(cls):
58 super(CnatCommonTestCase, cls).tearDownClass()
61 class Endpoint(object):
64 def __init__(self, pg=None, pgi=None, port=0, is_v6=False, ip=None):
67 self.sw_if_index = INVALID_INDEX
68 if pg is not None and pgi is not None:
69 # pg interface specified and remote index
70 self.ip = self.get_ip46(pg.remote_hosts[pgi])
73 self.sw_if_index = pg.sw_if_index
77 self.ip = "::" if self.is_v6 else "0.0.0.0"
79 def get_ip46(self, obj):
84 def udpate(self, **kwargs):
85 self.__init__(**kwargs)
89 return VppEnum.vl_api_address_family_t.ADDRESS_IP6
90 return VppEnum.vl_api_address_family_t.ADDRESS_IP4
96 "sw_if_index": self.sw_if_index,
97 "if_af": self._vpp_if_af(),
101 return "%s:%d" % (self.ip, self.port)
104 class Translation(VppObject):
105 def __init__(self, test, iproto, vip, paths, fhc):
114 return "%s %s %s" % (self.vip, self.iproto, self.paths)
116 def _vl4_proto(self):
117 ip_proto = VppEnum.vl_api_ip_proto_t
119 UDP: ip_proto.IP_API_PROTO_UDP,
120 TCP: ip_proto.IP_API_PROTO_TCP,
123 def _encoded_paths(self):
125 {"src_ep": src.encode(), "dst_ep": dst.encode()}
126 for (src, dst) in self.paths
129 def add_vpp_config(self):
130 r = self._test.vapi.cnat_translation_update(
132 "vip": self.vip.encode(),
133 "ip_proto": self._vl4_proto(),
134 "n_paths": len(self.paths),
135 "paths": self._encoded_paths(),
136 "flow_hash_config": self.fhc,
139 self._test.registry.register(self, self._test.logger)
143 def remove_vpp_config(self):
144 assert self.id is not None
145 self._test.vapi.cnat_translation_del(id=self.id)
148 def query_vpp_config(self):
149 for t in self._test.vapi.cnat_translation_dump():
150 if self.id == t.translation.id:
155 class CnatTestContext(object):
159 ctx = CnatTestContext(self, TCP, is_v6=True)
161 # send pg0.remote[0]:1234 -> pg1.remote[0]:6661
162 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
164 # We expect this to be NATed as
165 # pg2.remote[0]:<anyport> -> pg1.remote[0]:6661
166 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
168 # After running cnat_expect, we can send back the received packet
169 # and expect it be 'unnated' so that we get the original packet
170 ctx.cnat_send_return().cnat_expect_return()
172 # same thing for ICMP errors
173 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
176 def __init__(self, test, L4PROTO, is_v6):
177 self.L4PROTO = L4PROTO
181 def get_ip46(self, obj):
188 return IPv6 if self.is_v6 else IP
191 self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port, no_replies=False
193 if isinstance(src_id, int):
194 self.src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
196 self.dst_addr = src_id
197 if isinstance(dst_id, int):
198 self.dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
200 self.dst_addr = dst_id
201 self.src_port = src_port # also ICMP id
202 self.dst_port = dst_port # also ICMP type
204 if self.L4PROTO in [TCP, UDP]:
205 l4 = self.L4PROTO(sport=self.src_port, dport=self.dst_port)
206 elif self.L4PROTO in [ICMP] and not self.is_v6:
207 l4 = self.L4PROTO(id=self.src_port, type=self.dst_port)
208 elif self.L4PROTO in [ICMP] and self.is_v6:
209 l4 = ICMPv6EchoRequest(id=self.src_port)
211 Ether(src=src_pg.remote_mac, dst=src_pg.local_mac)
212 / self.IP46(src=self.src_addr, dst=self.dst_addr)
218 self._test.send_and_assert_no_replies(src_pg, p1 * N_PKTS, dst_pg)
220 self.rxs = self._test.send_and_expect(src_pg, p1 * N_PKTS, dst_pg)
221 self.expected_src_pg = src_pg
222 self.expected_dst_pg = dst_pg
225 def cnat_expect(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port):
226 if isinstance(src_id, int):
227 self.expect_src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
229 self.expect_src_addr = src_id
230 if isinstance(dst_id, int):
231 self.expect_dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
233 self.expect_dst_addr = dst_id
234 self.expect_src_port = src_port
235 self.expect_dst_port = dst_port
237 if self.expect_src_port is None:
238 if self.L4PROTO in [TCP, UDP]:
239 self.expect_src_port = self.rxs[0][self.L4PROTO].sport
240 elif self.L4PROTO in [ICMP] and not self.is_v6:
241 self.expect_src_port = self.rxs[0][self.L4PROTO].id
242 elif self.L4PROTO in [ICMP] and self.is_v6:
243 self.expect_src_port = self.rxs[0][ICMPv6EchoRequest].id
246 self._test.assert_packet_checksums_valid(rx)
247 self._test.assertEqual(rx[self.IP46].dst, self.expect_dst_addr)
248 self._test.assertEqual(rx[self.IP46].src, self.expect_src_addr)
249 if self.L4PROTO in [TCP, UDP]:
250 self._test.assertEqual(rx[self.L4PROTO].dport, self.expect_dst_port)
251 self._test.assertEqual(rx[self.L4PROTO].sport, self.expect_src_port)
252 elif self.L4PROTO in [ICMP] and not self.is_v6:
253 self._test.assertEqual(rx[self.L4PROTO].type, self.expect_dst_port)
254 self._test.assertEqual(rx[self.L4PROTO].id, self.expect_src_port)
255 elif self.L4PROTO in [ICMP] and self.is_v6:
256 self._test.assertEqual(rx[ICMPv6EchoRequest].id, self.expect_src_port)
259 def cnat_send_return(self):
260 """This sends the return traffic"""
261 if self.L4PROTO in [TCP, UDP]:
262 l4 = self.L4PROTO(sport=self.expect_dst_port, dport=self.expect_src_port)
263 elif self.L4PROTO in [ICMP] and not self.is_v6:
264 # icmp type 0 if echo reply
265 l4 = self.L4PROTO(id=self.expect_src_port, type=0)
266 elif self.L4PROTO in [ICMP] and self.is_v6:
267 l4 = ICMPv6EchoReply(id=self.expect_src_port)
268 src_mac = self.expected_dst_pg.remote_mac
270 Ether(src=src_mac, dst=self.expected_dst_pg.local_mac)
271 / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
276 self.return_rxs = self._test.send_and_expect(
277 self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
281 def cnat_expect_return(self):
282 for rx in self.return_rxs:
283 self._test.assert_packet_checksums_valid(rx)
284 self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
285 self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
286 if self.L4PROTO in [TCP, UDP]:
287 self._test.assertEqual(rx[self.L4PROTO].dport, self.src_port)
288 self._test.assertEqual(rx[self.L4PROTO].sport, self.dst_port)
289 elif self.L4PROTO in [ICMP] and not self.is_v6:
290 # icmp type 0 if echo reply
291 self._test.assertEqual(rx[self.L4PROTO].type, 0)
292 self._test.assertEqual(rx[self.L4PROTO].id, self.src_port)
293 elif self.L4PROTO in [ICMP] and self.is_v6:
294 self._test.assertEqual(rx[ICMPv6EchoReply].id, self.src_port)
297 def cnat_send_icmp_return_error(self):
299 This called after cnat_expect will send an icmp error
302 ICMPelem = ICMPv6DestUnreach(code=1) if self.is_v6 else ICMP(type=11)
303 InnerIP = self.rxs[0][self.IP46]
306 src=self.expected_dst_pg.remote_mac, dst=self.expected_dst_pg.local_mac
308 / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
312 self.return_rxs = self._test.send_and_expect(
313 self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
317 def cnat_expect_icmp_error_return(self):
318 ICMP46 = ICMPv6DestUnreach if self.is_v6 else ICMP
319 IP46err = IPerror6 if self.is_v6 else IPerror
320 L4err = TCPerror if self.L4PROTO is TCP else UDPerror
321 for rx in self.return_rxs:
322 self._test.assert_packet_checksums_valid(rx)
323 self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
324 self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
325 self._test.assertEqual(rx[ICMP46][IP46err].src, self.src_addr)
326 self._test.assertEqual(rx[ICMP46][IP46err].dst, self.dst_addr)
327 self._test.assertEqual(rx[ICMP46][IP46err][L4err].sport, self.src_port)
328 self._test.assertEqual(rx[ICMP46][IP46err][L4err].dport, self.dst_port)
332 # -------------------------------------------------------------------
333 # -------------------------------------------------------------------
334 # -------------------------------------------------------------------
335 # -------------------------------------------------------------------
338 class TestCNatTranslation(CnatCommonTestCase):
339 """CNat Translation"""
343 super(TestCNatTranslation, cls).setUpClass()
346 def tearDownClass(cls):
347 super(TestCNatTranslation, cls).tearDownClass()
350 super(TestCNatTranslation, self).setUp()
352 self.create_pg_interfaces(range(3))
353 self.pg0.generate_remote_hosts(N_REMOTE_HOSTS)
354 self.pg1.generate_remote_hosts(N_REMOTE_HOSTS)
356 for i in self.pg_interfaces:
362 i.configure_ipv4_neighbors()
363 i.configure_ipv6_neighbors()
366 for translation in self.translations:
367 translation.remove_vpp_config()
369 self.vapi.cnat_session_purge()
370 self.assertFalse(self.vapi.cnat_session_dump())
372 for i in self.pg_interfaces:
376 super(TestCNatTranslation, self).tearDown()
378 def cnat_fhc_translation(self):
379 """CNat Translation"""
380 self.logger.info(self.vapi.cli("sh cnat client"))
381 self.logger.info(self.vapi.cli("sh cnat translation"))
383 for nbr, translation in enumerate(self.mbtranslations):
384 vip = translation.vip
387 # Flows to the VIP with same ips and different source ports are loadbalanced identically
388 # in both cases of flow hash 0x03 (src ip and dst ip) and 0x08 (dst port)
390 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
391 for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
393 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
394 dport1 = ctx.rxs[0][ctx.L4PROTO].dport
397 [translation.paths[0][DST].port, translation.paths[1][DST].port],
399 ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg1, nbr, dport1)
402 self.pg0, src_pgi, sport + 122, self.pg1, vip.ip, vip.port
404 dport2 = ctx.rxs[0][ctx.L4PROTO].dport
407 [translation.paths[0][DST].port, translation.paths[1][DST].port],
409 ctx.cnat_expect(self.pg0, src_pgi, sport + 122, self.pg1, nbr, dport2)
411 ctx._test.assertEqual(dport1, dport2)
413 def cnat_translation(self):
414 """CNat Translation"""
415 self.logger.info(self.vapi.cli("sh cnat client"))
416 self.logger.info(self.vapi.cli("sh cnat translation"))
418 for nbr, translation in enumerate(self.translations):
419 vip = translation.vip
422 # Test Flows to the VIP
424 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
425 for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
427 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
428 dst_port = translation.paths[0][DST].port
429 ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg1, nbr, dst_port)
431 ctx.cnat_send_return().cnat_expect_return()
434 # packets to the VIP that do not match a
435 # translation are dropped
438 self.pg0, src_pgi, sport, self.pg1, vip.ip, 6666, no_replies=True
442 # packets from the VIP that do not match a
443 # session are forwarded
445 ctx.cnat_send(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
446 ctx.cnat_expect(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
449 # modify the translation to use a different backend
451 old_dst_port = translation.paths[0][DST].port
452 translation.paths[0][DST].udpate(
453 pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6
455 translation.add_vpp_config()
458 # existing flows follow the old path
460 for src_pgi in range(N_REMOTE_HOSTS):
461 for sport in [1234, 1233]:
463 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
465 self.pg0, src_pgi, sport, self.pg1, nbr, old_dst_port
468 ctx.cnat_send_return().cnat_expect_return()
471 # new flows go to the new backend
473 for src_pgi in range(N_REMOTE_HOSTS):
474 ctx.cnat_send(self.pg0, src_pgi, 9999, self.pg2, vip.ip, vip.port)
475 ctx.cnat_expect(self.pg0, src_pgi, 9999, self.pg2, 0, 5000)
477 self.logger.info(self.vapi.cli("sh cnat session verbose"))
480 # turn the scanner back on and wait until the sessions
483 self.vapi.cli("test cnat scanner on")
484 self.virtual_sleep(2)
485 sessions = self.vapi.cnat_session_dump()
486 self.assertEqual(len(sessions), 0)
487 self.vapi.cli("test cnat scanner off")
490 # load some flows again and purge
492 for translation in self.translations:
493 vip = translation.vip
494 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
495 for src_pgi in range(N_REMOTE_HOSTS):
496 for sport in [1234, 1233]:
498 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg2, vip.ip, vip.port)
499 ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg2, 0, 5000)
501 def _test_icmp(self):
505 for nbr, translation in enumerate(self.translations):
506 vip = translation.vip
507 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
512 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, vip.ip, vip.port)
513 dst_port = translation.paths[0][DST].port
514 ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, nbr, dst_port)
515 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
518 # ICMP errors with no VIP associated should not be
521 ctx.cnat_send(self.pg0, 0, 1234, self.pg2, 0, vip.port)
522 dst_port = translation.paths[0][DST].port
523 ctx.cnat_expect(self.pg0, 0, 1234, self.pg2, 0, vip.port)
524 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
526 def _make_multi_backend_translations(self):
527 self.translations = []
528 self.mbtranslations = []
529 self.mbtranslations.append(
533 Endpoint(ip="30.0.0.5", port=5555, is_v6=False),
536 Endpoint(is_v6=False),
537 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
540 Endpoint(is_v6=False),
541 Endpoint(pg=self.pg1, pgi=0, port=4005, is_v6=False),
544 0x03, # hash only on dst ip and src ip
547 self.mbtranslations.append(
551 Endpoint(ip="30.0.0.6", port=5555, is_v6=False),
554 Endpoint(is_v6=False),
555 Endpoint(pg=self.pg1, pgi=1, port=4006, is_v6=False),
558 Endpoint(is_v6=False),
559 Endpoint(pg=self.pg1, pgi=1, port=4007, is_v6=False),
562 0x08, # hash only on dst port
566 def _make_translations_v4(self):
567 self.translations = []
568 self.translations.append(
572 Endpoint(ip="30.0.0.1", port=5555, is_v6=False),
575 Endpoint(is_v6=False),
576 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
582 self.translations.append(
586 Endpoint(ip="30.0.0.2", port=5554, is_v6=False),
589 Endpoint(is_v6=False),
590 Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False),
596 self.translations.append(
600 Endpoint(ip="30.0.0.2", port=5553, is_v6=False),
603 Endpoint(is_v6=False),
604 Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False),
611 def _make_translations_v6(self):
612 self.translations = []
613 self.translations.append(
617 Endpoint(ip="30::1", port=5555, is_v6=True),
620 Endpoint(is_v6=True),
621 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True),
627 self.translations.append(
631 Endpoint(ip="30::2", port=5554, is_v6=True),
634 Endpoint(is_v6=True),
635 Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True),
641 self.translations.append(
645 Endpoint(ip="30::2", port=5553, is_v6=True),
648 Endpoint(is_v6=True),
649 Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True),
656 def test_icmp4(self):
657 # """ CNat Translation icmp v4 """
658 self._make_translations_v4()
661 def test_icmp6(self):
662 # """ CNat Translation icmp v6 """
663 self._make_translations_v6()
666 def test_cnat6(self):
667 # """ CNat Translation ipv6 """
668 self._make_translations_v6()
669 self.cnat_translation()
671 def test_cnat4(self):
672 # """ CNat Translation ipv4 """
673 self._make_translations_v4()
674 self.cnat_translation()
676 def test_cnat_fhc(self):
677 # """ CNat Translation flow hash config """
678 self._make_multi_backend_translations()
679 self.cnat_fhc_translation()
682 class TestCNatSourceNAT(CnatCommonTestCase):
683 """CNat Source NAT"""
687 super(TestCNatSourceNAT, cls).setUpClass()
690 def tearDownClass(cls):
691 super(TestCNatSourceNAT, cls).tearDownClass()
693 def _enable_disable_snat(self, is_enable=True):
694 self.vapi.cnat_set_snat_addresses(
695 snat_ip4=self.pg2.remote_hosts[0].ip4,
696 snat_ip6=self.pg2.remote_hosts[0].ip6,
697 sw_if_index=INVALID_INDEX,
699 self.vapi.feature_enable_disable(
700 enable=1 if is_enable else 0,
701 arc_name="ip6-unicast",
702 feature_name="cnat-snat-ip6",
703 sw_if_index=self.pg0.sw_if_index,
705 self.vapi.feature_enable_disable(
706 enable=1 if is_enable else 0,
707 arc_name="ip4-unicast",
708 feature_name="cnat-snat-ip4",
709 sw_if_index=self.pg0.sw_if_index,
712 policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
713 self.vapi.cnat_set_snat_policy(
714 policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX
716 for i in self.pg_interfaces:
717 self.vapi.cnat_snat_policy_add_del_if(
718 sw_if_index=i.sw_if_index,
719 is_add=1 if is_enable else 0,
720 table=policie_tbls.CNAT_POLICY_INCLUDE_V6,
722 self.vapi.cnat_snat_policy_add_del_if(
723 sw_if_index=i.sw_if_index,
724 is_add=1 if is_enable else 0,
725 table=policie_tbls.CNAT_POLICY_INCLUDE_V4,
729 super(TestCNatSourceNAT, self).setUp()
731 self.create_pg_interfaces(range(3))
732 self.pg1.generate_remote_hosts(2)
734 for i in self.pg_interfaces:
740 i.configure_ipv6_neighbors()
741 i.configure_ipv4_neighbors()
743 self._enable_disable_snat(is_enable=True)
746 self._enable_disable_snat(is_enable=True)
748 self.vapi.cnat_session_purge()
749 for i in self.pg_interfaces:
753 super(TestCNatSourceNAT, self).tearDown()
755 def test_snat_v6(self):
756 # """ CNat Source Nat v6 """
757 self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True)
758 self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True)
759 self.sourcenat_test_icmp_echo_conf(is_v6=True)
761 def test_snat_v4(self):
762 # """ CNat Source Nat v4 """
763 self.sourcenat_test_tcp_udp_conf(TCP)
764 self.sourcenat_test_tcp_udp_conf(UDP)
765 self.sourcenat_test_icmp_echo_conf()
767 def sourcenat_test_icmp_echo_conf(self, is_v6=False):
768 ctx = CnatTestContext(self, ICMP, is_v6=is_v6)
769 # 8 is ICMP type echo (v4 only)
770 ctx.cnat_send(self.pg0, 0, 0xFEED, self.pg1, 0, 8)
771 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8)
772 ctx.cnat_send_return().cnat_expect_return()
774 def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False):
775 ctx = CnatTestContext(self, L4PROTO, is_v6)
776 # we should source NAT
777 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
778 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
779 ctx.cnat_send_return().cnat_expect_return()
781 # exclude dst address of pg1.1 from snat
783 exclude_prefix = ip_network(
784 "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False
787 exclude_prefix = ip_network(
788 "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False
791 # add remote host to exclude list
792 self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=1)
794 # We should not source NAT the id=1
795 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
796 ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, 1, 6661)
797 ctx.cnat_send_return().cnat_expect_return()
799 # But we should source NAT the id=0
800 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
801 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
802 ctx.cnat_send_return().cnat_expect_return()
804 # remove remote host from exclude list
805 self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=0)
806 self.vapi.cnat_session_purge()
808 # We should source NAT again
809 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
810 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
811 ctx.cnat_send_return().cnat_expect_return()
813 # test return ICMP error nating
814 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
815 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
816 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
818 self.vapi.cnat_session_purge()
821 class TestCNatDHCP(CnatCommonTestCase):
822 """CNat Translation"""
826 super(TestCNatDHCP, cls).setUpClass()
829 def tearDownClass(cls):
830 super(TestCNatDHCP, cls).tearDownClass()
833 for i in self.pg_interfaces:
835 super(TestCNatDHCP, self).tearDown()
837 def make_addr(self, sw_if_index, addr_id, is_v6):
839 return "fd01:%x::%u" % (sw_if_index, addr_id + 1)
840 return "172.16.%u.%u" % (sw_if_index, addr_id)
842 def make_prefix(self, sw_if_index, addr_id, is_v6):
844 return "%s/128" % self.make_addr(sw_if_index, addr_id, is_v6)
845 return "%s/32" % self.make_addr(sw_if_index, addr_id, is_v6)
847 def check_resolved(self, tr, addr_id, is_v6=False):
848 qt = tr.query_vpp_config()
850 str(qt.vip.addr), self.make_addr(tr.vip.sw_if_index, addr_id, is_v6)
852 self.assertEqual(len(qt.paths), len(tr.paths))
853 for path_tr, path_qt in zip(tr.paths, qt.paths):
854 src_qt = path_qt.src_ep
855 dst_qt = path_qt.dst_ep
856 src_tr, dst_tr = path_tr
858 str(src_qt.addr), self.make_addr(src_tr.sw_if_index, addr_id, is_v6)
861 str(dst_qt.addr), self.make_addr(dst_tr.sw_if_index, addr_id, is_v6)
864 def add_del_address(self, pg, addr_id, is_add=True, is_v6=False):
865 self.vapi.sw_interface_add_del_address(
866 sw_if_index=pg.sw_if_index,
867 prefix=self.make_prefix(pg.sw_if_index, addr_id, is_v6),
868 is_add=1 if is_add else 0,
871 def _test_dhcp_v46(self, is_v6):
872 self.create_pg_interfaces(range(4))
873 for i in self.pg_interfaces:
876 (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg2, is_v6=is_v6)),
877 (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg3, is_v6=is_v6)),
879 ep = Endpoint(pg=self.pg0, is_v6=is_v6)
880 t = Translation(self, TCP, ep, paths, 0x9F).add_vpp_config()
881 # Add an address on every interface
882 # and check it is reflected in the cnat config
883 for pg in self.pg_interfaces:
884 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=is_v6)
885 self.check_resolved(t, addr_id=0, is_v6=is_v6)
886 # Add a new address on every interface, remove the old one
887 # and check it is reflected in the cnat config
888 for pg in self.pg_interfaces:
889 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=is_v6)
890 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=is_v6)
891 self.check_resolved(t, addr_id=1, is_v6=is_v6)
892 # remove the configuration
893 for pg in self.pg_interfaces:
894 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=is_v6)
895 t.remove_vpp_config()
897 def test_dhcp_v4(self):
898 self._test_dhcp_v46(False)
900 def test_dhcp_v6(self):
901 self._test_dhcp_v46(True)
903 def test_dhcp_snat(self):
904 self.create_pg_interfaces(range(1))
905 for i in self.pg_interfaces:
907 self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
908 # Add an address on every interface
909 # and check it is reflected in the cnat config
910 for pg in self.pg_interfaces:
911 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=False)
912 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=True)
913 r = self.vapi.cnat_get_snat_addresses()
916 self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=False),
919 str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=True)
921 # Add a new address on every interface, remove the old one
922 # and check it is reflected in the cnat config
923 for pg in self.pg_interfaces:
924 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=False)
925 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=True)
926 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=False)
927 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=True)
928 r = self.vapi.cnat_get_snat_addresses()
931 self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=False),
934 str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=True)
936 # remove the configuration
937 for pg in self.pg_interfaces:
938 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=False)
939 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=True)
940 self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
943 if __name__ == "__main__":
944 unittest.main(testRunner=VppTestRunner)