5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto, INVALID_INDEX
7 from itertools import product
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
14 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
18 from ipaddress import ip_address, ip_network, \
19 IPv4Address, IPv6Address, IPv4Network, IPv6Network
21 from vpp_object import VppObject
22 from vpp_papi import VppEnum
31 class CnatCommonTestCase(VppTestCase):
32 """ CNat common test class """
35 # turn the scanner off whilst testing otherwise sessions
38 extra_vpp_punt_config = ["cnat", "{",
39 "session-db-buckets", "64",
40 "session-cleanup-timeout", "0.1",
41 "session-max-age", "1",
43 "scanner", "off", "}"]
47 super(CnatCommonTestCase, cls).setUpClass()
50 def tearDownClass(cls):
51 super(CnatCommonTestCase, cls).tearDownClass()
54 class Endpoint(object):
57 def __init__(self, pg=None, pgi=None, port=0, is_v6=False, ip=None):
60 self.sw_if_index = INVALID_INDEX
61 if pg is not None and pgi is not None:
62 # pg interface specified and remote index
63 self.ip = self.get_ip46(pg.remote_hosts[pgi])
66 self.sw_if_index = pg.sw_if_index
70 self.ip = "::" if self.is_v6 else "0.0.0.0"
72 def get_ip46(self, obj):
77 def udpate(self, **kwargs):
78 self.__init__(**kwargs)
82 return VppEnum.vl_api_address_family_t.ADDRESS_IP6
83 return VppEnum.vl_api_address_family_t.ADDRESS_IP4
86 return {'addr': self.ip,
88 'sw_if_index': self.sw_if_index,
89 'if_af': self._vpp_if_af()}
92 return ("%s:%d" % (self.ip, self.port))
95 class Translation(VppObject):
97 def __init__(self, test, iproto, vip, paths):
105 return ("%s %s %s" % (self.vip, self.iproto, self.paths))
107 def _vl4_proto(self):
108 ip_proto = VppEnum.vl_api_ip_proto_t
110 UDP: ip_proto.IP_API_PROTO_UDP,
111 TCP: ip_proto.IP_API_PROTO_TCP,
114 def _encoded_paths(self):
115 return [{'src_ep': src.encode(),
116 'dst_ep': dst.encode()} for (src, dst) in self.paths]
118 def add_vpp_config(self):
119 r = self._test.vapi.cnat_translation_update(
120 {'vip': self.vip.encode(),
121 'ip_proto': self._vl4_proto(),
122 'n_paths': len(self.paths),
123 'paths': self._encoded_paths()})
124 self._test.registry.register(self, self._test.logger)
128 def remove_vpp_config(self):
129 assert(self.id is not None)
130 self._test.vapi.cnat_translation_del(id=self.id)
133 def query_vpp_config(self):
134 for t in self._test.vapi.cnat_translation_dump():
135 if self.id == t.translation.id:
140 class CnatTestContext(object):
144 ctx = CnatTestContext(self, TCP, is_v6=True)
146 # send pg0.remote[0]:1234 -> pg1.remote[0]:6661
147 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
149 # We expect this to be NATed as
150 # pg2.remote[0]:<anyport> -> pg1.remote[0]:6661
151 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
153 # After running cnat_expect, we can send back the received packet
154 # and expect it be 'unnated' so that we get the original packet
155 ctx.cnat_send_return().cnat_expect_return()
157 # same thing for ICMP errors
158 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
161 def __init__(self, test, L4PROTO, is_v6):
162 self.L4PROTO = L4PROTO
166 def get_ip46(self, obj):
173 return IPv6 if self.is_v6 else IP
175 def cnat_send(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port,
177 if isinstance(src_id, int):
178 self.src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
180 self.dst_addr = src_id
181 if isinstance(dst_id, int):
182 self.dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
184 self.dst_addr = dst_id
185 self.src_port = src_port # also ICMP id
186 self.dst_port = dst_port # also ICMP type
188 if self.L4PROTO in [TCP, UDP]:
189 l4 = self.L4PROTO(sport=self.src_port, dport=self.dst_port)
190 elif self.L4PROTO in [ICMP] and not self.is_v6:
191 l4 = self.L4PROTO(id=self.src_port, type=self.dst_port)
192 elif self.L4PROTO in [ICMP] and self.is_v6:
193 l4 = ICMPv6EchoRequest(id=self.src_port)
194 p1 = (Ether(src=src_pg.remote_mac,
195 dst=src_pg.local_mac) /
196 self.IP46(src=self.src_addr, dst=self.dst_addr) /
201 self._test.send_and_assert_no_replies(src_pg, p1 * N_PKTS, dst_pg)
203 self.rxs = self._test.send_and_expect(src_pg, p1 * N_PKTS, dst_pg)
204 self.expected_src_pg = src_pg
205 self.expected_dst_pg = dst_pg
208 def cnat_expect(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port):
209 if isinstance(src_id, int):
210 self.expect_src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
212 self.expect_src_addr = src_id
213 if isinstance(dst_id, int):
214 self.expect_dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
216 self.expect_dst_addr = dst_id
217 self.expect_src_port = src_port
218 self.expect_dst_port = dst_port
220 if self.expect_src_port is None:
221 if self.L4PROTO in [TCP, UDP]:
222 self.expect_src_port = self.rxs[0][self.L4PROTO].sport
223 elif self.L4PROTO in [ICMP] and not self.is_v6:
224 self.expect_src_port = self.rxs[0][self.L4PROTO].id
225 elif self.L4PROTO in [ICMP] and self.is_v6:
226 self.expect_src_port = self.rxs[0][ICMPv6EchoRequest].id
229 self._test.assert_packet_checksums_valid(rx)
230 self._test.assertEqual(rx[self.IP46].dst, self.expect_dst_addr)
231 self._test.assertEqual(rx[self.IP46].src, self.expect_src_addr)
232 if self.L4PROTO in [TCP, UDP]:
233 self._test.assertEqual(
234 rx[self.L4PROTO].dport, self.expect_dst_port)
235 self._test.assertEqual(
236 rx[self.L4PROTO].sport, self.expect_src_port)
237 elif self.L4PROTO in [ICMP] and not self.is_v6:
238 self._test.assertEqual(
239 rx[self.L4PROTO].type, self.expect_dst_port)
240 self._test.assertEqual(
241 rx[self.L4PROTO].id, self.expect_src_port)
242 elif self.L4PROTO in [ICMP] and self.is_v6:
243 self._test.assertEqual(
244 rx[ICMPv6EchoRequest].id, self.expect_src_port)
247 def cnat_send_return(self):
248 """This sends the return traffic"""
249 if self.L4PROTO in [TCP, UDP]:
250 l4 = self.L4PROTO(sport=self.expect_dst_port,
251 dport=self.expect_src_port)
252 elif self.L4PROTO in [ICMP] and not self.is_v6:
253 # icmp type 0 if echo reply
254 l4 = self.L4PROTO(id=self.expect_src_port, type=0)
255 elif self.L4PROTO in [ICMP] and self.is_v6:
256 l4 = ICMPv6EchoReply(id=self.expect_src_port)
257 src_mac = self.expected_dst_pg.remote_mac
258 p1 = (Ether(src=src_mac, dst=self.expected_dst_pg.local_mac) /
259 self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr) /
263 self.return_rxs = self._test.send_and_expect(
264 self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg)
267 def cnat_expect_return(self):
268 for rx in self.return_rxs:
269 self._test.assert_packet_checksums_valid(rx)
270 self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
271 self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
272 if self.L4PROTO in [TCP, UDP]:
273 self._test.assertEqual(rx[self.L4PROTO].dport, self.src_port)
274 self._test.assertEqual(rx[self.L4PROTO].sport, self.dst_port)
275 elif self.L4PROTO in [ICMP] and not self.is_v6:
276 # icmp type 0 if echo reply
277 self._test.assertEqual(rx[self.L4PROTO].type, 0)
278 self._test.assertEqual(rx[self.L4PROTO].id, self.src_port)
279 elif self.L4PROTO in [ICMP] and self.is_v6:
280 self._test.assertEqual(rx[ICMPv6EchoReply].id, self.src_port)
283 def cnat_send_icmp_return_error(self):
285 This called after cnat_expect will send an icmp error
288 ICMPelem = ICMPv6DestUnreach(code=1) if self.is_v6 else ICMP(type=11)
289 InnerIP = self.rxs[0][self.IP46]
291 Ether(src=self.expected_dst_pg.remote_mac,
292 dst=self.expected_dst_pg.local_mac) /
293 self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr) /
295 self.return_rxs = self._test.send_and_expect(
296 self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg)
299 def cnat_expect_icmp_error_return(self):
300 ICMP46 = ICMPv6DestUnreach if self.is_v6 else ICMP
301 IP46err = IPerror6 if self.is_v6 else IPerror
302 L4err = TCPerror if self.L4PROTO is TCP else UDPerror
303 for rx in self.return_rxs:
304 self._test.assert_packet_checksums_valid(rx)
305 self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
306 self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
307 self._test.assertEqual(rx[ICMP46][IP46err].src, self.src_addr)
308 self._test.assertEqual(rx[ICMP46][IP46err].dst, self.dst_addr)
309 self._test.assertEqual(
310 rx[ICMP46][IP46err][L4err].sport, self.src_port)
311 self._test.assertEqual(
312 rx[ICMP46][IP46err][L4err].dport, self.dst_port)
315 # -------------------------------------------------------------------
316 # -------------------------------------------------------------------
317 # -------------------------------------------------------------------
318 # -------------------------------------------------------------------
321 class TestCNatTranslation(CnatCommonTestCase):
322 """ CNat Translation """
326 super(TestCNatTranslation, cls).setUpClass()
329 def tearDownClass(cls):
330 super(TestCNatTranslation, cls).tearDownClass()
333 super(TestCNatTranslation, self).setUp()
335 self.create_pg_interfaces(range(3))
336 self.pg0.generate_remote_hosts(N_REMOTE_HOSTS)
337 self.pg1.generate_remote_hosts(N_REMOTE_HOSTS)
339 for i in self.pg_interfaces:
345 i.configure_ipv4_neighbors()
346 i.configure_ipv6_neighbors()
349 for translation in self.translations:
350 translation.remove_vpp_config()
352 self.vapi.cnat_session_purge()
353 self.assertFalse(self.vapi.cnat_session_dump())
355 for i in self.pg_interfaces:
359 super(TestCNatTranslation, self).tearDown()
361 def cnat_translation(self):
362 """ CNat Translation """
363 self.logger.info(self.vapi.cli("sh cnat client"))
364 self.logger.info(self.vapi.cli("sh cnat translation"))
366 for nbr, translation in enumerate(self.translations):
367 vip = translation.vip
370 # Test Flows to the VIP
372 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
373 for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
375 ctx.cnat_send(self.pg0, src_pgi, sport,
376 self.pg1, vip.ip, vip.port)
377 dst_port = translation.paths[0][DST].port
378 ctx.cnat_expect(self.pg0, src_pgi, sport,
379 self.pg1, nbr, dst_port)
381 ctx.cnat_send_return().cnat_expect_return()
384 # packets to the VIP that do not match a
385 # translation are dropped
387 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1,
388 vip.ip, 6666, no_replies=True)
391 # packets from the VIP that do not match a
392 # session are forwarded
394 ctx.cnat_send(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
395 ctx.cnat_expect(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
398 # modify the translation to use a different backend
400 old_dst_port = translation.paths[0][DST].port
401 translation.paths[0][DST].udpate(
402 pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6)
403 translation.add_vpp_config()
406 # existing flows follow the old path
408 for src_pgi in range(N_REMOTE_HOSTS):
409 for sport in [1234, 1233]:
411 ctx.cnat_send(self.pg0, src_pgi, sport,
412 self.pg1, vip.ip, vip.port)
413 ctx.cnat_expect(self.pg0, src_pgi, sport,
414 self.pg1, nbr, old_dst_port)
416 ctx.cnat_send_return().cnat_expect_return()
419 # new flows go to the new backend
421 for src_pgi in range(N_REMOTE_HOSTS):
422 ctx.cnat_send(self.pg0, src_pgi, 9999,
423 self.pg2, vip.ip, vip.port)
424 ctx.cnat_expect(self.pg0, src_pgi, 9999, self.pg2, 0, 5000)
426 self.logger.info(self.vapi.cli("sh cnat session verbose"))
429 # turn the scanner back on and wait until the sessions
432 self.vapi.cli("test cnat scanner on")
435 sessions = self.vapi.cnat_session_dump()
436 while (len(sessions) and n_tries < 100):
438 sessions = self.vapi.cnat_session_dump()
440 self.logger.info(self.vapi.cli("show cnat session verbose"))
442 self.assertTrue(n_tries < 100)
443 self.vapi.cli("test cnat scanner off")
446 # load some flows again and purge
448 for translation in self.translations:
449 vip = translation.vip
450 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
451 for src_pgi in range(N_REMOTE_HOSTS):
452 for sport in [1234, 1233]:
454 ctx.cnat_send(self.pg0, src_pgi, sport,
455 self.pg2, vip.ip, vip.port)
456 ctx.cnat_expect(self.pg0, src_pgi,
457 sport, self.pg2, 0, 5000)
459 def _test_icmp(self):
464 for nbr, translation in enumerate(self.translations):
465 vip = translation.vip
466 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
471 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, vip.ip, vip.port)
472 dst_port = translation.paths[0][DST].port
473 ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, nbr, dst_port)
474 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
477 # ICMP errors with no VIP associated should not be
480 ctx.cnat_send(self.pg0, 0, 1234, self.pg2, 0, vip.port)
481 dst_port = translation.paths[0][DST].port
482 ctx.cnat_expect(self.pg0, 0, 1234, self.pg2, 0, vip.port)
483 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
485 def _make_translations_v4(self):
486 self.translations = []
487 self.translations.append(Translation(
488 self, TCP, Endpoint(ip="30.0.0.1", port=5555, is_v6=False),
490 Endpoint(is_v6=False),
491 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
494 self.translations.append(Translation(
495 self, TCP, Endpoint(ip="30.0.0.2", port=5554, is_v6=False),
497 Endpoint(is_v6=False),
498 Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False),
501 self.translations.append(Translation(
502 self, UDP, Endpoint(ip="30.0.0.2", port=5553, is_v6=False),
504 Endpoint(is_v6=False),
505 Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False),
509 def _make_translations_v6(self):
510 self.translations = []
511 self.translations.append(Translation(
512 self, TCP, Endpoint(ip="30::1", port=5555, is_v6=True),
514 Endpoint(is_v6=True),
515 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True),
518 self.translations.append(Translation(
519 self, TCP, Endpoint(ip="30::2", port=5554, is_v6=True),
521 Endpoint(is_v6=True),
522 Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True),
525 self.translations.append(Translation(
526 self, UDP, Endpoint(ip="30::2", port=5553, is_v6=True),
528 Endpoint(is_v6=True),
529 Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True),
533 def test_icmp4(self):
534 # """ CNat Translation icmp v4 """
535 self._make_translations_v4()
538 def test_icmp6(self):
539 # """ CNat Translation icmp v6 """
540 self._make_translations_v6()
543 def test_cnat6(self):
544 # """ CNat Translation ipv6 """
545 self._make_translations_v6()
546 self.cnat_translation()
548 def test_cnat4(self):
549 # """ CNat Translation ipv4 """
550 self._make_translations_v4()
551 self.cnat_translation()
554 class TestCNatSourceNAT(CnatCommonTestCase):
555 """ CNat Source NAT """
559 super(TestCNatSourceNAT, cls).setUpClass()
562 def tearDownClass(cls):
563 super(TestCNatSourceNAT, cls).tearDownClass()
565 def _enable_disable_snat(self, is_enable=True):
566 self.vapi.cnat_set_snat_addresses(
567 snat_ip4=self.pg2.remote_hosts[0].ip4,
568 snat_ip6=self.pg2.remote_hosts[0].ip6,
569 sw_if_index=INVALID_INDEX)
570 self.vapi.feature_enable_disable(
571 enable=1 if is_enable else 0,
572 arc_name="ip6-unicast",
573 feature_name="cnat-snat-ip6",
574 sw_if_index=self.pg0.sw_if_index)
575 self.vapi.feature_enable_disable(
576 enable=1 if is_enable else 0,
577 arc_name="ip4-unicast",
578 feature_name="cnat-snat-ip4",
579 sw_if_index=self.pg0.sw_if_index)
581 policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
582 self.vapi.cnat_set_snat_policy(
583 policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX)
584 for i in self.pg_interfaces:
585 self.vapi.cnat_snat_policy_add_del_if(
586 sw_if_index=i.sw_if_index, is_add=1 if is_enable else 0,
587 table=policie_tbls.CNAT_POLICY_INCLUDE_V6)
588 self.vapi.cnat_snat_policy_add_del_if(
589 sw_if_index=i.sw_if_index, is_add=1 if is_enable else 0,
590 table=policie_tbls.CNAT_POLICY_INCLUDE_V4)
593 super(TestCNatSourceNAT, self).setUp()
595 self.create_pg_interfaces(range(3))
596 self.pg1.generate_remote_hosts(2)
598 for i in self.pg_interfaces:
604 i.configure_ipv6_neighbors()
605 i.configure_ipv4_neighbors()
607 self._enable_disable_snat(is_enable=True)
610 self._enable_disable_snat(is_enable=True)
612 self.vapi.cnat_session_purge()
613 for i in self.pg_interfaces:
617 super(TestCNatSourceNAT, self).tearDown()
619 def test_snat_v6(self):
620 # """ CNat Source Nat v6 """
621 self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True)
622 self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True)
623 self.sourcenat_test_icmp_echo_conf(is_v6=True)
625 def test_snat_v4(self):
626 # """ CNat Source Nat v4 """
627 self.sourcenat_test_tcp_udp_conf(TCP)
628 self.sourcenat_test_tcp_udp_conf(UDP)
629 self.sourcenat_test_icmp_echo_conf()
631 def sourcenat_test_icmp_echo_conf(self, is_v6=False):
632 ctx = CnatTestContext(self, ICMP, is_v6=is_v6)
633 # 8 is ICMP type echo (v4 only)
634 ctx.cnat_send(self.pg0, 0, 0xfeed, self.pg1, 0, 8)
635 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8)
636 ctx.cnat_send_return().cnat_expect_return()
638 def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False):
639 ctx = CnatTestContext(self, L4PROTO, is_v6)
640 # we should source NAT
641 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
642 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
643 ctx.cnat_send_return().cnat_expect_return()
645 # exclude dst address of pg1.1 from snat
647 exclude_prefix = ip_network(
648 "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False)
650 exclude_prefix = ip_network(
651 "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False)
653 # add remote host to exclude list
654 self.vapi.cnat_snat_policy_add_del_exclude_pfx(
655 prefix=exclude_prefix, is_add=1)
657 # We should not source NAT the id=1
658 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
659 ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, 1, 6661)
660 ctx.cnat_send_return().cnat_expect_return()
662 # But we should source NAT the id=0
663 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
664 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
665 ctx.cnat_send_return().cnat_expect_return()
667 # remove remote host from exclude list
668 self.vapi.cnat_snat_policy_add_del_exclude_pfx(
669 prefix=exclude_prefix, is_add=0)
670 self.vapi.cnat_session_purge()
672 # We should source NAT again
673 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
674 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
675 ctx.cnat_send_return().cnat_expect_return()
677 # test return ICMP error nating
678 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
679 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
680 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
682 self.vapi.cnat_session_purge()
685 class TestCNatDHCP(CnatCommonTestCase):
686 """ CNat Translation """
690 super(TestCNatDHCP, cls).setUpClass()
693 def tearDownClass(cls):
694 super(TestCNatDHCP, cls).tearDownClass()
697 for i in self.pg_interfaces:
699 super(TestCNatDHCP, self).tearDown()
701 def make_addr(self, sw_if_index, addr_id, is_v6):
703 return "fd01:%x::%u" % (sw_if_index, addr_id + 1)
704 return "172.16.%u.%u" % (sw_if_index, addr_id)
706 def make_prefix(self, sw_if_index, addr_id, is_v6):
708 return "%s/128" % self.make_addr(sw_if_index, addr_id, is_v6)
709 return "%s/32" % self.make_addr(sw_if_index, addr_id, is_v6)
711 def check_resolved(self, tr, addr_id, is_v6=False):
712 qt = tr.query_vpp_config()
713 self.assertEqual(str(qt.vip.addr), self.make_addr(
714 tr.vip.sw_if_index, addr_id, is_v6))
715 self.assertEqual(len(qt.paths), len(tr.paths))
716 for path_tr, path_qt in zip(tr.paths, qt.paths):
717 src_qt = path_qt.src_ep
718 dst_qt = path_qt.dst_ep
719 src_tr, dst_tr = path_tr
720 self.assertEqual(str(src_qt.addr), self.make_addr(
721 src_tr.sw_if_index, addr_id, is_v6))
722 self.assertEqual(str(dst_qt.addr), self.make_addr(
723 dst_tr.sw_if_index, addr_id, is_v6))
725 def add_del_address(self, pg, addr_id, is_add=True, is_v6=False):
726 self.vapi.sw_interface_add_del_address(
727 sw_if_index=pg.sw_if_index,
728 prefix=self.make_prefix(pg.sw_if_index, addr_id, is_v6),
729 is_add=1 if is_add else 0)
731 def _test_dhcp_v46(self, is_v6):
732 self.create_pg_interfaces(range(4))
733 for i in self.pg_interfaces:
736 (Endpoint(pg=self.pg1, is_v6=is_v6),
737 Endpoint(pg=self.pg2, is_v6=is_v6)),
738 (Endpoint(pg=self.pg1, is_v6=is_v6),
739 Endpoint(pg=self.pg3, is_v6=is_v6))
741 ep = Endpoint(pg=self.pg0, is_v6=is_v6)
742 t = Translation(self, TCP, ep, paths).add_vpp_config()
743 # Add an address on every interface
744 # and check it is reflected in the cnat config
745 for pg in self.pg_interfaces:
746 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=is_v6)
747 self.check_resolved(t, addr_id=0, is_v6=is_v6)
748 # Add a new address on every interface, remove the old one
749 # and check it is reflected in the cnat config
750 for pg in self.pg_interfaces:
751 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=is_v6)
752 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=is_v6)
753 self.check_resolved(t, addr_id=1, is_v6=is_v6)
754 # remove the configuration
755 for pg in self.pg_interfaces:
756 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=is_v6)
757 t.remove_vpp_config()
759 def test_dhcp_v4(self):
760 self._test_dhcp_v46(False)
762 def test_dhcp_v6(self):
763 self._test_dhcp_v46(True)
765 def test_dhcp_snat(self):
766 self.create_pg_interfaces(range(1))
767 for i in self.pg_interfaces:
769 self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
770 # Add an address on every interface
771 # and check it is reflected in the cnat config
772 for pg in self.pg_interfaces:
773 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=False)
774 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=True)
775 r = self.vapi.cnat_get_snat_addresses()
776 self.assertEqual(str(r.snat_ip4), self.make_addr(
777 self.pg0.sw_if_index, addr_id=0, is_v6=False))
778 self.assertEqual(str(r.snat_ip6), self.make_addr(
779 self.pg0.sw_if_index, addr_id=0, is_v6=True))
780 # Add a new address on every interface, remove the old one
781 # and check it is reflected in the cnat config
782 for pg in self.pg_interfaces:
783 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=False)
784 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=True)
785 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=False)
786 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=True)
787 r = self.vapi.cnat_get_snat_addresses()
788 self.assertEqual(str(r.snat_ip4), self.make_addr(
789 self.pg0.sw_if_index, addr_id=1, is_v6=False))
790 self.assertEqual(str(r.snat_ip6), self.make_addr(
791 self.pg0.sw_if_index, addr_id=1, is_v6=True))
792 # remove the configuration
793 for pg in self.pg_interfaces:
794 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=False)
795 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=True)
796 self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
799 if __name__ == '__main__':
800 unittest.main(testRunner=VppTestRunner)