5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto, INVALID_INDEX
7 from itertools import product
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
14 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
18 from ipaddress import (
27 from vpp_object import VppObject
28 from vpp_papi import VppEnum
37 class CnatCommonTestCase(VppTestCase):
38 """CNat common test class"""
41 # turn the scanner off whilst testing otherwise sessions
49 "session-cleanup-timeout",
62 super(CnatCommonTestCase, cls).setUpClass()
65 def tearDownClass(cls):
66 super(CnatCommonTestCase, cls).tearDownClass()
69 class Endpoint(object):
72 def __init__(self, pg=None, pgi=None, port=0, is_v6=False, ip=None):
75 self.sw_if_index = INVALID_INDEX
76 if pg is not None and pgi is not None:
77 # pg interface specified and remote index
78 self.ip = self.get_ip46(pg.remote_hosts[pgi])
81 self.sw_if_index = pg.sw_if_index
85 self.ip = "::" if self.is_v6 else "0.0.0.0"
87 def get_ip46(self, obj):
92 def udpate(self, **kwargs):
93 self.__init__(**kwargs)
97 return VppEnum.vl_api_address_family_t.ADDRESS_IP6
98 return VppEnum.vl_api_address_family_t.ADDRESS_IP4
104 "sw_if_index": self.sw_if_index,
105 "if_af": self._vpp_if_af(),
109 return "%s:%d" % (self.ip, self.port)
112 class Translation(VppObject):
113 def __init__(self, test, iproto, vip, paths):
121 return "%s %s %s" % (self.vip, self.iproto, self.paths)
123 def _vl4_proto(self):
124 ip_proto = VppEnum.vl_api_ip_proto_t
126 UDP: ip_proto.IP_API_PROTO_UDP,
127 TCP: ip_proto.IP_API_PROTO_TCP,
130 def _encoded_paths(self):
132 {"src_ep": src.encode(), "dst_ep": dst.encode()}
133 for (src, dst) in self.paths
136 def add_vpp_config(self):
137 r = self._test.vapi.cnat_translation_update(
139 "vip": self.vip.encode(),
140 "ip_proto": self._vl4_proto(),
141 "n_paths": len(self.paths),
142 "paths": self._encoded_paths(),
145 self._test.registry.register(self, self._test.logger)
149 def remove_vpp_config(self):
150 assert self.id is not None
151 self._test.vapi.cnat_translation_del(id=self.id)
154 def query_vpp_config(self):
155 for t in self._test.vapi.cnat_translation_dump():
156 if self.id == t.translation.id:
161 class CnatTestContext(object):
165 ctx = CnatTestContext(self, TCP, is_v6=True)
167 # send pg0.remote[0]:1234 -> pg1.remote[0]:6661
168 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
170 # We expect this to be NATed as
171 # pg2.remote[0]:<anyport> -> pg1.remote[0]:6661
172 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
174 # After running cnat_expect, we can send back the received packet
175 # and expect it be 'unnated' so that we get the original packet
176 ctx.cnat_send_return().cnat_expect_return()
178 # same thing for ICMP errors
179 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
182 def __init__(self, test, L4PROTO, is_v6):
183 self.L4PROTO = L4PROTO
187 def get_ip46(self, obj):
194 return IPv6 if self.is_v6 else IP
197 self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port, no_replies=False
199 if isinstance(src_id, int):
200 self.src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
202 self.dst_addr = src_id
203 if isinstance(dst_id, int):
204 self.dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
206 self.dst_addr = dst_id
207 self.src_port = src_port # also ICMP id
208 self.dst_port = dst_port # also ICMP type
210 if self.L4PROTO in [TCP, UDP]:
211 l4 = self.L4PROTO(sport=self.src_port, dport=self.dst_port)
212 elif self.L4PROTO in [ICMP] and not self.is_v6:
213 l4 = self.L4PROTO(id=self.src_port, type=self.dst_port)
214 elif self.L4PROTO in [ICMP] and self.is_v6:
215 l4 = ICMPv6EchoRequest(id=self.src_port)
217 Ether(src=src_pg.remote_mac, dst=src_pg.local_mac)
218 / self.IP46(src=self.src_addr, dst=self.dst_addr)
224 self._test.send_and_assert_no_replies(src_pg, p1 * N_PKTS, dst_pg)
226 self.rxs = self._test.send_and_expect(src_pg, p1 * N_PKTS, dst_pg)
227 self.expected_src_pg = src_pg
228 self.expected_dst_pg = dst_pg
231 def cnat_expect(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port):
232 if isinstance(src_id, int):
233 self.expect_src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
235 self.expect_src_addr = src_id
236 if isinstance(dst_id, int):
237 self.expect_dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
239 self.expect_dst_addr = dst_id
240 self.expect_src_port = src_port
241 self.expect_dst_port = dst_port
243 if self.expect_src_port is None:
244 if self.L4PROTO in [TCP, UDP]:
245 self.expect_src_port = self.rxs[0][self.L4PROTO].sport
246 elif self.L4PROTO in [ICMP] and not self.is_v6:
247 self.expect_src_port = self.rxs[0][self.L4PROTO].id
248 elif self.L4PROTO in [ICMP] and self.is_v6:
249 self.expect_src_port = self.rxs[0][ICMPv6EchoRequest].id
252 self._test.assert_packet_checksums_valid(rx)
253 self._test.assertEqual(rx[self.IP46].dst, self.expect_dst_addr)
254 self._test.assertEqual(rx[self.IP46].src, self.expect_src_addr)
255 if self.L4PROTO in [TCP, UDP]:
256 self._test.assertEqual(rx[self.L4PROTO].dport, self.expect_dst_port)
257 self._test.assertEqual(rx[self.L4PROTO].sport, self.expect_src_port)
258 elif self.L4PROTO in [ICMP] and not self.is_v6:
259 self._test.assertEqual(rx[self.L4PROTO].type, self.expect_dst_port)
260 self._test.assertEqual(rx[self.L4PROTO].id, self.expect_src_port)
261 elif self.L4PROTO in [ICMP] and self.is_v6:
262 self._test.assertEqual(rx[ICMPv6EchoRequest].id, self.expect_src_port)
265 def cnat_send_return(self):
266 """This sends the return traffic"""
267 if self.L4PROTO in [TCP, UDP]:
268 l4 = self.L4PROTO(sport=self.expect_dst_port, dport=self.expect_src_port)
269 elif self.L4PROTO in [ICMP] and not self.is_v6:
270 # icmp type 0 if echo reply
271 l4 = self.L4PROTO(id=self.expect_src_port, type=0)
272 elif self.L4PROTO in [ICMP] and self.is_v6:
273 l4 = ICMPv6EchoReply(id=self.expect_src_port)
274 src_mac = self.expected_dst_pg.remote_mac
276 Ether(src=src_mac, dst=self.expected_dst_pg.local_mac)
277 / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
282 self.return_rxs = self._test.send_and_expect(
283 self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
287 def cnat_expect_return(self):
288 for rx in self.return_rxs:
289 self._test.assert_packet_checksums_valid(rx)
290 self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
291 self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
292 if self.L4PROTO in [TCP, UDP]:
293 self._test.assertEqual(rx[self.L4PROTO].dport, self.src_port)
294 self._test.assertEqual(rx[self.L4PROTO].sport, self.dst_port)
295 elif self.L4PROTO in [ICMP] and not self.is_v6:
296 # icmp type 0 if echo reply
297 self._test.assertEqual(rx[self.L4PROTO].type, 0)
298 self._test.assertEqual(rx[self.L4PROTO].id, self.src_port)
299 elif self.L4PROTO in [ICMP] and self.is_v6:
300 self._test.assertEqual(rx[ICMPv6EchoReply].id, self.src_port)
303 def cnat_send_icmp_return_error(self):
305 This called after cnat_expect will send an icmp error
308 ICMPelem = ICMPv6DestUnreach(code=1) if self.is_v6 else ICMP(type=11)
309 InnerIP = self.rxs[0][self.IP46]
312 src=self.expected_dst_pg.remote_mac, dst=self.expected_dst_pg.local_mac
314 / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
318 self.return_rxs = self._test.send_and_expect(
319 self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
323 def cnat_expect_icmp_error_return(self):
324 ICMP46 = ICMPv6DestUnreach if self.is_v6 else ICMP
325 IP46err = IPerror6 if self.is_v6 else IPerror
326 L4err = TCPerror if self.L4PROTO is TCP else UDPerror
327 for rx in self.return_rxs:
328 self._test.assert_packet_checksums_valid(rx)
329 self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
330 self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
331 self._test.assertEqual(rx[ICMP46][IP46err].src, self.src_addr)
332 self._test.assertEqual(rx[ICMP46][IP46err].dst, self.dst_addr)
333 self._test.assertEqual(rx[ICMP46][IP46err][L4err].sport, self.src_port)
334 self._test.assertEqual(rx[ICMP46][IP46err][L4err].dport, self.dst_port)
338 # -------------------------------------------------------------------
339 # -------------------------------------------------------------------
340 # -------------------------------------------------------------------
341 # -------------------------------------------------------------------
344 class TestCNatTranslation(CnatCommonTestCase):
345 """CNat Translation"""
349 super(TestCNatTranslation, cls).setUpClass()
352 def tearDownClass(cls):
353 super(TestCNatTranslation, cls).tearDownClass()
356 super(TestCNatTranslation, self).setUp()
358 self.create_pg_interfaces(range(3))
359 self.pg0.generate_remote_hosts(N_REMOTE_HOSTS)
360 self.pg1.generate_remote_hosts(N_REMOTE_HOSTS)
362 for i in self.pg_interfaces:
368 i.configure_ipv4_neighbors()
369 i.configure_ipv6_neighbors()
372 for translation in self.translations:
373 translation.remove_vpp_config()
375 self.vapi.cnat_session_purge()
376 self.assertFalse(self.vapi.cnat_session_dump())
378 for i in self.pg_interfaces:
382 super(TestCNatTranslation, self).tearDown()
384 def cnat_translation(self):
385 """CNat Translation"""
386 self.logger.info(self.vapi.cli("sh cnat client"))
387 self.logger.info(self.vapi.cli("sh cnat translation"))
389 for nbr, translation in enumerate(self.translations):
390 vip = translation.vip
393 # Test Flows to the VIP
395 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
396 for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
398 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
399 dst_port = translation.paths[0][DST].port
400 ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg1, nbr, dst_port)
402 ctx.cnat_send_return().cnat_expect_return()
405 # packets to the VIP that do not match a
406 # translation are dropped
409 self.pg0, src_pgi, sport, self.pg1, vip.ip, 6666, no_replies=True
413 # packets from the VIP that do not match a
414 # session are forwarded
416 ctx.cnat_send(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
417 ctx.cnat_expect(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
420 # modify the translation to use a different backend
422 old_dst_port = translation.paths[0][DST].port
423 translation.paths[0][DST].udpate(
424 pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6
426 translation.add_vpp_config()
429 # existing flows follow the old path
431 for src_pgi in range(N_REMOTE_HOSTS):
432 for sport in [1234, 1233]:
434 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
436 self.pg0, src_pgi, sport, self.pg1, nbr, old_dst_port
439 ctx.cnat_send_return().cnat_expect_return()
442 # new flows go to the new backend
444 for src_pgi in range(N_REMOTE_HOSTS):
445 ctx.cnat_send(self.pg0, src_pgi, 9999, self.pg2, vip.ip, vip.port)
446 ctx.cnat_expect(self.pg0, src_pgi, 9999, self.pg2, 0, 5000)
448 self.logger.info(self.vapi.cli("sh cnat session verbose"))
451 # turn the scanner back on and wait until the sessions
454 self.vapi.cli("test cnat scanner on")
455 self.virtual_sleep(2)
456 sessions = self.vapi.cnat_session_dump()
457 self.assertEqual(len(sessions), 0)
458 self.vapi.cli("test cnat scanner off")
461 # load some flows again and purge
463 for translation in self.translations:
464 vip = translation.vip
465 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
466 for src_pgi in range(N_REMOTE_HOSTS):
467 for sport in [1234, 1233]:
469 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg2, vip.ip, vip.port)
470 ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg2, 0, 5000)
472 def _test_icmp(self):
476 for nbr, translation in enumerate(self.translations):
477 vip = translation.vip
478 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
483 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, vip.ip, vip.port)
484 dst_port = translation.paths[0][DST].port
485 ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, nbr, dst_port)
486 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
489 # ICMP errors with no VIP associated should not be
492 ctx.cnat_send(self.pg0, 0, 1234, self.pg2, 0, vip.port)
493 dst_port = translation.paths[0][DST].port
494 ctx.cnat_expect(self.pg0, 0, 1234, self.pg2, 0, vip.port)
495 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
497 def _make_translations_v4(self):
498 self.translations = []
499 self.translations.append(
503 Endpoint(ip="30.0.0.1", port=5555, is_v6=False),
506 Endpoint(is_v6=False),
507 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
512 self.translations.append(
516 Endpoint(ip="30.0.0.2", port=5554, is_v6=False),
519 Endpoint(is_v6=False),
520 Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False),
525 self.translations.append(
529 Endpoint(ip="30.0.0.2", port=5553, is_v6=False),
532 Endpoint(is_v6=False),
533 Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False),
539 def _make_translations_v6(self):
540 self.translations = []
541 self.translations.append(
545 Endpoint(ip="30::1", port=5555, is_v6=True),
548 Endpoint(is_v6=True),
549 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True),
554 self.translations.append(
558 Endpoint(ip="30::2", port=5554, is_v6=True),
561 Endpoint(is_v6=True),
562 Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True),
567 self.translations.append(
571 Endpoint(ip="30::2", port=5553, is_v6=True),
574 Endpoint(is_v6=True),
575 Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True),
581 def test_icmp4(self):
582 # """ CNat Translation icmp v4 """
583 self._make_translations_v4()
586 def test_icmp6(self):
587 # """ CNat Translation icmp v6 """
588 self._make_translations_v6()
591 def test_cnat6(self):
592 # """ CNat Translation ipv6 """
593 self._make_translations_v6()
594 self.cnat_translation()
596 def test_cnat4(self):
597 # """ CNat Translation ipv4 """
598 self._make_translations_v4()
599 self.cnat_translation()
602 class TestCNatSourceNAT(CnatCommonTestCase):
603 """CNat Source NAT"""
607 super(TestCNatSourceNAT, cls).setUpClass()
610 def tearDownClass(cls):
611 super(TestCNatSourceNAT, cls).tearDownClass()
613 def _enable_disable_snat(self, is_enable=True):
614 self.vapi.cnat_set_snat_addresses(
615 snat_ip4=self.pg2.remote_hosts[0].ip4,
616 snat_ip6=self.pg2.remote_hosts[0].ip6,
617 sw_if_index=INVALID_INDEX,
619 self.vapi.feature_enable_disable(
620 enable=1 if is_enable else 0,
621 arc_name="ip6-unicast",
622 feature_name="cnat-snat-ip6",
623 sw_if_index=self.pg0.sw_if_index,
625 self.vapi.feature_enable_disable(
626 enable=1 if is_enable else 0,
627 arc_name="ip4-unicast",
628 feature_name="cnat-snat-ip4",
629 sw_if_index=self.pg0.sw_if_index,
632 policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
633 self.vapi.cnat_set_snat_policy(
634 policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX
636 for i in self.pg_interfaces:
637 self.vapi.cnat_snat_policy_add_del_if(
638 sw_if_index=i.sw_if_index,
639 is_add=1 if is_enable else 0,
640 table=policie_tbls.CNAT_POLICY_INCLUDE_V6,
642 self.vapi.cnat_snat_policy_add_del_if(
643 sw_if_index=i.sw_if_index,
644 is_add=1 if is_enable else 0,
645 table=policie_tbls.CNAT_POLICY_INCLUDE_V4,
649 super(TestCNatSourceNAT, self).setUp()
651 self.create_pg_interfaces(range(3))
652 self.pg1.generate_remote_hosts(2)
654 for i in self.pg_interfaces:
660 i.configure_ipv6_neighbors()
661 i.configure_ipv4_neighbors()
663 self._enable_disable_snat(is_enable=True)
666 self._enable_disable_snat(is_enable=True)
668 self.vapi.cnat_session_purge()
669 for i in self.pg_interfaces:
673 super(TestCNatSourceNAT, self).tearDown()
675 def test_snat_v6(self):
676 # """ CNat Source Nat v6 """
677 self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True)
678 self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True)
679 self.sourcenat_test_icmp_echo_conf(is_v6=True)
681 def test_snat_v4(self):
682 # """ CNat Source Nat v4 """
683 self.sourcenat_test_tcp_udp_conf(TCP)
684 self.sourcenat_test_tcp_udp_conf(UDP)
685 self.sourcenat_test_icmp_echo_conf()
687 def sourcenat_test_icmp_echo_conf(self, is_v6=False):
688 ctx = CnatTestContext(self, ICMP, is_v6=is_v6)
689 # 8 is ICMP type echo (v4 only)
690 ctx.cnat_send(self.pg0, 0, 0xFEED, self.pg1, 0, 8)
691 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8)
692 ctx.cnat_send_return().cnat_expect_return()
694 def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False):
695 ctx = CnatTestContext(self, L4PROTO, is_v6)
696 # we should source NAT
697 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
698 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
699 ctx.cnat_send_return().cnat_expect_return()
701 # exclude dst address of pg1.1 from snat
703 exclude_prefix = ip_network(
704 "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False
707 exclude_prefix = ip_network(
708 "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False
711 # add remote host to exclude list
712 self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=1)
714 # We should not source NAT the id=1
715 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
716 ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, 1, 6661)
717 ctx.cnat_send_return().cnat_expect_return()
719 # But we should source NAT the id=0
720 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
721 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
722 ctx.cnat_send_return().cnat_expect_return()
724 # remove remote host from exclude list
725 self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=0)
726 self.vapi.cnat_session_purge()
728 # We should source NAT again
729 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
730 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
731 ctx.cnat_send_return().cnat_expect_return()
733 # test return ICMP error nating
734 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
735 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
736 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
738 self.vapi.cnat_session_purge()
741 class TestCNatDHCP(CnatCommonTestCase):
742 """CNat Translation"""
746 super(TestCNatDHCP, cls).setUpClass()
749 def tearDownClass(cls):
750 super(TestCNatDHCP, cls).tearDownClass()
753 for i in self.pg_interfaces:
755 super(TestCNatDHCP, self).tearDown()
757 def make_addr(self, sw_if_index, addr_id, is_v6):
759 return "fd01:%x::%u" % (sw_if_index, addr_id + 1)
760 return "172.16.%u.%u" % (sw_if_index, addr_id)
762 def make_prefix(self, sw_if_index, addr_id, is_v6):
764 return "%s/128" % self.make_addr(sw_if_index, addr_id, is_v6)
765 return "%s/32" % self.make_addr(sw_if_index, addr_id, is_v6)
767 def check_resolved(self, tr, addr_id, is_v6=False):
768 qt = tr.query_vpp_config()
770 str(qt.vip.addr), self.make_addr(tr.vip.sw_if_index, addr_id, is_v6)
772 self.assertEqual(len(qt.paths), len(tr.paths))
773 for path_tr, path_qt in zip(tr.paths, qt.paths):
774 src_qt = path_qt.src_ep
775 dst_qt = path_qt.dst_ep
776 src_tr, dst_tr = path_tr
778 str(src_qt.addr), self.make_addr(src_tr.sw_if_index, addr_id, is_v6)
781 str(dst_qt.addr), self.make_addr(dst_tr.sw_if_index, addr_id, is_v6)
784 def add_del_address(self, pg, addr_id, is_add=True, is_v6=False):
785 self.vapi.sw_interface_add_del_address(
786 sw_if_index=pg.sw_if_index,
787 prefix=self.make_prefix(pg.sw_if_index, addr_id, is_v6),
788 is_add=1 if is_add else 0,
791 def _test_dhcp_v46(self, is_v6):
792 self.create_pg_interfaces(range(4))
793 for i in self.pg_interfaces:
796 (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg2, is_v6=is_v6)),
797 (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg3, is_v6=is_v6)),
799 ep = Endpoint(pg=self.pg0, is_v6=is_v6)
800 t = Translation(self, TCP, ep, paths).add_vpp_config()
801 # Add an address on every interface
802 # and check it is reflected in the cnat config
803 for pg in self.pg_interfaces:
804 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=is_v6)
805 self.check_resolved(t, addr_id=0, is_v6=is_v6)
806 # Add a new address on every interface, remove the old one
807 # and check it is reflected in the cnat config
808 for pg in self.pg_interfaces:
809 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=is_v6)
810 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=is_v6)
811 self.check_resolved(t, addr_id=1, is_v6=is_v6)
812 # remove the configuration
813 for pg in self.pg_interfaces:
814 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=is_v6)
815 t.remove_vpp_config()
817 def test_dhcp_v4(self):
818 self._test_dhcp_v46(False)
820 def test_dhcp_v6(self):
821 self._test_dhcp_v46(True)
823 def test_dhcp_snat(self):
824 self.create_pg_interfaces(range(1))
825 for i in self.pg_interfaces:
827 self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
828 # Add an address on every interface
829 # and check it is reflected in the cnat config
830 for pg in self.pg_interfaces:
831 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=False)
832 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=True)
833 r = self.vapi.cnat_get_snat_addresses()
836 self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=False),
839 str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=True)
841 # Add a new address on every interface, remove the old one
842 # and check it is reflected in the cnat config
843 for pg in self.pg_interfaces:
844 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=False)
845 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=True)
846 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=False)
847 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=True)
848 r = self.vapi.cnat_get_snat_addresses()
851 self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=False),
854 str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=True)
856 # remove the configuration
857 for pg in self.pg_interfaces:
858 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=False)
859 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=True)
860 self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
863 if __name__ == "__main__":
864 unittest.main(testRunner=VppTestRunner)