tests: Use errno value rather than a specific int
[vpp.git] / test / test_cnat.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase
6 from asfframework import VppTestRunner
7 from vpp_ip import INVALID_INDEX
8 from itertools import product
9
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP, UDP, TCP, ICMP
13 from scapy.layers.inet import IPerror, TCPerror, UDPerror
14 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
15 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
16
17 from ipaddress import ip_network
18
19 from vpp_object import VppObject
20 from vpp_papi import VppEnum
21
22 N_PKTS = 15
23 N_REMOTE_HOSTS = 3
24
25 SRC = 0
26 DST = 1
27
28
29 class CnatCommonTestCase(VppTestCase):
30     """CNat common test class"""
31
32     #
33     # turn the scanner off whilst testing otherwise sessions
34     # will time out
35     #
36     extra_vpp_config = [
37         "cnat",
38         "{",
39         "session-db-buckets",
40         "64",
41         "session-cleanup-timeout",
42         "0.1",
43         "session-max-age",
44         "1",
45         "tcp-max-age",
46         "1",
47         "scanner",
48         "off",
49         "}",
50     ]
51
52     @classmethod
53     def setUpClass(cls):
54         super(CnatCommonTestCase, cls).setUpClass()
55
56     @classmethod
57     def tearDownClass(cls):
58         super(CnatCommonTestCase, cls).tearDownClass()
59
60
61 class Endpoint(object):
62     """CNat endpoint"""
63
64     def __init__(self, pg=None, pgi=None, port=0, is_v6=False, ip=None):
65         self.port = port
66         self.is_v6 = is_v6
67         self.sw_if_index = INVALID_INDEX
68         if pg is not None and pgi is not None:
69             # pg interface specified and remote index
70             self.ip = self.get_ip46(pg.remote_hosts[pgi])
71         elif pg is not None:
72             self.ip = None
73             self.sw_if_index = pg.sw_if_index
74         elif ip is not None:
75             self.ip = ip
76         else:
77             self.ip = "::" if self.is_v6 else "0.0.0.0"
78
79     def get_ip46(self, obj):
80         if self.is_v6:
81             return obj.ip6
82         return obj.ip4
83
84     def udpate(self, **kwargs):
85         self.__init__(**kwargs)
86
87     def _vpp_if_af(self):
88         if self.is_v6:
89             return VppEnum.vl_api_address_family_t.ADDRESS_IP6
90         return VppEnum.vl_api_address_family_t.ADDRESS_IP4
91
92     def encode(self):
93         return {
94             "addr": self.ip,
95             "port": self.port,
96             "sw_if_index": self.sw_if_index,
97             "if_af": self._vpp_if_af(),
98         }
99
100     def __str__(self):
101         return "%s:%d" % (self.ip, self.port)
102
103
104 class Translation(VppObject):
105     def __init__(self, test, iproto, vip, paths, fhc):
106         self._test = test
107         self.vip = vip
108         self.iproto = iproto
109         self.paths = paths
110         self.fhc = fhc
111         self.id = None
112
113     def __str__(self):
114         return "%s %s %s" % (self.vip, self.iproto, self.paths)
115
116     def _vl4_proto(self):
117         ip_proto = VppEnum.vl_api_ip_proto_t
118         return {
119             UDP: ip_proto.IP_API_PROTO_UDP,
120             TCP: ip_proto.IP_API_PROTO_TCP,
121         }[self.iproto]
122
123     def _encoded_paths(self):
124         return [
125             {"src_ep": src.encode(), "dst_ep": dst.encode()}
126             for (src, dst) in self.paths
127         ]
128
129     def add_vpp_config(self):
130         r = self._test.vapi.cnat_translation_update(
131             {
132                 "vip": self.vip.encode(),
133                 "ip_proto": self._vl4_proto(),
134                 "n_paths": len(self.paths),
135                 "paths": self._encoded_paths(),
136                 "flow_hash_config": self.fhc,
137             }
138         )
139         self._test.registry.register(self, self._test.logger)
140         self.id = r.id
141         return self
142
143     def remove_vpp_config(self):
144         assert self.id is not None
145         self._test.vapi.cnat_translation_del(id=self.id)
146         return self
147
148     def query_vpp_config(self):
149         for t in self._test.vapi.cnat_translation_dump():
150             if self.id == t.translation.id:
151                 return t.translation
152         return None
153
154
155 class CnatTestContext(object):
156     """
157     Usage :
158
159     ctx = CnatTestContext(self, TCP, is_v6=True)
160
161     # send pg0.remote[0]:1234 -> pg1.remote[0]:6661
162     ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
163
164     # We expect this to be NATed as
165     # pg2.remote[0]:<anyport> -> pg1.remote[0]:6661
166     ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
167
168     # After running cnat_expect, we can send back the received packet
169     # and expect it be 'unnated' so that we get the original packet
170     ctx.cnat_send_return().cnat_expect_return()
171
172     # same thing for ICMP errors
173     ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
174     """
175
176     def __init__(self, test, L4PROTO, is_v6):
177         self.L4PROTO = L4PROTO
178         self.is_v6 = is_v6
179         self._test = test
180
181     def get_ip46(self, obj):
182         if self.is_v6:
183             return obj.ip6
184         return obj.ip4
185
186     @property
187     def IP46(self):
188         return IPv6 if self.is_v6 else IP
189
190     def cnat_send(
191         self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port, no_replies=False
192     ):
193         if isinstance(src_id, int):
194             self.src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
195         else:
196             self.dst_addr = src_id
197         if isinstance(dst_id, int):
198             self.dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
199         else:
200             self.dst_addr = dst_id
201         self.src_port = src_port  # also ICMP id
202         self.dst_port = dst_port  # also ICMP type
203
204         if self.L4PROTO in [TCP, UDP]:
205             l4 = self.L4PROTO(sport=self.src_port, dport=self.dst_port)
206         elif self.L4PROTO in [ICMP] and not self.is_v6:
207             l4 = self.L4PROTO(id=self.src_port, type=self.dst_port)
208         elif self.L4PROTO in [ICMP] and self.is_v6:
209             l4 = ICMPv6EchoRequest(id=self.src_port)
210         p1 = (
211             Ether(src=src_pg.remote_mac, dst=src_pg.local_mac)
212             / self.IP46(src=self.src_addr, dst=self.dst_addr)
213             / l4
214             / Raw()
215         )
216
217         if no_replies:
218             self._test.send_and_assert_no_replies(src_pg, p1 * N_PKTS, dst_pg)
219         else:
220             self.rxs = self._test.send_and_expect(src_pg, p1 * N_PKTS, dst_pg)
221         self.expected_src_pg = src_pg
222         self.expected_dst_pg = dst_pg
223         return self
224
225     def cnat_expect(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port):
226         if isinstance(src_id, int):
227             self.expect_src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
228         else:
229             self.expect_src_addr = src_id
230         if isinstance(dst_id, int):
231             self.expect_dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
232         else:
233             self.expect_dst_addr = dst_id
234         self.expect_src_port = src_port
235         self.expect_dst_port = dst_port
236
237         if self.expect_src_port is None:
238             if self.L4PROTO in [TCP, UDP]:
239                 self.expect_src_port = self.rxs[0][self.L4PROTO].sport
240             elif self.L4PROTO in [ICMP] and not self.is_v6:
241                 self.expect_src_port = self.rxs[0][self.L4PROTO].id
242             elif self.L4PROTO in [ICMP] and self.is_v6:
243                 self.expect_src_port = self.rxs[0][ICMPv6EchoRequest].id
244
245         for rx in self.rxs:
246             self._test.assert_packet_checksums_valid(rx)
247             self._test.assertEqual(rx[self.IP46].dst, self.expect_dst_addr)
248             self._test.assertEqual(rx[self.IP46].src, self.expect_src_addr)
249             if self.L4PROTO in [TCP, UDP]:
250                 self._test.assertEqual(rx[self.L4PROTO].dport, self.expect_dst_port)
251                 self._test.assertEqual(rx[self.L4PROTO].sport, self.expect_src_port)
252             elif self.L4PROTO in [ICMP] and not self.is_v6:
253                 self._test.assertEqual(rx[self.L4PROTO].type, self.expect_dst_port)
254                 self._test.assertEqual(rx[self.L4PROTO].id, self.expect_src_port)
255             elif self.L4PROTO in [ICMP] and self.is_v6:
256                 self._test.assertEqual(rx[ICMPv6EchoRequest].id, self.expect_src_port)
257         return self
258
259     def cnat_send_return(self):
260         """This sends the return traffic"""
261         if self.L4PROTO in [TCP, UDP]:
262             l4 = self.L4PROTO(sport=self.expect_dst_port, dport=self.expect_src_port)
263         elif self.L4PROTO in [ICMP] and not self.is_v6:
264             # icmp type 0 if echo reply
265             l4 = self.L4PROTO(id=self.expect_src_port, type=0)
266         elif self.L4PROTO in [ICMP] and self.is_v6:
267             l4 = ICMPv6EchoReply(id=self.expect_src_port)
268         src_mac = self.expected_dst_pg.remote_mac
269         p1 = (
270             Ether(src=src_mac, dst=self.expected_dst_pg.local_mac)
271             / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
272             / l4
273             / Raw()
274         )
275
276         self.return_rxs = self._test.send_and_expect(
277             self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
278         )
279         return self
280
281     def cnat_expect_return(self):
282         for rx in self.return_rxs:
283             self._test.assert_packet_checksums_valid(rx)
284             self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
285             self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
286             if self.L4PROTO in [TCP, UDP]:
287                 self._test.assertEqual(rx[self.L4PROTO].dport, self.src_port)
288                 self._test.assertEqual(rx[self.L4PROTO].sport, self.dst_port)
289             elif self.L4PROTO in [ICMP] and not self.is_v6:
290                 # icmp type 0 if echo reply
291                 self._test.assertEqual(rx[self.L4PROTO].type, 0)
292                 self._test.assertEqual(rx[self.L4PROTO].id, self.src_port)
293             elif self.L4PROTO in [ICMP] and self.is_v6:
294                 self._test.assertEqual(rx[ICMPv6EchoReply].id, self.src_port)
295         return self
296
297     def cnat_send_icmp_return_error(self):
298         """
299         This called after cnat_expect will send an icmp error
300         on the reverse path
301         """
302         ICMPelem = ICMPv6DestUnreach(code=1) if self.is_v6 else ICMP(type=11)
303         InnerIP = self.rxs[0][self.IP46]
304         p1 = (
305             Ether(
306                 src=self.expected_dst_pg.remote_mac, dst=self.expected_dst_pg.local_mac
307             )
308             / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
309             / ICMPelem
310             / InnerIP
311         )
312         self.return_rxs = self._test.send_and_expect(
313             self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
314         )
315         return self
316
317     def cnat_expect_icmp_error_return(self):
318         ICMP46 = ICMPv6DestUnreach if self.is_v6 else ICMP
319         IP46err = IPerror6 if self.is_v6 else IPerror
320         L4err = TCPerror if self.L4PROTO is TCP else UDPerror
321         for rx in self.return_rxs:
322             self._test.assert_packet_checksums_valid(rx)
323             self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
324             self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
325             self._test.assertEqual(rx[ICMP46][IP46err].src, self.src_addr)
326             self._test.assertEqual(rx[ICMP46][IP46err].dst, self.dst_addr)
327             self._test.assertEqual(rx[ICMP46][IP46err][L4err].sport, self.src_port)
328             self._test.assertEqual(rx[ICMP46][IP46err][L4err].dport, self.dst_port)
329         return self
330
331
332 # -------------------------------------------------------------------
333 # -------------------------------------------------------------------
334 # -------------------------------------------------------------------
335 # -------------------------------------------------------------------
336
337
338 class TestCNatTranslation(CnatCommonTestCase):
339     """CNat Translation"""
340
341     @classmethod
342     def setUpClass(cls):
343         super(TestCNatTranslation, cls).setUpClass()
344
345     @classmethod
346     def tearDownClass(cls):
347         super(TestCNatTranslation, cls).tearDownClass()
348
349     def setUp(self):
350         super(TestCNatTranslation, self).setUp()
351
352         self.create_pg_interfaces(range(3))
353         self.pg0.generate_remote_hosts(N_REMOTE_HOSTS)
354         self.pg1.generate_remote_hosts(N_REMOTE_HOSTS)
355
356         for i in self.pg_interfaces:
357             i.admin_up()
358             i.config_ip4()
359             i.resolve_arp()
360             i.config_ip6()
361             i.resolve_ndp()
362             i.configure_ipv4_neighbors()
363             i.configure_ipv6_neighbors()
364
365     def tearDown(self):
366         for translation in self.translations:
367             translation.remove_vpp_config()
368
369         self.vapi.cnat_session_purge()
370         self.assertFalse(self.vapi.cnat_session_dump())
371
372         for i in self.pg_interfaces:
373             i.unconfig_ip4()
374             i.unconfig_ip6()
375             i.admin_down()
376         super(TestCNatTranslation, self).tearDown()
377
378     def cnat_fhc_translation(self):
379         """CNat Translation"""
380         self.logger.info(self.vapi.cli("sh cnat client"))
381         self.logger.info(self.vapi.cli("sh cnat translation"))
382
383         for nbr, translation in enumerate(self.mbtranslations):
384             vip = translation.vip
385
386             #
387             # Flows to the VIP with same ips and different source ports are loadbalanced identically
388             # in both cases of flow hash 0x03 (src ip and dst ip) and 0x08 (dst port)
389             #
390             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
391             for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
392                 # from client to vip
393                 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
394                 dport1 = ctx.rxs[0][ctx.L4PROTO].dport
395                 ctx._test.assertIn(
396                     dport1,
397                     [translation.paths[0][DST].port, translation.paths[1][DST].port],
398                 )
399                 ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg1, nbr, dport1)
400
401                 ctx.cnat_send(
402                     self.pg0, src_pgi, sport + 122, self.pg1, vip.ip, vip.port
403                 )
404                 dport2 = ctx.rxs[0][ctx.L4PROTO].dport
405                 ctx._test.assertIn(
406                     dport2,
407                     [translation.paths[0][DST].port, translation.paths[1][DST].port],
408                 )
409                 ctx.cnat_expect(self.pg0, src_pgi, sport + 122, self.pg1, nbr, dport2)
410
411                 ctx._test.assertEqual(dport1, dport2)
412
413     def cnat_translation(self):
414         """CNat Translation"""
415         self.logger.info(self.vapi.cli("sh cnat client"))
416         self.logger.info(self.vapi.cli("sh cnat translation"))
417
418         for nbr, translation in enumerate(self.translations):
419             vip = translation.vip
420
421             #
422             # Test Flows to the VIP
423             #
424             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
425             for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
426                 # from client to vip
427                 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
428                 dst_port = translation.paths[0][DST].port
429                 ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg1, nbr, dst_port)
430                 # from vip to client
431                 ctx.cnat_send_return().cnat_expect_return()
432
433                 #
434                 # packets to the VIP that do not match a
435                 # translation are dropped
436                 #
437                 ctx.cnat_send(
438                     self.pg0, src_pgi, sport, self.pg1, vip.ip, 6666, no_replies=True
439                 )
440
441                 #
442                 # packets from the VIP that do not match a
443                 # session are forwarded
444                 #
445                 ctx.cnat_send(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
446                 ctx.cnat_expect(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
447
448             #
449             # modify the translation to use a different backend
450             #
451             old_dst_port = translation.paths[0][DST].port
452             translation.paths[0][DST].udpate(
453                 pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6
454             )
455             translation.add_vpp_config()
456
457             #
458             # existing flows follow the old path
459             #
460             for src_pgi in range(N_REMOTE_HOSTS):
461                 for sport in [1234, 1233]:
462                     # from client to vip
463                     ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
464                     ctx.cnat_expect(
465                         self.pg0, src_pgi, sport, self.pg1, nbr, old_dst_port
466                     )
467                     # from vip to client
468                     ctx.cnat_send_return().cnat_expect_return()
469
470             #
471             # new flows go to the new backend
472             #
473             for src_pgi in range(N_REMOTE_HOSTS):
474                 ctx.cnat_send(self.pg0, src_pgi, 9999, self.pg2, vip.ip, vip.port)
475                 ctx.cnat_expect(self.pg0, src_pgi, 9999, self.pg2, 0, 5000)
476
477             self.logger.info(self.vapi.cli("sh cnat session verbose"))
478
479         #
480         # turn the scanner back on and wait until the sessions
481         # all disapper
482         #
483         self.vapi.cli("test cnat scanner on")
484         self.virtual_sleep(2)
485         sessions = self.vapi.cnat_session_dump()
486         self.assertEqual(len(sessions), 0)
487         self.vapi.cli("test cnat scanner off")
488
489         #
490         # load some flows again and purge
491         #
492         for translation in self.translations:
493             vip = translation.vip
494             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
495             for src_pgi in range(N_REMOTE_HOSTS):
496                 for sport in [1234, 1233]:
497                     # from client to vip
498                     ctx.cnat_send(self.pg0, src_pgi, sport, self.pg2, vip.ip, vip.port)
499                     ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg2, 0, 5000)
500
501     def _test_icmp(self):
502         #
503         # Testing ICMP
504         #
505         for nbr, translation in enumerate(self.translations):
506             vip = translation.vip
507             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
508
509             #
510             # NATing ICMP errors
511             #
512             ctx.cnat_send(self.pg0, 0, 1234, self.pg1, vip.ip, vip.port)
513             dst_port = translation.paths[0][DST].port
514             ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, nbr, dst_port)
515             ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
516
517             #
518             # ICMP errors with no VIP associated should not be
519             # modified
520             #
521             ctx.cnat_send(self.pg0, 0, 1234, self.pg2, 0, vip.port)
522             dst_port = translation.paths[0][DST].port
523             ctx.cnat_expect(self.pg0, 0, 1234, self.pg2, 0, vip.port)
524             ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
525
526     def _make_multi_backend_translations(self):
527         self.translations = []
528         self.mbtranslations = []
529         self.mbtranslations.append(
530             Translation(
531                 self,
532                 TCP,
533                 Endpoint(ip="30.0.0.5", port=5555, is_v6=False),
534                 [
535                     (
536                         Endpoint(is_v6=False),
537                         Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
538                     ),
539                     (
540                         Endpoint(is_v6=False),
541                         Endpoint(pg=self.pg1, pgi=0, port=4005, is_v6=False),
542                     ),
543                 ],
544                 0x03,  # hash only on dst ip and src ip
545             ).add_vpp_config()
546         )
547         self.mbtranslations.append(
548             Translation(
549                 self,
550                 TCP,
551                 Endpoint(ip="30.0.0.6", port=5555, is_v6=False),
552                 [
553                     (
554                         Endpoint(is_v6=False),
555                         Endpoint(pg=self.pg1, pgi=1, port=4006, is_v6=False),
556                     ),
557                     (
558                         Endpoint(is_v6=False),
559                         Endpoint(pg=self.pg1, pgi=1, port=4007, is_v6=False),
560                     ),
561                 ],
562                 0x08,  # hash only on dst port
563             ).add_vpp_config()
564         )
565
566     def _make_translations_v4(self):
567         self.translations = []
568         self.translations.append(
569             Translation(
570                 self,
571                 TCP,
572                 Endpoint(ip="30.0.0.1", port=5555, is_v6=False),
573                 [
574                     (
575                         Endpoint(is_v6=False),
576                         Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
577                     )
578                 ],
579                 0x9F,
580             ).add_vpp_config()
581         )
582         self.translations.append(
583             Translation(
584                 self,
585                 TCP,
586                 Endpoint(ip="30.0.0.2", port=5554, is_v6=False),
587                 [
588                     (
589                         Endpoint(is_v6=False),
590                         Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False),
591                     )
592                 ],
593                 0x9F,
594             ).add_vpp_config()
595         )
596         self.translations.append(
597             Translation(
598                 self,
599                 UDP,
600                 Endpoint(ip="30.0.0.2", port=5553, is_v6=False),
601                 [
602                     (
603                         Endpoint(is_v6=False),
604                         Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False),
605                     )
606                 ],
607                 0x9F,
608             ).add_vpp_config()
609         )
610
611     def _make_translations_v6(self):
612         self.translations = []
613         self.translations.append(
614             Translation(
615                 self,
616                 TCP,
617                 Endpoint(ip="30::1", port=5555, is_v6=True),
618                 [
619                     (
620                         Endpoint(is_v6=True),
621                         Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True),
622                     )
623                 ],
624                 0x9F,
625             ).add_vpp_config()
626         )
627         self.translations.append(
628             Translation(
629                 self,
630                 TCP,
631                 Endpoint(ip="30::2", port=5554, is_v6=True),
632                 [
633                     (
634                         Endpoint(is_v6=True),
635                         Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True),
636                     )
637                 ],
638                 0x9F,
639             ).add_vpp_config()
640         )
641         self.translations.append(
642             Translation(
643                 self,
644                 UDP,
645                 Endpoint(ip="30::2", port=5553, is_v6=True),
646                 [
647                     (
648                         Endpoint(is_v6=True),
649                         Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True),
650                     )
651                 ],
652                 0x9F,
653             ).add_vpp_config()
654         )
655
656     def test_icmp4(self):
657         # """ CNat Translation icmp v4 """
658         self._make_translations_v4()
659         self._test_icmp()
660
661     def test_icmp6(self):
662         # """ CNat Translation icmp v6 """
663         self._make_translations_v6()
664         self._test_icmp()
665
666     def test_cnat6(self):
667         # """ CNat Translation ipv6 """
668         self._make_translations_v6()
669         self.cnat_translation()
670
671     def test_cnat4(self):
672         # """ CNat Translation ipv4 """
673         self._make_translations_v4()
674         self.cnat_translation()
675
676     def test_cnat_fhc(self):
677         # """ CNat Translation flow hash config """
678         self._make_multi_backend_translations()
679         self.cnat_fhc_translation()
680
681
682 class TestCNatSourceNAT(CnatCommonTestCase):
683     """CNat Source NAT"""
684
685     @classmethod
686     def setUpClass(cls):
687         super(TestCNatSourceNAT, cls).setUpClass()
688
689     @classmethod
690     def tearDownClass(cls):
691         super(TestCNatSourceNAT, cls).tearDownClass()
692
693     def _enable_disable_snat(self, is_enable=True):
694         self.vapi.cnat_set_snat_addresses(
695             snat_ip4=self.pg2.remote_hosts[0].ip4,
696             snat_ip6=self.pg2.remote_hosts[0].ip6,
697             sw_if_index=INVALID_INDEX,
698         )
699         self.vapi.feature_enable_disable(
700             enable=1 if is_enable else 0,
701             arc_name="ip6-unicast",
702             feature_name="cnat-snat-ip6",
703             sw_if_index=self.pg0.sw_if_index,
704         )
705         self.vapi.feature_enable_disable(
706             enable=1 if is_enable else 0,
707             arc_name="ip4-unicast",
708             feature_name="cnat-snat-ip4",
709             sw_if_index=self.pg0.sw_if_index,
710         )
711
712         policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
713         self.vapi.cnat_set_snat_policy(
714             policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX
715         )
716         for i in self.pg_interfaces:
717             self.vapi.cnat_snat_policy_add_del_if(
718                 sw_if_index=i.sw_if_index,
719                 is_add=1 if is_enable else 0,
720                 table=policie_tbls.CNAT_POLICY_INCLUDE_V6,
721             )
722             self.vapi.cnat_snat_policy_add_del_if(
723                 sw_if_index=i.sw_if_index,
724                 is_add=1 if is_enable else 0,
725                 table=policie_tbls.CNAT_POLICY_INCLUDE_V4,
726             )
727
728     def setUp(self):
729         super(TestCNatSourceNAT, self).setUp()
730
731         self.create_pg_interfaces(range(3))
732         self.pg1.generate_remote_hosts(2)
733
734         for i in self.pg_interfaces:
735             i.admin_up()
736             i.config_ip4()
737             i.resolve_arp()
738             i.config_ip6()
739             i.resolve_ndp()
740             i.configure_ipv6_neighbors()
741             i.configure_ipv4_neighbors()
742
743         self._enable_disable_snat(is_enable=True)
744
745     def tearDown(self):
746         self._enable_disable_snat(is_enable=True)
747
748         self.vapi.cnat_session_purge()
749         for i in self.pg_interfaces:
750             i.unconfig_ip4()
751             i.unconfig_ip6()
752             i.admin_down()
753         super(TestCNatSourceNAT, self).tearDown()
754
755     def test_snat_v6(self):
756         # """ CNat Source Nat v6 """
757         self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True)
758         self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True)
759         self.sourcenat_test_icmp_echo_conf(is_v6=True)
760
761     def test_snat_v4(self):
762         # """ CNat Source Nat v4 """
763         self.sourcenat_test_tcp_udp_conf(TCP)
764         self.sourcenat_test_tcp_udp_conf(UDP)
765         self.sourcenat_test_icmp_echo_conf()
766
767     def sourcenat_test_icmp_echo_conf(self, is_v6=False):
768         ctx = CnatTestContext(self, ICMP, is_v6=is_v6)
769         # 8 is ICMP type echo (v4 only)
770         ctx.cnat_send(self.pg0, 0, 0xFEED, self.pg1, 0, 8)
771         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8)
772         ctx.cnat_send_return().cnat_expect_return()
773
774     def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False):
775         ctx = CnatTestContext(self, L4PROTO, is_v6)
776         # we should source NAT
777         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
778         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
779         ctx.cnat_send_return().cnat_expect_return()
780
781         # exclude dst address of pg1.1 from snat
782         if is_v6:
783             exclude_prefix = ip_network(
784                 "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False
785             )
786         else:
787             exclude_prefix = ip_network(
788                 "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False
789             )
790
791         # add remote host to exclude list
792         self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=1)
793
794         # We should not source NAT the id=1
795         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
796         ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, 1, 6661)
797         ctx.cnat_send_return().cnat_expect_return()
798
799         # But we should source NAT the id=0
800         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
801         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
802         ctx.cnat_send_return().cnat_expect_return()
803
804         # remove remote host from exclude list
805         self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=0)
806         self.vapi.cnat_session_purge()
807
808         # We should source NAT again
809         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
810         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
811         ctx.cnat_send_return().cnat_expect_return()
812
813         # test return ICMP error nating
814         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
815         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
816         ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
817
818         self.vapi.cnat_session_purge()
819
820
821 class TestCNatDHCP(CnatCommonTestCase):
822     """CNat Translation"""
823
824     @classmethod
825     def setUpClass(cls):
826         super(TestCNatDHCP, cls).setUpClass()
827
828     @classmethod
829     def tearDownClass(cls):
830         super(TestCNatDHCP, cls).tearDownClass()
831
832     def tearDown(self):
833         for i in self.pg_interfaces:
834             i.admin_down()
835         super(TestCNatDHCP, self).tearDown()
836
837     def make_addr(self, sw_if_index, addr_id, is_v6):
838         if is_v6:
839             return "fd01:%x::%u" % (sw_if_index, addr_id + 1)
840         return "172.16.%u.%u" % (sw_if_index, addr_id)
841
842     def make_prefix(self, sw_if_index, addr_id, is_v6):
843         if is_v6:
844             return "%s/128" % self.make_addr(sw_if_index, addr_id, is_v6)
845         return "%s/32" % self.make_addr(sw_if_index, addr_id, is_v6)
846
847     def check_resolved(self, tr, addr_id, is_v6=False):
848         qt = tr.query_vpp_config()
849         self.assertEqual(
850             str(qt.vip.addr), self.make_addr(tr.vip.sw_if_index, addr_id, is_v6)
851         )
852         self.assertEqual(len(qt.paths), len(tr.paths))
853         for path_tr, path_qt in zip(tr.paths, qt.paths):
854             src_qt = path_qt.src_ep
855             dst_qt = path_qt.dst_ep
856             src_tr, dst_tr = path_tr
857             self.assertEqual(
858                 str(src_qt.addr), self.make_addr(src_tr.sw_if_index, addr_id, is_v6)
859             )
860             self.assertEqual(
861                 str(dst_qt.addr), self.make_addr(dst_tr.sw_if_index, addr_id, is_v6)
862             )
863
864     def add_del_address(self, pg, addr_id, is_add=True, is_v6=False):
865         self.vapi.sw_interface_add_del_address(
866             sw_if_index=pg.sw_if_index,
867             prefix=self.make_prefix(pg.sw_if_index, addr_id, is_v6),
868             is_add=1 if is_add else 0,
869         )
870
871     def _test_dhcp_v46(self, is_v6):
872         self.create_pg_interfaces(range(4))
873         for i in self.pg_interfaces:
874             i.admin_up()
875         paths = [
876             (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg2, is_v6=is_v6)),
877             (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg3, is_v6=is_v6)),
878         ]
879         ep = Endpoint(pg=self.pg0, is_v6=is_v6)
880         t = Translation(self, TCP, ep, paths, 0x9F).add_vpp_config()
881         # Add an address on every interface
882         # and check it is reflected in the cnat config
883         for pg in self.pg_interfaces:
884             self.add_del_address(pg, addr_id=0, is_add=True, is_v6=is_v6)
885         self.check_resolved(t, addr_id=0, is_v6=is_v6)
886         # Add a new address on every interface, remove the old one
887         # and check it is reflected in the cnat config
888         for pg in self.pg_interfaces:
889             self.add_del_address(pg, addr_id=1, is_add=True, is_v6=is_v6)
890             self.add_del_address(pg, addr_id=0, is_add=False, is_v6=is_v6)
891         self.check_resolved(t, addr_id=1, is_v6=is_v6)
892         # remove the configuration
893         for pg in self.pg_interfaces:
894             self.add_del_address(pg, addr_id=1, is_add=False, is_v6=is_v6)
895         t.remove_vpp_config()
896
897     def test_dhcp_v4(self):
898         self._test_dhcp_v46(False)
899
900     def test_dhcp_v6(self):
901         self._test_dhcp_v46(True)
902
903     def test_dhcp_snat(self):
904         self.create_pg_interfaces(range(1))
905         for i in self.pg_interfaces:
906             i.admin_up()
907         self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
908         # Add an address on every interface
909         # and check it is reflected in the cnat config
910         for pg in self.pg_interfaces:
911             self.add_del_address(pg, addr_id=0, is_add=True, is_v6=False)
912             self.add_del_address(pg, addr_id=0, is_add=True, is_v6=True)
913         r = self.vapi.cnat_get_snat_addresses()
914         self.assertEqual(
915             str(r.snat_ip4),
916             self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=False),
917         )
918         self.assertEqual(
919             str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=True)
920         )
921         # Add a new address on every interface, remove the old one
922         # and check it is reflected in the cnat config
923         for pg in self.pg_interfaces:
924             self.add_del_address(pg, addr_id=1, is_add=True, is_v6=False)
925             self.add_del_address(pg, addr_id=1, is_add=True, is_v6=True)
926             self.add_del_address(pg, addr_id=0, is_add=False, is_v6=False)
927             self.add_del_address(pg, addr_id=0, is_add=False, is_v6=True)
928         r = self.vapi.cnat_get_snat_addresses()
929         self.assertEqual(
930             str(r.snat_ip4),
931             self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=False),
932         )
933         self.assertEqual(
934             str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=True)
935         )
936         # remove the configuration
937         for pg in self.pg_interfaces:
938             self.add_del_address(pg, addr_id=1, is_add=False, is_v6=False)
939             self.add_del_address(pg, addr_id=1, is_add=False, is_v6=True)
940         self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
941
942
943 if __name__ == "__main__":
944     unittest.main(testRunner=VppTestRunner)