http_static: fix memory hss_session using after be freed
[vpp.git] / test / test_cnat.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase
6 from asfframework import VppTestRunner
7 from vpp_ip import INVALID_INDEX
8 from itertools import product
9 from config import config
10
11 from scapy.packet import Raw
12 from scapy.layers.l2 import Ether
13 from scapy.layers.inet import IP, UDP, TCP, ICMP
14 from scapy.layers.inet import IPerror, TCPerror, UDPerror
15 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
16 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
17
18 from ipaddress import ip_network
19
20 from vpp_object import VppObject
21 from vpp_papi import VppEnum
22
23 N_PKTS = 15
24 N_REMOTE_HOSTS = 3
25
26 SRC = 0
27 DST = 1
28
29
30 class CnatCommonTestCase(VppTestCase):
31     """CNat common test class"""
32
33     #
34     # turn the scanner off whilst testing otherwise sessions
35     # will time out
36     #
37     extra_vpp_config = [
38         "cnat",
39         "{",
40         "session-db-buckets",
41         "64",
42         "session-cleanup-timeout",
43         "0.1",
44         "session-max-age",
45         "1",
46         "tcp-max-age",
47         "1",
48         "scanner",
49         "off",
50         "}",
51     ]
52
53     @classmethod
54     def setUpClass(cls):
55         super(CnatCommonTestCase, cls).setUpClass()
56
57     @classmethod
58     def tearDownClass(cls):
59         super(CnatCommonTestCase, cls).tearDownClass()
60
61
62 class Endpoint(object):
63     """CNat endpoint"""
64
65     def __init__(self, pg=None, pgi=None, port=0, is_v6=False, ip=None):
66         self.port = port
67         self.is_v6 = is_v6
68         self.sw_if_index = INVALID_INDEX
69         if pg is not None and pgi is not None:
70             # pg interface specified and remote index
71             self.ip = self.get_ip46(pg.remote_hosts[pgi])
72         elif pg is not None:
73             self.ip = None
74             self.sw_if_index = pg.sw_if_index
75         elif ip is not None:
76             self.ip = ip
77         else:
78             self.ip = "::" if self.is_v6 else "0.0.0.0"
79
80     def get_ip46(self, obj):
81         if self.is_v6:
82             return obj.ip6
83         return obj.ip4
84
85     def udpate(self, **kwargs):
86         self.__init__(**kwargs)
87
88     def _vpp_if_af(self):
89         if self.is_v6:
90             return VppEnum.vl_api_address_family_t.ADDRESS_IP6
91         return VppEnum.vl_api_address_family_t.ADDRESS_IP4
92
93     def encode(self):
94         return {
95             "addr": self.ip,
96             "port": self.port,
97             "sw_if_index": self.sw_if_index,
98             "if_af": self._vpp_if_af(),
99         }
100
101     def __str__(self):
102         return "%s:%d" % (self.ip, self.port)
103
104
105 class Translation(VppObject):
106     def __init__(self, test, iproto, vip, paths, fhc):
107         self._test = test
108         self.vip = vip
109         self.iproto = iproto
110         self.paths = paths
111         self.fhc = fhc
112         self.id = None
113
114     def __str__(self):
115         return "%s %s %s" % (self.vip, self.iproto, self.paths)
116
117     def _vl4_proto(self):
118         ip_proto = VppEnum.vl_api_ip_proto_t
119         return {
120             UDP: ip_proto.IP_API_PROTO_UDP,
121             TCP: ip_proto.IP_API_PROTO_TCP,
122         }[self.iproto]
123
124     def _encoded_paths(self):
125         return [
126             {"src_ep": src.encode(), "dst_ep": dst.encode()}
127             for (src, dst) in self.paths
128         ]
129
130     def add_vpp_config(self):
131         r = self._test.vapi.cnat_translation_update(
132             {
133                 "vip": self.vip.encode(),
134                 "ip_proto": self._vl4_proto(),
135                 "n_paths": len(self.paths),
136                 "paths": self._encoded_paths(),
137                 "flow_hash_config": self.fhc,
138             }
139         )
140         self._test.registry.register(self, self._test.logger)
141         self.id = r.id
142         return self
143
144     def remove_vpp_config(self):
145         assert self.id is not None
146         self._test.vapi.cnat_translation_del(id=self.id)
147         return self
148
149     def query_vpp_config(self):
150         for t in self._test.vapi.cnat_translation_dump():
151             if self.id == t.translation.id:
152                 return t.translation
153         return None
154
155
156 class CnatTestContext(object):
157     """
158     Usage :
159
160     ctx = CnatTestContext(self, TCP, is_v6=True)
161
162     # send pg0.remote[0]:1234 -> pg1.remote[0]:6661
163     ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
164
165     # We expect this to be NATed as
166     # pg2.remote[0]:<anyport> -> pg1.remote[0]:6661
167     ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
168
169     # After running cnat_expect, we can send back the received packet
170     # and expect it be 'unnated' so that we get the original packet
171     ctx.cnat_send_return().cnat_expect_return()
172
173     # same thing for ICMP errors
174     ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
175     """
176
177     def __init__(self, test, L4PROTO, is_v6):
178         self.L4PROTO = L4PROTO
179         self.is_v6 = is_v6
180         self._test = test
181
182     def get_ip46(self, obj):
183         if self.is_v6:
184             return obj.ip6
185         return obj.ip4
186
187     @property
188     def IP46(self):
189         return IPv6 if self.is_v6 else IP
190
191     def cnat_send(
192         self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port, no_replies=False
193     ):
194         if isinstance(src_id, int):
195             self.src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
196         else:
197             self.dst_addr = src_id
198         if isinstance(dst_id, int):
199             self.dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
200         else:
201             self.dst_addr = dst_id
202         self.src_port = src_port  # also ICMP id
203         self.dst_port = dst_port  # also ICMP type
204
205         if self.L4PROTO in [TCP, UDP]:
206             l4 = self.L4PROTO(sport=self.src_port, dport=self.dst_port)
207         elif self.L4PROTO in [ICMP] and not self.is_v6:
208             l4 = self.L4PROTO(id=self.src_port, type=self.dst_port)
209         elif self.L4PROTO in [ICMP] and self.is_v6:
210             l4 = ICMPv6EchoRequest(id=self.src_port)
211         p1 = (
212             Ether(src=src_pg.remote_mac, dst=src_pg.local_mac)
213             / self.IP46(src=self.src_addr, dst=self.dst_addr)
214             / l4
215             / Raw()
216         )
217
218         if no_replies:
219             self._test.send_and_assert_no_replies(src_pg, p1 * N_PKTS, dst_pg)
220         else:
221             self.rxs = self._test.send_and_expect(src_pg, p1 * N_PKTS, dst_pg)
222         self.expected_src_pg = src_pg
223         self.expected_dst_pg = dst_pg
224         return self
225
226     def cnat_expect(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port):
227         if isinstance(src_id, int):
228             self.expect_src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
229         else:
230             self.expect_src_addr = src_id
231         if isinstance(dst_id, int):
232             self.expect_dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
233         else:
234             self.expect_dst_addr = dst_id
235         self.expect_src_port = src_port
236         self.expect_dst_port = dst_port
237
238         if self.expect_src_port is None:
239             if self.L4PROTO in [TCP, UDP]:
240                 self.expect_src_port = self.rxs[0][self.L4PROTO].sport
241             elif self.L4PROTO in [ICMP] and not self.is_v6:
242                 self.expect_src_port = self.rxs[0][self.L4PROTO].id
243             elif self.L4PROTO in [ICMP] and self.is_v6:
244                 self.expect_src_port = self.rxs[0][ICMPv6EchoRequest].id
245
246         for rx in self.rxs:
247             self._test.assert_packet_checksums_valid(rx)
248             self._test.assertEqual(rx[self.IP46].dst, self.expect_dst_addr)
249             self._test.assertEqual(rx[self.IP46].src, self.expect_src_addr)
250             if self.L4PROTO in [TCP, UDP]:
251                 self._test.assertEqual(rx[self.L4PROTO].dport, self.expect_dst_port)
252                 self._test.assertEqual(rx[self.L4PROTO].sport, self.expect_src_port)
253             elif self.L4PROTO in [ICMP] and not self.is_v6:
254                 self._test.assertEqual(rx[self.L4PROTO].type, self.expect_dst_port)
255                 self._test.assertEqual(rx[self.L4PROTO].id, self.expect_src_port)
256             elif self.L4PROTO in [ICMP] and self.is_v6:
257                 self._test.assertEqual(rx[ICMPv6EchoRequest].id, self.expect_src_port)
258         return self
259
260     def cnat_send_return(self):
261         """This sends the return traffic"""
262         if self.L4PROTO in [TCP, UDP]:
263             l4 = self.L4PROTO(sport=self.expect_dst_port, dport=self.expect_src_port)
264         elif self.L4PROTO in [ICMP] and not self.is_v6:
265             # icmp type 0 if echo reply
266             l4 = self.L4PROTO(id=self.expect_src_port, type=0)
267         elif self.L4PROTO in [ICMP] and self.is_v6:
268             l4 = ICMPv6EchoReply(id=self.expect_src_port)
269         src_mac = self.expected_dst_pg.remote_mac
270         p1 = (
271             Ether(src=src_mac, dst=self.expected_dst_pg.local_mac)
272             / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
273             / l4
274             / Raw()
275         )
276
277         self.return_rxs = self._test.send_and_expect(
278             self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
279         )
280         return self
281
282     def cnat_expect_return(self):
283         for rx in self.return_rxs:
284             self._test.assert_packet_checksums_valid(rx)
285             self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
286             self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
287             if self.L4PROTO in [TCP, UDP]:
288                 self._test.assertEqual(rx[self.L4PROTO].dport, self.src_port)
289                 self._test.assertEqual(rx[self.L4PROTO].sport, self.dst_port)
290             elif self.L4PROTO in [ICMP] and not self.is_v6:
291                 # icmp type 0 if echo reply
292                 self._test.assertEqual(rx[self.L4PROTO].type, 0)
293                 self._test.assertEqual(rx[self.L4PROTO].id, self.src_port)
294             elif self.L4PROTO in [ICMP] and self.is_v6:
295                 self._test.assertEqual(rx[ICMPv6EchoReply].id, self.src_port)
296         return self
297
298     def cnat_send_icmp_return_error(self):
299         """
300         This called after cnat_expect will send an icmp error
301         on the reverse path
302         """
303         ICMPelem = ICMPv6DestUnreach(code=1) if self.is_v6 else ICMP(type=11)
304         InnerIP = self.rxs[0][self.IP46]
305         p1 = (
306             Ether(
307                 src=self.expected_dst_pg.remote_mac, dst=self.expected_dst_pg.local_mac
308             )
309             / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
310             / ICMPelem
311             / InnerIP
312         )
313         self.return_rxs = self._test.send_and_expect(
314             self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
315         )
316         return self
317
318     def cnat_expect_icmp_error_return(self):
319         ICMP46 = ICMPv6DestUnreach if self.is_v6 else ICMP
320         IP46err = IPerror6 if self.is_v6 else IPerror
321         L4err = TCPerror if self.L4PROTO is TCP else UDPerror
322         for rx in self.return_rxs:
323             self._test.assert_packet_checksums_valid(rx)
324             self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
325             self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
326             self._test.assertEqual(rx[ICMP46][IP46err].src, self.src_addr)
327             self._test.assertEqual(rx[ICMP46][IP46err].dst, self.dst_addr)
328             self._test.assertEqual(rx[ICMP46][IP46err][L4err].sport, self.src_port)
329             self._test.assertEqual(rx[ICMP46][IP46err][L4err].dport, self.dst_port)
330         return self
331
332
333 # -------------------------------------------------------------------
334 # -------------------------------------------------------------------
335 # -------------------------------------------------------------------
336 # -------------------------------------------------------------------
337
338
339 @unittest.skipIf("cnat" in config.excluded_plugins, "Exclude CNAT plugin tests")
340 class TestCNatTranslation(CnatCommonTestCase):
341     """CNat Translation"""
342
343     @classmethod
344     def setUpClass(cls):
345         super(TestCNatTranslation, cls).setUpClass()
346
347     @classmethod
348     def tearDownClass(cls):
349         super(TestCNatTranslation, cls).tearDownClass()
350
351     def setUp(self):
352         super(TestCNatTranslation, self).setUp()
353
354         self.create_pg_interfaces(range(3))
355         self.pg0.generate_remote_hosts(N_REMOTE_HOSTS)
356         self.pg1.generate_remote_hosts(N_REMOTE_HOSTS)
357
358         for i in self.pg_interfaces:
359             i.admin_up()
360             i.config_ip4()
361             i.resolve_arp()
362             i.config_ip6()
363             i.resolve_ndp()
364             i.configure_ipv4_neighbors()
365             i.configure_ipv6_neighbors()
366
367     def tearDown(self):
368         for translation in self.translations:
369             translation.remove_vpp_config()
370
371         self.vapi.cnat_session_purge()
372         self.assertFalse(self.vapi.cnat_session_dump())
373
374         for i in self.pg_interfaces:
375             i.unconfig_ip4()
376             i.unconfig_ip6()
377             i.admin_down()
378         super(TestCNatTranslation, self).tearDown()
379
380     def cnat_fhc_translation(self):
381         """CNat Translation"""
382         self.logger.info(self.vapi.cli("sh cnat client"))
383         self.logger.info(self.vapi.cli("sh cnat translation"))
384
385         for nbr, translation in enumerate(self.mbtranslations):
386             vip = translation.vip
387
388             #
389             # Flows to the VIP with same ips and different source ports are loadbalanced identically
390             # in both cases of flow hash 0x03 (src ip and dst ip) and 0x08 (dst port)
391             #
392             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
393             for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
394                 # from client to vip
395                 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
396                 dport1 = ctx.rxs[0][ctx.L4PROTO].dport
397                 ctx._test.assertIn(
398                     dport1,
399                     [translation.paths[0][DST].port, translation.paths[1][DST].port],
400                 )
401                 ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg1, nbr, dport1)
402
403                 ctx.cnat_send(
404                     self.pg0, src_pgi, sport + 122, self.pg1, vip.ip, vip.port
405                 )
406                 dport2 = ctx.rxs[0][ctx.L4PROTO].dport
407                 ctx._test.assertIn(
408                     dport2,
409                     [translation.paths[0][DST].port, translation.paths[1][DST].port],
410                 )
411                 ctx.cnat_expect(self.pg0, src_pgi, sport + 122, self.pg1, nbr, dport2)
412
413                 ctx._test.assertEqual(dport1, dport2)
414
415     def cnat_translation(self):
416         """CNat Translation"""
417         self.logger.info(self.vapi.cli("sh cnat client"))
418         self.logger.info(self.vapi.cli("sh cnat translation"))
419
420         for nbr, translation in enumerate(self.translations):
421             vip = translation.vip
422
423             #
424             # Test Flows to the VIP
425             #
426             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
427             for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
428                 # from client to vip
429                 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
430                 dst_port = translation.paths[0][DST].port
431                 ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg1, nbr, dst_port)
432                 # from vip to client
433                 ctx.cnat_send_return().cnat_expect_return()
434
435                 #
436                 # packets to the VIP that do not match a
437                 # translation are dropped
438                 #
439                 ctx.cnat_send(
440                     self.pg0, src_pgi, sport, self.pg1, vip.ip, 6666, no_replies=True
441                 )
442
443                 #
444                 # packets from the VIP that do not match a
445                 # session are forwarded
446                 #
447                 ctx.cnat_send(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
448                 ctx.cnat_expect(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
449
450             #
451             # modify the translation to use a different backend
452             #
453             old_dst_port = translation.paths[0][DST].port
454             translation.paths[0][DST].udpate(
455                 pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6
456             )
457             translation.add_vpp_config()
458
459             #
460             # existing flows follow the old path
461             #
462             for src_pgi in range(N_REMOTE_HOSTS):
463                 for sport in [1234, 1233]:
464                     # from client to vip
465                     ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
466                     ctx.cnat_expect(
467                         self.pg0, src_pgi, sport, self.pg1, nbr, old_dst_port
468                     )
469                     # from vip to client
470                     ctx.cnat_send_return().cnat_expect_return()
471
472             #
473             # new flows go to the new backend
474             #
475             for src_pgi in range(N_REMOTE_HOSTS):
476                 ctx.cnat_send(self.pg0, src_pgi, 9999, self.pg2, vip.ip, vip.port)
477                 ctx.cnat_expect(self.pg0, src_pgi, 9999, self.pg2, 0, 5000)
478
479             self.logger.info(self.vapi.cli("sh cnat session verbose"))
480
481         #
482         # turn the scanner back on and wait until the sessions
483         # all disapper
484         #
485         self.vapi.cli("test cnat scanner on")
486         self.virtual_sleep(2)
487         sessions = self.vapi.cnat_session_dump()
488         self.assertEqual(len(sessions), 0)
489         self.vapi.cli("test cnat scanner off")
490
491         #
492         # load some flows again and purge
493         #
494         for translation in self.translations:
495             vip = translation.vip
496             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
497             for src_pgi in range(N_REMOTE_HOSTS):
498                 for sport in [1234, 1233]:
499                     # from client to vip
500                     ctx.cnat_send(self.pg0, src_pgi, sport, self.pg2, vip.ip, vip.port)
501                     ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg2, 0, 5000)
502
503     def _test_icmp(self):
504         #
505         # Testing ICMP
506         #
507         for nbr, translation in enumerate(self.translations):
508             vip = translation.vip
509             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
510
511             #
512             # NATing ICMP errors
513             #
514             ctx.cnat_send(self.pg0, 0, 1234, self.pg1, vip.ip, vip.port)
515             dst_port = translation.paths[0][DST].port
516             ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, nbr, dst_port)
517             ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
518
519             #
520             # ICMP errors with no VIP associated should not be
521             # modified
522             #
523             ctx.cnat_send(self.pg0, 0, 1234, self.pg2, 0, vip.port)
524             dst_port = translation.paths[0][DST].port
525             ctx.cnat_expect(self.pg0, 0, 1234, self.pg2, 0, vip.port)
526             ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
527
528     def _make_multi_backend_translations(self):
529         self.translations = []
530         self.mbtranslations = []
531         self.mbtranslations.append(
532             Translation(
533                 self,
534                 TCP,
535                 Endpoint(ip="30.0.0.5", port=5555, is_v6=False),
536                 [
537                     (
538                         Endpoint(is_v6=False),
539                         Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
540                     ),
541                     (
542                         Endpoint(is_v6=False),
543                         Endpoint(pg=self.pg1, pgi=0, port=4005, is_v6=False),
544                     ),
545                 ],
546                 0x03,  # hash only on dst ip and src ip
547             ).add_vpp_config()
548         )
549         self.mbtranslations.append(
550             Translation(
551                 self,
552                 TCP,
553                 Endpoint(ip="30.0.0.6", port=5555, is_v6=False),
554                 [
555                     (
556                         Endpoint(is_v6=False),
557                         Endpoint(pg=self.pg1, pgi=1, port=4006, is_v6=False),
558                     ),
559                     (
560                         Endpoint(is_v6=False),
561                         Endpoint(pg=self.pg1, pgi=1, port=4007, is_v6=False),
562                     ),
563                 ],
564                 0x08,  # hash only on dst port
565             ).add_vpp_config()
566         )
567
568     def _make_translations_v4(self):
569         self.translations = []
570         self.translations.append(
571             Translation(
572                 self,
573                 TCP,
574                 Endpoint(ip="30.0.0.1", port=5555, is_v6=False),
575                 [
576                     (
577                         Endpoint(is_v6=False),
578                         Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
579                     )
580                 ],
581                 0x9F,
582             ).add_vpp_config()
583         )
584         self.translations.append(
585             Translation(
586                 self,
587                 TCP,
588                 Endpoint(ip="30.0.0.2", port=5554, is_v6=False),
589                 [
590                     (
591                         Endpoint(is_v6=False),
592                         Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False),
593                     )
594                 ],
595                 0x9F,
596             ).add_vpp_config()
597         )
598         self.translations.append(
599             Translation(
600                 self,
601                 UDP,
602                 Endpoint(ip="30.0.0.2", port=5553, is_v6=False),
603                 [
604                     (
605                         Endpoint(is_v6=False),
606                         Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False),
607                     )
608                 ],
609                 0x9F,
610             ).add_vpp_config()
611         )
612
613     def _make_translations_v6(self):
614         self.translations = []
615         self.translations.append(
616             Translation(
617                 self,
618                 TCP,
619                 Endpoint(ip="30::1", port=5555, is_v6=True),
620                 [
621                     (
622                         Endpoint(is_v6=True),
623                         Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True),
624                     )
625                 ],
626                 0x9F,
627             ).add_vpp_config()
628         )
629         self.translations.append(
630             Translation(
631                 self,
632                 TCP,
633                 Endpoint(ip="30::2", port=5554, is_v6=True),
634                 [
635                     (
636                         Endpoint(is_v6=True),
637                         Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True),
638                     )
639                 ],
640                 0x9F,
641             ).add_vpp_config()
642         )
643         self.translations.append(
644             Translation(
645                 self,
646                 UDP,
647                 Endpoint(ip="30::2", port=5553, is_v6=True),
648                 [
649                     (
650                         Endpoint(is_v6=True),
651                         Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True),
652                     )
653                 ],
654                 0x9F,
655             ).add_vpp_config()
656         )
657
658     def test_icmp4(self):
659         # """ CNat Translation icmp v4 """
660         self._make_translations_v4()
661         self._test_icmp()
662
663     def test_icmp6(self):
664         # """ CNat Translation icmp v6 """
665         self._make_translations_v6()
666         self._test_icmp()
667
668     def test_cnat6(self):
669         # """ CNat Translation ipv6 """
670         self._make_translations_v6()
671         self.cnat_translation()
672
673     def test_cnat4(self):
674         # """ CNat Translation ipv4 """
675         self._make_translations_v4()
676         self.cnat_translation()
677
678     def test_cnat_fhc(self):
679         # """ CNat Translation flow hash config """
680         self._make_multi_backend_translations()
681         self.cnat_fhc_translation()
682
683
684 @unittest.skipIf("cnat" in config.excluded_plugins, "Exclude CNAT plugin tests")
685 class TestCNatSourceNAT(CnatCommonTestCase):
686     """CNat Source NAT"""
687
688     @classmethod
689     def setUpClass(cls):
690         super(TestCNatSourceNAT, cls).setUpClass()
691
692     @classmethod
693     def tearDownClass(cls):
694         super(TestCNatSourceNAT, cls).tearDownClass()
695
696     def _enable_disable_snat(self, is_enable=True):
697         self.vapi.cnat_set_snat_addresses(
698             snat_ip4=self.pg2.remote_hosts[0].ip4,
699             snat_ip6=self.pg2.remote_hosts[0].ip6,
700             sw_if_index=INVALID_INDEX,
701         )
702         self.vapi.feature_enable_disable(
703             enable=1 if is_enable else 0,
704             arc_name="ip6-unicast",
705             feature_name="cnat-snat-ip6",
706             sw_if_index=self.pg0.sw_if_index,
707         )
708         self.vapi.feature_enable_disable(
709             enable=1 if is_enable else 0,
710             arc_name="ip4-unicast",
711             feature_name="cnat-snat-ip4",
712             sw_if_index=self.pg0.sw_if_index,
713         )
714
715         policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
716         self.vapi.cnat_set_snat_policy(
717             policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX
718         )
719         for i in self.pg_interfaces:
720             self.vapi.cnat_snat_policy_add_del_if(
721                 sw_if_index=i.sw_if_index,
722                 is_add=1 if is_enable else 0,
723                 table=policie_tbls.CNAT_POLICY_INCLUDE_V6,
724             )
725             self.vapi.cnat_snat_policy_add_del_if(
726                 sw_if_index=i.sw_if_index,
727                 is_add=1 if is_enable else 0,
728                 table=policie_tbls.CNAT_POLICY_INCLUDE_V4,
729             )
730
731     def setUp(self):
732         super(TestCNatSourceNAT, self).setUp()
733
734         self.create_pg_interfaces(range(3))
735         self.pg1.generate_remote_hosts(2)
736
737         for i in self.pg_interfaces:
738             i.admin_up()
739             i.config_ip4()
740             i.resolve_arp()
741             i.config_ip6()
742             i.resolve_ndp()
743             i.configure_ipv6_neighbors()
744             i.configure_ipv4_neighbors()
745
746         self._enable_disable_snat(is_enable=True)
747
748     def tearDown(self):
749         self._enable_disable_snat(is_enable=True)
750
751         self.vapi.cnat_session_purge()
752         for i in self.pg_interfaces:
753             i.unconfig_ip4()
754             i.unconfig_ip6()
755             i.admin_down()
756         super(TestCNatSourceNAT, self).tearDown()
757
758     def test_snat_v6(self):
759         # """ CNat Source Nat v6 """
760         self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True)
761         self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True)
762         self.sourcenat_test_icmp_echo_conf(is_v6=True)
763
764     def test_snat_v4(self):
765         # """ CNat Source Nat v4 """
766         self.sourcenat_test_tcp_udp_conf(TCP)
767         self.sourcenat_test_tcp_udp_conf(UDP)
768         self.sourcenat_test_icmp_echo_conf()
769
770     def sourcenat_test_icmp_echo_conf(self, is_v6=False):
771         ctx = CnatTestContext(self, ICMP, is_v6=is_v6)
772         # 8 is ICMP type echo (v4 only)
773         ctx.cnat_send(self.pg0, 0, 0xFEED, self.pg1, 0, 8)
774         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8)
775         ctx.cnat_send_return().cnat_expect_return()
776
777     def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False):
778         ctx = CnatTestContext(self, L4PROTO, is_v6)
779         # we should source NAT
780         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
781         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
782         ctx.cnat_send_return().cnat_expect_return()
783
784         # exclude dst address of pg1.1 from snat
785         if is_v6:
786             exclude_prefix = ip_network(
787                 "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False
788             )
789         else:
790             exclude_prefix = ip_network(
791                 "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False
792             )
793
794         # add remote host to exclude list
795         self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=1)
796
797         # We should not source NAT the id=1
798         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
799         ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, 1, 6661)
800         ctx.cnat_send_return().cnat_expect_return()
801
802         # But we should source NAT the id=0
803         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
804         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
805         ctx.cnat_send_return().cnat_expect_return()
806
807         # remove remote host from exclude list
808         self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=0)
809         self.vapi.cnat_session_purge()
810
811         # We should source NAT again
812         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
813         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
814         ctx.cnat_send_return().cnat_expect_return()
815
816         # test return ICMP error nating
817         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
818         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
819         ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
820
821         self.vapi.cnat_session_purge()
822
823
824 @unittest.skipIf("cnat" in config.excluded_plugins, "Exclude CNAT plugin tests")
825 class TestCNatDHCP(CnatCommonTestCase):
826     """CNat Translation"""
827
828     @classmethod
829     def setUpClass(cls):
830         super(TestCNatDHCP, cls).setUpClass()
831
832     @classmethod
833     def tearDownClass(cls):
834         super(TestCNatDHCP, cls).tearDownClass()
835
836     def tearDown(self):
837         for i in self.pg_interfaces:
838             i.admin_down()
839         super(TestCNatDHCP, self).tearDown()
840
841     def make_addr(self, sw_if_index, addr_id, is_v6):
842         if is_v6:
843             return "fd01:%x::%u" % (sw_if_index, addr_id + 1)
844         return "172.16.%u.%u" % (sw_if_index, addr_id)
845
846     def make_prefix(self, sw_if_index, addr_id, is_v6):
847         if is_v6:
848             return "%s/128" % self.make_addr(sw_if_index, addr_id, is_v6)
849         return "%s/32" % self.make_addr(sw_if_index, addr_id, is_v6)
850
851     def check_resolved(self, tr, addr_id, is_v6=False):
852         qt = tr.query_vpp_config()
853         self.assertEqual(
854             str(qt.vip.addr), self.make_addr(tr.vip.sw_if_index, addr_id, is_v6)
855         )
856         self.assertEqual(len(qt.paths), len(tr.paths))
857         for path_tr, path_qt in zip(tr.paths, qt.paths):
858             src_qt = path_qt.src_ep
859             dst_qt = path_qt.dst_ep
860             src_tr, dst_tr = path_tr
861             self.assertEqual(
862                 str(src_qt.addr), self.make_addr(src_tr.sw_if_index, addr_id, is_v6)
863             )
864             self.assertEqual(
865                 str(dst_qt.addr), self.make_addr(dst_tr.sw_if_index, addr_id, is_v6)
866             )
867
868     def add_del_address(self, pg, addr_id, is_add=True, is_v6=False):
869         self.vapi.sw_interface_add_del_address(
870             sw_if_index=pg.sw_if_index,
871             prefix=self.make_prefix(pg.sw_if_index, addr_id, is_v6),
872             is_add=1 if is_add else 0,
873         )
874
875     def _test_dhcp_v46(self, is_v6):
876         self.create_pg_interfaces(range(4))
877         for i in self.pg_interfaces:
878             i.admin_up()
879         paths = [
880             (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg2, is_v6=is_v6)),
881             (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg3, is_v6=is_v6)),
882         ]
883         ep = Endpoint(pg=self.pg0, is_v6=is_v6)
884         t = Translation(self, TCP, ep, paths, 0x9F).add_vpp_config()
885         # Add an address on every interface
886         # and check it is reflected in the cnat config
887         for pg in self.pg_interfaces:
888             self.add_del_address(pg, addr_id=0, is_add=True, is_v6=is_v6)
889         self.check_resolved(t, addr_id=0, is_v6=is_v6)
890         # Add a new address on every interface, remove the old one
891         # and check it is reflected in the cnat config
892         for pg in self.pg_interfaces:
893             self.add_del_address(pg, addr_id=1, is_add=True, is_v6=is_v6)
894             self.add_del_address(pg, addr_id=0, is_add=False, is_v6=is_v6)
895         self.check_resolved(t, addr_id=1, is_v6=is_v6)
896         # remove the configuration
897         for pg in self.pg_interfaces:
898             self.add_del_address(pg, addr_id=1, is_add=False, is_v6=is_v6)
899         t.remove_vpp_config()
900
901     def test_dhcp_v4(self):
902         self._test_dhcp_v46(False)
903
904     def test_dhcp_v6(self):
905         self._test_dhcp_v46(True)
906
907     def test_dhcp_snat(self):
908         self.create_pg_interfaces(range(1))
909         for i in self.pg_interfaces:
910             i.admin_up()
911         self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
912         # Add an address on every interface
913         # and check it is reflected in the cnat config
914         for pg in self.pg_interfaces:
915             self.add_del_address(pg, addr_id=0, is_add=True, is_v6=False)
916             self.add_del_address(pg, addr_id=0, is_add=True, is_v6=True)
917         r = self.vapi.cnat_get_snat_addresses()
918         self.assertEqual(
919             str(r.snat_ip4),
920             self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=False),
921         )
922         self.assertEqual(
923             str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=True)
924         )
925         # Add a new address on every interface, remove the old one
926         # and check it is reflected in the cnat config
927         for pg in self.pg_interfaces:
928             self.add_del_address(pg, addr_id=1, is_add=True, is_v6=False)
929             self.add_del_address(pg, addr_id=1, is_add=True, is_v6=True)
930             self.add_del_address(pg, addr_id=0, is_add=False, is_v6=False)
931             self.add_del_address(pg, addr_id=0, is_add=False, is_v6=True)
932         r = self.vapi.cnat_get_snat_addresses()
933         self.assertEqual(
934             str(r.snat_ip4),
935             self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=False),
936         )
937         self.assertEqual(
938             str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=True)
939         )
940         # remove the configuration
941         for pg in self.pg_interfaces:
942             self.add_del_address(pg, addr_id=1, is_add=False, is_v6=False)
943             self.add_del_address(pg, addr_id=1, is_add=False, is_v6=True)
944         self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
945
946
947 if __name__ == "__main__":
948     unittest.main(testRunner=VppTestRunner)