cnat: Fix snat with dhcp
[vpp.git] / src / plugins / cnat / test / test_cnat.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto, INVALID_INDEX
7 from itertools import product
8
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
14 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
15
16 import struct
17
18 from ipaddress import ip_address, ip_network, \
19     IPv4Address, IPv6Address, IPv4Network, IPv6Network
20
21 from vpp_object import VppObject
22 from vpp_papi import VppEnum
23
24 N_PKTS = 15
25
26
27 class Ep(object):
28     """ CNat endpoint """
29
30     def __init__(self, ip=None, port=0, l4p=TCP,
31                  sw_if_index=INVALID_INDEX, is_v6=False):
32         self.ip = ip
33         if ip is None:
34             self.ip = "::" if is_v6 else "0.0.0.0"
35         self.port = port
36         self.l4p = l4p
37         self.sw_if_index = sw_if_index
38         if is_v6:
39             self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP6
40         else:
41             self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP4
42
43     def encode(self):
44         return {'addr': self.ip,
45                 'port': self.port,
46                 'sw_if_index': self.sw_if_index,
47                 'if_af': self.if_af}
48
49     @classmethod
50     def from_pg(cls, pg, is_v6=False):
51         if pg is None:
52             return cls(is_v6=is_v6)
53         else:
54             return cls(sw_if_index=pg.sw_if_index, is_v6=is_v6)
55
56     @property
57     def isV6(self):
58         return ":" in self.ip
59
60     def __str__(self):
61         return ("%s:%d" % (self.ip, self.port))
62
63
64 class EpTuple(object):
65     """ CNat endpoint """
66
67     def __init__(self, src, dst):
68         self.src = src
69         self.dst = dst
70
71     def encode(self):
72         return {'src_ep': self.src.encode(),
73                 'dst_ep': self.dst.encode()}
74
75     def __str__(self):
76         return ("%s->%s" % (self.src, self.dst))
77
78
79 class VppCNatTranslation(VppObject):
80
81     def __init__(self, test, iproto, vip, paths):
82         self._test = test
83         self.vip = vip
84         self.iproto = iproto
85         self.paths = paths
86         self.encoded_paths = []
87         for path in self.paths:
88             self.encoded_paths.append(path.encode())
89
90     def __str__(self):
91         return ("%s %s %s" % (self.vip, self.iproto, self.paths))
92
93     @property
94     def vl4_proto(self):
95         ip_proto = VppEnum.vl_api_ip_proto_t
96         return {
97             UDP: ip_proto.IP_API_PROTO_UDP,
98             TCP: ip_proto.IP_API_PROTO_TCP,
99         }[self.iproto]
100
101     def add_vpp_config(self):
102         r = self._test.vapi.cnat_translation_update(
103             {'vip': self.vip.encode(),
104              'ip_proto': self.vl4_proto,
105              'n_paths': len(self.paths),
106              'paths': self.encoded_paths})
107         self._test.registry.register(self, self._test.logger)
108         self.id = r.id
109
110     def modify_vpp_config(self, paths):
111         self.paths = paths
112         self.encoded_paths = []
113         for path in self.paths:
114             self.encoded_paths.append(path.encode())
115
116         r = self._test.vapi.cnat_translation_update(
117             {'vip': self.vip.encode(),
118              'ip_proto': self.vl4_proto,
119              'n_paths': len(self.paths),
120              'paths': self.encoded_paths})
121         self._test.registry.register(self, self._test.logger)
122
123     def remove_vpp_config(self):
124         self._test.vapi.cnat_translation_del(id=self.id)
125
126     def query_vpp_config(self):
127         for t in self._test.vapi.cnat_translation_dump():
128             if self.id == t.translation.id:
129                 return t.translation
130         return None
131
132     def object_id(self):
133         return ("cnat-translation-%s" % (self.vip))
134
135     def get_stats(self):
136         c = self._test.statistics.get_counter("/net/cnat-translation")
137         return c[0][self.id]
138
139
140 class TestCNatTranslation(VppTestCase):
141     """ CNat Translation """
142     extra_vpp_punt_config = ["cnat", "{",
143                              "session-db-buckets", "64",
144                              "session-cleanup-timeout", "0.1",
145                              "session-max-age", "1",
146                              "tcp-max-age", "1",
147                              "scanner", "off", "}"]
148
149     @classmethod
150     def setUpClass(cls):
151         super(TestCNatTranslation, cls).setUpClass()
152
153     @classmethod
154     def tearDownClass(cls):
155         super(TestCNatTranslation, cls).tearDownClass()
156
157     def setUp(self):
158         super(TestCNatTranslation, self).setUp()
159
160         self.create_pg_interfaces(range(3))
161
162         for i in self.pg_interfaces:
163             i.admin_up()
164             i.config_ip4()
165             i.resolve_arp()
166             i.config_ip6()
167             i.resolve_ndp()
168
169     def tearDown(self):
170         for i in self.pg_interfaces:
171             i.unconfig_ip4()
172             i.unconfig_ip6()
173             i.admin_down()
174         super(TestCNatTranslation, self).tearDown()
175
176     def cnat_create_translation(self, vip, nbr):
177         ip_v = "ip6" if vip.isV6 else "ip4"
178         dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr)
179         sep = Ep("::", 0) if vip.isV6 else Ep("0.0.0.0", 0)
180         t1 = VppCNatTranslation(
181             self, vip.l4p, vip,
182             [EpTuple(sep, dep), EpTuple(sep, dep)])
183         t1.add_vpp_config()
184         return t1
185
186     def cnat_test_translation(self, t1, nbr, sports, isV6=False):
187         ip_v = "ip6" if isV6 else "ip4"
188         ip_class = IPv6 if isV6 else IP
189         vip = t1.vip
190
191         #
192         # Flows
193         #
194         for src in self.pg0.remote_hosts:
195             for sport in sports:
196                 # from client to vip
197                 p1 = (Ether(dst=self.pg0.local_mac,
198                             src=src.mac) /
199                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
200                       vip.l4p(sport=sport, dport=vip.port) /
201                       Raw())
202
203                 self.vapi.cli("trace add pg-input 1")
204                 rxs = self.send_and_expect(self.pg0,
205                                            p1 * N_PKTS,
206                                            self.pg1)
207                 self.logger.info(self.vapi.cli("show trace max 1"))
208
209                 for rx in rxs:
210                     self.assert_packet_checksums_valid(rx)
211                     self.assertEqual(
212                         rx[ip_class].dst,
213                         getattr(self.pg1.remote_hosts[nbr], ip_v))
214                     self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
215                     self.assertEqual(
216                         rx[ip_class].src,
217                         getattr(src, ip_v))
218                     self.assertEqual(rx[vip.l4p].sport, sport)
219
220                 # from vip to client
221                 p1 = (Ether(dst=self.pg1.local_mac,
222                             src=self.pg1.remote_mac) /
223                       ip_class(src=getattr(
224                           self.pg1.remote_hosts[nbr],
225                           ip_v),
226                           dst=getattr(src, ip_v)) /
227                       vip.l4p(sport=4000 + nbr, dport=sport) /
228                       Raw())
229
230                 rxs = self.send_and_expect(self.pg1,
231                                            p1 * N_PKTS,
232                                            self.pg0)
233
234                 for rx in rxs:
235                     self.assert_packet_checksums_valid(rx)
236                     self.assertEqual(
237                         rx[ip_class].dst,
238                         getattr(src, ip_v))
239                     self.assertEqual(rx[vip.l4p].dport, sport)
240                     self.assertEqual(rx[ip_class].src, vip.ip)
241                     self.assertEqual(rx[vip.l4p].sport, vip.port)
242
243                 #
244                 # packets to the VIP that do not match a
245                 # translation are dropped
246                 #
247                 p1 = (Ether(dst=self.pg0.local_mac,
248                             src=src.mac) /
249                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
250                       vip.l4p(sport=sport, dport=6666) /
251                       Raw())
252
253                 self.send_and_assert_no_replies(self.pg0,
254                                                 p1 * N_PKTS,
255                                                 self.pg1)
256
257                 #
258                 # packets from the VIP that do not match a
259                 # session are forwarded
260                 #
261                 p1 = (Ether(dst=self.pg1.local_mac,
262                             src=self.pg1.remote_mac) /
263                       ip_class(src=getattr(
264                           self.pg1.remote_hosts[nbr],
265                           ip_v),
266                           dst=getattr(src, ip_v)) /
267                       vip.l4p(sport=6666, dport=sport) /
268                       Raw())
269
270                 rxs = self.send_and_expect(self.pg1,
271                                            p1 * N_PKTS,
272                                            self.pg0)
273
274     def cnat_test_translation_update(self, t1, sports, isV6=False):
275         ip_v = "ip6" if isV6 else "ip4"
276         ip_class = IPv6 if isV6 else IP
277         vip = t1.vip
278
279         #
280         # modify the translation to use a different backend
281         #
282         dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000)
283         sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
284         t1.modify_vpp_config([EpTuple(sep, dep)])
285
286         #
287         # existing flows follow the old path
288         #
289         for src in self.pg0.remote_hosts:
290             for sport in sports:
291                 # from client to vip
292                 p1 = (Ether(dst=self.pg0.local_mac,
293                             src=src.mac) /
294                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
295                       vip.l4p(sport=sport, dport=vip.port) /
296                       Raw())
297
298                 rxs = self.send_and_expect(self.pg0,
299                                            p1 * N_PKTS,
300                                            self.pg1)
301
302         #
303         # new flows go to the new backend
304         #
305         for src in self.pg0.remote_hosts:
306             p1 = (Ether(dst=self.pg0.local_mac,
307                         src=src.mac) /
308                   ip_class(src=getattr(src, ip_v), dst=vip.ip) /
309                   vip.l4p(sport=9999, dport=vip.port) /
310                   Raw())
311
312             rxs = self.send_and_expect(self.pg0,
313                                        p1 * N_PKTS,
314                                        self.pg2)
315
316     def cnat_translation(self, vips, isV6=False):
317         """ CNat Translation """
318
319         ip_class = IPv6 if isV6 else IP
320         ip_v = "ip6" if isV6 else "ip4"
321         sports = [1234, 1233]
322
323         #
324         # turn the scanner off whilst testing otherwise sessions
325         # will time out
326         #
327         self.vapi.cli("test cnat scanner off")
328
329         sessions = self.vapi.cnat_session_dump()
330
331         trs = []
332         for nbr, vip in enumerate(vips):
333             trs.append(self.cnat_create_translation(vip, nbr))
334
335         self.logger.info(self.vapi.cli("sh cnat client"))
336         self.logger.info(self.vapi.cli("sh cnat translation"))
337
338         #
339         # translations
340         #
341         for nbr, vip in enumerate(vips):
342             self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6)
343             self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6)
344             if isV6:
345                 self.logger.info(self.vapi.cli(
346                     "sh ip6 fib %s" % self.pg0.remote_ip6))
347             else:
348                 self.logger.info(self.vapi.cli(
349                     "sh ip fib %s" % self.pg0.remote_ip4))
350             self.logger.info(self.vapi.cli("sh cnat session verbose"))
351
352         #
353         # turn the scanner back on and wait until the sessions
354         # all disapper
355         #
356         self.vapi.cli("test cnat scanner on")
357
358         n_tries = 0
359         sessions = self.vapi.cnat_session_dump()
360         while (len(sessions) and n_tries < 100):
361             n_tries += 1
362             sessions = self.vapi.cnat_session_dump()
363             self.sleep(2)
364             self.logger.info(self.vapi.cli("show cnat session verbose"))
365
366         self.assertTrue(n_tries < 100)
367         self.vapi.cli("test cnat scanner off")
368
369         #
370         # load some flows again and purge
371         #
372         for vip in vips:
373             for src in self.pg0.remote_hosts:
374                 for sport in sports:
375                     # from client to vip
376                     p1 = (Ether(dst=self.pg0.local_mac,
377                                 src=src.mac) /
378                           ip_class(src=getattr(src, ip_v), dst=vip.ip) /
379                           vip.l4p(sport=sport, dport=vip.port) /
380                           Raw())
381                     self.send_and_expect(self.pg0,
382                                          p1 * N_PKTS,
383                                          self.pg2)
384
385         for tr in trs:
386             tr.remove_vpp_config()
387
388         self.assertTrue(self.vapi.cnat_session_dump())
389         self.vapi.cnat_session_purge()
390         self.assertFalse(self.vapi.cnat_session_dump())
391
392     def test_icmp(self):
393         vips = [
394             Ep("30.0.0.1", 5555),
395             Ep("30.0.0.2", 5554),
396             Ep("30.0.0.2", 5553, UDP),
397             Ep("30::1", 6666),
398             Ep("30::2", 5553, UDP),
399         ]
400         sport = 1234
401
402         self.pg0.generate_remote_hosts(len(vips))
403         self.pg0.configure_ipv6_neighbors()
404         self.pg0.configure_ipv4_neighbors()
405
406         self.pg1.generate_remote_hosts(len(vips))
407         self.pg1.configure_ipv6_neighbors()
408         self.pg1.configure_ipv4_neighbors()
409
410         self.vapi.cli("test cnat scanner off")
411         trs = []
412         for nbr, vip in enumerate(vips):
413             trs.append(self.cnat_create_translation(vip, nbr))
414
415         self.logger.info(self.vapi.cli("sh cnat client"))
416         self.logger.info(self.vapi.cli("sh cnat translation"))
417
418         for nbr, vip in enumerate(vips):
419             if vip.isV6:
420                 client_addr = self.pg0.remote_hosts[0].ip6
421                 remote_addr = self.pg1.remote_hosts[nbr].ip6
422                 remote2_addr = self.pg2.remote_hosts[0].ip6
423             else:
424                 client_addr = self.pg0.remote_hosts[0].ip4
425                 remote_addr = self.pg1.remote_hosts[nbr].ip4
426                 remote2_addr = self.pg2.remote_hosts[0].ip4
427             IP46 = IPv6 if vip.isV6 else IP
428             # from client to vip
429             p1 = (Ether(dst=self.pg0.local_mac,
430                         src=self.pg0.remote_hosts[0].mac) /
431                   IP46(src=client_addr, dst=vip.ip) /
432                   vip.l4p(sport=sport, dport=vip.port) /
433                   Raw())
434
435             rxs = self.send_and_expect(self.pg0,
436                                        p1 * N_PKTS,
437                                        self.pg1)
438
439             for rx in rxs:
440                 self.assert_packet_checksums_valid(rx)
441                 self.assertEqual(rx[IP46].dst, remote_addr)
442                 self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
443                 self.assertEqual(rx[IP46].src, client_addr)
444                 self.assertEqual(rx[vip.l4p].sport, sport)
445
446             InnerIP = rxs[0][IP46]
447
448             ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP
449             ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11)
450             # from vip to client, ICMP error
451             p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
452                   IP46(src=remote_addr, dst=client_addr) /
453                   ICMPelem / InnerIP)
454
455             rxs = self.send_and_expect(self.pg1,
456                                        p1 * N_PKTS,
457                                        self.pg0)
458
459             TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
460             IP46error = IPerror6 if vip.isV6 else IPerror
461             for rx in rxs:
462                 self.assert_packet_checksums_valid(rx)
463                 self.assertEqual(rx[IP46].src, vip.ip)
464                 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
465                 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
466                 self.assertEqual(rx[ICMP46][IP46error]
467                                  [TCPUDPError].sport, sport)
468                 self.assertEqual(rx[ICMP46][IP46error]
469                                  [TCPUDPError].dport, vip.port)
470
471             # from other remote to client, ICMP error
472             # outside shouldn't be NAT-ed
473             p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
474                   IP46(src=remote2_addr, dst=client_addr) /
475                   ICMPelem / InnerIP)
476
477             rxs = self.send_and_expect(self.pg1,
478                                        p1 * N_PKTS,
479                                        self.pg0)
480
481             TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
482             IP46error = IPerror6 if vip.isV6 else IPerror
483             for rx in rxs:
484                 self.assert_packet_checksums_valid(rx)
485                 self.assertEqual(rx[IP46].src, remote2_addr)
486                 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
487                 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
488                 self.assertEqual(rx[ICMP46][IP46error]
489                                  [TCPUDPError].sport, sport)
490                 self.assertEqual(rx[ICMP46][IP46error]
491                                  [TCPUDPError].dport, vip.port)
492
493         self.vapi.cnat_session_purge()
494
495     def test_cnat6(self):
496         # """ CNat Translation ipv6 """
497         vips = [
498             Ep("30::1", 5555),
499             Ep("30::2", 5554),
500             Ep("30::2", 5553, UDP),
501         ]
502
503         self.pg0.generate_remote_hosts(len(vips))
504         self.pg0.configure_ipv6_neighbors()
505         self.pg1.generate_remote_hosts(len(vips))
506         self.pg1.configure_ipv6_neighbors()
507
508         self.cnat_translation(vips, isV6=True)
509
510     def test_cnat4(self):
511         # """ CNat Translation ipv4 """
512
513         vips = [
514             Ep("30.0.0.1", 5555),
515             Ep("30.0.0.2", 5554),
516             Ep("30.0.0.2", 5553, UDP),
517         ]
518
519         self.pg0.generate_remote_hosts(len(vips))
520         self.pg0.configure_ipv4_neighbors()
521         self.pg1.generate_remote_hosts(len(vips))
522         self.pg1.configure_ipv4_neighbors()
523
524         self.cnat_translation(vips)
525
526
527 class TestCNatSourceNAT(VppTestCase):
528     """ CNat Source NAT """
529     extra_vpp_punt_config = ["cnat", "{",
530                              "session-cleanup-timeout", "0.1",
531                              "session-max-age", "1",
532                              "tcp-max-age", "1",
533                              "scanner", "off", "}"]
534
535     @classmethod
536     def setUpClass(cls):
537         super(TestCNatSourceNAT, cls).setUpClass()
538
539     @classmethod
540     def tearDownClass(cls):
541         super(TestCNatSourceNAT, cls).tearDownClass()
542
543     def setUp(self):
544         super(TestCNatSourceNAT, self).setUp()
545
546         self.create_pg_interfaces(range(3))
547
548         for i in self.pg_interfaces:
549             i.admin_up()
550             i.config_ip4()
551             i.resolve_arp()
552             i.config_ip6()
553             i.resolve_ndp()
554
555         self.pg0.configure_ipv6_neighbors()
556         self.pg0.configure_ipv4_neighbors()
557         self.pg1.generate_remote_hosts(2)
558         self.pg1.configure_ipv4_neighbors()
559         self.pg1.configure_ipv6_neighbors()
560
561         self.vapi.cnat_set_snat_addresses(
562             snat_ip4=self.pg2.remote_hosts[0].ip4,
563             snat_ip6=self.pg2.remote_hosts[0].ip6,
564             sw_if_index=INVALID_INDEX)
565         self.vapi.feature_enable_disable(
566             enable=1,
567             arc_name="ip6-unicast",
568             feature_name="cnat-snat-ip6",
569             sw_if_index=self.pg0.sw_if_index)
570         self.vapi.feature_enable_disable(
571             enable=1,
572             arc_name="ip4-unicast",
573             feature_name="cnat-snat-ip4",
574             sw_if_index=self.pg0.sw_if_index)
575
576     def tearDown(self):
577         self.vapi.cnat_session_purge()
578         for i in self.pg_interfaces:
579             i.unconfig_ip4()
580             i.unconfig_ip6()
581             i.admin_down()
582         super(TestCNatSourceNAT, self).tearDown()
583
584     def test_snat_v6(self):
585         # """ CNat Source Nat v6 """
586         self.sourcenat_test_tcp_udp_conf(TCP, isV6=True)
587         self.sourcenat_test_tcp_udp_conf(UDP, isV6=True)
588         self.sourcenat_test_icmp_err_conf(isV6=True)
589         self.sourcenat_test_icmp_echo6_conf()
590
591     def test_snat_v4(self):
592         # """ CNat Source Nat v4 """
593         self.sourcenat_test_tcp_udp_conf(TCP)
594         self.sourcenat_test_tcp_udp_conf(UDP)
595         self.sourcenat_test_icmp_err_conf()
596         self.sourcenat_test_icmp_echo4_conf()
597
598     def sourcenat_test_icmp_echo6_conf(self):
599         sports = [1234, 1235]
600         dports = [6661, 6662]
601
602         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
603             client_addr = self.pg0.remote_hosts[0].ip6
604             remote_addr = self.pg1.remote_hosts[nbr].ip6
605             src_nat_addr = self.pg2.remote_hosts[0].ip6
606
607             # ping from pods to outside network
608             p1 = (
609                 Ether(dst=self.pg0.local_mac,
610                       src=self.pg0.remote_hosts[0].mac) /
611                 IPv6(src=client_addr, dst=remote_addr) /
612                 ICMPv6EchoRequest(id=0xfeed) /
613                 Raw())
614
615             rxs = self.send_and_expect(
616                 self.pg0,
617                 p1 * N_PKTS,
618                 self.pg1)
619
620             for rx in rxs:
621                 self.assertEqual(rx[IPv6].src, src_nat_addr)
622                 self.assert_packet_checksums_valid(rx)
623
624             received_id = rx[0][ICMPv6EchoRequest].id
625             # ping reply from outside to pods
626             p2 = (
627                 Ether(dst=self.pg1.local_mac,
628                       src=self.pg1.remote_hosts[nbr].mac) /
629                 IPv6(src=remote_addr, dst=src_nat_addr) /
630                 ICMPv6EchoReply(id=received_id))
631             rxs = self.send_and_expect(
632                 self.pg1,
633                 p2 * N_PKTS,
634                 self.pg0)
635
636             for rx in rxs:
637                 self.assert_packet_checksums_valid(rx)
638                 self.assertEqual(rx[IPv6].src, remote_addr)
639                 self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed)
640
641     def sourcenat_test_icmp_echo4_conf(self):
642         sports = [1234, 1235]
643         dports = [6661, 6662]
644
645         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
646             IP46 = IP
647             client_addr = self.pg0.remote_hosts[0].ip4
648             remote_addr = self.pg1.remote_hosts[nbr].ip4
649             src_nat_addr = self.pg2.remote_hosts[0].ip4
650
651             # ping from pods to outside network
652             p1 = (
653                 Ether(dst=self.pg0.local_mac,
654                       src=self.pg0.remote_hosts[0].mac) /
655                 IP46(src=client_addr, dst=remote_addr) /
656                 ICMP(type=8, id=0xfeed) /
657                 Raw())
658
659             rxs = self.send_and_expect(
660                 self.pg0,
661                 p1 * N_PKTS,
662                 self.pg1)
663
664             for rx in rxs:
665                 self.assertEqual(rx[IP46].src, src_nat_addr)
666                 self.assert_packet_checksums_valid(rx)
667
668             received_id = rx[0][ICMP].id
669             # ping reply from outside to pods
670             p2 = (
671                 Ether(dst=self.pg1.local_mac,
672                       src=self.pg1.remote_hosts[nbr].mac) /
673                 IP46(src=remote_addr, dst=src_nat_addr) /
674                 ICMP(type=0, id=received_id))
675             rxs = self.send_and_expect(
676                 self.pg1,
677                 p2 * N_PKTS,
678                 self.pg0)
679
680             for rx in rxs:
681                 self.assert_packet_checksums_valid(rx)
682                 self.assertEqual(rx[IP46].src, remote_addr)
683                 self.assertEqual(rx[ICMP].id, 0xfeed)
684
685     def sourcenat_test_icmp_err_conf(self, isV6=False):
686         sports = [1234, 1235]
687         dports = [6661, 6662]
688
689         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
690             if isV6:
691                 IP46 = IPv6
692                 client_addr = self.pg0.remote_hosts[0].ip6
693                 remote_addr = self.pg1.remote_hosts[nbr].ip6
694                 src_nat_addr = self.pg2.remote_hosts[0].ip6
695                 ICMP46 = ICMPv6DestUnreach
696                 ICMPelem = ICMPv6DestUnreach(code=1)
697                 IP46error = IPerror6
698             else:
699                 IP46 = IP
700                 client_addr = self.pg0.remote_hosts[0].ip4
701                 remote_addr = self.pg1.remote_hosts[nbr].ip4
702                 src_nat_addr = self.pg2.remote_hosts[0].ip4
703                 IP46error = IPerror
704                 ICMP46 = ICMP
705                 ICMPelem = ICMP(type=11)
706
707             # from pods to outside network
708             p1 = (
709                 Ether(dst=self.pg0.local_mac,
710                       src=self.pg0.remote_hosts[0].mac) /
711                 IP46(src=client_addr, dst=remote_addr) /
712                 TCP(sport=sports[nbr], dport=dports[nbr]) /
713                 Raw())
714
715             rxs = self.send_and_expect(
716                 self.pg0,
717                 p1 * N_PKTS,
718                 self.pg1)
719             for rx in rxs:
720                 self.assert_packet_checksums_valid(rx)
721                 self.assertEqual(rx[IP46].dst, remote_addr)
722                 self.assertEqual(rx[TCP].dport, dports[nbr])
723                 self.assertEqual(rx[IP46].src, src_nat_addr)
724                 sport = rx[TCP].sport
725
726             InnerIP = rxs[0][IP46]
727             # from outside to pods, ICMP error
728             p2 = (
729                 Ether(dst=self.pg1.local_mac,
730                       src=self.pg1.remote_hosts[nbr].mac) /
731                 IP46(src=remote_addr, dst=src_nat_addr) /
732                 ICMPelem / InnerIP)
733
734             rxs = self.send_and_expect(
735                 self.pg1,
736                 p2 * N_PKTS,
737                 self.pg0)
738
739             for rx in rxs:
740                 self.assert_packet_checksums_valid(rx)
741                 self.assertEqual(rx[IP46].src, remote_addr)
742                 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
743                 self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr)
744                 self.assertEqual(rx[ICMP46][IP46error]
745                                  [TCPerror].sport, sports[nbr])
746                 self.assertEqual(rx[ICMP46][IP46error]
747                                  [TCPerror].dport, dports[nbr])
748
749     def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False):
750         sports = [1234, 1235]
751         dports = [6661, 6662]
752
753         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
754             if isV6:
755                 IP46 = IPv6
756                 client_addr = self.pg0.remote_hosts[0].ip6
757                 remote_addr = self.pg1.remote_hosts[nbr].ip6
758                 src_nat_addr = self.pg2.remote_hosts[0].ip6
759                 exclude_prefix = ip_network(
760                     "%s/100" % remote_addr, strict=False)
761             else:
762                 IP46 = IP
763                 client_addr = self.pg0.remote_hosts[0].ip4
764                 remote_addr = self.pg1.remote_hosts[nbr].ip4
765                 src_nat_addr = self.pg2.remote_hosts[0].ip4
766                 exclude_prefix = ip_network(
767                     "%s/16" % remote_addr, strict=False)
768             # from pods to outside network
769             p1 = (
770                 Ether(dst=self.pg0.local_mac,
771                       src=self.pg0.remote_hosts[0].mac) /
772                 IP46(src=client_addr, dst=remote_addr) /
773                 l4p(sport=sports[nbr], dport=dports[nbr]) /
774                 Raw())
775
776             self.vapi.cli("trace add pg-input 1")
777             rxs = self.send_and_expect(
778                 self.pg0,
779                 p1 * N_PKTS,
780                 self.pg1)
781             self.logger.info(self.vapi.cli("show trace max 1"))
782
783             for rx in rxs:
784                 self.assert_packet_checksums_valid(rx)
785                 self.assertEqual(rx[IP46].dst, remote_addr)
786                 self.assertEqual(rx[l4p].dport, dports[nbr])
787                 self.assertEqual(rx[IP46].src, src_nat_addr)
788                 sport = rx[l4p].sport
789
790             # from outside to pods
791             p2 = (
792                 Ether(dst=self.pg1.local_mac,
793                       src=self.pg1.remote_hosts[nbr].mac) /
794                 IP46(src=remote_addr, dst=src_nat_addr) /
795                 l4p(sport=dports[nbr], dport=sport) /
796                 Raw())
797
798             rxs = self.send_and_expect(
799                 self.pg1,
800                 p2 * N_PKTS,
801                 self.pg0)
802
803             for rx in rxs:
804                 self.assert_packet_checksums_valid(rx)
805                 self.assertEqual(rx[IP46].dst, client_addr)
806                 self.assertEqual(rx[l4p].dport, sports[nbr])
807                 self.assertEqual(rx[l4p].sport, dports[nbr])
808                 self.assertEqual(rx[IP46].src, remote_addr)
809
810             # add remote host to exclude list
811             self.vapi.cnat_add_del_snat_prefix(prefix=exclude_prefix, is_add=1)
812             self.vapi.cnat_session_purge()
813
814             rxs = self.send_and_expect(
815                 self.pg0,
816                 p1 * N_PKTS,
817                 self.pg1)
818             for rx in rxs:
819                 self.assert_packet_checksums_valid(rx)
820                 self.assertEqual(rx[IP46].dst, remote_addr)
821                 self.assertEqual(rx[l4p].dport, dports[nbr])
822                 self.assertEqual(rx[IP46].src, client_addr)
823
824             # remove remote host from exclude list
825             self.vapi.cnat_add_del_snat_prefix(prefix=exclude_prefix, is_add=0)
826             self.vapi.cnat_session_purge()
827
828             rxs = self.send_and_expect(
829                 self.pg0,
830                 p1 * N_PKTS,
831                 self.pg1)
832
833             for rx in rxs:
834                 self.assert_packet_checksums_valid(rx)
835                 self.assertEqual(rx[IP46].dst, remote_addr)
836                 self.assertEqual(rx[l4p].dport, dports[nbr])
837                 self.assertEqual(rx[IP46].src, src_nat_addr)
838
839             self.vapi.cnat_session_purge()
840
841
842 class TestCNatDHCP(VppTestCase):
843     """ CNat Translation """
844     extra_vpp_punt_config = ["cnat", "{",
845                              "session-db-buckets", "64",
846                              "session-cleanup-timeout", "0.1",
847                              "session-max-age", "1",
848                              "tcp-max-age", "1",
849                              "scanner", "off", "}"]
850
851     @classmethod
852     def setUpClass(cls):
853         super(TestCNatDHCP, cls).setUpClass()
854
855     @classmethod
856     def tearDownClass(cls):
857         super(TestCNatDHCP, cls).tearDownClass()
858
859     def tearDown(self):
860         for i in self.pg_interfaces:
861             i.admin_down()
862         super(TestCNatDHCP, self).tearDown()
863
864     def create_translation(self, vip_pg, *args, is_v6=False):
865         vip = Ep(sw_if_index=vip_pg.sw_if_index, is_v6=is_v6)
866         paths = []
867         for (src_pg, dst_pg) in args:
868             paths.append(EpTuple(
869                 Ep.from_pg(src_pg, is_v6=is_v6),
870                 Ep.from_pg(dst_pg, is_v6=is_v6)
871             ))
872         t1 = VppCNatTranslation(self, TCP, vip, paths)
873         t1.add_vpp_config()
874         return t1
875
876     def make_addr(self, sw_if_index, i, is_v6):
877         if is_v6:
878             return "fd01:%x::%u" % (sw_if_index, i + 1)
879         else:
880             return "172.16.%u.%u" % (sw_if_index, i)
881
882     def make_prefix(self, sw_if_index, i, is_v6):
883         if is_v6:
884             return "%s/128" % self.make_addr(sw_if_index, i, is_v6)
885         else:
886             return "%s/32" % self.make_addr(sw_if_index, i, is_v6)
887
888     def check_resolved(self, tr, vip_pg, *args, i=0, is_v6=False):
889         qt1 = tr.query_vpp_config()
890         self.assertEqual(str(qt1.vip.addr), self.make_addr(
891             vip_pg.sw_if_index, i, is_v6))
892         for (src_pg, dst_pg), path in zip(args, qt1.paths):
893             if src_pg:
894                 self.assertEqual(str(path.src_ep.addr), self.make_addr(
895                     src_pg.sw_if_index, i, is_v6))
896             if dst_pg:
897                 self.assertEqual(str(path.dst_ep.addr), self.make_addr(
898                     dst_pg.sw_if_index, i, is_v6))
899
900     def config_ips(self, rng, is_add=1, is_v6=False):
901         for pg, i in product(self.pg_interfaces, rng):
902             self.vapi.sw_interface_add_del_address(
903                 sw_if_index=pg.sw_if_index,
904                 prefix=self.make_prefix(pg.sw_if_index, i, is_v6),
905                 is_add=is_add)
906
907     def test_dhcp_v4(self):
908         self.create_pg_interfaces(range(5))
909         for i in self.pg_interfaces:
910             i.admin_up()
911         pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
912         t1 = self.create_translation(*pglist)
913         self.config_ips([0])
914         self.check_resolved(t1, *pglist)
915         self.config_ips([1])
916         self.config_ips([0], is_add=0)
917         self.check_resolved(t1, *pglist, i=1)
918         self.config_ips([1], is_add=0)
919         t1.remove_vpp_config()
920
921     def test_dhcp_v6(self):
922         self.create_pg_interfaces(range(5))
923         for i in self.pg_interfaces:
924             i.admin_up()
925         pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
926         t1 = self.create_translation(*pglist, is_v6=True)
927         self.config_ips([0], is_v6=True)
928         self.check_resolved(t1, *pglist, is_v6=True)
929         self.config_ips([1], is_v6=True)
930         self.config_ips([0], is_add=0, is_v6=True)
931         self.check_resolved(t1, *pglist, i=1, is_v6=True)
932         self.config_ips([1], is_add=0, is_v6=True)
933         t1.remove_vpp_config()
934
935     def test_dhcp_snat(self):
936         self.create_pg_interfaces(range(1))
937         for i in self.pg_interfaces:
938             i.admin_up()
939         self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
940         self.config_ips([0], is_v6=False)
941         self.config_ips([0], is_v6=True)
942         r = self.vapi.cnat_get_snat_addresses()
943         self.assertEqual(str(r.snat_ip4), self.make_addr(
944             self.pg0.sw_if_index, 0, False))
945         self.assertEqual(str(r.snat_ip6), self.make_addr(
946             self.pg0.sw_if_index, 0, True))
947         self.config_ips([1], is_v6=False)
948         self.config_ips([1], is_v6=True)
949         self.config_ips([0], is_add=0, is_v6=False)
950         self.config_ips([0], is_add=0, is_v6=True)
951         r = self.vapi.cnat_get_snat_addresses()
952         self.assertEqual(str(r.snat_ip4), self.make_addr(
953             self.pg0.sw_if_index, 1, False))
954         self.assertEqual(str(r.snat_ip6), self.make_addr(
955             self.pg0.sw_if_index, 1, True))
956         self.config_ips([1], is_add=0, is_v6=False)
957         self.config_ips([1], is_add=0, is_v6=True)
958         self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
959
960
961 if __name__ == '__main__':
962     unittest.main(testRunner=VppTestRunner)