5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto, INVALID_INDEX
7 from itertools import product
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
14 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
18 from ipaddress import (
27 from vpp_object import VppObject
28 from vpp_papi import VppEnum
37 class CnatCommonTestCase(VppTestCase):
38 """CNat common test class"""
41 # turn the scanner off whilst testing otherwise sessions
44 extra_vpp_punt_config = [
49 "session-cleanup-timeout",
62 super(CnatCommonTestCase, cls).setUpClass()
65 def tearDownClass(cls):
66 super(CnatCommonTestCase, cls).tearDownClass()
69 class Endpoint(object):
72 def __init__(self, pg=None, pgi=None, port=0, is_v6=False, ip=None):
75 self.sw_if_index = INVALID_INDEX
76 if pg is not None and pgi is not None:
77 # pg interface specified and remote index
78 self.ip = self.get_ip46(pg.remote_hosts[pgi])
81 self.sw_if_index = pg.sw_if_index
85 self.ip = "::" if self.is_v6 else "0.0.0.0"
87 def get_ip46(self, obj):
92 def udpate(self, **kwargs):
93 self.__init__(**kwargs)
97 return VppEnum.vl_api_address_family_t.ADDRESS_IP6
98 return VppEnum.vl_api_address_family_t.ADDRESS_IP4
104 "sw_if_index": self.sw_if_index,
105 "if_af": self._vpp_if_af(),
109 return "%s:%d" % (self.ip, self.port)
112 class Translation(VppObject):
113 def __init__(self, test, iproto, vip, paths):
121 return "%s %s %s" % (self.vip, self.iproto, self.paths)
123 def _vl4_proto(self):
124 ip_proto = VppEnum.vl_api_ip_proto_t
126 UDP: ip_proto.IP_API_PROTO_UDP,
127 TCP: ip_proto.IP_API_PROTO_TCP,
130 def _encoded_paths(self):
132 {"src_ep": src.encode(), "dst_ep": dst.encode()}
133 for (src, dst) in self.paths
136 def add_vpp_config(self):
137 r = self._test.vapi.cnat_translation_update(
139 "vip": self.vip.encode(),
140 "ip_proto": self._vl4_proto(),
141 "n_paths": len(self.paths),
142 "paths": self._encoded_paths(),
145 self._test.registry.register(self, self._test.logger)
149 def remove_vpp_config(self):
150 assert self.id is not None
151 self._test.vapi.cnat_translation_del(id=self.id)
154 def query_vpp_config(self):
155 for t in self._test.vapi.cnat_translation_dump():
156 if self.id == t.translation.id:
161 class CnatTestContext(object):
165 ctx = CnatTestContext(self, TCP, is_v6=True)
167 # send pg0.remote[0]:1234 -> pg1.remote[0]:6661
168 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
170 # We expect this to be NATed as
171 # pg2.remote[0]:<anyport> -> pg1.remote[0]:6661
172 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
174 # After running cnat_expect, we can send back the received packet
175 # and expect it be 'unnated' so that we get the original packet
176 ctx.cnat_send_return().cnat_expect_return()
178 # same thing for ICMP errors
179 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
182 def __init__(self, test, L4PROTO, is_v6):
183 self.L4PROTO = L4PROTO
187 def get_ip46(self, obj):
194 return IPv6 if self.is_v6 else IP
197 self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port, no_replies=False
199 if isinstance(src_id, int):
200 self.src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
202 self.dst_addr = src_id
203 if isinstance(dst_id, int):
204 self.dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
206 self.dst_addr = dst_id
207 self.src_port = src_port # also ICMP id
208 self.dst_port = dst_port # also ICMP type
210 if self.L4PROTO in [TCP, UDP]:
211 l4 = self.L4PROTO(sport=self.src_port, dport=self.dst_port)
212 elif self.L4PROTO in [ICMP] and not self.is_v6:
213 l4 = self.L4PROTO(id=self.src_port, type=self.dst_port)
214 elif self.L4PROTO in [ICMP] and self.is_v6:
215 l4 = ICMPv6EchoRequest(id=self.src_port)
217 Ether(src=src_pg.remote_mac, dst=src_pg.local_mac)
218 / self.IP46(src=self.src_addr, dst=self.dst_addr)
224 self._test.send_and_assert_no_replies(src_pg, p1 * N_PKTS, dst_pg)
226 self.rxs = self._test.send_and_expect(src_pg, p1 * N_PKTS, dst_pg)
227 self.expected_src_pg = src_pg
228 self.expected_dst_pg = dst_pg
231 def cnat_expect(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port):
232 if isinstance(src_id, int):
233 self.expect_src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
235 self.expect_src_addr = src_id
236 if isinstance(dst_id, int):
237 self.expect_dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
239 self.expect_dst_addr = dst_id
240 self.expect_src_port = src_port
241 self.expect_dst_port = dst_port
243 if self.expect_src_port is None:
244 if self.L4PROTO in [TCP, UDP]:
245 self.expect_src_port = self.rxs[0][self.L4PROTO].sport
246 elif self.L4PROTO in [ICMP] and not self.is_v6:
247 self.expect_src_port = self.rxs[0][self.L4PROTO].id
248 elif self.L4PROTO in [ICMP] and self.is_v6:
249 self.expect_src_port = self.rxs[0][ICMPv6EchoRequest].id
252 self._test.assert_packet_checksums_valid(rx)
253 self._test.assertEqual(rx[self.IP46].dst, self.expect_dst_addr)
254 self._test.assertEqual(rx[self.IP46].src, self.expect_src_addr)
255 if self.L4PROTO in [TCP, UDP]:
256 self._test.assertEqual(rx[self.L4PROTO].dport, self.expect_dst_port)
257 self._test.assertEqual(rx[self.L4PROTO].sport, self.expect_src_port)
258 elif self.L4PROTO in [ICMP] and not self.is_v6:
259 self._test.assertEqual(rx[self.L4PROTO].type, self.expect_dst_port)
260 self._test.assertEqual(rx[self.L4PROTO].id, self.expect_src_port)
261 elif self.L4PROTO in [ICMP] and self.is_v6:
262 self._test.assertEqual(rx[ICMPv6EchoRequest].id, self.expect_src_port)
265 def cnat_send_return(self):
266 """This sends the return traffic"""
267 if self.L4PROTO in [TCP, UDP]:
268 l4 = self.L4PROTO(sport=self.expect_dst_port, dport=self.expect_src_port)
269 elif self.L4PROTO in [ICMP] and not self.is_v6:
270 # icmp type 0 if echo reply
271 l4 = self.L4PROTO(id=self.expect_src_port, type=0)
272 elif self.L4PROTO in [ICMP] and self.is_v6:
273 l4 = ICMPv6EchoReply(id=self.expect_src_port)
274 src_mac = self.expected_dst_pg.remote_mac
276 Ether(src=src_mac, dst=self.expected_dst_pg.local_mac)
277 / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
282 self.return_rxs = self._test.send_and_expect(
283 self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
287 def cnat_expect_return(self):
288 for rx in self.return_rxs:
289 self._test.assert_packet_checksums_valid(rx)
290 self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
291 self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
292 if self.L4PROTO in [TCP, UDP]:
293 self._test.assertEqual(rx[self.L4PROTO].dport, self.src_port)
294 self._test.assertEqual(rx[self.L4PROTO].sport, self.dst_port)
295 elif self.L4PROTO in [ICMP] and not self.is_v6:
296 # icmp type 0 if echo reply
297 self._test.assertEqual(rx[self.L4PROTO].type, 0)
298 self._test.assertEqual(rx[self.L4PROTO].id, self.src_port)
299 elif self.L4PROTO in [ICMP] and self.is_v6:
300 self._test.assertEqual(rx[ICMPv6EchoReply].id, self.src_port)
303 def cnat_send_icmp_return_error(self):
305 This called after cnat_expect will send an icmp error
308 ICMPelem = ICMPv6DestUnreach(code=1) if self.is_v6 else ICMP(type=11)
309 InnerIP = self.rxs[0][self.IP46]
312 src=self.expected_dst_pg.remote_mac, dst=self.expected_dst_pg.local_mac
314 / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
318 self.return_rxs = self._test.send_and_expect(
319 self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
323 def cnat_expect_icmp_error_return(self):
324 ICMP46 = ICMPv6DestUnreach if self.is_v6 else ICMP
325 IP46err = IPerror6 if self.is_v6 else IPerror
326 L4err = TCPerror if self.L4PROTO is TCP else UDPerror
327 for rx in self.return_rxs:
328 self._test.assert_packet_checksums_valid(rx)
329 self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
330 self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
331 self._test.assertEqual(rx[ICMP46][IP46err].src, self.src_addr)
332 self._test.assertEqual(rx[ICMP46][IP46err].dst, self.dst_addr)
333 self._test.assertEqual(rx[ICMP46][IP46err][L4err].sport, self.src_port)
334 self._test.assertEqual(rx[ICMP46][IP46err][L4err].dport, self.dst_port)
338 # -------------------------------------------------------------------
339 # -------------------------------------------------------------------
340 # -------------------------------------------------------------------
341 # -------------------------------------------------------------------
344 class TestCNatTranslation(CnatCommonTestCase):
345 """CNat Translation"""
349 super(TestCNatTranslation, cls).setUpClass()
352 def tearDownClass(cls):
353 super(TestCNatTranslation, cls).tearDownClass()
356 super(TestCNatTranslation, self).setUp()
358 self.create_pg_interfaces(range(3))
359 self.pg0.generate_remote_hosts(N_REMOTE_HOSTS)
360 self.pg1.generate_remote_hosts(N_REMOTE_HOSTS)
362 for i in self.pg_interfaces:
368 i.configure_ipv4_neighbors()
369 i.configure_ipv6_neighbors()
372 for translation in self.translations:
373 translation.remove_vpp_config()
375 self.vapi.cnat_session_purge()
376 self.assertFalse(self.vapi.cnat_session_dump())
378 for i in self.pg_interfaces:
382 super(TestCNatTranslation, self).tearDown()
384 def cnat_translation(self):
385 """CNat Translation"""
386 self.logger.info(self.vapi.cli("sh cnat client"))
387 self.logger.info(self.vapi.cli("sh cnat translation"))
389 for nbr, translation in enumerate(self.translations):
390 vip = translation.vip
393 # Test Flows to the VIP
395 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
396 for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
398 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
399 dst_port = translation.paths[0][DST].port
400 ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg1, nbr, dst_port)
402 ctx.cnat_send_return().cnat_expect_return()
405 # packets to the VIP that do not match a
406 # translation are dropped
409 self.pg0, src_pgi, sport, self.pg1, vip.ip, 6666, no_replies=True
413 # packets from the VIP that do not match a
414 # session are forwarded
416 ctx.cnat_send(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
417 ctx.cnat_expect(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
420 # modify the translation to use a different backend
422 old_dst_port = translation.paths[0][DST].port
423 translation.paths[0][DST].udpate(
424 pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6
426 translation.add_vpp_config()
429 # existing flows follow the old path
431 for src_pgi in range(N_REMOTE_HOSTS):
432 for sport in [1234, 1233]:
434 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
436 self.pg0, src_pgi, sport, self.pg1, nbr, old_dst_port
439 ctx.cnat_send_return().cnat_expect_return()
442 # new flows go to the new backend
444 for src_pgi in range(N_REMOTE_HOSTS):
445 ctx.cnat_send(self.pg0, src_pgi, 9999, self.pg2, vip.ip, vip.port)
446 ctx.cnat_expect(self.pg0, src_pgi, 9999, self.pg2, 0, 5000)
448 self.logger.info(self.vapi.cli("sh cnat session verbose"))
451 # turn the scanner back on and wait until the sessions
454 self.vapi.cli("test cnat scanner on")
455 self.virtual_sleep(2)
456 sessions = self.vapi.cnat_session_dump()
457 self.assertEqual(len(sessions), 0)
458 self.vapi.cli("test cnat scanner off")
461 # load some flows again and purge
463 for translation in self.translations:
464 vip = translation.vip
465 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
466 for src_pgi in range(N_REMOTE_HOSTS):
467 for sport in [1234, 1233]:
469 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg2, vip.ip, vip.port)
470 ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg2, 0, 5000)
472 def _test_icmp(self):
477 for nbr, translation in enumerate(self.translations):
478 vip = translation.vip
479 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
484 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, vip.ip, vip.port)
485 dst_port = translation.paths[0][DST].port
486 ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, nbr, dst_port)
487 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
490 # ICMP errors with no VIP associated should not be
493 ctx.cnat_send(self.pg0, 0, 1234, self.pg2, 0, vip.port)
494 dst_port = translation.paths[0][DST].port
495 ctx.cnat_expect(self.pg0, 0, 1234, self.pg2, 0, vip.port)
496 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
498 def _make_translations_v4(self):
499 self.translations = []
500 self.translations.append(
504 Endpoint(ip="30.0.0.1", port=5555, is_v6=False),
507 Endpoint(is_v6=False),
508 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
513 self.translations.append(
517 Endpoint(ip="30.0.0.2", port=5554, is_v6=False),
520 Endpoint(is_v6=False),
521 Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False),
526 self.translations.append(
530 Endpoint(ip="30.0.0.2", port=5553, is_v6=False),
533 Endpoint(is_v6=False),
534 Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False),
540 def _make_translations_v6(self):
541 self.translations = []
542 self.translations.append(
546 Endpoint(ip="30::1", port=5555, is_v6=True),
549 Endpoint(is_v6=True),
550 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True),
555 self.translations.append(
559 Endpoint(ip="30::2", port=5554, is_v6=True),
562 Endpoint(is_v6=True),
563 Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True),
568 self.translations.append(
572 Endpoint(ip="30::2", port=5553, is_v6=True),
575 Endpoint(is_v6=True),
576 Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True),
582 def test_icmp4(self):
583 # """ CNat Translation icmp v4 """
584 self._make_translations_v4()
587 def test_icmp6(self):
588 # """ CNat Translation icmp v6 """
589 self._make_translations_v6()
592 def test_cnat6(self):
593 # """ CNat Translation ipv6 """
594 self._make_translations_v6()
595 self.cnat_translation()
597 def test_cnat4(self):
598 # """ CNat Translation ipv4 """
599 self._make_translations_v4()
600 self.cnat_translation()
603 class TestCNatSourceNAT(CnatCommonTestCase):
604 """CNat Source NAT"""
608 super(TestCNatSourceNAT, cls).setUpClass()
611 def tearDownClass(cls):
612 super(TestCNatSourceNAT, cls).tearDownClass()
614 def _enable_disable_snat(self, is_enable=True):
615 self.vapi.cnat_set_snat_addresses(
616 snat_ip4=self.pg2.remote_hosts[0].ip4,
617 snat_ip6=self.pg2.remote_hosts[0].ip6,
618 sw_if_index=INVALID_INDEX,
620 self.vapi.feature_enable_disable(
621 enable=1 if is_enable else 0,
622 arc_name="ip6-unicast",
623 feature_name="cnat-snat-ip6",
624 sw_if_index=self.pg0.sw_if_index,
626 self.vapi.feature_enable_disable(
627 enable=1 if is_enable else 0,
628 arc_name="ip4-unicast",
629 feature_name="cnat-snat-ip4",
630 sw_if_index=self.pg0.sw_if_index,
633 policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
634 self.vapi.cnat_set_snat_policy(
635 policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX
637 for i in self.pg_interfaces:
638 self.vapi.cnat_snat_policy_add_del_if(
639 sw_if_index=i.sw_if_index,
640 is_add=1 if is_enable else 0,
641 table=policie_tbls.CNAT_POLICY_INCLUDE_V6,
643 self.vapi.cnat_snat_policy_add_del_if(
644 sw_if_index=i.sw_if_index,
645 is_add=1 if is_enable else 0,
646 table=policie_tbls.CNAT_POLICY_INCLUDE_V4,
650 super(TestCNatSourceNAT, self).setUp()
652 self.create_pg_interfaces(range(3))
653 self.pg1.generate_remote_hosts(2)
655 for i in self.pg_interfaces:
661 i.configure_ipv6_neighbors()
662 i.configure_ipv4_neighbors()
664 self._enable_disable_snat(is_enable=True)
667 self._enable_disable_snat(is_enable=True)
669 self.vapi.cnat_session_purge()
670 for i in self.pg_interfaces:
674 super(TestCNatSourceNAT, self).tearDown()
676 def test_snat_v6(self):
677 # """ CNat Source Nat v6 """
678 self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True)
679 self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True)
680 self.sourcenat_test_icmp_echo_conf(is_v6=True)
682 def test_snat_v4(self):
683 # """ CNat Source Nat v4 """
684 self.sourcenat_test_tcp_udp_conf(TCP)
685 self.sourcenat_test_tcp_udp_conf(UDP)
686 self.sourcenat_test_icmp_echo_conf()
688 def sourcenat_test_icmp_echo_conf(self, is_v6=False):
689 ctx = CnatTestContext(self, ICMP, is_v6=is_v6)
690 # 8 is ICMP type echo (v4 only)
691 ctx.cnat_send(self.pg0, 0, 0xFEED, self.pg1, 0, 8)
692 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8)
693 ctx.cnat_send_return().cnat_expect_return()
695 def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False):
696 ctx = CnatTestContext(self, L4PROTO, is_v6)
697 # we should source NAT
698 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
699 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
700 ctx.cnat_send_return().cnat_expect_return()
702 # exclude dst address of pg1.1 from snat
704 exclude_prefix = ip_network(
705 "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False
708 exclude_prefix = ip_network(
709 "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False
712 # add remote host to exclude list
713 self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=1)
715 # We should not source NAT the id=1
716 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
717 ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, 1, 6661)
718 ctx.cnat_send_return().cnat_expect_return()
720 # But we should source NAT the id=0
721 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
722 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
723 ctx.cnat_send_return().cnat_expect_return()
725 # remove remote host from exclude list
726 self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=0)
727 self.vapi.cnat_session_purge()
729 # We should source NAT again
730 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
731 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
732 ctx.cnat_send_return().cnat_expect_return()
734 # test return ICMP error nating
735 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
736 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
737 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
739 self.vapi.cnat_session_purge()
742 class TestCNatDHCP(CnatCommonTestCase):
743 """CNat Translation"""
747 super(TestCNatDHCP, cls).setUpClass()
750 def tearDownClass(cls):
751 super(TestCNatDHCP, cls).tearDownClass()
754 for i in self.pg_interfaces:
756 super(TestCNatDHCP, self).tearDown()
758 def make_addr(self, sw_if_index, addr_id, is_v6):
760 return "fd01:%x::%u" % (sw_if_index, addr_id + 1)
761 return "172.16.%u.%u" % (sw_if_index, addr_id)
763 def make_prefix(self, sw_if_index, addr_id, is_v6):
765 return "%s/128" % self.make_addr(sw_if_index, addr_id, is_v6)
766 return "%s/32" % self.make_addr(sw_if_index, addr_id, is_v6)
768 def check_resolved(self, tr, addr_id, is_v6=False):
769 qt = tr.query_vpp_config()
771 str(qt.vip.addr), self.make_addr(tr.vip.sw_if_index, addr_id, is_v6)
773 self.assertEqual(len(qt.paths), len(tr.paths))
774 for path_tr, path_qt in zip(tr.paths, qt.paths):
775 src_qt = path_qt.src_ep
776 dst_qt = path_qt.dst_ep
777 src_tr, dst_tr = path_tr
779 str(src_qt.addr), self.make_addr(src_tr.sw_if_index, addr_id, is_v6)
782 str(dst_qt.addr), self.make_addr(dst_tr.sw_if_index, addr_id, is_v6)
785 def add_del_address(self, pg, addr_id, is_add=True, is_v6=False):
786 self.vapi.sw_interface_add_del_address(
787 sw_if_index=pg.sw_if_index,
788 prefix=self.make_prefix(pg.sw_if_index, addr_id, is_v6),
789 is_add=1 if is_add else 0,
792 def _test_dhcp_v46(self, is_v6):
793 self.create_pg_interfaces(range(4))
794 for i in self.pg_interfaces:
797 (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg2, is_v6=is_v6)),
798 (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg3, is_v6=is_v6)),
800 ep = Endpoint(pg=self.pg0, is_v6=is_v6)
801 t = Translation(self, TCP, ep, paths).add_vpp_config()
802 # Add an address on every interface
803 # and check it is reflected in the cnat config
804 for pg in self.pg_interfaces:
805 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=is_v6)
806 self.check_resolved(t, addr_id=0, is_v6=is_v6)
807 # Add a new address on every interface, remove the old one
808 # and check it is reflected in the cnat config
809 for pg in self.pg_interfaces:
810 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=is_v6)
811 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=is_v6)
812 self.check_resolved(t, addr_id=1, is_v6=is_v6)
813 # remove the configuration
814 for pg in self.pg_interfaces:
815 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=is_v6)
816 t.remove_vpp_config()
818 def test_dhcp_v4(self):
819 self._test_dhcp_v46(False)
821 def test_dhcp_v6(self):
822 self._test_dhcp_v46(True)
824 def test_dhcp_snat(self):
825 self.create_pg_interfaces(range(1))
826 for i in self.pg_interfaces:
828 self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
829 # Add an address on every interface
830 # and check it is reflected in the cnat config
831 for pg in self.pg_interfaces:
832 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=False)
833 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=True)
834 r = self.vapi.cnat_get_snat_addresses()
837 self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=False),
840 str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=True)
842 # Add a new address on every interface, remove the old one
843 # and check it is reflected in the cnat config
844 for pg in self.pg_interfaces:
845 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=False)
846 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=True)
847 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=False)
848 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=True)
849 r = self.vapi.cnat_get_snat_addresses()
852 self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=False),
855 str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=True)
857 # remove the configuration
858 for pg in self.pg_interfaces:
859 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=False)
860 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=True)
861 self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
864 if __name__ == "__main__":
865 unittest.main(testRunner=VppTestRunner)