cnat: Prepare extended snat policies
[vpp.git] / src / plugins / cnat / test / test_cnat.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto, INVALID_INDEX
7 from itertools import product
8
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
14 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
15
16 import struct
17
18 from ipaddress import ip_address, ip_network, \
19     IPv4Address, IPv6Address, IPv4Network, IPv6Network
20
21 from vpp_object import VppObject
22 from vpp_papi import VppEnum
23
24 N_PKTS = 15
25
26
27 class Ep(object):
28     """ CNat endpoint """
29
30     def __init__(self, ip=None, port=0, l4p=TCP,
31                  sw_if_index=INVALID_INDEX, is_v6=False):
32         self.ip = ip
33         if ip is None:
34             self.ip = "::" if is_v6 else "0.0.0.0"
35         self.port = port
36         self.l4p = l4p
37         self.sw_if_index = sw_if_index
38         if is_v6:
39             self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP6
40         else:
41             self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP4
42
43     def encode(self):
44         return {'addr': self.ip,
45                 'port': self.port,
46                 'sw_if_index': self.sw_if_index,
47                 'if_af': self.if_af}
48
49     @classmethod
50     def from_pg(cls, pg, is_v6=False):
51         if pg is None:
52             return cls(is_v6=is_v6)
53         else:
54             return cls(sw_if_index=pg.sw_if_index, is_v6=is_v6)
55
56     @property
57     def isV6(self):
58         return ":" in self.ip
59
60     def __str__(self):
61         return ("%s:%d" % (self.ip, self.port))
62
63
64 class EpTuple(object):
65     """ CNat endpoint """
66
67     def __init__(self, src, dst):
68         self.src = src
69         self.dst = dst
70
71     def encode(self):
72         return {'src_ep': self.src.encode(),
73                 'dst_ep': self.dst.encode()}
74
75     def __str__(self):
76         return ("%s->%s" % (self.src, self.dst))
77
78
79 class VppCNatTranslation(VppObject):
80
81     def __init__(self, test, iproto, vip, paths):
82         self._test = test
83         self.vip = vip
84         self.iproto = iproto
85         self.paths = paths
86         self.encoded_paths = []
87         for path in self.paths:
88             self.encoded_paths.append(path.encode())
89
90     def __str__(self):
91         return ("%s %s %s" % (self.vip, self.iproto, self.paths))
92
93     @property
94     def vl4_proto(self):
95         ip_proto = VppEnum.vl_api_ip_proto_t
96         return {
97             UDP: ip_proto.IP_API_PROTO_UDP,
98             TCP: ip_proto.IP_API_PROTO_TCP,
99         }[self.iproto]
100
101     def add_vpp_config(self):
102         r = self._test.vapi.cnat_translation_update(
103             {'vip': self.vip.encode(),
104              'ip_proto': self.vl4_proto,
105              'n_paths': len(self.paths),
106              'paths': self.encoded_paths})
107         self._test.registry.register(self, self._test.logger)
108         self.id = r.id
109
110     def modify_vpp_config(self, paths):
111         self.paths = paths
112         self.encoded_paths = []
113         for path in self.paths:
114             self.encoded_paths.append(path.encode())
115
116         r = self._test.vapi.cnat_translation_update(
117             {'vip': self.vip.encode(),
118              'ip_proto': self.vl4_proto,
119              'n_paths': len(self.paths),
120              'paths': self.encoded_paths})
121         self._test.registry.register(self, self._test.logger)
122
123     def remove_vpp_config(self):
124         self._test.vapi.cnat_translation_del(id=self.id)
125
126     def query_vpp_config(self):
127         for t in self._test.vapi.cnat_translation_dump():
128             if self.id == t.translation.id:
129                 return t.translation
130         return None
131
132     def object_id(self):
133         return ("cnat-translation-%s" % (self.vip))
134
135     def get_stats(self):
136         c = self._test.statistics.get_counter("/net/cnat-translation")
137         return c[0][self.id]
138
139
140 class TestCNatTranslation(VppTestCase):
141     """ CNat Translation """
142     extra_vpp_punt_config = ["cnat", "{",
143                              "session-db-buckets", "64",
144                              "session-cleanup-timeout", "0.1",
145                              "session-max-age", "1",
146                              "tcp-max-age", "1",
147                              "scanner", "off", "}"]
148
149     @classmethod
150     def setUpClass(cls):
151         super(TestCNatTranslation, cls).setUpClass()
152
153     @classmethod
154     def tearDownClass(cls):
155         super(TestCNatTranslation, cls).tearDownClass()
156
157     def setUp(self):
158         super(TestCNatTranslation, self).setUp()
159
160         self.create_pg_interfaces(range(3))
161
162         for i in self.pg_interfaces:
163             i.admin_up()
164             i.config_ip4()
165             i.resolve_arp()
166             i.config_ip6()
167             i.resolve_ndp()
168
169     def tearDown(self):
170         for i in self.pg_interfaces:
171             i.unconfig_ip4()
172             i.unconfig_ip6()
173             i.admin_down()
174         super(TestCNatTranslation, self).tearDown()
175
176     def cnat_create_translation(self, vip, nbr):
177         ip_v = "ip6" if vip.isV6 else "ip4"
178         dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr)
179         sep = Ep("::", 0) if vip.isV6 else Ep("0.0.0.0", 0)
180         t1 = VppCNatTranslation(
181             self, vip.l4p, vip,
182             [EpTuple(sep, dep), EpTuple(sep, dep)])
183         t1.add_vpp_config()
184         return t1
185
186     def cnat_test_translation(self, t1, nbr, sports, isV6=False):
187         ip_v = "ip6" if isV6 else "ip4"
188         ip_class = IPv6 if isV6 else IP
189         vip = t1.vip
190
191         #
192         # Flows
193         #
194         for src in self.pg0.remote_hosts:
195             for sport in sports:
196                 # from client to vip
197                 p1 = (Ether(dst=self.pg0.local_mac,
198                             src=src.mac) /
199                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
200                       vip.l4p(sport=sport, dport=vip.port) /
201                       Raw())
202
203                 self.vapi.cli("trace add pg-input 1")
204                 rxs = self.send_and_expect(self.pg0,
205                                            p1 * N_PKTS,
206                                            self.pg1)
207                 self.logger.info(self.vapi.cli("show trace max 1"))
208
209                 for rx in rxs:
210                     self.assert_packet_checksums_valid(rx)
211                     self.assertEqual(
212                         rx[ip_class].dst,
213                         getattr(self.pg1.remote_hosts[nbr], ip_v))
214                     self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
215                     self.assertEqual(
216                         rx[ip_class].src,
217                         getattr(src, ip_v))
218                     self.assertEqual(rx[vip.l4p].sport, sport)
219
220                 # from vip to client
221                 p1 = (Ether(dst=self.pg1.local_mac,
222                             src=self.pg1.remote_mac) /
223                       ip_class(src=getattr(
224                           self.pg1.remote_hosts[nbr],
225                           ip_v),
226                           dst=getattr(src, ip_v)) /
227                       vip.l4p(sport=4000 + nbr, dport=sport) /
228                       Raw())
229
230                 rxs = self.send_and_expect(self.pg1,
231                                            p1 * N_PKTS,
232                                            self.pg0)
233
234                 for rx in rxs:
235                     self.assert_packet_checksums_valid(rx)
236                     self.assertEqual(
237                         rx[ip_class].dst,
238                         getattr(src, ip_v))
239                     self.assertEqual(rx[vip.l4p].dport, sport)
240                     self.assertEqual(rx[ip_class].src, vip.ip)
241                     self.assertEqual(rx[vip.l4p].sport, vip.port)
242
243                 #
244                 # packets to the VIP that do not match a
245                 # translation are dropped
246                 #
247                 p1 = (Ether(dst=self.pg0.local_mac,
248                             src=src.mac) /
249                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
250                       vip.l4p(sport=sport, dport=6666) /
251                       Raw())
252
253                 self.send_and_assert_no_replies(self.pg0,
254                                                 p1 * N_PKTS,
255                                                 self.pg1)
256
257                 #
258                 # packets from the VIP that do not match a
259                 # session are forwarded
260                 #
261                 p1 = (Ether(dst=self.pg1.local_mac,
262                             src=self.pg1.remote_mac) /
263                       ip_class(src=getattr(
264                           self.pg1.remote_hosts[nbr],
265                           ip_v),
266                           dst=getattr(src, ip_v)) /
267                       vip.l4p(sport=6666, dport=sport) /
268                       Raw())
269
270                 rxs = self.send_and_expect(self.pg1,
271                                            p1 * N_PKTS,
272                                            self.pg0)
273
274     def cnat_test_translation_update(self, t1, sports, isV6=False):
275         ip_v = "ip6" if isV6 else "ip4"
276         ip_class = IPv6 if isV6 else IP
277         vip = t1.vip
278
279         #
280         # modify the translation to use a different backend
281         #
282         dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000)
283         sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
284         t1.modify_vpp_config([EpTuple(sep, dep)])
285
286         #
287         # existing flows follow the old path
288         #
289         for src in self.pg0.remote_hosts:
290             for sport in sports:
291                 # from client to vip
292                 p1 = (Ether(dst=self.pg0.local_mac,
293                             src=src.mac) /
294                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
295                       vip.l4p(sport=sport, dport=vip.port) /
296                       Raw())
297
298                 rxs = self.send_and_expect(self.pg0,
299                                            p1 * N_PKTS,
300                                            self.pg1)
301
302         #
303         # new flows go to the new backend
304         #
305         for src in self.pg0.remote_hosts:
306             p1 = (Ether(dst=self.pg0.local_mac,
307                         src=src.mac) /
308                   ip_class(src=getattr(src, ip_v), dst=vip.ip) /
309                   vip.l4p(sport=9999, dport=vip.port) /
310                   Raw())
311
312             rxs = self.send_and_expect(self.pg0,
313                                        p1 * N_PKTS,
314                                        self.pg2)
315
316     def cnat_translation(self, vips, isV6=False):
317         """ CNat Translation """
318
319         ip_class = IPv6 if isV6 else IP
320         ip_v = "ip6" if isV6 else "ip4"
321         sports = [1234, 1233]
322
323         #
324         # turn the scanner off whilst testing otherwise sessions
325         # will time out
326         #
327         self.vapi.cli("test cnat scanner off")
328
329         sessions = self.vapi.cnat_session_dump()
330
331         trs = []
332         for nbr, vip in enumerate(vips):
333             trs.append(self.cnat_create_translation(vip, nbr))
334
335         self.logger.info(self.vapi.cli("sh cnat client"))
336         self.logger.info(self.vapi.cli("sh cnat translation"))
337
338         #
339         # translations
340         #
341         for nbr, vip in enumerate(vips):
342             self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6)
343             self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6)
344             if isV6:
345                 self.logger.info(self.vapi.cli(
346                     "sh ip6 fib %s" % self.pg0.remote_ip6))
347             else:
348                 self.logger.info(self.vapi.cli(
349                     "sh ip fib %s" % self.pg0.remote_ip4))
350             self.logger.info(self.vapi.cli("sh cnat session verbose"))
351
352         #
353         # turn the scanner back on and wait until the sessions
354         # all disapper
355         #
356         self.vapi.cli("test cnat scanner on")
357
358         n_tries = 0
359         sessions = self.vapi.cnat_session_dump()
360         while (len(sessions) and n_tries < 100):
361             n_tries += 1
362             sessions = self.vapi.cnat_session_dump()
363             self.sleep(2)
364             self.logger.info(self.vapi.cli("show cnat session verbose"))
365
366         self.assertTrue(n_tries < 100)
367         self.vapi.cli("test cnat scanner off")
368
369         #
370         # load some flows again and purge
371         #
372         for vip in vips:
373             for src in self.pg0.remote_hosts:
374                 for sport in sports:
375                     # from client to vip
376                     p1 = (Ether(dst=self.pg0.local_mac,
377                                 src=src.mac) /
378                           ip_class(src=getattr(src, ip_v), dst=vip.ip) /
379                           vip.l4p(sport=sport, dport=vip.port) /
380                           Raw())
381                     self.send_and_expect(self.pg0,
382                                          p1 * N_PKTS,
383                                          self.pg2)
384
385         for tr in trs:
386             tr.remove_vpp_config()
387
388         self.assertTrue(self.vapi.cnat_session_dump())
389         self.vapi.cnat_session_purge()
390         self.assertFalse(self.vapi.cnat_session_dump())
391
392     def test_icmp(self):
393         vips = [
394             Ep("30.0.0.1", 5555),
395             Ep("30.0.0.2", 5554),
396             Ep("30.0.0.2", 5553, UDP),
397             Ep("30::1", 6666),
398             Ep("30::2", 5553, UDP),
399         ]
400         sport = 1234
401
402         self.pg0.generate_remote_hosts(len(vips))
403         self.pg0.configure_ipv6_neighbors()
404         self.pg0.configure_ipv4_neighbors()
405
406         self.pg1.generate_remote_hosts(len(vips))
407         self.pg1.configure_ipv6_neighbors()
408         self.pg1.configure_ipv4_neighbors()
409
410         self.vapi.cli("test cnat scanner off")
411         trs = []
412         for nbr, vip in enumerate(vips):
413             trs.append(self.cnat_create_translation(vip, nbr))
414
415         self.logger.info(self.vapi.cli("sh cnat client"))
416         self.logger.info(self.vapi.cli("sh cnat translation"))
417
418         for nbr, vip in enumerate(vips):
419             if vip.isV6:
420                 client_addr = self.pg0.remote_hosts[0].ip6
421                 remote_addr = self.pg1.remote_hosts[nbr].ip6
422                 remote2_addr = self.pg2.remote_hosts[0].ip6
423             else:
424                 client_addr = self.pg0.remote_hosts[0].ip4
425                 remote_addr = self.pg1.remote_hosts[nbr].ip4
426                 remote2_addr = self.pg2.remote_hosts[0].ip4
427             IP46 = IPv6 if vip.isV6 else IP
428             # from client to vip
429             p1 = (Ether(dst=self.pg0.local_mac,
430                         src=self.pg0.remote_hosts[0].mac) /
431                   IP46(src=client_addr, dst=vip.ip) /
432                   vip.l4p(sport=sport, dport=vip.port) /
433                   Raw())
434
435             rxs = self.send_and_expect(self.pg0,
436                                        p1 * N_PKTS,
437                                        self.pg1)
438
439             for rx in rxs:
440                 self.assert_packet_checksums_valid(rx)
441                 self.assertEqual(rx[IP46].dst, remote_addr)
442                 self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
443                 self.assertEqual(rx[IP46].src, client_addr)
444                 self.assertEqual(rx[vip.l4p].sport, sport)
445
446             InnerIP = rxs[0][IP46]
447
448             ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP
449             ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11)
450             # from vip to client, ICMP error
451             p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
452                   IP46(src=remote_addr, dst=client_addr) /
453                   ICMPelem / InnerIP)
454
455             rxs = self.send_and_expect(self.pg1,
456                                        p1 * N_PKTS,
457                                        self.pg0)
458
459             TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
460             IP46error = IPerror6 if vip.isV6 else IPerror
461             for rx in rxs:
462                 self.assert_packet_checksums_valid(rx)
463                 self.assertEqual(rx[IP46].src, vip.ip)
464                 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
465                 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
466                 self.assertEqual(rx[ICMP46][IP46error]
467                                  [TCPUDPError].sport, sport)
468                 self.assertEqual(rx[ICMP46][IP46error]
469                                  [TCPUDPError].dport, vip.port)
470
471             # from other remote to client, ICMP error
472             # outside shouldn't be NAT-ed
473             p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
474                   IP46(src=remote2_addr, dst=client_addr) /
475                   ICMPelem / InnerIP)
476
477             rxs = self.send_and_expect(self.pg1,
478                                        p1 * N_PKTS,
479                                        self.pg0)
480
481             TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
482             IP46error = IPerror6 if vip.isV6 else IPerror
483             for rx in rxs:
484                 self.assert_packet_checksums_valid(rx)
485                 self.assertEqual(rx[IP46].src, remote2_addr)
486                 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
487                 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
488                 self.assertEqual(rx[ICMP46][IP46error]
489                                  [TCPUDPError].sport, sport)
490                 self.assertEqual(rx[ICMP46][IP46error]
491                                  [TCPUDPError].dport, vip.port)
492
493         self.vapi.cnat_session_purge()
494
495     def test_cnat6(self):
496         # """ CNat Translation ipv6 """
497         vips = [
498             Ep("30::1", 5555),
499             Ep("30::2", 5554),
500             Ep("30::2", 5553, UDP),
501         ]
502
503         self.pg0.generate_remote_hosts(len(vips))
504         self.pg0.configure_ipv6_neighbors()
505         self.pg1.generate_remote_hosts(len(vips))
506         self.pg1.configure_ipv6_neighbors()
507
508         self.cnat_translation(vips, isV6=True)
509
510     def test_cnat4(self):
511         # """ CNat Translation ipv4 """
512
513         vips = [
514             Ep("30.0.0.1", 5555),
515             Ep("30.0.0.2", 5554),
516             Ep("30.0.0.2", 5553, UDP),
517         ]
518
519         self.pg0.generate_remote_hosts(len(vips))
520         self.pg0.configure_ipv4_neighbors()
521         self.pg1.generate_remote_hosts(len(vips))
522         self.pg1.configure_ipv4_neighbors()
523
524         self.cnat_translation(vips)
525
526
527 class TestCNatSourceNAT(VppTestCase):
528     """ CNat Source NAT """
529     extra_vpp_punt_config = ["cnat", "{",
530                              "session-cleanup-timeout", "0.1",
531                              "session-max-age", "1",
532                              "tcp-max-age", "1",
533                              "scanner", "off", "}"]
534
535     @classmethod
536     def setUpClass(cls):
537         super(TestCNatSourceNAT, cls).setUpClass()
538
539     @classmethod
540     def tearDownClass(cls):
541         super(TestCNatSourceNAT, cls).tearDownClass()
542
543     def setUp(self):
544         super(TestCNatSourceNAT, self).setUp()
545
546         self.create_pg_interfaces(range(3))
547
548         for i in self.pg_interfaces:
549             i.admin_up()
550             i.config_ip4()
551             i.resolve_arp()
552             i.config_ip6()
553             i.resolve_ndp()
554
555         self.pg0.configure_ipv6_neighbors()
556         self.pg0.configure_ipv4_neighbors()
557         self.pg1.generate_remote_hosts(2)
558         self.pg1.configure_ipv4_neighbors()
559         self.pg1.configure_ipv6_neighbors()
560
561         self.vapi.cnat_set_snat_addresses(
562             snat_ip4=self.pg2.remote_hosts[0].ip4,
563             snat_ip6=self.pg2.remote_hosts[0].ip6,
564             sw_if_index=INVALID_INDEX)
565         self.vapi.feature_enable_disable(
566             enable=1,
567             arc_name="ip6-unicast",
568             feature_name="cnat-snat-ip6",
569             sw_if_index=self.pg0.sw_if_index)
570         self.vapi.feature_enable_disable(
571             enable=1,
572             arc_name="ip4-unicast",
573             feature_name="cnat-snat-ip4",
574             sw_if_index=self.pg0.sw_if_index)
575
576         policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
577         self.vapi.cnat_set_snat_policy(
578             policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX)
579         for i in self.pg_interfaces:
580             self.vapi.cnat_snat_policy_add_del_if(
581                 sw_if_index=i.sw_if_index, is_add=1,
582                 table=policie_tbls.CNAT_POLICY_INCLUDE_V6)
583             self.vapi.cnat_snat_policy_add_del_if(
584                 sw_if_index=i.sw_if_index, is_add=1,
585                 table=policie_tbls.CNAT_POLICY_INCLUDE_V4)
586
587     def tearDown(self):
588         self.vapi.cnat_session_purge()
589         for i in self.pg_interfaces:
590             i.unconfig_ip4()
591             i.unconfig_ip6()
592             i.admin_down()
593         super(TestCNatSourceNAT, self).tearDown()
594
595     def test_snat_v6(self):
596         # """ CNat Source Nat v6 """
597         self.sourcenat_test_tcp_udp_conf(TCP, isV6=True)
598         self.sourcenat_test_tcp_udp_conf(UDP, isV6=True)
599         self.sourcenat_test_icmp_err_conf(isV6=True)
600         self.sourcenat_test_icmp_echo6_conf()
601
602     def test_snat_v4(self):
603         # """ CNat Source Nat v4 """
604         self.sourcenat_test_tcp_udp_conf(TCP)
605         self.sourcenat_test_tcp_udp_conf(UDP)
606         self.sourcenat_test_icmp_err_conf()
607         self.sourcenat_test_icmp_echo4_conf()
608
609     def sourcenat_test_icmp_echo6_conf(self):
610         sports = [1234, 1235]
611         dports = [6661, 6662]
612
613         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
614             client_addr = self.pg0.remote_hosts[0].ip6
615             remote_addr = self.pg1.remote_hosts[nbr].ip6
616             src_nat_addr = self.pg2.remote_hosts[0].ip6
617
618             # ping from pods to outside network
619             p1 = (
620                 Ether(dst=self.pg0.local_mac,
621                       src=self.pg0.remote_hosts[0].mac) /
622                 IPv6(src=client_addr, dst=remote_addr) /
623                 ICMPv6EchoRequest(id=0xfeed) /
624                 Raw())
625
626             rxs = self.send_and_expect(
627                 self.pg0,
628                 p1 * N_PKTS,
629                 self.pg1)
630
631             for rx in rxs:
632                 self.assertEqual(rx[IPv6].src, src_nat_addr)
633                 self.assert_packet_checksums_valid(rx)
634
635             received_id = rx[0][ICMPv6EchoRequest].id
636             # ping reply from outside to pods
637             p2 = (
638                 Ether(dst=self.pg1.local_mac,
639                       src=self.pg1.remote_hosts[nbr].mac) /
640                 IPv6(src=remote_addr, dst=src_nat_addr) /
641                 ICMPv6EchoReply(id=received_id))
642             rxs = self.send_and_expect(
643                 self.pg1,
644                 p2 * N_PKTS,
645                 self.pg0)
646
647             for rx in rxs:
648                 self.assert_packet_checksums_valid(rx)
649                 self.assertEqual(rx[IPv6].src, remote_addr)
650                 self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed)
651
652     def sourcenat_test_icmp_echo4_conf(self):
653         sports = [1234, 1235]
654         dports = [6661, 6662]
655
656         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
657             IP46 = IP
658             client_addr = self.pg0.remote_hosts[0].ip4
659             remote_addr = self.pg1.remote_hosts[nbr].ip4
660             src_nat_addr = self.pg2.remote_hosts[0].ip4
661
662             # ping from pods to outside network
663             p1 = (
664                 Ether(dst=self.pg0.local_mac,
665                       src=self.pg0.remote_hosts[0].mac) /
666                 IP46(src=client_addr, dst=remote_addr) /
667                 ICMP(type=8, id=0xfeed) /
668                 Raw())
669
670             rxs = self.send_and_expect(
671                 self.pg0,
672                 p1 * N_PKTS,
673                 self.pg1)
674
675             for rx in rxs:
676                 self.assertEqual(rx[IP46].src, src_nat_addr)
677                 self.assert_packet_checksums_valid(rx)
678
679             received_id = rx[0][ICMP].id
680             # ping reply from outside to pods
681             p2 = (
682                 Ether(dst=self.pg1.local_mac,
683                       src=self.pg1.remote_hosts[nbr].mac) /
684                 IP46(src=remote_addr, dst=src_nat_addr) /
685                 ICMP(type=0, id=received_id))
686             rxs = self.send_and_expect(
687                 self.pg1,
688                 p2 * N_PKTS,
689                 self.pg0)
690
691             for rx in rxs:
692                 self.assert_packet_checksums_valid(rx)
693                 self.assertEqual(rx[IP46].src, remote_addr)
694                 self.assertEqual(rx[ICMP].id, 0xfeed)
695
696     def sourcenat_test_icmp_err_conf(self, isV6=False):
697         sports = [1234, 1235]
698         dports = [6661, 6662]
699
700         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
701             if isV6:
702                 IP46 = IPv6
703                 client_addr = self.pg0.remote_hosts[0].ip6
704                 remote_addr = self.pg1.remote_hosts[nbr].ip6
705                 src_nat_addr = self.pg2.remote_hosts[0].ip6
706                 ICMP46 = ICMPv6DestUnreach
707                 ICMPelem = ICMPv6DestUnreach(code=1)
708                 IP46error = IPerror6
709             else:
710                 IP46 = IP
711                 client_addr = self.pg0.remote_hosts[0].ip4
712                 remote_addr = self.pg1.remote_hosts[nbr].ip4
713                 src_nat_addr = self.pg2.remote_hosts[0].ip4
714                 IP46error = IPerror
715                 ICMP46 = ICMP
716                 ICMPelem = ICMP(type=11)
717
718             # from pods to outside network
719             p1 = (
720                 Ether(dst=self.pg0.local_mac,
721                       src=self.pg0.remote_hosts[0].mac) /
722                 IP46(src=client_addr, dst=remote_addr) /
723                 TCP(sport=sports[nbr], dport=dports[nbr]) /
724                 Raw())
725
726             rxs = self.send_and_expect(
727                 self.pg0,
728                 p1 * N_PKTS,
729                 self.pg1)
730             for rx in rxs:
731                 self.assert_packet_checksums_valid(rx)
732                 self.assertEqual(rx[IP46].dst, remote_addr)
733                 self.assertEqual(rx[TCP].dport, dports[nbr])
734                 self.assertEqual(rx[IP46].src, src_nat_addr)
735                 sport = rx[TCP].sport
736
737             InnerIP = rxs[0][IP46]
738             # from outside to pods, ICMP error
739             p2 = (
740                 Ether(dst=self.pg1.local_mac,
741                       src=self.pg1.remote_hosts[nbr].mac) /
742                 IP46(src=remote_addr, dst=src_nat_addr) /
743                 ICMPelem / InnerIP)
744
745             rxs = self.send_and_expect(
746                 self.pg1,
747                 p2 * N_PKTS,
748                 self.pg0)
749
750             for rx in rxs:
751                 self.assert_packet_checksums_valid(rx)
752                 self.assertEqual(rx[IP46].src, remote_addr)
753                 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
754                 self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr)
755                 self.assertEqual(rx[ICMP46][IP46error]
756                                  [TCPerror].sport, sports[nbr])
757                 self.assertEqual(rx[ICMP46][IP46error]
758                                  [TCPerror].dport, dports[nbr])
759
760     def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False):
761         sports = [1234, 1235]
762         dports = [6661, 6662]
763
764         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
765             if isV6:
766                 IP46 = IPv6
767                 client_addr = self.pg0.remote_hosts[0].ip6
768                 remote_addr = self.pg1.remote_hosts[nbr].ip6
769                 src_nat_addr = self.pg2.remote_hosts[0].ip6
770                 exclude_prefix = ip_network(
771                     "%s/100" % remote_addr, strict=False)
772             else:
773                 IP46 = IP
774                 client_addr = self.pg0.remote_hosts[0].ip4
775                 remote_addr = self.pg1.remote_hosts[nbr].ip4
776                 src_nat_addr = self.pg2.remote_hosts[0].ip4
777                 exclude_prefix = ip_network(
778                     "%s/16" % remote_addr, strict=False)
779             # from pods to outside network
780             p1 = (
781                 Ether(dst=self.pg0.local_mac,
782                       src=self.pg0.remote_hosts[0].mac) /
783                 IP46(src=client_addr, dst=remote_addr) /
784                 l4p(sport=sports[nbr], dport=dports[nbr]) /
785                 Raw())
786
787             self.vapi.cli("trace add pg-input 1")
788             rxs = self.send_and_expect(
789                 self.pg0,
790                 p1 * N_PKTS,
791                 self.pg1)
792             self.logger.info(self.vapi.cli("show trace max 1"))
793
794             for rx in rxs:
795                 self.assert_packet_checksums_valid(rx)
796                 self.assertEqual(rx[IP46].dst, remote_addr)
797                 self.assertEqual(rx[l4p].dport, dports[nbr])
798                 self.assertEqual(rx[IP46].src, src_nat_addr)
799                 sport = rx[l4p].sport
800
801             # from outside to pods
802             p2 = (
803                 Ether(dst=self.pg1.local_mac,
804                       src=self.pg1.remote_hosts[nbr].mac) /
805                 IP46(src=remote_addr, dst=src_nat_addr) /
806                 l4p(sport=dports[nbr], dport=sport) /
807                 Raw())
808
809             rxs = self.send_and_expect(
810                 self.pg1,
811                 p2 * N_PKTS,
812                 self.pg0)
813
814             for rx in rxs:
815                 self.assert_packet_checksums_valid(rx)
816                 self.assertEqual(rx[IP46].dst, client_addr)
817                 self.assertEqual(rx[l4p].dport, sports[nbr])
818                 self.assertEqual(rx[l4p].sport, dports[nbr])
819                 self.assertEqual(rx[IP46].src, remote_addr)
820
821             # add remote host to exclude list
822             self.vapi.cnat_snat_policy_add_del_exclude_pfx(
823                 prefix=exclude_prefix, is_add=1)
824             self.vapi.cnat_session_purge()
825
826             rxs = self.send_and_expect(
827                 self.pg0,
828                 p1 * N_PKTS,
829                 self.pg1)
830             for rx in rxs:
831                 self.assert_packet_checksums_valid(rx)
832                 self.assertEqual(rx[IP46].dst, remote_addr)
833                 self.assertEqual(rx[l4p].dport, dports[nbr])
834                 self.assertEqual(rx[IP46].src, client_addr)
835
836             # remove remote host from exclude list
837             self.vapi.cnat_snat_policy_add_del_exclude_pfx(
838                 prefix=exclude_prefix, is_add=0)
839             self.vapi.cnat_session_purge()
840
841             rxs = self.send_and_expect(
842                 self.pg0,
843                 p1 * N_PKTS,
844                 self.pg1)
845
846             for rx in rxs:
847                 self.assert_packet_checksums_valid(rx)
848                 self.assertEqual(rx[IP46].dst, remote_addr)
849                 self.assertEqual(rx[l4p].dport, dports[nbr])
850                 self.assertEqual(rx[IP46].src, src_nat_addr)
851
852             self.vapi.cnat_session_purge()
853
854
855 class TestCNatDHCP(VppTestCase):
856     """ CNat Translation """
857     extra_vpp_punt_config = ["cnat", "{",
858                              "session-db-buckets", "64",
859                              "session-cleanup-timeout", "0.1",
860                              "session-max-age", "1",
861                              "tcp-max-age", "1",
862                              "scanner", "off", "}"]
863
864     @classmethod
865     def setUpClass(cls):
866         super(TestCNatDHCP, cls).setUpClass()
867
868     @classmethod
869     def tearDownClass(cls):
870         super(TestCNatDHCP, cls).tearDownClass()
871
872     def tearDown(self):
873         for i in self.pg_interfaces:
874             i.admin_down()
875         super(TestCNatDHCP, self).tearDown()
876
877     def create_translation(self, vip_pg, *args, is_v6=False):
878         vip = Ep(sw_if_index=vip_pg.sw_if_index, is_v6=is_v6)
879         paths = []
880         for (src_pg, dst_pg) in args:
881             paths.append(EpTuple(
882                 Ep.from_pg(src_pg, is_v6=is_v6),
883                 Ep.from_pg(dst_pg, is_v6=is_v6)
884             ))
885         t1 = VppCNatTranslation(self, TCP, vip, paths)
886         t1.add_vpp_config()
887         return t1
888
889     def make_addr(self, sw_if_index, i, is_v6):
890         if is_v6:
891             return "fd01:%x::%u" % (sw_if_index, i + 1)
892         else:
893             return "172.16.%u.%u" % (sw_if_index, i)
894
895     def make_prefix(self, sw_if_index, i, is_v6):
896         if is_v6:
897             return "%s/128" % self.make_addr(sw_if_index, i, is_v6)
898         else:
899             return "%s/32" % self.make_addr(sw_if_index, i, is_v6)
900
901     def check_resolved(self, tr, vip_pg, *args, i=0, is_v6=False):
902         qt1 = tr.query_vpp_config()
903         self.assertEqual(str(qt1.vip.addr), self.make_addr(
904             vip_pg.sw_if_index, i, is_v6))
905         for (src_pg, dst_pg), path in zip(args, qt1.paths):
906             if src_pg:
907                 self.assertEqual(str(path.src_ep.addr), self.make_addr(
908                     src_pg.sw_if_index, i, is_v6))
909             if dst_pg:
910                 self.assertEqual(str(path.dst_ep.addr), self.make_addr(
911                     dst_pg.sw_if_index, i, is_v6))
912
913     def config_ips(self, rng, is_add=1, is_v6=False):
914         for pg, i in product(self.pg_interfaces, rng):
915             self.vapi.sw_interface_add_del_address(
916                 sw_if_index=pg.sw_if_index,
917                 prefix=self.make_prefix(pg.sw_if_index, i, is_v6),
918                 is_add=is_add)
919
920     def test_dhcp_v4(self):
921         self.create_pg_interfaces(range(5))
922         for i in self.pg_interfaces:
923             i.admin_up()
924         pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
925         t1 = self.create_translation(*pglist)
926         self.config_ips([0])
927         self.check_resolved(t1, *pglist)
928         self.config_ips([1])
929         self.config_ips([0], is_add=0)
930         self.check_resolved(t1, *pglist, i=1)
931         self.config_ips([1], is_add=0)
932         t1.remove_vpp_config()
933
934     def test_dhcp_v6(self):
935         self.create_pg_interfaces(range(5))
936         for i in self.pg_interfaces:
937             i.admin_up()
938         pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
939         t1 = self.create_translation(*pglist, is_v6=True)
940         self.config_ips([0], is_v6=True)
941         self.check_resolved(t1, *pglist, is_v6=True)
942         self.config_ips([1], is_v6=True)
943         self.config_ips([0], is_add=0, is_v6=True)
944         self.check_resolved(t1, *pglist, i=1, is_v6=True)
945         self.config_ips([1], is_add=0, is_v6=True)
946         t1.remove_vpp_config()
947
948     def test_dhcp_snat(self):
949         self.create_pg_interfaces(range(1))
950         for i in self.pg_interfaces:
951             i.admin_up()
952         self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
953         self.config_ips([0], is_v6=False)
954         self.config_ips([0], is_v6=True)
955         r = self.vapi.cnat_get_snat_addresses()
956         self.assertEqual(str(r.snat_ip4), self.make_addr(
957             self.pg0.sw_if_index, 0, False))
958         self.assertEqual(str(r.snat_ip6), self.make_addr(
959             self.pg0.sw_if_index, 0, True))
960         self.config_ips([1], is_v6=False)
961         self.config_ips([1], is_v6=True)
962         self.config_ips([0], is_add=0, is_v6=False)
963         self.config_ips([0], is_add=0, is_v6=True)
964         r = self.vapi.cnat_get_snat_addresses()
965         self.assertEqual(str(r.snat_ip4), self.make_addr(
966             self.pg0.sw_if_index, 1, False))
967         self.assertEqual(str(r.snat_ip6), self.make_addr(
968             self.pg0.sw_if_index, 1, True))
969         self.config_ips([1], is_add=0, is_v6=False)
970         self.config_ips([1], is_add=0, is_v6=True)
971         self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
972
973
974 if __name__ == '__main__':
975     unittest.main(testRunner=VppTestRunner)