tests: replace pycodestyle with black
[vpp.git] / test / test_cnat.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto, INVALID_INDEX
7 from itertools import product
8
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
14 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
15
16 import struct
17
18 from ipaddress import (
19     ip_address,
20     ip_network,
21     IPv4Address,
22     IPv6Address,
23     IPv4Network,
24     IPv6Network,
25 )
26
27 from vpp_object import VppObject
28 from vpp_papi import VppEnum
29
30 N_PKTS = 15
31 N_REMOTE_HOSTS = 3
32
33 SRC = 0
34 DST = 1
35
36
37 class CnatCommonTestCase(VppTestCase):
38     """CNat common test class"""
39
40     #
41     # turn the scanner off whilst testing otherwise sessions
42     # will time out
43     #
44     extra_vpp_punt_config = [
45         "cnat",
46         "{",
47         "session-db-buckets",
48         "64",
49         "session-cleanup-timeout",
50         "0.1",
51         "session-max-age",
52         "1",
53         "tcp-max-age",
54         "1",
55         "scanner",
56         "off",
57         "}",
58     ]
59
60     @classmethod
61     def setUpClass(cls):
62         super(CnatCommonTestCase, cls).setUpClass()
63
64     @classmethod
65     def tearDownClass(cls):
66         super(CnatCommonTestCase, cls).tearDownClass()
67
68
69 class Endpoint(object):
70     """CNat endpoint"""
71
72     def __init__(self, pg=None, pgi=None, port=0, is_v6=False, ip=None):
73         self.port = port
74         self.is_v6 = is_v6
75         self.sw_if_index = INVALID_INDEX
76         if pg is not None and pgi is not None:
77             # pg interface specified and remote index
78             self.ip = self.get_ip46(pg.remote_hosts[pgi])
79         elif pg is not None:
80             self.ip = None
81             self.sw_if_index = pg.sw_if_index
82         elif ip is not None:
83             self.ip = ip
84         else:
85             self.ip = "::" if self.is_v6 else "0.0.0.0"
86
87     def get_ip46(self, obj):
88         if self.is_v6:
89             return obj.ip6
90         return obj.ip4
91
92     def udpate(self, **kwargs):
93         self.__init__(**kwargs)
94
95     def _vpp_if_af(self):
96         if self.is_v6:
97             return VppEnum.vl_api_address_family_t.ADDRESS_IP6
98         return VppEnum.vl_api_address_family_t.ADDRESS_IP4
99
100     def encode(self):
101         return {
102             "addr": self.ip,
103             "port": self.port,
104             "sw_if_index": self.sw_if_index,
105             "if_af": self._vpp_if_af(),
106         }
107
108     def __str__(self):
109         return "%s:%d" % (self.ip, self.port)
110
111
112 class Translation(VppObject):
113     def __init__(self, test, iproto, vip, paths):
114         self._test = test
115         self.vip = vip
116         self.iproto = iproto
117         self.paths = paths
118         self.id = None
119
120     def __str__(self):
121         return "%s %s %s" % (self.vip, self.iproto, self.paths)
122
123     def _vl4_proto(self):
124         ip_proto = VppEnum.vl_api_ip_proto_t
125         return {
126             UDP: ip_proto.IP_API_PROTO_UDP,
127             TCP: ip_proto.IP_API_PROTO_TCP,
128         }[self.iproto]
129
130     def _encoded_paths(self):
131         return [
132             {"src_ep": src.encode(), "dst_ep": dst.encode()}
133             for (src, dst) in self.paths
134         ]
135
136     def add_vpp_config(self):
137         r = self._test.vapi.cnat_translation_update(
138             {
139                 "vip": self.vip.encode(),
140                 "ip_proto": self._vl4_proto(),
141                 "n_paths": len(self.paths),
142                 "paths": self._encoded_paths(),
143             }
144         )
145         self._test.registry.register(self, self._test.logger)
146         self.id = r.id
147         return self
148
149     def remove_vpp_config(self):
150         assert self.id is not None
151         self._test.vapi.cnat_translation_del(id=self.id)
152         return self
153
154     def query_vpp_config(self):
155         for t in self._test.vapi.cnat_translation_dump():
156             if self.id == t.translation.id:
157                 return t.translation
158         return None
159
160
161 class CnatTestContext(object):
162     """
163     Usage :
164
165     ctx = CnatTestContext(self, TCP, is_v6=True)
166
167     # send pg0.remote[0]:1234 -> pg1.remote[0]:6661
168     ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
169
170     # We expect this to be NATed as
171     # pg2.remote[0]:<anyport> -> pg1.remote[0]:6661
172     ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
173
174     # After running cnat_expect, we can send back the received packet
175     # and expect it be 'unnated' so that we get the original packet
176     ctx.cnat_send_return().cnat_expect_return()
177
178     # same thing for ICMP errors
179     ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
180     """
181
182     def __init__(self, test, L4PROTO, is_v6):
183         self.L4PROTO = L4PROTO
184         self.is_v6 = is_v6
185         self._test = test
186
187     def get_ip46(self, obj):
188         if self.is_v6:
189             return obj.ip6
190         return obj.ip4
191
192     @property
193     def IP46(self):
194         return IPv6 if self.is_v6 else IP
195
196     def cnat_send(
197         self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port, no_replies=False
198     ):
199         if isinstance(src_id, int):
200             self.src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
201         else:
202             self.dst_addr = src_id
203         if isinstance(dst_id, int):
204             self.dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
205         else:
206             self.dst_addr = dst_id
207         self.src_port = src_port  # also ICMP id
208         self.dst_port = dst_port  # also ICMP type
209
210         if self.L4PROTO in [TCP, UDP]:
211             l4 = self.L4PROTO(sport=self.src_port, dport=self.dst_port)
212         elif self.L4PROTO in [ICMP] and not self.is_v6:
213             l4 = self.L4PROTO(id=self.src_port, type=self.dst_port)
214         elif self.L4PROTO in [ICMP] and self.is_v6:
215             l4 = ICMPv6EchoRequest(id=self.src_port)
216         p1 = (
217             Ether(src=src_pg.remote_mac, dst=src_pg.local_mac)
218             / self.IP46(src=self.src_addr, dst=self.dst_addr)
219             / l4
220             / Raw()
221         )
222
223         if no_replies:
224             self._test.send_and_assert_no_replies(src_pg, p1 * N_PKTS, dst_pg)
225         else:
226             self.rxs = self._test.send_and_expect(src_pg, p1 * N_PKTS, dst_pg)
227         self.expected_src_pg = src_pg
228         self.expected_dst_pg = dst_pg
229         return self
230
231     def cnat_expect(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port):
232         if isinstance(src_id, int):
233             self.expect_src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
234         else:
235             self.expect_src_addr = src_id
236         if isinstance(dst_id, int):
237             self.expect_dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
238         else:
239             self.expect_dst_addr = dst_id
240         self.expect_src_port = src_port
241         self.expect_dst_port = dst_port
242
243         if self.expect_src_port is None:
244             if self.L4PROTO in [TCP, UDP]:
245                 self.expect_src_port = self.rxs[0][self.L4PROTO].sport
246             elif self.L4PROTO in [ICMP] and not self.is_v6:
247                 self.expect_src_port = self.rxs[0][self.L4PROTO].id
248             elif self.L4PROTO in [ICMP] and self.is_v6:
249                 self.expect_src_port = self.rxs[0][ICMPv6EchoRequest].id
250
251         for rx in self.rxs:
252             self._test.assert_packet_checksums_valid(rx)
253             self._test.assertEqual(rx[self.IP46].dst, self.expect_dst_addr)
254             self._test.assertEqual(rx[self.IP46].src, self.expect_src_addr)
255             if self.L4PROTO in [TCP, UDP]:
256                 self._test.assertEqual(rx[self.L4PROTO].dport, self.expect_dst_port)
257                 self._test.assertEqual(rx[self.L4PROTO].sport, self.expect_src_port)
258             elif self.L4PROTO in [ICMP] and not self.is_v6:
259                 self._test.assertEqual(rx[self.L4PROTO].type, self.expect_dst_port)
260                 self._test.assertEqual(rx[self.L4PROTO].id, self.expect_src_port)
261             elif self.L4PROTO in [ICMP] and self.is_v6:
262                 self._test.assertEqual(rx[ICMPv6EchoRequest].id, self.expect_src_port)
263         return self
264
265     def cnat_send_return(self):
266         """This sends the return traffic"""
267         if self.L4PROTO in [TCP, UDP]:
268             l4 = self.L4PROTO(sport=self.expect_dst_port, dport=self.expect_src_port)
269         elif self.L4PROTO in [ICMP] and not self.is_v6:
270             # icmp type 0 if echo reply
271             l4 = self.L4PROTO(id=self.expect_src_port, type=0)
272         elif self.L4PROTO in [ICMP] and self.is_v6:
273             l4 = ICMPv6EchoReply(id=self.expect_src_port)
274         src_mac = self.expected_dst_pg.remote_mac
275         p1 = (
276             Ether(src=src_mac, dst=self.expected_dst_pg.local_mac)
277             / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
278             / l4
279             / Raw()
280         )
281
282         self.return_rxs = self._test.send_and_expect(
283             self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
284         )
285         return self
286
287     def cnat_expect_return(self):
288         for rx in self.return_rxs:
289             self._test.assert_packet_checksums_valid(rx)
290             self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
291             self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
292             if self.L4PROTO in [TCP, UDP]:
293                 self._test.assertEqual(rx[self.L4PROTO].dport, self.src_port)
294                 self._test.assertEqual(rx[self.L4PROTO].sport, self.dst_port)
295             elif self.L4PROTO in [ICMP] and not self.is_v6:
296                 # icmp type 0 if echo reply
297                 self._test.assertEqual(rx[self.L4PROTO].type, 0)
298                 self._test.assertEqual(rx[self.L4PROTO].id, self.src_port)
299             elif self.L4PROTO in [ICMP] and self.is_v6:
300                 self._test.assertEqual(rx[ICMPv6EchoReply].id, self.src_port)
301         return self
302
303     def cnat_send_icmp_return_error(self):
304         """
305         This called after cnat_expect will send an icmp error
306         on the reverse path
307         """
308         ICMPelem = ICMPv6DestUnreach(code=1) if self.is_v6 else ICMP(type=11)
309         InnerIP = self.rxs[0][self.IP46]
310         p1 = (
311             Ether(
312                 src=self.expected_dst_pg.remote_mac, dst=self.expected_dst_pg.local_mac
313             )
314             / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
315             / ICMPelem
316             / InnerIP
317         )
318         self.return_rxs = self._test.send_and_expect(
319             self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
320         )
321         return self
322
323     def cnat_expect_icmp_error_return(self):
324         ICMP46 = ICMPv6DestUnreach if self.is_v6 else ICMP
325         IP46err = IPerror6 if self.is_v6 else IPerror
326         L4err = TCPerror if self.L4PROTO is TCP else UDPerror
327         for rx in self.return_rxs:
328             self._test.assert_packet_checksums_valid(rx)
329             self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
330             self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
331             self._test.assertEqual(rx[ICMP46][IP46err].src, self.src_addr)
332             self._test.assertEqual(rx[ICMP46][IP46err].dst, self.dst_addr)
333             self._test.assertEqual(rx[ICMP46][IP46err][L4err].sport, self.src_port)
334             self._test.assertEqual(rx[ICMP46][IP46err][L4err].dport, self.dst_port)
335         return self
336
337
338 # -------------------------------------------------------------------
339 # -------------------------------------------------------------------
340 # -------------------------------------------------------------------
341 # -------------------------------------------------------------------
342
343
344 class TestCNatTranslation(CnatCommonTestCase):
345     """CNat Translation"""
346
347     @classmethod
348     def setUpClass(cls):
349         super(TestCNatTranslation, cls).setUpClass()
350
351     @classmethod
352     def tearDownClass(cls):
353         super(TestCNatTranslation, cls).tearDownClass()
354
355     def setUp(self):
356         super(TestCNatTranslation, self).setUp()
357
358         self.create_pg_interfaces(range(3))
359         self.pg0.generate_remote_hosts(N_REMOTE_HOSTS)
360         self.pg1.generate_remote_hosts(N_REMOTE_HOSTS)
361
362         for i in self.pg_interfaces:
363             i.admin_up()
364             i.config_ip4()
365             i.resolve_arp()
366             i.config_ip6()
367             i.resolve_ndp()
368             i.configure_ipv4_neighbors()
369             i.configure_ipv6_neighbors()
370
371     def tearDown(self):
372         for translation in self.translations:
373             translation.remove_vpp_config()
374
375         self.vapi.cnat_session_purge()
376         self.assertFalse(self.vapi.cnat_session_dump())
377
378         for i in self.pg_interfaces:
379             i.unconfig_ip4()
380             i.unconfig_ip6()
381             i.admin_down()
382         super(TestCNatTranslation, self).tearDown()
383
384     def cnat_translation(self):
385         """CNat Translation"""
386         self.logger.info(self.vapi.cli("sh cnat client"))
387         self.logger.info(self.vapi.cli("sh cnat translation"))
388
389         for nbr, translation in enumerate(self.translations):
390             vip = translation.vip
391
392             #
393             # Test Flows to the VIP
394             #
395             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
396             for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
397                 # from client to vip
398                 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
399                 dst_port = translation.paths[0][DST].port
400                 ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg1, nbr, dst_port)
401                 # from vip to client
402                 ctx.cnat_send_return().cnat_expect_return()
403
404                 #
405                 # packets to the VIP that do not match a
406                 # translation are dropped
407                 #
408                 ctx.cnat_send(
409                     self.pg0, src_pgi, sport, self.pg1, vip.ip, 6666, no_replies=True
410                 )
411
412                 #
413                 # packets from the VIP that do not match a
414                 # session are forwarded
415                 #
416                 ctx.cnat_send(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
417                 ctx.cnat_expect(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
418
419             #
420             # modify the translation to use a different backend
421             #
422             old_dst_port = translation.paths[0][DST].port
423             translation.paths[0][DST].udpate(
424                 pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6
425             )
426             translation.add_vpp_config()
427
428             #
429             # existing flows follow the old path
430             #
431             for src_pgi in range(N_REMOTE_HOSTS):
432                 for sport in [1234, 1233]:
433                     # from client to vip
434                     ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
435                     ctx.cnat_expect(
436                         self.pg0, src_pgi, sport, self.pg1, nbr, old_dst_port
437                     )
438                     # from vip to client
439                     ctx.cnat_send_return().cnat_expect_return()
440
441             #
442             # new flows go to the new backend
443             #
444             for src_pgi in range(N_REMOTE_HOSTS):
445                 ctx.cnat_send(self.pg0, src_pgi, 9999, self.pg2, vip.ip, vip.port)
446                 ctx.cnat_expect(self.pg0, src_pgi, 9999, self.pg2, 0, 5000)
447
448             self.logger.info(self.vapi.cli("sh cnat session verbose"))
449
450         #
451         # turn the scanner back on and wait until the sessions
452         # all disapper
453         #
454         self.vapi.cli("test cnat scanner on")
455         self.virtual_sleep(2)
456         sessions = self.vapi.cnat_session_dump()
457         self.assertEqual(len(sessions), 0)
458         self.vapi.cli("test cnat scanner off")
459
460         #
461         # load some flows again and purge
462         #
463         for translation in self.translations:
464             vip = translation.vip
465             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
466             for src_pgi in range(N_REMOTE_HOSTS):
467                 for sport in [1234, 1233]:
468                     # from client to vip
469                     ctx.cnat_send(self.pg0, src_pgi, sport, self.pg2, vip.ip, vip.port)
470                     ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg2, 0, 5000)
471
472     def _test_icmp(self):
473
474         #
475         # Testing ICMP
476         #
477         for nbr, translation in enumerate(self.translations):
478             vip = translation.vip
479             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
480
481             #
482             # NATing ICMP errors
483             #
484             ctx.cnat_send(self.pg0, 0, 1234, self.pg1, vip.ip, vip.port)
485             dst_port = translation.paths[0][DST].port
486             ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, nbr, dst_port)
487             ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
488
489             #
490             # ICMP errors with no VIP associated should not be
491             # modified
492             #
493             ctx.cnat_send(self.pg0, 0, 1234, self.pg2, 0, vip.port)
494             dst_port = translation.paths[0][DST].port
495             ctx.cnat_expect(self.pg0, 0, 1234, self.pg2, 0, vip.port)
496             ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
497
498     def _make_translations_v4(self):
499         self.translations = []
500         self.translations.append(
501             Translation(
502                 self,
503                 TCP,
504                 Endpoint(ip="30.0.0.1", port=5555, is_v6=False),
505                 [
506                     (
507                         Endpoint(is_v6=False),
508                         Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
509                     )
510                 ],
511             ).add_vpp_config()
512         )
513         self.translations.append(
514             Translation(
515                 self,
516                 TCP,
517                 Endpoint(ip="30.0.0.2", port=5554, is_v6=False),
518                 [
519                     (
520                         Endpoint(is_v6=False),
521                         Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False),
522                     )
523                 ],
524             ).add_vpp_config()
525         )
526         self.translations.append(
527             Translation(
528                 self,
529                 UDP,
530                 Endpoint(ip="30.0.0.2", port=5553, is_v6=False),
531                 [
532                     (
533                         Endpoint(is_v6=False),
534                         Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False),
535                     )
536                 ],
537             ).add_vpp_config()
538         )
539
540     def _make_translations_v6(self):
541         self.translations = []
542         self.translations.append(
543             Translation(
544                 self,
545                 TCP,
546                 Endpoint(ip="30::1", port=5555, is_v6=True),
547                 [
548                     (
549                         Endpoint(is_v6=True),
550                         Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True),
551                     )
552                 ],
553             ).add_vpp_config()
554         )
555         self.translations.append(
556             Translation(
557                 self,
558                 TCP,
559                 Endpoint(ip="30::2", port=5554, is_v6=True),
560                 [
561                     (
562                         Endpoint(is_v6=True),
563                         Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True),
564                     )
565                 ],
566             ).add_vpp_config()
567         )
568         self.translations.append(
569             Translation(
570                 self,
571                 UDP,
572                 Endpoint(ip="30::2", port=5553, is_v6=True),
573                 [
574                     (
575                         Endpoint(is_v6=True),
576                         Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True),
577                     )
578                 ],
579             ).add_vpp_config()
580         )
581
582     def test_icmp4(self):
583         # """ CNat Translation icmp v4 """
584         self._make_translations_v4()
585         self._test_icmp()
586
587     def test_icmp6(self):
588         # """ CNat Translation icmp v6 """
589         self._make_translations_v6()
590         self._test_icmp()
591
592     def test_cnat6(self):
593         # """ CNat Translation ipv6 """
594         self._make_translations_v6()
595         self.cnat_translation()
596
597     def test_cnat4(self):
598         # """ CNat Translation ipv4 """
599         self._make_translations_v4()
600         self.cnat_translation()
601
602
603 class TestCNatSourceNAT(CnatCommonTestCase):
604     """CNat Source NAT"""
605
606     @classmethod
607     def setUpClass(cls):
608         super(TestCNatSourceNAT, cls).setUpClass()
609
610     @classmethod
611     def tearDownClass(cls):
612         super(TestCNatSourceNAT, cls).tearDownClass()
613
614     def _enable_disable_snat(self, is_enable=True):
615         self.vapi.cnat_set_snat_addresses(
616             snat_ip4=self.pg2.remote_hosts[0].ip4,
617             snat_ip6=self.pg2.remote_hosts[0].ip6,
618             sw_if_index=INVALID_INDEX,
619         )
620         self.vapi.feature_enable_disable(
621             enable=1 if is_enable else 0,
622             arc_name="ip6-unicast",
623             feature_name="cnat-snat-ip6",
624             sw_if_index=self.pg0.sw_if_index,
625         )
626         self.vapi.feature_enable_disable(
627             enable=1 if is_enable else 0,
628             arc_name="ip4-unicast",
629             feature_name="cnat-snat-ip4",
630             sw_if_index=self.pg0.sw_if_index,
631         )
632
633         policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
634         self.vapi.cnat_set_snat_policy(
635             policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX
636         )
637         for i in self.pg_interfaces:
638             self.vapi.cnat_snat_policy_add_del_if(
639                 sw_if_index=i.sw_if_index,
640                 is_add=1 if is_enable else 0,
641                 table=policie_tbls.CNAT_POLICY_INCLUDE_V6,
642             )
643             self.vapi.cnat_snat_policy_add_del_if(
644                 sw_if_index=i.sw_if_index,
645                 is_add=1 if is_enable else 0,
646                 table=policie_tbls.CNAT_POLICY_INCLUDE_V4,
647             )
648
649     def setUp(self):
650         super(TestCNatSourceNAT, self).setUp()
651
652         self.create_pg_interfaces(range(3))
653         self.pg1.generate_remote_hosts(2)
654
655         for i in self.pg_interfaces:
656             i.admin_up()
657             i.config_ip4()
658             i.resolve_arp()
659             i.config_ip6()
660             i.resolve_ndp()
661             i.configure_ipv6_neighbors()
662             i.configure_ipv4_neighbors()
663
664         self._enable_disable_snat(is_enable=True)
665
666     def tearDown(self):
667         self._enable_disable_snat(is_enable=True)
668
669         self.vapi.cnat_session_purge()
670         for i in self.pg_interfaces:
671             i.unconfig_ip4()
672             i.unconfig_ip6()
673             i.admin_down()
674         super(TestCNatSourceNAT, self).tearDown()
675
676     def test_snat_v6(self):
677         # """ CNat Source Nat v6 """
678         self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True)
679         self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True)
680         self.sourcenat_test_icmp_echo_conf(is_v6=True)
681
682     def test_snat_v4(self):
683         # """ CNat Source Nat v4 """
684         self.sourcenat_test_tcp_udp_conf(TCP)
685         self.sourcenat_test_tcp_udp_conf(UDP)
686         self.sourcenat_test_icmp_echo_conf()
687
688     def sourcenat_test_icmp_echo_conf(self, is_v6=False):
689         ctx = CnatTestContext(self, ICMP, is_v6=is_v6)
690         # 8 is ICMP type echo (v4 only)
691         ctx.cnat_send(self.pg0, 0, 0xFEED, self.pg1, 0, 8)
692         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8)
693         ctx.cnat_send_return().cnat_expect_return()
694
695     def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False):
696         ctx = CnatTestContext(self, L4PROTO, is_v6)
697         # we should source NAT
698         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
699         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
700         ctx.cnat_send_return().cnat_expect_return()
701
702         # exclude dst address of pg1.1 from snat
703         if is_v6:
704             exclude_prefix = ip_network(
705                 "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False
706             )
707         else:
708             exclude_prefix = ip_network(
709                 "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False
710             )
711
712         # add remote host to exclude list
713         self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=1)
714
715         # We should not source NAT the id=1
716         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
717         ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, 1, 6661)
718         ctx.cnat_send_return().cnat_expect_return()
719
720         # But we should source NAT the id=0
721         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
722         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
723         ctx.cnat_send_return().cnat_expect_return()
724
725         # remove remote host from exclude list
726         self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=0)
727         self.vapi.cnat_session_purge()
728
729         # We should source NAT again
730         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
731         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
732         ctx.cnat_send_return().cnat_expect_return()
733
734         # test return ICMP error nating
735         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
736         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
737         ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
738
739         self.vapi.cnat_session_purge()
740
741
742 class TestCNatDHCP(CnatCommonTestCase):
743     """CNat Translation"""
744
745     @classmethod
746     def setUpClass(cls):
747         super(TestCNatDHCP, cls).setUpClass()
748
749     @classmethod
750     def tearDownClass(cls):
751         super(TestCNatDHCP, cls).tearDownClass()
752
753     def tearDown(self):
754         for i in self.pg_interfaces:
755             i.admin_down()
756         super(TestCNatDHCP, self).tearDown()
757
758     def make_addr(self, sw_if_index, addr_id, is_v6):
759         if is_v6:
760             return "fd01:%x::%u" % (sw_if_index, addr_id + 1)
761         return "172.16.%u.%u" % (sw_if_index, addr_id)
762
763     def make_prefix(self, sw_if_index, addr_id, is_v6):
764         if is_v6:
765             return "%s/128" % self.make_addr(sw_if_index, addr_id, is_v6)
766         return "%s/32" % self.make_addr(sw_if_index, addr_id, is_v6)
767
768     def check_resolved(self, tr, addr_id, is_v6=False):
769         qt = tr.query_vpp_config()
770         self.assertEqual(
771             str(qt.vip.addr), self.make_addr(tr.vip.sw_if_index, addr_id, is_v6)
772         )
773         self.assertEqual(len(qt.paths), len(tr.paths))
774         for path_tr, path_qt in zip(tr.paths, qt.paths):
775             src_qt = path_qt.src_ep
776             dst_qt = path_qt.dst_ep
777             src_tr, dst_tr = path_tr
778             self.assertEqual(
779                 str(src_qt.addr), self.make_addr(src_tr.sw_if_index, addr_id, is_v6)
780             )
781             self.assertEqual(
782                 str(dst_qt.addr), self.make_addr(dst_tr.sw_if_index, addr_id, is_v6)
783             )
784
785     def add_del_address(self, pg, addr_id, is_add=True, is_v6=False):
786         self.vapi.sw_interface_add_del_address(
787             sw_if_index=pg.sw_if_index,
788             prefix=self.make_prefix(pg.sw_if_index, addr_id, is_v6),
789             is_add=1 if is_add else 0,
790         )
791
792     def _test_dhcp_v46(self, is_v6):
793         self.create_pg_interfaces(range(4))
794         for i in self.pg_interfaces:
795             i.admin_up()
796         paths = [
797             (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg2, is_v6=is_v6)),
798             (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg3, is_v6=is_v6)),
799         ]
800         ep = Endpoint(pg=self.pg0, is_v6=is_v6)
801         t = Translation(self, TCP, ep, paths).add_vpp_config()
802         # Add an address on every interface
803         # and check it is reflected in the cnat config
804         for pg in self.pg_interfaces:
805             self.add_del_address(pg, addr_id=0, is_add=True, is_v6=is_v6)
806         self.check_resolved(t, addr_id=0, is_v6=is_v6)
807         # Add a new address on every interface, remove the old one
808         # and check it is reflected in the cnat config
809         for pg in self.pg_interfaces:
810             self.add_del_address(pg, addr_id=1, is_add=True, is_v6=is_v6)
811             self.add_del_address(pg, addr_id=0, is_add=False, is_v6=is_v6)
812         self.check_resolved(t, addr_id=1, is_v6=is_v6)
813         # remove the configuration
814         for pg in self.pg_interfaces:
815             self.add_del_address(pg, addr_id=1, is_add=False, is_v6=is_v6)
816         t.remove_vpp_config()
817
818     def test_dhcp_v4(self):
819         self._test_dhcp_v46(False)
820
821     def test_dhcp_v6(self):
822         self._test_dhcp_v46(True)
823
824     def test_dhcp_snat(self):
825         self.create_pg_interfaces(range(1))
826         for i in self.pg_interfaces:
827             i.admin_up()
828         self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
829         # Add an address on every interface
830         # and check it is reflected in the cnat config
831         for pg in self.pg_interfaces:
832             self.add_del_address(pg, addr_id=0, is_add=True, is_v6=False)
833             self.add_del_address(pg, addr_id=0, is_add=True, is_v6=True)
834         r = self.vapi.cnat_get_snat_addresses()
835         self.assertEqual(
836             str(r.snat_ip4),
837             self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=False),
838         )
839         self.assertEqual(
840             str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=True)
841         )
842         # Add a new address on every interface, remove the old one
843         # and check it is reflected in the cnat config
844         for pg in self.pg_interfaces:
845             self.add_del_address(pg, addr_id=1, is_add=True, is_v6=False)
846             self.add_del_address(pg, addr_id=1, is_add=True, is_v6=True)
847             self.add_del_address(pg, addr_id=0, is_add=False, is_v6=False)
848             self.add_del_address(pg, addr_id=0, is_add=False, is_v6=True)
849         r = self.vapi.cnat_get_snat_addresses()
850         self.assertEqual(
851             str(r.snat_ip4),
852             self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=False),
853         )
854         self.assertEqual(
855             str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=True)
856         )
857         # remove the configuration
858         for pg in self.pg_interfaces:
859             self.add_del_address(pg, addr_id=1, is_add=False, is_v6=False)
860             self.add_del_address(pg, addr_id=1, is_add=False, is_v6=True)
861         self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
862
863
864 if __name__ == "__main__":
865     unittest.main(testRunner=VppTestRunner)