cnat: add flow hash config to cnat translation
[vpp.git] / test / test_cnat.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto, INVALID_INDEX
7 from itertools import product
8
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
14 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
15
16 import struct
17
18 from ipaddress import (
19     ip_address,
20     ip_network,
21     IPv4Address,
22     IPv6Address,
23     IPv4Network,
24     IPv6Network,
25 )
26
27 from vpp_object import VppObject
28 from vpp_papi import VppEnum
29
30 N_PKTS = 15
31 N_REMOTE_HOSTS = 3
32
33 SRC = 0
34 DST = 1
35
36
37 class CnatCommonTestCase(VppTestCase):
38     """CNat common test class"""
39
40     #
41     # turn the scanner off whilst testing otherwise sessions
42     # will time out
43     #
44     extra_vpp_config = [
45         "cnat",
46         "{",
47         "session-db-buckets",
48         "64",
49         "session-cleanup-timeout",
50         "0.1",
51         "session-max-age",
52         "1",
53         "tcp-max-age",
54         "1",
55         "scanner",
56         "off",
57         "}",
58     ]
59
60     @classmethod
61     def setUpClass(cls):
62         super(CnatCommonTestCase, cls).setUpClass()
63
64     @classmethod
65     def tearDownClass(cls):
66         super(CnatCommonTestCase, cls).tearDownClass()
67
68
69 class Endpoint(object):
70     """CNat endpoint"""
71
72     def __init__(self, pg=None, pgi=None, port=0, is_v6=False, ip=None):
73         self.port = port
74         self.is_v6 = is_v6
75         self.sw_if_index = INVALID_INDEX
76         if pg is not None and pgi is not None:
77             # pg interface specified and remote index
78             self.ip = self.get_ip46(pg.remote_hosts[pgi])
79         elif pg is not None:
80             self.ip = None
81             self.sw_if_index = pg.sw_if_index
82         elif ip is not None:
83             self.ip = ip
84         else:
85             self.ip = "::" if self.is_v6 else "0.0.0.0"
86
87     def get_ip46(self, obj):
88         if self.is_v6:
89             return obj.ip6
90         return obj.ip4
91
92     def udpate(self, **kwargs):
93         self.__init__(**kwargs)
94
95     def _vpp_if_af(self):
96         if self.is_v6:
97             return VppEnum.vl_api_address_family_t.ADDRESS_IP6
98         return VppEnum.vl_api_address_family_t.ADDRESS_IP4
99
100     def encode(self):
101         return {
102             "addr": self.ip,
103             "port": self.port,
104             "sw_if_index": self.sw_if_index,
105             "if_af": self._vpp_if_af(),
106         }
107
108     def __str__(self):
109         return "%s:%d" % (self.ip, self.port)
110
111
112 class Translation(VppObject):
113     def __init__(self, test, iproto, vip, paths, fhc):
114         self._test = test
115         self.vip = vip
116         self.iproto = iproto
117         self.paths = paths
118         self.fhc = fhc
119         self.id = None
120
121     def __str__(self):
122         return "%s %s %s" % (self.vip, self.iproto, self.paths)
123
124     def _vl4_proto(self):
125         ip_proto = VppEnum.vl_api_ip_proto_t
126         return {
127             UDP: ip_proto.IP_API_PROTO_UDP,
128             TCP: ip_proto.IP_API_PROTO_TCP,
129         }[self.iproto]
130
131     def _encoded_paths(self):
132         return [
133             {"src_ep": src.encode(), "dst_ep": dst.encode()}
134             for (src, dst) in self.paths
135         ]
136
137     def add_vpp_config(self):
138         r = self._test.vapi.cnat_translation_update(
139             {
140                 "vip": self.vip.encode(),
141                 "ip_proto": self._vl4_proto(),
142                 "n_paths": len(self.paths),
143                 "paths": self._encoded_paths(),
144                 "flow_hash_config": self.fhc,
145             }
146         )
147         self._test.registry.register(self, self._test.logger)
148         self.id = r.id
149         return self
150
151     def remove_vpp_config(self):
152         assert self.id is not None
153         self._test.vapi.cnat_translation_del(id=self.id)
154         return self
155
156     def query_vpp_config(self):
157         for t in self._test.vapi.cnat_translation_dump():
158             if self.id == t.translation.id:
159                 return t.translation
160         return None
161
162
163 class CnatTestContext(object):
164     """
165     Usage :
166
167     ctx = CnatTestContext(self, TCP, is_v6=True)
168
169     # send pg0.remote[0]:1234 -> pg1.remote[0]:6661
170     ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
171
172     # We expect this to be NATed as
173     # pg2.remote[0]:<anyport> -> pg1.remote[0]:6661
174     ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
175
176     # After running cnat_expect, we can send back the received packet
177     # and expect it be 'unnated' so that we get the original packet
178     ctx.cnat_send_return().cnat_expect_return()
179
180     # same thing for ICMP errors
181     ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
182     """
183
184     def __init__(self, test, L4PROTO, is_v6):
185         self.L4PROTO = L4PROTO
186         self.is_v6 = is_v6
187         self._test = test
188
189     def get_ip46(self, obj):
190         if self.is_v6:
191             return obj.ip6
192         return obj.ip4
193
194     @property
195     def IP46(self):
196         return IPv6 if self.is_v6 else IP
197
198     def cnat_send(
199         self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port, no_replies=False
200     ):
201         if isinstance(src_id, int):
202             self.src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
203         else:
204             self.dst_addr = src_id
205         if isinstance(dst_id, int):
206             self.dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
207         else:
208             self.dst_addr = dst_id
209         self.src_port = src_port  # also ICMP id
210         self.dst_port = dst_port  # also ICMP type
211
212         if self.L4PROTO in [TCP, UDP]:
213             l4 = self.L4PROTO(sport=self.src_port, dport=self.dst_port)
214         elif self.L4PROTO in [ICMP] and not self.is_v6:
215             l4 = self.L4PROTO(id=self.src_port, type=self.dst_port)
216         elif self.L4PROTO in [ICMP] and self.is_v6:
217             l4 = ICMPv6EchoRequest(id=self.src_port)
218         p1 = (
219             Ether(src=src_pg.remote_mac, dst=src_pg.local_mac)
220             / self.IP46(src=self.src_addr, dst=self.dst_addr)
221             / l4
222             / Raw()
223         )
224
225         if no_replies:
226             self._test.send_and_assert_no_replies(src_pg, p1 * N_PKTS, dst_pg)
227         else:
228             self.rxs = self._test.send_and_expect(src_pg, p1 * N_PKTS, dst_pg)
229         self.expected_src_pg = src_pg
230         self.expected_dst_pg = dst_pg
231         return self
232
233     def cnat_expect(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port):
234         if isinstance(src_id, int):
235             self.expect_src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
236         else:
237             self.expect_src_addr = src_id
238         if isinstance(dst_id, int):
239             self.expect_dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
240         else:
241             self.expect_dst_addr = dst_id
242         self.expect_src_port = src_port
243         self.expect_dst_port = dst_port
244
245         if self.expect_src_port is None:
246             if self.L4PROTO in [TCP, UDP]:
247                 self.expect_src_port = self.rxs[0][self.L4PROTO].sport
248             elif self.L4PROTO in [ICMP] and not self.is_v6:
249                 self.expect_src_port = self.rxs[0][self.L4PROTO].id
250             elif self.L4PROTO in [ICMP] and self.is_v6:
251                 self.expect_src_port = self.rxs[0][ICMPv6EchoRequest].id
252
253         for rx in self.rxs:
254             self._test.assert_packet_checksums_valid(rx)
255             self._test.assertEqual(rx[self.IP46].dst, self.expect_dst_addr)
256             self._test.assertEqual(rx[self.IP46].src, self.expect_src_addr)
257             if self.L4PROTO in [TCP, UDP]:
258                 self._test.assertEqual(rx[self.L4PROTO].dport, self.expect_dst_port)
259                 self._test.assertEqual(rx[self.L4PROTO].sport, self.expect_src_port)
260             elif self.L4PROTO in [ICMP] and not self.is_v6:
261                 self._test.assertEqual(rx[self.L4PROTO].type, self.expect_dst_port)
262                 self._test.assertEqual(rx[self.L4PROTO].id, self.expect_src_port)
263             elif self.L4PROTO in [ICMP] and self.is_v6:
264                 self._test.assertEqual(rx[ICMPv6EchoRequest].id, self.expect_src_port)
265         return self
266
267     def cnat_send_return(self):
268         """This sends the return traffic"""
269         if self.L4PROTO in [TCP, UDP]:
270             l4 = self.L4PROTO(sport=self.expect_dst_port, dport=self.expect_src_port)
271         elif self.L4PROTO in [ICMP] and not self.is_v6:
272             # icmp type 0 if echo reply
273             l4 = self.L4PROTO(id=self.expect_src_port, type=0)
274         elif self.L4PROTO in [ICMP] and self.is_v6:
275             l4 = ICMPv6EchoReply(id=self.expect_src_port)
276         src_mac = self.expected_dst_pg.remote_mac
277         p1 = (
278             Ether(src=src_mac, dst=self.expected_dst_pg.local_mac)
279             / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
280             / l4
281             / Raw()
282         )
283
284         self.return_rxs = self._test.send_and_expect(
285             self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
286         )
287         return self
288
289     def cnat_expect_return(self):
290         for rx in self.return_rxs:
291             self._test.assert_packet_checksums_valid(rx)
292             self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
293             self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
294             if self.L4PROTO in [TCP, UDP]:
295                 self._test.assertEqual(rx[self.L4PROTO].dport, self.src_port)
296                 self._test.assertEqual(rx[self.L4PROTO].sport, self.dst_port)
297             elif self.L4PROTO in [ICMP] and not self.is_v6:
298                 # icmp type 0 if echo reply
299                 self._test.assertEqual(rx[self.L4PROTO].type, 0)
300                 self._test.assertEqual(rx[self.L4PROTO].id, self.src_port)
301             elif self.L4PROTO in [ICMP] and self.is_v6:
302                 self._test.assertEqual(rx[ICMPv6EchoReply].id, self.src_port)
303         return self
304
305     def cnat_send_icmp_return_error(self):
306         """
307         This called after cnat_expect will send an icmp error
308         on the reverse path
309         """
310         ICMPelem = ICMPv6DestUnreach(code=1) if self.is_v6 else ICMP(type=11)
311         InnerIP = self.rxs[0][self.IP46]
312         p1 = (
313             Ether(
314                 src=self.expected_dst_pg.remote_mac, dst=self.expected_dst_pg.local_mac
315             )
316             / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
317             / ICMPelem
318             / InnerIP
319         )
320         self.return_rxs = self._test.send_and_expect(
321             self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
322         )
323         return self
324
325     def cnat_expect_icmp_error_return(self):
326         ICMP46 = ICMPv6DestUnreach if self.is_v6 else ICMP
327         IP46err = IPerror6 if self.is_v6 else IPerror
328         L4err = TCPerror if self.L4PROTO is TCP else UDPerror
329         for rx in self.return_rxs:
330             self._test.assert_packet_checksums_valid(rx)
331             self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
332             self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
333             self._test.assertEqual(rx[ICMP46][IP46err].src, self.src_addr)
334             self._test.assertEqual(rx[ICMP46][IP46err].dst, self.dst_addr)
335             self._test.assertEqual(rx[ICMP46][IP46err][L4err].sport, self.src_port)
336             self._test.assertEqual(rx[ICMP46][IP46err][L4err].dport, self.dst_port)
337         return self
338
339
340 # -------------------------------------------------------------------
341 # -------------------------------------------------------------------
342 # -------------------------------------------------------------------
343 # -------------------------------------------------------------------
344
345
346 class TestCNatTranslation(CnatCommonTestCase):
347     """CNat Translation"""
348
349     @classmethod
350     def setUpClass(cls):
351         super(TestCNatTranslation, cls).setUpClass()
352
353     @classmethod
354     def tearDownClass(cls):
355         super(TestCNatTranslation, cls).tearDownClass()
356
357     def setUp(self):
358         super(TestCNatTranslation, self).setUp()
359
360         self.create_pg_interfaces(range(3))
361         self.pg0.generate_remote_hosts(N_REMOTE_HOSTS)
362         self.pg1.generate_remote_hosts(N_REMOTE_HOSTS)
363
364         for i in self.pg_interfaces:
365             i.admin_up()
366             i.config_ip4()
367             i.resolve_arp()
368             i.config_ip6()
369             i.resolve_ndp()
370             i.configure_ipv4_neighbors()
371             i.configure_ipv6_neighbors()
372
373     def tearDown(self):
374         for translation in self.translations:
375             translation.remove_vpp_config()
376
377         self.vapi.cnat_session_purge()
378         self.assertFalse(self.vapi.cnat_session_dump())
379
380         for i in self.pg_interfaces:
381             i.unconfig_ip4()
382             i.unconfig_ip6()
383             i.admin_down()
384         super(TestCNatTranslation, self).tearDown()
385
386     def cnat_fhc_translation(self):
387         """CNat Translation"""
388         self.logger.info(self.vapi.cli("sh cnat client"))
389         self.logger.info(self.vapi.cli("sh cnat translation"))
390
391         for nbr, translation in enumerate(self.mbtranslations):
392             vip = translation.vip
393
394             #
395             # Flows to the VIP with same ips and different source ports are loadbalanced identically
396             # in both cases of flow hash 0x03 (src ip and dst ip) and 0x08 (dst port)
397             #
398             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
399             for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
400                 # from client to vip
401                 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
402                 dport1 = ctx.rxs[0][ctx.L4PROTO].dport
403                 ctx._test.assertIn(
404                     dport1,
405                     [translation.paths[0][DST].port, translation.paths[1][DST].port],
406                 )
407                 ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg1, nbr, dport1)
408
409                 ctx.cnat_send(
410                     self.pg0, src_pgi, sport + 122, self.pg1, vip.ip, vip.port
411                 )
412                 dport2 = ctx.rxs[0][ctx.L4PROTO].dport
413                 ctx._test.assertIn(
414                     dport2,
415                     [translation.paths[0][DST].port, translation.paths[1][DST].port],
416                 )
417                 ctx.cnat_expect(self.pg0, src_pgi, sport + 122, self.pg1, nbr, dport2)
418
419                 ctx._test.assertEqual(dport1, dport2)
420
421     def cnat_translation(self):
422         """CNat Translation"""
423         self.logger.info(self.vapi.cli("sh cnat client"))
424         self.logger.info(self.vapi.cli("sh cnat translation"))
425
426         for nbr, translation in enumerate(self.translations):
427             vip = translation.vip
428
429             #
430             # Test Flows to the VIP
431             #
432             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
433             for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
434                 # from client to vip
435                 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
436                 dst_port = translation.paths[0][DST].port
437                 ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg1, nbr, dst_port)
438                 # from vip to client
439                 ctx.cnat_send_return().cnat_expect_return()
440
441                 #
442                 # packets to the VIP that do not match a
443                 # translation are dropped
444                 #
445                 ctx.cnat_send(
446                     self.pg0, src_pgi, sport, self.pg1, vip.ip, 6666, no_replies=True
447                 )
448
449                 #
450                 # packets from the VIP that do not match a
451                 # session are forwarded
452                 #
453                 ctx.cnat_send(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
454                 ctx.cnat_expect(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
455
456             #
457             # modify the translation to use a different backend
458             #
459             old_dst_port = translation.paths[0][DST].port
460             translation.paths[0][DST].udpate(
461                 pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6
462             )
463             translation.add_vpp_config()
464
465             #
466             # existing flows follow the old path
467             #
468             for src_pgi in range(N_REMOTE_HOSTS):
469                 for sport in [1234, 1233]:
470                     # from client to vip
471                     ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
472                     ctx.cnat_expect(
473                         self.pg0, src_pgi, sport, self.pg1, nbr, old_dst_port
474                     )
475                     # from vip to client
476                     ctx.cnat_send_return().cnat_expect_return()
477
478             #
479             # new flows go to the new backend
480             #
481             for src_pgi in range(N_REMOTE_HOSTS):
482                 ctx.cnat_send(self.pg0, src_pgi, 9999, self.pg2, vip.ip, vip.port)
483                 ctx.cnat_expect(self.pg0, src_pgi, 9999, self.pg2, 0, 5000)
484
485             self.logger.info(self.vapi.cli("sh cnat session verbose"))
486
487         #
488         # turn the scanner back on and wait until the sessions
489         # all disapper
490         #
491         self.vapi.cli("test cnat scanner on")
492         self.virtual_sleep(2)
493         sessions = self.vapi.cnat_session_dump()
494         self.assertEqual(len(sessions), 0)
495         self.vapi.cli("test cnat scanner off")
496
497         #
498         # load some flows again and purge
499         #
500         for translation in self.translations:
501             vip = translation.vip
502             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
503             for src_pgi in range(N_REMOTE_HOSTS):
504                 for sport in [1234, 1233]:
505                     # from client to vip
506                     ctx.cnat_send(self.pg0, src_pgi, sport, self.pg2, vip.ip, vip.port)
507                     ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg2, 0, 5000)
508
509     def _test_icmp(self):
510         #
511         # Testing ICMP
512         #
513         for nbr, translation in enumerate(self.translations):
514             vip = translation.vip
515             ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
516
517             #
518             # NATing ICMP errors
519             #
520             ctx.cnat_send(self.pg0, 0, 1234, self.pg1, vip.ip, vip.port)
521             dst_port = translation.paths[0][DST].port
522             ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, nbr, dst_port)
523             ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
524
525             #
526             # ICMP errors with no VIP associated should not be
527             # modified
528             #
529             ctx.cnat_send(self.pg0, 0, 1234, self.pg2, 0, vip.port)
530             dst_port = translation.paths[0][DST].port
531             ctx.cnat_expect(self.pg0, 0, 1234, self.pg2, 0, vip.port)
532             ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
533
534     def _make_multi_backend_translations(self):
535         self.translations = []
536         self.mbtranslations = []
537         self.mbtranslations.append(
538             Translation(
539                 self,
540                 TCP,
541                 Endpoint(ip="30.0.0.5", port=5555, is_v6=False),
542                 [
543                     (
544                         Endpoint(is_v6=False),
545                         Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
546                     ),
547                     (
548                         Endpoint(is_v6=False),
549                         Endpoint(pg=self.pg1, pgi=0, port=4005, is_v6=False),
550                     ),
551                 ],
552                 0x03,  # hash only on dst ip and src ip
553             ).add_vpp_config()
554         )
555         self.mbtranslations.append(
556             Translation(
557                 self,
558                 TCP,
559                 Endpoint(ip="30.0.0.6", port=5555, is_v6=False),
560                 [
561                     (
562                         Endpoint(is_v6=False),
563                         Endpoint(pg=self.pg1, pgi=1, port=4006, is_v6=False),
564                     ),
565                     (
566                         Endpoint(is_v6=False),
567                         Endpoint(pg=self.pg1, pgi=1, port=4007, is_v6=False),
568                     ),
569                 ],
570                 0x08,  # hash only on dst port
571             ).add_vpp_config()
572         )
573
574     def _make_translations_v4(self):
575         self.translations = []
576         self.translations.append(
577             Translation(
578                 self,
579                 TCP,
580                 Endpoint(ip="30.0.0.1", port=5555, is_v6=False),
581                 [
582                     (
583                         Endpoint(is_v6=False),
584                         Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
585                     )
586                 ],
587                 0x9F,
588             ).add_vpp_config()
589         )
590         self.translations.append(
591             Translation(
592                 self,
593                 TCP,
594                 Endpoint(ip="30.0.0.2", port=5554, is_v6=False),
595                 [
596                     (
597                         Endpoint(is_v6=False),
598                         Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False),
599                     )
600                 ],
601                 0x9F,
602             ).add_vpp_config()
603         )
604         self.translations.append(
605             Translation(
606                 self,
607                 UDP,
608                 Endpoint(ip="30.0.0.2", port=5553, is_v6=False),
609                 [
610                     (
611                         Endpoint(is_v6=False),
612                         Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False),
613                     )
614                 ],
615                 0x9F,
616             ).add_vpp_config()
617         )
618
619     def _make_translations_v6(self):
620         self.translations = []
621         self.translations.append(
622             Translation(
623                 self,
624                 TCP,
625                 Endpoint(ip="30::1", port=5555, is_v6=True),
626                 [
627                     (
628                         Endpoint(is_v6=True),
629                         Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True),
630                     )
631                 ],
632                 0x9F,
633             ).add_vpp_config()
634         )
635         self.translations.append(
636             Translation(
637                 self,
638                 TCP,
639                 Endpoint(ip="30::2", port=5554, is_v6=True),
640                 [
641                     (
642                         Endpoint(is_v6=True),
643                         Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True),
644                     )
645                 ],
646                 0x9F,
647             ).add_vpp_config()
648         )
649         self.translations.append(
650             Translation(
651                 self,
652                 UDP,
653                 Endpoint(ip="30::2", port=5553, is_v6=True),
654                 [
655                     (
656                         Endpoint(is_v6=True),
657                         Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True),
658                     )
659                 ],
660                 0x9F,
661             ).add_vpp_config()
662         )
663
664     def test_icmp4(self):
665         # """ CNat Translation icmp v4 """
666         self._make_translations_v4()
667         self._test_icmp()
668
669     def test_icmp6(self):
670         # """ CNat Translation icmp v6 """
671         self._make_translations_v6()
672         self._test_icmp()
673
674     def test_cnat6(self):
675         # """ CNat Translation ipv6 """
676         self._make_translations_v6()
677         self.cnat_translation()
678
679     def test_cnat4(self):
680         # """ CNat Translation ipv4 """
681         self._make_translations_v4()
682         self.cnat_translation()
683
684     def test_cnat_fhc(self):
685         # """ CNat Translation flow hash config """
686         self._make_multi_backend_translations()
687         self.cnat_fhc_translation()
688
689
690 class TestCNatSourceNAT(CnatCommonTestCase):
691     """CNat Source NAT"""
692
693     @classmethod
694     def setUpClass(cls):
695         super(TestCNatSourceNAT, cls).setUpClass()
696
697     @classmethod
698     def tearDownClass(cls):
699         super(TestCNatSourceNAT, cls).tearDownClass()
700
701     def _enable_disable_snat(self, is_enable=True):
702         self.vapi.cnat_set_snat_addresses(
703             snat_ip4=self.pg2.remote_hosts[0].ip4,
704             snat_ip6=self.pg2.remote_hosts[0].ip6,
705             sw_if_index=INVALID_INDEX,
706         )
707         self.vapi.feature_enable_disable(
708             enable=1 if is_enable else 0,
709             arc_name="ip6-unicast",
710             feature_name="cnat-snat-ip6",
711             sw_if_index=self.pg0.sw_if_index,
712         )
713         self.vapi.feature_enable_disable(
714             enable=1 if is_enable else 0,
715             arc_name="ip4-unicast",
716             feature_name="cnat-snat-ip4",
717             sw_if_index=self.pg0.sw_if_index,
718         )
719
720         policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
721         self.vapi.cnat_set_snat_policy(
722             policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX
723         )
724         for i in self.pg_interfaces:
725             self.vapi.cnat_snat_policy_add_del_if(
726                 sw_if_index=i.sw_if_index,
727                 is_add=1 if is_enable else 0,
728                 table=policie_tbls.CNAT_POLICY_INCLUDE_V6,
729             )
730             self.vapi.cnat_snat_policy_add_del_if(
731                 sw_if_index=i.sw_if_index,
732                 is_add=1 if is_enable else 0,
733                 table=policie_tbls.CNAT_POLICY_INCLUDE_V4,
734             )
735
736     def setUp(self):
737         super(TestCNatSourceNAT, self).setUp()
738
739         self.create_pg_interfaces(range(3))
740         self.pg1.generate_remote_hosts(2)
741
742         for i in self.pg_interfaces:
743             i.admin_up()
744             i.config_ip4()
745             i.resolve_arp()
746             i.config_ip6()
747             i.resolve_ndp()
748             i.configure_ipv6_neighbors()
749             i.configure_ipv4_neighbors()
750
751         self._enable_disable_snat(is_enable=True)
752
753     def tearDown(self):
754         self._enable_disable_snat(is_enable=True)
755
756         self.vapi.cnat_session_purge()
757         for i in self.pg_interfaces:
758             i.unconfig_ip4()
759             i.unconfig_ip6()
760             i.admin_down()
761         super(TestCNatSourceNAT, self).tearDown()
762
763     def test_snat_v6(self):
764         # """ CNat Source Nat v6 """
765         self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True)
766         self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True)
767         self.sourcenat_test_icmp_echo_conf(is_v6=True)
768
769     def test_snat_v4(self):
770         # """ CNat Source Nat v4 """
771         self.sourcenat_test_tcp_udp_conf(TCP)
772         self.sourcenat_test_tcp_udp_conf(UDP)
773         self.sourcenat_test_icmp_echo_conf()
774
775     def sourcenat_test_icmp_echo_conf(self, is_v6=False):
776         ctx = CnatTestContext(self, ICMP, is_v6=is_v6)
777         # 8 is ICMP type echo (v4 only)
778         ctx.cnat_send(self.pg0, 0, 0xFEED, self.pg1, 0, 8)
779         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8)
780         ctx.cnat_send_return().cnat_expect_return()
781
782     def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False):
783         ctx = CnatTestContext(self, L4PROTO, is_v6)
784         # we should source NAT
785         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
786         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
787         ctx.cnat_send_return().cnat_expect_return()
788
789         # exclude dst address of pg1.1 from snat
790         if is_v6:
791             exclude_prefix = ip_network(
792                 "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False
793             )
794         else:
795             exclude_prefix = ip_network(
796                 "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False
797             )
798
799         # add remote host to exclude list
800         self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=1)
801
802         # We should not source NAT the id=1
803         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
804         ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, 1, 6661)
805         ctx.cnat_send_return().cnat_expect_return()
806
807         # But we should source NAT the id=0
808         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
809         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
810         ctx.cnat_send_return().cnat_expect_return()
811
812         # remove remote host from exclude list
813         self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=0)
814         self.vapi.cnat_session_purge()
815
816         # We should source NAT again
817         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
818         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
819         ctx.cnat_send_return().cnat_expect_return()
820
821         # test return ICMP error nating
822         ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
823         ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
824         ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
825
826         self.vapi.cnat_session_purge()
827
828
829 class TestCNatDHCP(CnatCommonTestCase):
830     """CNat Translation"""
831
832     @classmethod
833     def setUpClass(cls):
834         super(TestCNatDHCP, cls).setUpClass()
835
836     @classmethod
837     def tearDownClass(cls):
838         super(TestCNatDHCP, cls).tearDownClass()
839
840     def tearDown(self):
841         for i in self.pg_interfaces:
842             i.admin_down()
843         super(TestCNatDHCP, self).tearDown()
844
845     def make_addr(self, sw_if_index, addr_id, is_v6):
846         if is_v6:
847             return "fd01:%x::%u" % (sw_if_index, addr_id + 1)
848         return "172.16.%u.%u" % (sw_if_index, addr_id)
849
850     def make_prefix(self, sw_if_index, addr_id, is_v6):
851         if is_v6:
852             return "%s/128" % self.make_addr(sw_if_index, addr_id, is_v6)
853         return "%s/32" % self.make_addr(sw_if_index, addr_id, is_v6)
854
855     def check_resolved(self, tr, addr_id, is_v6=False):
856         qt = tr.query_vpp_config()
857         self.assertEqual(
858             str(qt.vip.addr), self.make_addr(tr.vip.sw_if_index, addr_id, is_v6)
859         )
860         self.assertEqual(len(qt.paths), len(tr.paths))
861         for path_tr, path_qt in zip(tr.paths, qt.paths):
862             src_qt = path_qt.src_ep
863             dst_qt = path_qt.dst_ep
864             src_tr, dst_tr = path_tr
865             self.assertEqual(
866                 str(src_qt.addr), self.make_addr(src_tr.sw_if_index, addr_id, is_v6)
867             )
868             self.assertEqual(
869                 str(dst_qt.addr), self.make_addr(dst_tr.sw_if_index, addr_id, is_v6)
870             )
871
872     def add_del_address(self, pg, addr_id, is_add=True, is_v6=False):
873         self.vapi.sw_interface_add_del_address(
874             sw_if_index=pg.sw_if_index,
875             prefix=self.make_prefix(pg.sw_if_index, addr_id, is_v6),
876             is_add=1 if is_add else 0,
877         )
878
879     def _test_dhcp_v46(self, is_v6):
880         self.create_pg_interfaces(range(4))
881         for i in self.pg_interfaces:
882             i.admin_up()
883         paths = [
884             (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg2, is_v6=is_v6)),
885             (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg3, is_v6=is_v6)),
886         ]
887         ep = Endpoint(pg=self.pg0, is_v6=is_v6)
888         t = Translation(self, TCP, ep, paths, 0x9F).add_vpp_config()
889         # Add an address on every interface
890         # and check it is reflected in the cnat config
891         for pg in self.pg_interfaces:
892             self.add_del_address(pg, addr_id=0, is_add=True, is_v6=is_v6)
893         self.check_resolved(t, addr_id=0, is_v6=is_v6)
894         # Add a new address on every interface, remove the old one
895         # and check it is reflected in the cnat config
896         for pg in self.pg_interfaces:
897             self.add_del_address(pg, addr_id=1, is_add=True, is_v6=is_v6)
898             self.add_del_address(pg, addr_id=0, is_add=False, is_v6=is_v6)
899         self.check_resolved(t, addr_id=1, is_v6=is_v6)
900         # remove the configuration
901         for pg in self.pg_interfaces:
902             self.add_del_address(pg, addr_id=1, is_add=False, is_v6=is_v6)
903         t.remove_vpp_config()
904
905     def test_dhcp_v4(self):
906         self._test_dhcp_v46(False)
907
908     def test_dhcp_v6(self):
909         self._test_dhcp_v46(True)
910
911     def test_dhcp_snat(self):
912         self.create_pg_interfaces(range(1))
913         for i in self.pg_interfaces:
914             i.admin_up()
915         self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
916         # Add an address on every interface
917         # and check it is reflected in the cnat config
918         for pg in self.pg_interfaces:
919             self.add_del_address(pg, addr_id=0, is_add=True, is_v6=False)
920             self.add_del_address(pg, addr_id=0, is_add=True, is_v6=True)
921         r = self.vapi.cnat_get_snat_addresses()
922         self.assertEqual(
923             str(r.snat_ip4),
924             self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=False),
925         )
926         self.assertEqual(
927             str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=True)
928         )
929         # Add a new address on every interface, remove the old one
930         # and check it is reflected in the cnat config
931         for pg in self.pg_interfaces:
932             self.add_del_address(pg, addr_id=1, is_add=True, is_v6=False)
933             self.add_del_address(pg, addr_id=1, is_add=True, is_v6=True)
934             self.add_del_address(pg, addr_id=0, is_add=False, is_v6=False)
935             self.add_del_address(pg, addr_id=0, is_add=False, is_v6=True)
936         r = self.vapi.cnat_get_snat_addresses()
937         self.assertEqual(
938             str(r.snat_ip4),
939             self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=False),
940         )
941         self.assertEqual(
942             str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=True)
943         )
944         # remove the configuration
945         for pg in self.pg_interfaces:
946             self.add_del_address(pg, addr_id=1, is_add=False, is_v6=False)
947             self.add_del_address(pg, addr_id=1, is_add=False, is_v6=True)
948         self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
949
950
951 if __name__ == "__main__":
952     unittest.main(testRunner=VppTestRunner)