tests docs: update python3 venv packages
[vpp.git] / test / test_cnat.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto, INVALID_INDEX
7 from itertools import product
8
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
14 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
15
16 import struct
17
18 from ipaddress import (
19     ip_address,
20     ip_network,
21     IPv4Address,
22     IPv6Address,
23     IPv4Network,
24     IPv6Network,
25 )
26
27 from vpp_object import VppObject
28 from vpp_papi import VppEnum
29
30 N_PKTS = 15
31 N_REMOTE_HOSTS = 3
32
33 SRC = 0
34 DST = 1
35
36
37 class CnatCommonTestCase(VppTestCase):
38     """CNat common test class"""
39
40     #
41     # turn the scanner off whilst testing otherwise sessions
42     # will time out
43     #
44     extra_vpp_config = [
45         "cnat",
46         "{",
47         "session-db-buckets",
48         "64",
49         "session-cleanup-timeout",
50         "0.1",
51         "session-max-age",
52         "1",
53         "tcp-max-age",
54         "1",
55         "scanner",
56         "off",
57         "}",
58     ]
59
60     @classmethod
61     def setUpClass(cls):
62         super(CnatCommonTestCase, cls).setUpClass()
63
64     @classmethod
65     def tearDownClass(cls):
66         super(CnatCommonTestCase, cls).tearDownClass()
67
68
69 class Endpoint(object):
70     """CNat endpoint"""
71
72     def __init__(self, pg=None, pgi=None, port=0, is_v6=False, ip=None):
73         self.port = port
74         self.is_v6 = is_v6
75         self.sw_if_index = INVALID_INDEX
76         if pg is not None and pgi is not None:
77             # pg interface specified and remote index
78             self.ip = self.get_ip46(pg.remote_hosts[pgi])
79         elif pg is not None:
80             self.ip = None
81             self.sw_if_index = pg.sw_if_index
82         elif ip is not None:
83             self.ip = ip
84         else:
85             self.ip = "::" if self.is_v6 else "0.0.0.0"
86
87     def get_ip46(self, obj):
88         if self.is_v6:
89             return obj.ip6
90         return obj.ip4
91
92     def udpate(self, **kwargs):
93         self.__init__(**kwargs)
94
95     def _vpp_if_af(self):
96         if self.is_v6:
97             return VppEnum.vl_api_address_family_t.ADDRESS_IP6
98         return VppEnum.vl_api_address_family_t.ADDRESS_IP4
99
100     def encode(self):
101         return {
102             "addr": self.ip,
103             "port": self.port,
104             "sw_if_index": self.sw_if_index,
105             "if_af": self._vpp_if_af(),
106         }
107
108     def __str__(self):
109         return "%s:%d" % (self.ip, self.port)
110
111
112 class Translation(VppObject):
113     def __init__(self, test, iproto, vip, paths):
114         self._test = test
115         self.vip = vip
116         self.iproto = iproto
117         self.paths = paths
118         self.id = None
119
120     def __str__(self):
121         return "%s %s %s" % (self.vip, self.iproto, self.paths)
122
123     def _vl4_proto(self):
124         ip_proto = VppEnum.vl_api_ip_proto_t
125         return {
126             UDP: ip_proto.IP_API_PROTO_UDP,
127             TCP: ip_proto.IP_API_PROTO_TCP,
128         }[self.iproto]
129
130     def _encoded_paths(self):
131         return [
132             {"src_ep": src.encode(), "dst_ep": dst.encode()}
133             for (src, dst) in self.paths
134         ]
135
136     def add_vpp_config(self):
137         r = self._test.vapi.cnat_translation_update(
138             {
139                 "vip": self.vip.encode(),
140                 "ip_proto": self._vl4_proto(),
141                 "n_paths": len(self.paths),
142                 "paths": self._encoded_paths(),
143             }
144         )
145         self._test.registry.register(self, self._test.logger)
146         self.id = r.id
147         return self
148
149     def remove_vpp_config(self):
150         assert self.id is not None
151         self._test.vapi.cnat_translation_del(id=self.id)
152         return self
153
154     def query_vpp_config(self):
155         for t in self._test.vapi.cnat_translation_dump():
156             if self.id == t.translation.id:
157                 return t.translation
158         return None
159
160
161 class CnatTestContext(object):
162     """
163     Usage :
164
165     ctx = CnatTestContext(self, TCP, is_v6=True)
166
167     # send pg0.remote[0]:1234 -> pg1.remote[0]:6661
168     ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
169
170     # We expect this to be NATed as
171     # pg2.remote[0]:<anyport> -> pg1.remote[0]:6661
172     ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
173
174     # After running cnat_expect, we can send back the received packet
175     # and expect it be 'unnated' so that we get the original packet
176     ctx.cnat_send_return().cnat_expect_return()
177
178     # same thing for ICMP errors
179     ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
180     """
181
182     def __init__(self, test, L4PROTO, is_v6):
183         self.L4PROTO = L4PROTO
184         self.is_v6 = is_v6
185         self._test = test
186
187     def get_ip46(self, obj):
188         if self.is_v6:
189             return obj.ip6
190         return obj.ip4
191
192     @property
193     def IP46(self):
194         return IPv6 if self.is_v6 else IP
195
196     def cnat_send(
197         self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port, no_replies=False
198     ):
199         if isinstance(src_id, int):
200             self.src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
201         else:
202             self.dst_addr = src_id
203         if isinstance(dst_id, int):
204             self.dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
205         else:
206             self.dst_addr = dst_id
207         self.src_port = src_port  # also ICMP id
208         self.dst_port = dst_port  # also ICMP type
209
210         if self.L4PROTO in [TCP, UDP]:
211             l4 = self.L4PROTO(sport=self.src_port, dport=self.dst_port)
212         elif self.L4PROTO in [ICMP] and not self.is_v6:
213             l4 = self.L4PROTO(id=self.src_port, type=self.dst_port)
214         elif self.L4PROTO in [ICMP] and self.is_v6:
215             l4 = ICMPv6EchoRequest(id=self.src_port)
216         p1 = (
217             Ether(src=src_pg.remote_mac, dst=src_pg.local_mac)
218             / self.IP46(src=self.src_addr, dst=self.dst_addr)
219             / l4
220             / Raw()
221         )
222
223         if no_replies:
224             self._test.send_and_assert_no_replies(src_pg, p1 * N_PKTS, dst_pg)
225         else:
226             self.rxs = self._test.send_and_expect(src_pg, p1 * N_PKTS, dst_pg)
227         self.expected_src_pg = src_pg
228         self.expected_dst_pg = dst_pg
229         return self
230
231     def cnat_expect(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port):
232         if isinstance(src_id, int):
233             self.expect_src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
234         else:
235             self.expect_src_addr = src_id
236         if isinstance(dst_id, int):
237             self.expect_dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
238         else:
239             self.expect_dst_addr = dst_id
240         self.expect_src_port = src_port
241         self.expect_dst_port = dst_port
242
243         if self.expect_src_port is None:
244             if self.L4PROTO in [TCP, UDP]:
245                 self.expect_src_port = self.rxs[0][self.L4PROTO].sport
246             elif self.L4PROTO in [ICMP] and not self.is_v6:
247                 self.expect_src_port = self.rxs[0][self.L4PROTO].id
248             elif self.L4PROTO in [ICMP] and self.is_v6:
249                 self.expect_src_port = self.rxs[0][ICMPv6EchoRequest].id
250
251         for rx in self.rxs:
252             self._test.assert_packet_checksums_valid(rx)
253             self._test.assertEqual(rx[self.IP46].dst, self.expect_dst_addr)
254             self._test.assertEqual(rx[self.IP46].src, self.expect_src_addr)
255             if self.L4PROTO in [TCP, UDP]:
256                 self._test.assertEqual(rx[self.L4PROTO].dport, self.expect_dst_port)
257                 self._test.assertEqual(rx[self.L4PROTO].sport, self.expect_src_port)
258             elif self.L4PROTO in [ICMP] and not self.is_v6:
259                 self._test.assertEqual(rx[self.L4PROTO].type, self.expect_dst_port)
260                 self._test.assertEqual(rx[self.L4PROTO].id, self.expect_src_port)
261             elif self.L4PROTO in [ICMP] and self.is_v6:
262                 self._test.assertEqual(rx[ICMPv6EchoRequest].id, self.expect_src_port)
263         return self
264
265     def cnat_send_return(self):
266         """This sends the return traffic"""
267         if self.L4PROTO in [TCP, UDP]:
268             l4 = self.L4PROTO(sport=self.expect_dst_port, dport=self.expect_src_port)
269         elif self.L4PROTO in [ICMP] and not self.is_v6:
270             # icmp type 0 if echo reply
271             l4 = self.L4PROTO(id=self.expect_src_port, type=0)
272         elif self.L4PROTO in [ICMP] and self.is_v6:
273             l4 = ICMPv6EchoReply(id=self.expect_src_port)
274         src_mac = self.expected_dst_pg.remote_mac
275         p1 = (
276             Ether(src=src_mac, dst=self.expected_dst_pg.local_mac)
277             / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
278             / l4
279             / Raw()
280         )
281
282         self.return_rxs = self._test.send_and_expect(
283             self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
284         )
285         return self
286
287     def cnat_expect_return(self):
288         for rx in self.return_rxs:
289             self._test.assert_packet_checksums_valid(rx)
290             self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
291             self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
292             if self.L4PROTO in [TCP, UDP]:
293                 self._test.assertEqual(rx[self.L4PROTO].dport, self.src_port)
294                 self._test.assertEqual(rx[self.L4PROTO].sport, self.dst_port)
295             elif self.L4PROTO in [ICMP] and not self.is_v6:
296                 # icmp type 0 if echo reply
297                 self._test.assertEqual(rx[self.L4PROTO].type, 0)
298                 self._test.assertEqual(rx[self.L4PROTO].id, self.src_port)
299             elif self.L4PROTO in [ICMP] and self.is_v6:
300                 self._test.assertEqual(rx[ICMPv6EchoReply].id, self.src_port)
301         return self
302
303     def cnat_send_icmp_return_error(self):
304         """
305         This called after cnat_expect will send an icmp error
306         on the reverse path
307         """
308         ICMPelem = ICMPv6DestUnreach(code=1) if self.is_v6 else ICMP(type=11)
309         InnerIP = self.rxs[0][self.IP46]
310         p1 = (
311             Ether(
312                 src=self.expected_dst_pg.remote_mac, dst=self.expected_dst_pg.local_mac
313             )
314             / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
315             / ICMPelem
316             / InnerIP
317         )
318         self.return_rxs = self._test.send_and_expect(
319             self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
320         )
321         return self
322
323     def cnat_expect_icmp_error_return(self):
324         ICMP46 = ICMPv6DestUnreach if self.is_v6 else ICMP
325         IP46err = IPerror6 if self.is_v6 else IPerror
326         L4err = TCPerror if self.L4PROTO is TCP else UDPerror
327         for rx in self.return_rxs:
328             self._test.assert_packet_checksums_valid(rx)
329             self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
330             self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
331             self._test.assertEqual(rx[ICMP46][IP46err].src, self.src_addr)
332             self._test.assertEqual(rx[ICMP46][IP46err].dst, self.dst_addr)
333             self._test.assertEqual(rx[ICMP46][IP46err][L4err].sport, self.src_port)
334             self._test.assertEqual(rx[ICMP46][IP46err][L4err].dport, self.dst_port)
335         return self
336
337
338 # -------------------------------------------------------------------
339 # -------------------------------------------------------------------
340 # -------------------------------------------------------------------
341 # -------------------------------------------------------------------
342
343
344 class TestCNatTranslation(CnatCommonTestCase):
345     """CNat Translation"""
346
347     @classmethod
348     def setUpClass(cls):
349         super(TestCNatTranslation, cls).setUpClass()
350
351     @classmethod
352     def tearDownClass(cls):
353         super(TestCNatTranslation, cls).tearDownClass()
354
355     def setUp(self):
356         super(TestCNatTranslation, self).setUp()
357
358         self.create_pg_interfaces(range(3))
359         self.pg0.generate_remote_hosts(N_REMOTE_HOSTS)
360         self.pg1.generate_remote_hosts(N_REMOTE_HOSTS)
361
362         for i in self.pg_interfaces:
363             i.admin_up()
364             i.config_ip4()
365             i.resolve_arp()
366             i.config_ip6()
367             i.resolve_ndp()
368             i.configure_ipv4_neighbors()
369             i.configure_ipv6_neighbors()
370
371     def tearDown(self):
372         for translation in self.translations:
373             translation.remove_vpp_config()
374
375         self.vapi.cnat_session_purge()
376         self.assertFalse(self.vapi.cnat_session_dump())
377
378         for i in self.pg_interfaces:
379             i.unconfig_ip4()
380             i.unconfig_ip6()
381             i.admin_down()
382         super(TestCNatTranslation, self).tearDown()
383
384     def cnat_translation(self):
385         """CNat Translation"""
386         self.logger.info(self.vapi.cli("sh cnat client"))
387         self.logger.info(self.vapi.cli("sh cnat translation"))
388
389         for nbr, translation in enumerate(self.translations):
390             vip = translation.vip
391
392             #
393             # Test Flows to the VIP
394             #
395             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
396             for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
397                 # from client to vip
398                 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
399                 dst_port = translation.paths[0][DST].port
400                 ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg1, nbr, dst_port)
401                 # from vip to client
402                 ctx.cnat_send_return().cnat_expect_return()
403
404                 #
405                 # packets to the VIP that do not match a
406                 # translation are dropped
407                 #
408                 ctx.cnat_send(
409                     self.pg0, src_pgi, sport, self.pg1, vip.ip, 6666, no_replies=True
410                 )
411
412                 #
413                 # packets from the VIP that do not match a
414                 # session are forwarded
415                 #
416                 ctx.cnat_send(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
417                 ctx.cnat_expect(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
418
419             #
420             # modify the translation to use a different backend
421             #
422             old_dst_port = translation.paths[0][DST].port
423             translation.paths[0][DST].udpate(
424                 pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6
425             )
426             translation.add_vpp_config()
427
428             #
429             # existing flows follow the old path
430             #
431             for src_pgi in range(N_REMOTE_HOSTS):
432                 for sport in [1234, 1233]:
433                     # from client to vip
434                     ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
435                     ctx.cnat_expect(
436                         self.pg0, src_pgi, sport, self.pg1, nbr, old_dst_port
437                     )
438                     # from vip to client
439                     ctx.cnat_send_return().cnat_expect_return()
440
441             #
442             # new flows go to the new backend
443             #
444             for src_pgi in range(N_REMOTE_HOSTS):
445                 ctx.cnat_send(self.pg0, src_pgi, 9999, self.pg2, vip.ip, vip.port)
446                 ctx.cnat_expect(self.pg0, src_pgi, 9999, self.pg2, 0, 5000)
447
448             self.logger.info(self.vapi.cli("sh cnat session verbose"))
449
450         #
451         # turn the scanner back on and wait until the sessions
452         # all disapper
453         #
454         self.vapi.cli("test cnat scanner on")
455         self.virtual_sleep(2)
456         sessions = self.vapi.cnat_session_dump()
457         self.assertEqual(len(sessions), 0)
458         self.vapi.cli("test cnat scanner off")
459
460         #
461         # load some flows again and purge
462         #
463         for translation in self.translations:
464             vip = translation.vip
465             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
466             for src_pgi in range(N_REMOTE_HOSTS):
467                 for sport in [1234, 1233]:
468                     # from client to vip
469                     ctx.cnat_send(self.pg0, src_pgi, sport, self.pg2, vip.ip, vip.port)
470                     ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg2, 0, 5000)
471
472     def _test_icmp(self):
473         #
474         # Testing ICMP
475         #
476         for nbr, translation in enumerate(self.translations):
477             vip = translation.vip
478             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
479
480             #
481             # NATing ICMP errors
482             #
483             ctx.cnat_send(self.pg0, 0, 1234, self.pg1, vip.ip, vip.port)
484             dst_port = translation.paths[0][DST].port
485             ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, nbr, dst_port)
486             ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
487
488             #
489             # ICMP errors with no VIP associated should not be
490             # modified
491             #
492             ctx.cnat_send(self.pg0, 0, 1234, self.pg2, 0, vip.port)
493             dst_port = translation.paths[0][DST].port
494             ctx.cnat_expect(self.pg0, 0, 1234, self.pg2, 0, vip.port)
495             ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
496
497     def _make_translations_v4(self):
498         self.translations = []
499         self.translations.append(
500             Translation(
501                 self,
502                 TCP,
503                 Endpoint(ip="30.0.0.1", port=5555, is_v6=False),
504                 [
505                     (
506                         Endpoint(is_v6=False),
507                         Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
508                     )
509                 ],
510             ).add_vpp_config()
511         )
512         self.translations.append(
513             Translation(
514                 self,
515                 TCP,
516                 Endpoint(ip="30.0.0.2", port=5554, is_v6=False),
517                 [
518                     (
519                         Endpoint(is_v6=False),
520                         Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False),
521                     )
522                 ],
523             ).add_vpp_config()
524         )
525         self.translations.append(
526             Translation(
527                 self,
528                 UDP,
529                 Endpoint(ip="30.0.0.2", port=5553, is_v6=False),
530                 [
531                     (
532                         Endpoint(is_v6=False),
533                         Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False),
534                     )
535                 ],
536             ).add_vpp_config()
537         )
538
539     def _make_translations_v6(self):
540         self.translations = []
541         self.translations.append(
542             Translation(
543                 self,
544                 TCP,
545                 Endpoint(ip="30::1", port=5555, is_v6=True),
546                 [
547                     (
548                         Endpoint(is_v6=True),
549                         Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True),
550                     )
551                 ],
552             ).add_vpp_config()
553         )
554         self.translations.append(
555             Translation(
556                 self,
557                 TCP,
558                 Endpoint(ip="30::2", port=5554, is_v6=True),
559                 [
560                     (
561                         Endpoint(is_v6=True),
562                         Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True),
563                     )
564                 ],
565             ).add_vpp_config()
566         )
567         self.translations.append(
568             Translation(
569                 self,
570                 UDP,
571                 Endpoint(ip="30::2", port=5553, is_v6=True),
572                 [
573                     (
574                         Endpoint(is_v6=True),
575                         Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True),
576                     )
577                 ],
578             ).add_vpp_config()
579         )
580
581     def test_icmp4(self):
582         # """ CNat Translation icmp v4 """
583         self._make_translations_v4()
584         self._test_icmp()
585
586     def test_icmp6(self):
587         # """ CNat Translation icmp v6 """
588         self._make_translations_v6()
589         self._test_icmp()
590
591     def test_cnat6(self):
592         # """ CNat Translation ipv6 """
593         self._make_translations_v6()
594         self.cnat_translation()
595
596     def test_cnat4(self):
597         # """ CNat Translation ipv4 """
598         self._make_translations_v4()
599         self.cnat_translation()
600
601
602 class TestCNatSourceNAT(CnatCommonTestCase):
603     """CNat Source NAT"""
604
605     @classmethod
606     def setUpClass(cls):
607         super(TestCNatSourceNAT, cls).setUpClass()
608
609     @classmethod
610     def tearDownClass(cls):
611         super(TestCNatSourceNAT, cls).tearDownClass()
612
613     def _enable_disable_snat(self, is_enable=True):
614         self.vapi.cnat_set_snat_addresses(
615             snat_ip4=self.pg2.remote_hosts[0].ip4,
616             snat_ip6=self.pg2.remote_hosts[0].ip6,
617             sw_if_index=INVALID_INDEX,
618         )
619         self.vapi.feature_enable_disable(
620             enable=1 if is_enable else 0,
621             arc_name="ip6-unicast",
622             feature_name="cnat-snat-ip6",
623             sw_if_index=self.pg0.sw_if_index,
624         )
625         self.vapi.feature_enable_disable(
626             enable=1 if is_enable else 0,
627             arc_name="ip4-unicast",
628             feature_name="cnat-snat-ip4",
629             sw_if_index=self.pg0.sw_if_index,
630         )
631
632         policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
633         self.vapi.cnat_set_snat_policy(
634             policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX
635         )
636         for i in self.pg_interfaces:
637             self.vapi.cnat_snat_policy_add_del_if(
638                 sw_if_index=i.sw_if_index,
639                 is_add=1 if is_enable else 0,
640                 table=policie_tbls.CNAT_POLICY_INCLUDE_V6,
641             )
642             self.vapi.cnat_snat_policy_add_del_if(
643                 sw_if_index=i.sw_if_index,
644                 is_add=1 if is_enable else 0,
645                 table=policie_tbls.CNAT_POLICY_INCLUDE_V4,
646             )
647
648     def setUp(self):
649         super(TestCNatSourceNAT, self).setUp()
650
651         self.create_pg_interfaces(range(3))
652         self.pg1.generate_remote_hosts(2)
653
654         for i in self.pg_interfaces:
655             i.admin_up()
656             i.config_ip4()
657             i.resolve_arp()
658             i.config_ip6()
659             i.resolve_ndp()
660             i.configure_ipv6_neighbors()
661             i.configure_ipv4_neighbors()
662
663         self._enable_disable_snat(is_enable=True)
664
665     def tearDown(self):
666         self._enable_disable_snat(is_enable=True)
667
668         self.vapi.cnat_session_purge()
669         for i in self.pg_interfaces:
670             i.unconfig_ip4()
671             i.unconfig_ip6()
672             i.admin_down()
673         super(TestCNatSourceNAT, self).tearDown()
674
675     def test_snat_v6(self):
676         # """ CNat Source Nat v6 """
677         self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True)
678         self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True)
679         self.sourcenat_test_icmp_echo_conf(is_v6=True)
680
681     def test_snat_v4(self):
682         # """ CNat Source Nat v4 """
683         self.sourcenat_test_tcp_udp_conf(TCP)
684         self.sourcenat_test_tcp_udp_conf(UDP)
685         self.sourcenat_test_icmp_echo_conf()
686
687     def sourcenat_test_icmp_echo_conf(self, is_v6=False):
688         ctx = CnatTestContext(self, ICMP, is_v6=is_v6)
689         # 8 is ICMP type echo (v4 only)
690         ctx.cnat_send(self.pg0, 0, 0xFEED, self.pg1, 0, 8)
691         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8)
692         ctx.cnat_send_return().cnat_expect_return()
693
694     def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False):
695         ctx = CnatTestContext(self, L4PROTO, is_v6)
696         # we should source NAT
697         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
698         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
699         ctx.cnat_send_return().cnat_expect_return()
700
701         # exclude dst address of pg1.1 from snat
702         if is_v6:
703             exclude_prefix = ip_network(
704                 "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False
705             )
706         else:
707             exclude_prefix = ip_network(
708                 "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False
709             )
710
711         # add remote host to exclude list
712         self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=1)
713
714         # We should not source NAT the id=1
715         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
716         ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, 1, 6661)
717         ctx.cnat_send_return().cnat_expect_return()
718
719         # But we should source NAT the id=0
720         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
721         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
722         ctx.cnat_send_return().cnat_expect_return()
723
724         # remove remote host from exclude list
725         self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=0)
726         self.vapi.cnat_session_purge()
727
728         # We should source NAT again
729         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
730         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
731         ctx.cnat_send_return().cnat_expect_return()
732
733         # test return ICMP error nating
734         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
735         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
736         ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
737
738         self.vapi.cnat_session_purge()
739
740
741 class TestCNatDHCP(CnatCommonTestCase):
742     """CNat Translation"""
743
744     @classmethod
745     def setUpClass(cls):
746         super(TestCNatDHCP, cls).setUpClass()
747
748     @classmethod
749     def tearDownClass(cls):
750         super(TestCNatDHCP, cls).tearDownClass()
751
752     def tearDown(self):
753         for i in self.pg_interfaces:
754             i.admin_down()
755         super(TestCNatDHCP, self).tearDown()
756
757     def make_addr(self, sw_if_index, addr_id, is_v6):
758         if is_v6:
759             return "fd01:%x::%u" % (sw_if_index, addr_id + 1)
760         return "172.16.%u.%u" % (sw_if_index, addr_id)
761
762     def make_prefix(self, sw_if_index, addr_id, is_v6):
763         if is_v6:
764             return "%s/128" % self.make_addr(sw_if_index, addr_id, is_v6)
765         return "%s/32" % self.make_addr(sw_if_index, addr_id, is_v6)
766
767     def check_resolved(self, tr, addr_id, is_v6=False):
768         qt = tr.query_vpp_config()
769         self.assertEqual(
770             str(qt.vip.addr), self.make_addr(tr.vip.sw_if_index, addr_id, is_v6)
771         )
772         self.assertEqual(len(qt.paths), len(tr.paths))
773         for path_tr, path_qt in zip(tr.paths, qt.paths):
774             src_qt = path_qt.src_ep
775             dst_qt = path_qt.dst_ep
776             src_tr, dst_tr = path_tr
777             self.assertEqual(
778                 str(src_qt.addr), self.make_addr(src_tr.sw_if_index, addr_id, is_v6)
779             )
780             self.assertEqual(
781                 str(dst_qt.addr), self.make_addr(dst_tr.sw_if_index, addr_id, is_v6)
782             )
783
784     def add_del_address(self, pg, addr_id, is_add=True, is_v6=False):
785         self.vapi.sw_interface_add_del_address(
786             sw_if_index=pg.sw_if_index,
787             prefix=self.make_prefix(pg.sw_if_index, addr_id, is_v6),
788             is_add=1 if is_add else 0,
789         )
790
791     def _test_dhcp_v46(self, is_v6):
792         self.create_pg_interfaces(range(4))
793         for i in self.pg_interfaces:
794             i.admin_up()
795         paths = [
796             (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg2, is_v6=is_v6)),
797             (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg3, is_v6=is_v6)),
798         ]
799         ep = Endpoint(pg=self.pg0, is_v6=is_v6)
800         t = Translation(self, TCP, ep, paths).add_vpp_config()
801         # Add an address on every interface
802         # and check it is reflected in the cnat config
803         for pg in self.pg_interfaces:
804             self.add_del_address(pg, addr_id=0, is_add=True, is_v6=is_v6)
805         self.check_resolved(t, addr_id=0, is_v6=is_v6)
806         # Add a new address on every interface, remove the old one
807         # and check it is reflected in the cnat config
808         for pg in self.pg_interfaces:
809             self.add_del_address(pg, addr_id=1, is_add=True, is_v6=is_v6)
810             self.add_del_address(pg, addr_id=0, is_add=False, is_v6=is_v6)
811         self.check_resolved(t, addr_id=1, is_v6=is_v6)
812         # remove the configuration
813         for pg in self.pg_interfaces:
814             self.add_del_address(pg, addr_id=1, is_add=False, is_v6=is_v6)
815         t.remove_vpp_config()
816
817     def test_dhcp_v4(self):
818         self._test_dhcp_v46(False)
819
820     def test_dhcp_v6(self):
821         self._test_dhcp_v46(True)
822
823     def test_dhcp_snat(self):
824         self.create_pg_interfaces(range(1))
825         for i in self.pg_interfaces:
826             i.admin_up()
827         self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
828         # Add an address on every interface
829         # and check it is reflected in the cnat config
830         for pg in self.pg_interfaces:
831             self.add_del_address(pg, addr_id=0, is_add=True, is_v6=False)
832             self.add_del_address(pg, addr_id=0, is_add=True, is_v6=True)
833         r = self.vapi.cnat_get_snat_addresses()
834         self.assertEqual(
835             str(r.snat_ip4),
836             self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=False),
837         )
838         self.assertEqual(
839             str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=True)
840         )
841         # Add a new address on every interface, remove the old one
842         # and check it is reflected in the cnat config
843         for pg in self.pg_interfaces:
844             self.add_del_address(pg, addr_id=1, is_add=True, is_v6=False)
845             self.add_del_address(pg, addr_id=1, is_add=True, is_v6=True)
846             self.add_del_address(pg, addr_id=0, is_add=False, is_v6=False)
847             self.add_del_address(pg, addr_id=0, is_add=False, is_v6=True)
848         r = self.vapi.cnat_get_snat_addresses()
849         self.assertEqual(
850             str(r.snat_ip4),
851             self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=False),
852         )
853         self.assertEqual(
854             str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=True)
855         )
856         # remove the configuration
857         for pg in self.pg_interfaces:
858             self.add_del_address(pg, addr_id=1, is_add=False, is_v6=False)
859             self.add_del_address(pg, addr_id=1, is_add=False, is_v6=True)
860         self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
861
862
863 if __name__ == "__main__":
864     unittest.main(testRunner=VppTestRunner)