5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto, INVALID_INDEX
7 from itertools import product
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
14 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
18 from ipaddress import ip_address, ip_network, \
19 IPv4Address, IPv6Address, IPv4Network, IPv6Network
21 from vpp_object import VppObject
22 from vpp_papi import VppEnum
31 class CnatCommonTestCase(VppTestCase):
32 """ CNat common test class """
35 # turn the scanner off whilst testing otherwise sessions
38 extra_vpp_punt_config = ["cnat", "{",
39 "session-db-buckets", "64",
40 "session-cleanup-timeout", "0.1",
41 "session-max-age", "1",
43 "scanner", "off", "}"]
47 super(CnatCommonTestCase, cls).setUpClass()
50 def tearDownClass(cls):
51 super(CnatCommonTestCase, cls).tearDownClass()
54 class Endpoint(object):
57 def __init__(self, pg=None, pgi=None, port=0, is_v6=False, ip=None):
60 self.sw_if_index = INVALID_INDEX
61 if pg is not None and pgi is not None:
62 # pg interface specified and remote index
63 self.ip = self.get_ip46(pg.remote_hosts[pgi])
66 self.sw_if_index = pg.sw_if_index
70 self.ip = "::" if self.is_v6 else "0.0.0.0"
72 def get_ip46(self, obj):
77 def udpate(self, **kwargs):
78 self.__init__(**kwargs)
82 return VppEnum.vl_api_address_family_t.ADDRESS_IP6
83 return VppEnum.vl_api_address_family_t.ADDRESS_IP4
86 return {'addr': self.ip,
88 'sw_if_index': self.sw_if_index,
89 'if_af': self._vpp_if_af()}
92 return ("%s:%d" % (self.ip, self.port))
95 class Translation(VppObject):
97 def __init__(self, test, iproto, vip, paths):
105 return ("%s %s %s" % (self.vip, self.iproto, self.paths))
107 def _vl4_proto(self):
108 ip_proto = VppEnum.vl_api_ip_proto_t
110 UDP: ip_proto.IP_API_PROTO_UDP,
111 TCP: ip_proto.IP_API_PROTO_TCP,
114 def _encoded_paths(self):
115 return [{'src_ep': src.encode(),
116 'dst_ep': dst.encode()} for (src, dst) in self.paths]
118 def add_vpp_config(self):
119 r = self._test.vapi.cnat_translation_update(
120 {'vip': self.vip.encode(),
121 'ip_proto': self._vl4_proto(),
122 'n_paths': len(self.paths),
123 'paths': self._encoded_paths()})
124 self._test.registry.register(self, self._test.logger)
128 def remove_vpp_config(self):
129 assert(self.id is not None)
130 self._test.vapi.cnat_translation_del(id=self.id)
133 def query_vpp_config(self):
134 for t in self._test.vapi.cnat_translation_dump():
135 if self.id == t.translation.id:
140 class CnatTestContext(object):
144 ctx = CnatTestContext(self, TCP, is_v6=True)
146 # send pg0.remote[0]:1234 -> pg1.remote[0]:6661
147 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
149 # We expect this to be NATed as
150 # pg2.remote[0]:<anyport> -> pg1.remote[0]:6661
151 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
153 # After running cnat_expect, we can send back the received packet
154 # and expect it be 'unnated' so that we get the original packet
155 ctx.cnat_send_return().cnat_expect_return()
157 # same thing for ICMP errors
158 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
161 def __init__(self, test, L4PROTO, is_v6):
162 self.L4PROTO = L4PROTO
166 def get_ip46(self, obj):
173 return IPv6 if self.is_v6 else IP
175 def cnat_send(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port,
177 if isinstance(src_id, int):
178 self.src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
180 self.dst_addr = src_id
181 if isinstance(dst_id, int):
182 self.dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
184 self.dst_addr = dst_id
185 self.src_port = src_port # also ICMP id
186 self.dst_port = dst_port # also ICMP type
188 if self.L4PROTO in [TCP, UDP]:
189 l4 = self.L4PROTO(sport=self.src_port, dport=self.dst_port)
190 elif self.L4PROTO in [ICMP] and not self.is_v6:
191 l4 = self.L4PROTO(id=self.src_port, type=self.dst_port)
192 elif self.L4PROTO in [ICMP] and self.is_v6:
193 l4 = ICMPv6EchoRequest(id=self.src_port)
194 p1 = (Ether(src=src_pg.remote_mac,
195 dst=src_pg.local_mac) /
196 self.IP46(src=self.src_addr, dst=self.dst_addr) /
201 self._test.send_and_assert_no_replies(src_pg, p1 * N_PKTS, dst_pg)
203 self.rxs = self._test.send_and_expect(src_pg, p1 * N_PKTS, dst_pg)
204 self.expected_src_pg = src_pg
205 self.expected_dst_pg = dst_pg
208 def cnat_expect(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port):
209 if isinstance(src_id, int):
210 self.expect_src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
212 self.expect_src_addr = src_id
213 if isinstance(dst_id, int):
214 self.expect_dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
216 self.expect_dst_addr = dst_id
217 self.expect_src_port = src_port
218 self.expect_dst_port = dst_port
220 if self.expect_src_port is None:
221 if self.L4PROTO in [TCP, UDP]:
222 self.expect_src_port = self.rxs[0][self.L4PROTO].sport
223 elif self.L4PROTO in [ICMP] and not self.is_v6:
224 self.expect_src_port = self.rxs[0][self.L4PROTO].id
225 elif self.L4PROTO in [ICMP] and self.is_v6:
226 self.expect_src_port = self.rxs[0][ICMPv6EchoRequest].id
229 self._test.assert_packet_checksums_valid(rx)
230 self._test.assertEqual(rx[self.IP46].dst, self.expect_dst_addr)
231 self._test.assertEqual(rx[self.IP46].src, self.expect_src_addr)
232 if self.L4PROTO in [TCP, UDP]:
233 self._test.assertEqual(
234 rx[self.L4PROTO].dport, self.expect_dst_port)
235 self._test.assertEqual(
236 rx[self.L4PROTO].sport, self.expect_src_port)
237 elif self.L4PROTO in [ICMP] and not self.is_v6:
238 self._test.assertEqual(
239 rx[self.L4PROTO].type, self.expect_dst_port)
240 self._test.assertEqual(
241 rx[self.L4PROTO].id, self.expect_src_port)
242 elif self.L4PROTO in [ICMP] and self.is_v6:
243 self._test.assertEqual(
244 rx[ICMPv6EchoRequest].id, self.expect_src_port)
247 def cnat_send_return(self):
248 """This sends the return traffic"""
249 if self.L4PROTO in [TCP, UDP]:
250 l4 = self.L4PROTO(sport=self.expect_dst_port,
251 dport=self.expect_src_port)
252 elif self.L4PROTO in [ICMP] and not self.is_v6:
253 # icmp type 0 if echo reply
254 l4 = self.L4PROTO(id=self.expect_src_port, type=0)
255 elif self.L4PROTO in [ICMP] and self.is_v6:
256 l4 = ICMPv6EchoReply(id=self.expect_src_port)
257 src_mac = self.expected_dst_pg.remote_mac
258 p1 = (Ether(src=src_mac, dst=self.expected_dst_pg.local_mac) /
259 self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr) /
263 self.return_rxs = self._test.send_and_expect(
264 self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg)
267 def cnat_expect_return(self):
268 for rx in self.return_rxs:
269 self._test.assert_packet_checksums_valid(rx)
270 self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
271 self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
272 if self.L4PROTO in [TCP, UDP]:
273 self._test.assertEqual(rx[self.L4PROTO].dport, self.src_port)
274 self._test.assertEqual(rx[self.L4PROTO].sport, self.dst_port)
275 elif self.L4PROTO in [ICMP] and not self.is_v6:
276 # icmp type 0 if echo reply
277 self._test.assertEqual(rx[self.L4PROTO].type, 0)
278 self._test.assertEqual(rx[self.L4PROTO].id, self.src_port)
279 elif self.L4PROTO in [ICMP] and self.is_v6:
280 self._test.assertEqual(rx[ICMPv6EchoReply].id, self.src_port)
283 def cnat_send_icmp_return_error(self):
285 This called after cnat_expect will send an icmp error
288 ICMPelem = ICMPv6DestUnreach(code=1) if self.is_v6 else ICMP(type=11)
289 InnerIP = self.rxs[0][self.IP46]
291 Ether(src=self.expected_dst_pg.remote_mac,
292 dst=self.expected_dst_pg.local_mac) /
293 self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr) /
295 self.return_rxs = self._test.send_and_expect(
296 self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg)
299 def cnat_expect_icmp_error_return(self):
300 ICMP46 = ICMPv6DestUnreach if self.is_v6 else ICMP
301 IP46err = IPerror6 if self.is_v6 else IPerror
302 L4err = TCPerror if self.L4PROTO is TCP else UDPerror
303 for rx in self.return_rxs:
304 self._test.assert_packet_checksums_valid(rx)
305 self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
306 self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
307 self._test.assertEqual(rx[ICMP46][IP46err].src, self.src_addr)
308 self._test.assertEqual(rx[ICMP46][IP46err].dst, self.dst_addr)
309 self._test.assertEqual(
310 rx[ICMP46][IP46err][L4err].sport, self.src_port)
311 self._test.assertEqual(
312 rx[ICMP46][IP46err][L4err].dport, self.dst_port)
315 # -------------------------------------------------------------------
316 # -------------------------------------------------------------------
317 # -------------------------------------------------------------------
318 # -------------------------------------------------------------------
321 class TestCNatTranslation(CnatCommonTestCase):
322 """ CNat Translation """
326 super(TestCNatTranslation, cls).setUpClass()
329 def tearDownClass(cls):
330 super(TestCNatTranslation, cls).tearDownClass()
333 super(TestCNatTranslation, self).setUp()
335 self.create_pg_interfaces(range(3))
336 self.pg0.generate_remote_hosts(N_REMOTE_HOSTS)
337 self.pg1.generate_remote_hosts(N_REMOTE_HOSTS)
339 for i in self.pg_interfaces:
345 i.configure_ipv4_neighbors()
346 i.configure_ipv6_neighbors()
349 for translation in self.translations:
350 translation.remove_vpp_config()
352 self.vapi.cnat_session_purge()
353 self.assertFalse(self.vapi.cnat_session_dump())
355 for i in self.pg_interfaces:
359 super(TestCNatTranslation, self).tearDown()
361 def cnat_translation(self):
362 """ CNat Translation """
363 self.logger.info(self.vapi.cli("sh cnat client"))
364 self.logger.info(self.vapi.cli("sh cnat translation"))
366 for nbr, translation in enumerate(self.translations):
367 vip = translation.vip
370 # Test Flows to the VIP
372 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
373 for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
375 ctx.cnat_send(self.pg0, src_pgi, sport,
376 self.pg1, vip.ip, vip.port)
377 dst_port = translation.paths[0][DST].port
378 ctx.cnat_expect(self.pg0, src_pgi, sport,
379 self.pg1, nbr, dst_port)
381 ctx.cnat_send_return().cnat_expect_return()
384 # packets to the VIP that do not match a
385 # translation are dropped
387 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1,
388 vip.ip, 6666, no_replies=True)
391 # packets from the VIP that do not match a
392 # session are forwarded
394 ctx.cnat_send(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
395 ctx.cnat_expect(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
398 # modify the translation to use a different backend
400 old_dst_port = translation.paths[0][DST].port
401 translation.paths[0][DST].udpate(
402 pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6)
403 translation.add_vpp_config()
406 # existing flows follow the old path
408 for src_pgi in range(N_REMOTE_HOSTS):
409 for sport in [1234, 1233]:
411 ctx.cnat_send(self.pg0, src_pgi, sport,
412 self.pg1, vip.ip, vip.port)
413 ctx.cnat_expect(self.pg0, src_pgi, sport,
414 self.pg1, nbr, old_dst_port)
416 ctx.cnat_send_return().cnat_expect_return()
419 # new flows go to the new backend
421 for src_pgi in range(N_REMOTE_HOSTS):
422 ctx.cnat_send(self.pg0, src_pgi, 9999,
423 self.pg2, vip.ip, vip.port)
424 ctx.cnat_expect(self.pg0, src_pgi, 9999, self.pg2, 0, 5000)
426 self.logger.info(self.vapi.cli("sh cnat session verbose"))
429 # turn the scanner back on and wait until the sessions
432 self.vapi.cli("test cnat scanner on")
433 self.virtual_sleep(2)
434 sessions = self.vapi.cnat_session_dump()
435 self.assertEqual(len(sessions), 0)
436 self.vapi.cli("test cnat scanner off")
439 # load some flows again and purge
441 for translation in self.translations:
442 vip = translation.vip
443 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
444 for src_pgi in range(N_REMOTE_HOSTS):
445 for sport in [1234, 1233]:
447 ctx.cnat_send(self.pg0, src_pgi, sport,
448 self.pg2, vip.ip, vip.port)
449 ctx.cnat_expect(self.pg0, src_pgi,
450 sport, self.pg2, 0, 5000)
452 def _test_icmp(self):
457 for nbr, translation in enumerate(self.translations):
458 vip = translation.vip
459 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
464 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, vip.ip, vip.port)
465 dst_port = translation.paths[0][DST].port
466 ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, nbr, dst_port)
467 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
470 # ICMP errors with no VIP associated should not be
473 ctx.cnat_send(self.pg0, 0, 1234, self.pg2, 0, vip.port)
474 dst_port = translation.paths[0][DST].port
475 ctx.cnat_expect(self.pg0, 0, 1234, self.pg2, 0, vip.port)
476 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
478 def _make_translations_v4(self):
479 self.translations = []
480 self.translations.append(Translation(
481 self, TCP, Endpoint(ip="30.0.0.1", port=5555, is_v6=False),
483 Endpoint(is_v6=False),
484 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
487 self.translations.append(Translation(
488 self, TCP, Endpoint(ip="30.0.0.2", port=5554, is_v6=False),
490 Endpoint(is_v6=False),
491 Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False),
494 self.translations.append(Translation(
495 self, UDP, Endpoint(ip="30.0.0.2", port=5553, is_v6=False),
497 Endpoint(is_v6=False),
498 Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False),
502 def _make_translations_v6(self):
503 self.translations = []
504 self.translations.append(Translation(
505 self, TCP, Endpoint(ip="30::1", port=5555, is_v6=True),
507 Endpoint(is_v6=True),
508 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True),
511 self.translations.append(Translation(
512 self, TCP, Endpoint(ip="30::2", port=5554, is_v6=True),
514 Endpoint(is_v6=True),
515 Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True),
518 self.translations.append(Translation(
519 self, UDP, Endpoint(ip="30::2", port=5553, is_v6=True),
521 Endpoint(is_v6=True),
522 Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True),
526 def test_icmp4(self):
527 # """ CNat Translation icmp v4 """
528 self._make_translations_v4()
531 def test_icmp6(self):
532 # """ CNat Translation icmp v6 """
533 self._make_translations_v6()
536 def test_cnat6(self):
537 # """ CNat Translation ipv6 """
538 self._make_translations_v6()
539 self.cnat_translation()
541 def test_cnat4(self):
542 # """ CNat Translation ipv4 """
543 self._make_translations_v4()
544 self.cnat_translation()
547 class TestCNatSourceNAT(CnatCommonTestCase):
548 """ CNat Source NAT """
552 super(TestCNatSourceNAT, cls).setUpClass()
555 def tearDownClass(cls):
556 super(TestCNatSourceNAT, cls).tearDownClass()
558 def _enable_disable_snat(self, is_enable=True):
559 self.vapi.cnat_set_snat_addresses(
560 snat_ip4=self.pg2.remote_hosts[0].ip4,
561 snat_ip6=self.pg2.remote_hosts[0].ip6,
562 sw_if_index=INVALID_INDEX)
563 self.vapi.feature_enable_disable(
564 enable=1 if is_enable else 0,
565 arc_name="ip6-unicast",
566 feature_name="cnat-snat-ip6",
567 sw_if_index=self.pg0.sw_if_index)
568 self.vapi.feature_enable_disable(
569 enable=1 if is_enable else 0,
570 arc_name="ip4-unicast",
571 feature_name="cnat-snat-ip4",
572 sw_if_index=self.pg0.sw_if_index)
574 policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
575 self.vapi.cnat_set_snat_policy(
576 policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX)
577 for i in self.pg_interfaces:
578 self.vapi.cnat_snat_policy_add_del_if(
579 sw_if_index=i.sw_if_index, is_add=1 if is_enable else 0,
580 table=policie_tbls.CNAT_POLICY_INCLUDE_V6)
581 self.vapi.cnat_snat_policy_add_del_if(
582 sw_if_index=i.sw_if_index, is_add=1 if is_enable else 0,
583 table=policie_tbls.CNAT_POLICY_INCLUDE_V4)
586 super(TestCNatSourceNAT, self).setUp()
588 self.create_pg_interfaces(range(3))
589 self.pg1.generate_remote_hosts(2)
591 for i in self.pg_interfaces:
597 i.configure_ipv6_neighbors()
598 i.configure_ipv4_neighbors()
600 self._enable_disable_snat(is_enable=True)
603 self._enable_disable_snat(is_enable=True)
605 self.vapi.cnat_session_purge()
606 for i in self.pg_interfaces:
610 super(TestCNatSourceNAT, self).tearDown()
612 def test_snat_v6(self):
613 # """ CNat Source Nat v6 """
614 self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True)
615 self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True)
616 self.sourcenat_test_icmp_echo_conf(is_v6=True)
618 def test_snat_v4(self):
619 # """ CNat Source Nat v4 """
620 self.sourcenat_test_tcp_udp_conf(TCP)
621 self.sourcenat_test_tcp_udp_conf(UDP)
622 self.sourcenat_test_icmp_echo_conf()
624 def sourcenat_test_icmp_echo_conf(self, is_v6=False):
625 ctx = CnatTestContext(self, ICMP, is_v6=is_v6)
626 # 8 is ICMP type echo (v4 only)
627 ctx.cnat_send(self.pg0, 0, 0xfeed, self.pg1, 0, 8)
628 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8)
629 ctx.cnat_send_return().cnat_expect_return()
631 def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False):
632 ctx = CnatTestContext(self, L4PROTO, is_v6)
633 # we should source NAT
634 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
635 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
636 ctx.cnat_send_return().cnat_expect_return()
638 # exclude dst address of pg1.1 from snat
640 exclude_prefix = ip_network(
641 "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False)
643 exclude_prefix = ip_network(
644 "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False)
646 # add remote host to exclude list
647 self.vapi.cnat_snat_policy_add_del_exclude_pfx(
648 prefix=exclude_prefix, is_add=1)
650 # We should not source NAT the id=1
651 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
652 ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, 1, 6661)
653 ctx.cnat_send_return().cnat_expect_return()
655 # But we should source NAT the id=0
656 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
657 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
658 ctx.cnat_send_return().cnat_expect_return()
660 # remove remote host from exclude list
661 self.vapi.cnat_snat_policy_add_del_exclude_pfx(
662 prefix=exclude_prefix, is_add=0)
663 self.vapi.cnat_session_purge()
665 # We should source NAT again
666 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
667 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
668 ctx.cnat_send_return().cnat_expect_return()
670 # test return ICMP error nating
671 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
672 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
673 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
675 self.vapi.cnat_session_purge()
678 class TestCNatDHCP(CnatCommonTestCase):
679 """ CNat Translation """
683 super(TestCNatDHCP, cls).setUpClass()
686 def tearDownClass(cls):
687 super(TestCNatDHCP, cls).tearDownClass()
690 for i in self.pg_interfaces:
692 super(TestCNatDHCP, self).tearDown()
694 def make_addr(self, sw_if_index, addr_id, is_v6):
696 return "fd01:%x::%u" % (sw_if_index, addr_id + 1)
697 return "172.16.%u.%u" % (sw_if_index, addr_id)
699 def make_prefix(self, sw_if_index, addr_id, is_v6):
701 return "%s/128" % self.make_addr(sw_if_index, addr_id, is_v6)
702 return "%s/32" % self.make_addr(sw_if_index, addr_id, is_v6)
704 def check_resolved(self, tr, addr_id, is_v6=False):
705 qt = tr.query_vpp_config()
706 self.assertEqual(str(qt.vip.addr), self.make_addr(
707 tr.vip.sw_if_index, addr_id, is_v6))
708 self.assertEqual(len(qt.paths), len(tr.paths))
709 for path_tr, path_qt in zip(tr.paths, qt.paths):
710 src_qt = path_qt.src_ep
711 dst_qt = path_qt.dst_ep
712 src_tr, dst_tr = path_tr
713 self.assertEqual(str(src_qt.addr), self.make_addr(
714 src_tr.sw_if_index, addr_id, is_v6))
715 self.assertEqual(str(dst_qt.addr), self.make_addr(
716 dst_tr.sw_if_index, addr_id, is_v6))
718 def add_del_address(self, pg, addr_id, is_add=True, is_v6=False):
719 self.vapi.sw_interface_add_del_address(
720 sw_if_index=pg.sw_if_index,
721 prefix=self.make_prefix(pg.sw_if_index, addr_id, is_v6),
722 is_add=1 if is_add else 0)
724 def _test_dhcp_v46(self, is_v6):
725 self.create_pg_interfaces(range(4))
726 for i in self.pg_interfaces:
729 (Endpoint(pg=self.pg1, is_v6=is_v6),
730 Endpoint(pg=self.pg2, is_v6=is_v6)),
731 (Endpoint(pg=self.pg1, is_v6=is_v6),
732 Endpoint(pg=self.pg3, is_v6=is_v6))
734 ep = Endpoint(pg=self.pg0, is_v6=is_v6)
735 t = Translation(self, TCP, ep, paths).add_vpp_config()
736 # Add an address on every interface
737 # and check it is reflected in the cnat config
738 for pg in self.pg_interfaces:
739 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=is_v6)
740 self.check_resolved(t, addr_id=0, is_v6=is_v6)
741 # Add a new address on every interface, remove the old one
742 # and check it is reflected in the cnat config
743 for pg in self.pg_interfaces:
744 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=is_v6)
745 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=is_v6)
746 self.check_resolved(t, addr_id=1, is_v6=is_v6)
747 # remove the configuration
748 for pg in self.pg_interfaces:
749 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=is_v6)
750 t.remove_vpp_config()
752 def test_dhcp_v4(self):
753 self._test_dhcp_v46(False)
755 def test_dhcp_v6(self):
756 self._test_dhcp_v46(True)
758 def test_dhcp_snat(self):
759 self.create_pg_interfaces(range(1))
760 for i in self.pg_interfaces:
762 self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
763 # Add an address on every interface
764 # and check it is reflected in the cnat config
765 for pg in self.pg_interfaces:
766 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=False)
767 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=True)
768 r = self.vapi.cnat_get_snat_addresses()
769 self.assertEqual(str(r.snat_ip4), self.make_addr(
770 self.pg0.sw_if_index, addr_id=0, is_v6=False))
771 self.assertEqual(str(r.snat_ip6), self.make_addr(
772 self.pg0.sw_if_index, addr_id=0, is_v6=True))
773 # Add a new address on every interface, remove the old one
774 # and check it is reflected in the cnat config
775 for pg in self.pg_interfaces:
776 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=False)
777 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=True)
778 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=False)
779 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=True)
780 r = self.vapi.cnat_get_snat_addresses()
781 self.assertEqual(str(r.snat_ip4), self.make_addr(
782 self.pg0.sw_if_index, addr_id=1, is_v6=False))
783 self.assertEqual(str(r.snat_ip6), self.make_addr(
784 self.pg0.sw_if_index, addr_id=1, is_v6=True))
785 # remove the configuration
786 for pg in self.pg_interfaces:
787 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=False)
788 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=True)
789 self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
792 if __name__ == '__main__':
793 unittest.main(testRunner=VppTestRunner)