cnat: Add DHCP support
[vpp.git] / src / plugins / cnat / test / test_cnat.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto, INVALID_INDEX
7 from itertools import product
8
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
14 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
15
16 import struct
17
18 from ipaddress import ip_address, ip_network, \
19     IPv4Address, IPv6Address, IPv4Network, IPv6Network
20
21 from vpp_object import VppObject
22 from vpp_papi import VppEnum
23
24 N_PKTS = 15
25
26
27 class Ep(object):
28     """ CNat endpoint """
29
30     def __init__(self, ip=None, port=0, l4p=TCP,
31                  sw_if_index=INVALID_INDEX, is_v6=False):
32         self.ip = ip
33         if ip is None:
34             self.ip = "::" if is_v6 else "0.0.0.0"
35         self.port = port
36         self.l4p = l4p
37         self.sw_if_index = sw_if_index
38         if is_v6:
39             self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP6
40         else:
41             self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP4
42
43     def encode(self):
44         return {'addr': self.ip,
45                 'port': self.port,
46                 'sw_if_index': self.sw_if_index,
47                 'if_af': self.if_af}
48
49     @classmethod
50     def from_pg(cls, pg, is_v6=False):
51         if pg is None:
52             return cls(is_v6=is_v6)
53         else:
54             return cls(sw_if_index=pg.sw_if_index, is_v6=is_v6)
55
56     @property
57     def isV6(self):
58         return ":" in self.ip
59
60     def __str__(self):
61         return ("%s:%d" % (self.ip, self.port))
62
63
64 class EpTuple(object):
65     """ CNat endpoint """
66
67     def __init__(self, src, dst):
68         self.src = src
69         self.dst = dst
70
71     def encode(self):
72         return {'src_ep': self.src.encode(),
73                 'dst_ep': self.dst.encode()}
74
75     def __str__(self):
76         return ("%s->%s" % (self.src, self.dst))
77
78
79 class VppCNatTranslation(VppObject):
80
81     def __init__(self, test, iproto, vip, paths):
82         self._test = test
83         self.vip = vip
84         self.iproto = iproto
85         self.paths = paths
86         self.encoded_paths = []
87         for path in self.paths:
88             self.encoded_paths.append(path.encode())
89
90     def __str__(self):
91         return ("%s %s %s" % (self.vip, self.iproto, self.paths))
92
93     @property
94     def vl4_proto(self):
95         ip_proto = VppEnum.vl_api_ip_proto_t
96         return {
97             UDP: ip_proto.IP_API_PROTO_UDP,
98             TCP: ip_proto.IP_API_PROTO_TCP,
99         }[self.iproto]
100
101     def add_vpp_config(self):
102         r = self._test.vapi.cnat_translation_update(
103             {'vip': self.vip.encode(),
104              'ip_proto': self.vl4_proto,
105              'n_paths': len(self.paths),
106              'paths': self.encoded_paths})
107         self._test.registry.register(self, self._test.logger)
108         self.id = r.id
109
110     def modify_vpp_config(self, paths):
111         self.paths = paths
112         self.encoded_paths = []
113         for path in self.paths:
114             self.encoded_paths.append(path.encode())
115
116         r = self._test.vapi.cnat_translation_update(
117             {'vip': self.vip.encode(),
118              'ip_proto': self.vl4_proto,
119              'n_paths': len(self.paths),
120              'paths': self.encoded_paths})
121         self._test.registry.register(self, self._test.logger)
122
123     def remove_vpp_config(self):
124         self._test.vapi.cnat_translation_del(id=self.id)
125
126     def query_vpp_config(self):
127         for t in self._test.vapi.cnat_translation_dump():
128             if self.id == t.translation.id:
129                 return t.translation
130         return None
131
132     def object_id(self):
133         return ("cnat-translation-%s" % (self.vip))
134
135     def get_stats(self):
136         c = self._test.statistics.get_counter("/net/cnat-translation")
137         return c[0][self.id]
138
139
140 class TestCNatTranslation(VppTestCase):
141     """ CNat Translation """
142     extra_vpp_punt_config = ["cnat", "{",
143                              "session-db-buckets", "64",
144                              "session-cleanup-timeout", "0.1",
145                              "session-max-age", "1",
146                              "tcp-max-age", "1",
147                              "scanner", "off", "}"]
148
149     @classmethod
150     def setUpClass(cls):
151         super(TestCNatTranslation, cls).setUpClass()
152
153     @classmethod
154     def tearDownClass(cls):
155         super(TestCNatTranslation, cls).tearDownClass()
156
157     def setUp(self):
158         super(TestCNatTranslation, self).setUp()
159
160         self.create_pg_interfaces(range(3))
161
162         for i in self.pg_interfaces:
163             i.admin_up()
164             i.config_ip4()
165             i.resolve_arp()
166             i.config_ip6()
167             i.resolve_ndp()
168
169     def tearDown(self):
170         for i in self.pg_interfaces:
171             i.unconfig_ip4()
172             i.unconfig_ip6()
173             i.admin_down()
174         super(TestCNatTranslation, self).tearDown()
175
176     def cnat_create_translation(self, vip, nbr):
177         ip_v = "ip6" if vip.isV6 else "ip4"
178         dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr)
179         sep = Ep("::", 0) if vip.isV6 else Ep("0.0.0.0", 0)
180         t1 = VppCNatTranslation(
181             self, vip.l4p, vip,
182             [EpTuple(sep, dep), EpTuple(sep, dep)])
183         t1.add_vpp_config()
184         return t1
185
186     def cnat_test_translation(self, t1, nbr, sports, isV6=False):
187         ip_v = "ip6" if isV6 else "ip4"
188         ip_class = IPv6 if isV6 else IP
189         vip = t1.vip
190
191         #
192         # Flows
193         #
194         for src in self.pg0.remote_hosts:
195             for sport in sports:
196                 # from client to vip
197                 p1 = (Ether(dst=self.pg0.local_mac,
198                             src=src.mac) /
199                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
200                       vip.l4p(sport=sport, dport=vip.port) /
201                       Raw())
202
203                 self.vapi.cli("trace add pg-input 1")
204                 rxs = self.send_and_expect(self.pg0,
205                                            p1 * N_PKTS,
206                                            self.pg1)
207                 self.logger.info(self.vapi.cli("show trace max 1"))
208
209                 for rx in rxs:
210                     self.assert_packet_checksums_valid(rx)
211                     self.assertEqual(
212                         rx[ip_class].dst,
213                         getattr(self.pg1.remote_hosts[nbr], ip_v))
214                     self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
215                     self.assertEqual(
216                         rx[ip_class].src,
217                         getattr(src, ip_v))
218                     self.assertEqual(rx[vip.l4p].sport, sport)
219
220                 # from vip to client
221                 p1 = (Ether(dst=self.pg1.local_mac,
222                             src=self.pg1.remote_mac) /
223                       ip_class(src=getattr(
224                           self.pg1.remote_hosts[nbr],
225                           ip_v),
226                           dst=getattr(src, ip_v)) /
227                       vip.l4p(sport=4000 + nbr, dport=sport) /
228                       Raw())
229
230                 rxs = self.send_and_expect(self.pg1,
231                                            p1 * N_PKTS,
232                                            self.pg0)
233
234                 for rx in rxs:
235                     self.assert_packet_checksums_valid(rx)
236                     self.assertEqual(
237                         rx[ip_class].dst,
238                         getattr(src, ip_v))
239                     self.assertEqual(rx[vip.l4p].dport, sport)
240                     self.assertEqual(rx[ip_class].src, vip.ip)
241                     self.assertEqual(rx[vip.l4p].sport, vip.port)
242
243                 #
244                 # packets to the VIP that do not match a
245                 # translation are dropped
246                 #
247                 p1 = (Ether(dst=self.pg0.local_mac,
248                             src=src.mac) /
249                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
250                       vip.l4p(sport=sport, dport=6666) /
251                       Raw())
252
253                 self.send_and_assert_no_replies(self.pg0,
254                                                 p1 * N_PKTS,
255                                                 self.pg1)
256
257                 #
258                 # packets from the VIP that do not match a
259                 # session are forwarded
260                 #
261                 p1 = (Ether(dst=self.pg1.local_mac,
262                             src=self.pg1.remote_mac) /
263                       ip_class(src=getattr(
264                           self.pg1.remote_hosts[nbr],
265                           ip_v),
266                           dst=getattr(src, ip_v)) /
267                       vip.l4p(sport=6666, dport=sport) /
268                       Raw())
269
270                 rxs = self.send_and_expect(self.pg1,
271                                            p1 * N_PKTS,
272                                            self.pg0)
273
274         self.assertEqual(t1.get_stats()['packets'],
275                          N_PKTS *
276                          len(sports) *
277                          len(self.pg0.remote_hosts))
278
279     def cnat_test_translation_update(self, t1, sports, isV6=False):
280         ip_v = "ip6" if isV6 else "ip4"
281         ip_class = IPv6 if isV6 else IP
282         vip = t1.vip
283
284         #
285         # modify the translation to use a different backend
286         #
287         dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000)
288         sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
289         t1.modify_vpp_config([EpTuple(sep, dep)])
290
291         #
292         # existing flows follow the old path
293         #
294         for src in self.pg0.remote_hosts:
295             for sport in sports:
296                 # from client to vip
297                 p1 = (Ether(dst=self.pg0.local_mac,
298                             src=src.mac) /
299                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
300                       vip.l4p(sport=sport, dport=vip.port) /
301                       Raw())
302
303                 rxs = self.send_and_expect(self.pg0,
304                                            p1 * N_PKTS,
305                                            self.pg1)
306
307         #
308         # new flows go to the new backend
309         #
310         for src in self.pg0.remote_hosts:
311             p1 = (Ether(dst=self.pg0.local_mac,
312                         src=src.mac) /
313                   ip_class(src=getattr(src, ip_v), dst=vip.ip) /
314                   vip.l4p(sport=9999, dport=vip.port) /
315                   Raw())
316
317             rxs = self.send_and_expect(self.pg0,
318                                        p1 * N_PKTS,
319                                        self.pg2)
320
321     def cnat_translation(self, vips, isV6=False):
322         """ CNat Translation """
323
324         ip_class = IPv6 if isV6 else IP
325         ip_v = "ip6" if isV6 else "ip4"
326         sports = [1234, 1233]
327
328         #
329         # turn the scanner off whilst testing otherwise sessions
330         # will time out
331         #
332         self.vapi.cli("test cnat scanner off")
333
334         sessions = self.vapi.cnat_session_dump()
335
336         trs = []
337         for nbr, vip in enumerate(vips):
338             trs.append(self.cnat_create_translation(vip, nbr))
339
340         self.logger.info(self.vapi.cli("sh cnat client"))
341         self.logger.info(self.vapi.cli("sh cnat translation"))
342
343         #
344         # translations
345         #
346         for nbr, vip in enumerate(vips):
347             self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6)
348             self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6)
349             if isV6:
350                 self.logger.info(self.vapi.cli(
351                     "sh ip6 fib %s" % self.pg0.remote_ip6))
352             else:
353                 self.logger.info(self.vapi.cli(
354                     "sh ip fib %s" % self.pg0.remote_ip4))
355             self.logger.info(self.vapi.cli("sh cnat session verbose"))
356
357         #
358         # turn the scanner back on and wait until the sessions
359         # all disapper
360         #
361         self.vapi.cli("test cnat scanner on")
362
363         n_tries = 0
364         sessions = self.vapi.cnat_session_dump()
365         while (len(sessions) and n_tries < 100):
366             n_tries += 1
367             sessions = self.vapi.cnat_session_dump()
368             self.sleep(2)
369             self.logger.info(self.vapi.cli("show cnat session verbose"))
370
371         self.assertTrue(n_tries < 100)
372         self.vapi.cli("test cnat scanner off")
373
374         #
375         # load some flows again and purge
376         #
377         for vip in vips:
378             for src in self.pg0.remote_hosts:
379                 for sport in sports:
380                     # from client to vip
381                     p1 = (Ether(dst=self.pg0.local_mac,
382                                 src=src.mac) /
383                           ip_class(src=getattr(src, ip_v), dst=vip.ip) /
384                           vip.l4p(sport=sport, dport=vip.port) /
385                           Raw())
386                     self.send_and_expect(self.pg0,
387                                          p1 * N_PKTS,
388                                          self.pg2)
389
390         for tr in trs:
391             tr.remove_vpp_config()
392
393         self.assertTrue(self.vapi.cnat_session_dump())
394         self.vapi.cnat_session_purge()
395         self.assertFalse(self.vapi.cnat_session_dump())
396
397     def test_icmp(self):
398         vips = [
399             Ep("30.0.0.1", 5555),
400             Ep("30.0.0.2", 5554),
401             Ep("30.0.0.2", 5553, UDP),
402             Ep("30::1", 6666),
403             Ep("30::2", 5553, UDP),
404         ]
405         sport = 1234
406
407         self.pg0.generate_remote_hosts(len(vips))
408         self.pg0.configure_ipv6_neighbors()
409         self.pg0.configure_ipv4_neighbors()
410
411         self.pg1.generate_remote_hosts(len(vips))
412         self.pg1.configure_ipv6_neighbors()
413         self.pg1.configure_ipv4_neighbors()
414
415         self.vapi.cli("test cnat scanner off")
416         trs = []
417         for nbr, vip in enumerate(vips):
418             trs.append(self.cnat_create_translation(vip, nbr))
419
420         self.logger.info(self.vapi.cli("sh cnat client"))
421         self.logger.info(self.vapi.cli("sh cnat translation"))
422
423         for nbr, vip in enumerate(vips):
424             if vip.isV6:
425                 client_addr = self.pg0.remote_hosts[0].ip6
426                 remote_addr = self.pg1.remote_hosts[nbr].ip6
427                 remote2_addr = self.pg2.remote_hosts[0].ip6
428             else:
429                 client_addr = self.pg0.remote_hosts[0].ip4
430                 remote_addr = self.pg1.remote_hosts[nbr].ip4
431                 remote2_addr = self.pg2.remote_hosts[0].ip4
432             IP46 = IPv6 if vip.isV6 else IP
433             # from client to vip
434             p1 = (Ether(dst=self.pg0.local_mac,
435                         src=self.pg0.remote_hosts[0].mac) /
436                   IP46(src=client_addr, dst=vip.ip) /
437                   vip.l4p(sport=sport, dport=vip.port) /
438                   Raw())
439
440             rxs = self.send_and_expect(self.pg0,
441                                        p1 * N_PKTS,
442                                        self.pg1)
443
444             for rx in rxs:
445                 self.assert_packet_checksums_valid(rx)
446                 self.assertEqual(rx[IP46].dst, remote_addr)
447                 self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
448                 self.assertEqual(rx[IP46].src, client_addr)
449                 self.assertEqual(rx[vip.l4p].sport, sport)
450
451             InnerIP = rxs[0][IP46]
452
453             ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP
454             ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11)
455             # from vip to client, ICMP error
456             p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
457                   IP46(src=remote_addr, dst=client_addr) /
458                   ICMPelem / InnerIP)
459
460             rxs = self.send_and_expect(self.pg1,
461                                        p1 * N_PKTS,
462                                        self.pg0)
463
464             TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
465             IP46error = IPerror6 if vip.isV6 else IPerror
466             for rx in rxs:
467                 self.assert_packet_checksums_valid(rx)
468                 self.assertEqual(rx[IP46].src, vip.ip)
469                 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
470                 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
471                 self.assertEqual(rx[ICMP46][IP46error]
472                                  [TCPUDPError].sport, sport)
473                 self.assertEqual(rx[ICMP46][IP46error]
474                                  [TCPUDPError].dport, vip.port)
475
476             # from other remote to client, ICMP error
477             # outside shouldn't be NAT-ed
478             p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
479                   IP46(src=remote2_addr, dst=client_addr) /
480                   ICMPelem / InnerIP)
481
482             rxs = self.send_and_expect(self.pg1,
483                                        p1 * N_PKTS,
484                                        self.pg0)
485
486             TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
487             IP46error = IPerror6 if vip.isV6 else IPerror
488             for rx in rxs:
489                 self.assert_packet_checksums_valid(rx)
490                 self.assertEqual(rx[IP46].src, remote2_addr)
491                 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
492                 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
493                 self.assertEqual(rx[ICMP46][IP46error]
494                                  [TCPUDPError].sport, sport)
495                 self.assertEqual(rx[ICMP46][IP46error]
496                                  [TCPUDPError].dport, vip.port)
497
498         self.vapi.cnat_session_purge()
499
500     def test_cnat6(self):
501         # """ CNat Translation ipv6 """
502         vips = [
503             Ep("30::1", 5555),
504             Ep("30::2", 5554),
505             Ep("30::2", 5553, UDP),
506         ]
507
508         self.pg0.generate_remote_hosts(len(vips))
509         self.pg0.configure_ipv6_neighbors()
510         self.pg1.generate_remote_hosts(len(vips))
511         self.pg1.configure_ipv6_neighbors()
512
513         self.cnat_translation(vips, isV6=True)
514
515     def test_cnat4(self):
516         # """ CNat Translation ipv4 """
517
518         vips = [
519             Ep("30.0.0.1", 5555),
520             Ep("30.0.0.2", 5554),
521             Ep("30.0.0.2", 5553, UDP),
522         ]
523
524         self.pg0.generate_remote_hosts(len(vips))
525         self.pg0.configure_ipv4_neighbors()
526         self.pg1.generate_remote_hosts(len(vips))
527         self.pg1.configure_ipv4_neighbors()
528
529         self.cnat_translation(vips)
530
531
532 class TestCNatSourceNAT(VppTestCase):
533     """ CNat Source NAT """
534     extra_vpp_punt_config = ["cnat", "{",
535                              "session-max-age", "1",
536                              "tcp-max-age", "1", "}"]
537
538     @classmethod
539     def setUpClass(cls):
540         super(TestCNatSourceNAT, cls).setUpClass()
541
542     @classmethod
543     def tearDownClass(cls):
544         super(TestCNatSourceNAT, cls).tearDownClass()
545
546     def setUp(self):
547         super(TestCNatSourceNAT, self).setUp()
548
549         self.create_pg_interfaces(range(3))
550
551         for i in self.pg_interfaces:
552             i.admin_up()
553             i.config_ip4()
554             i.resolve_arp()
555             i.config_ip6()
556             i.resolve_ndp()
557
558         self.pg0.configure_ipv6_neighbors()
559         self.pg0.configure_ipv4_neighbors()
560         self.pg1.generate_remote_hosts(2)
561         self.pg1.configure_ipv4_neighbors()
562         self.pg1.configure_ipv6_neighbors()
563
564         self.vapi.cli("test cnat scanner off")
565         self.vapi.cnat_set_snat_addresses(
566             snat_ip4=self.pg2.remote_hosts[0].ip4,
567             snat_ip6=self.pg2.remote_hosts[0].ip6)
568         self.vapi.feature_enable_disable(
569             enable=1,
570             arc_name="ip6-unicast",
571             feature_name="ip6-cnat-snat",
572             sw_if_index=self.pg0.sw_if_index)
573         self.vapi.feature_enable_disable(
574             enable=1,
575             arc_name="ip4-unicast",
576             feature_name="ip4-cnat-snat",
577             sw_if_index=self.pg0.sw_if_index)
578
579     def tearDown(self):
580         self.vapi.cnat_session_purge()
581         for i in self.pg_interfaces:
582             i.unconfig_ip4()
583             i.unconfig_ip6()
584             i.admin_down()
585         super(TestCNatSourceNAT, self).tearDown()
586
587     def test_snat_v6(self):
588         # """ CNat Source Nat v6 """
589         self.sourcenat_test_tcp_udp_conf(TCP, isV6=True)
590         self.sourcenat_test_tcp_udp_conf(UDP, isV6=True)
591         self.sourcenat_test_icmp_err_conf(isV6=True)
592         self.sourcenat_test_icmp_echo6_conf()
593
594     def test_snat_v4(self):
595         # """ CNat Source Nat v4 """
596         self.sourcenat_test_tcp_udp_conf(TCP)
597         self.sourcenat_test_tcp_udp_conf(UDP)
598         self.sourcenat_test_icmp_err_conf()
599         self.sourcenat_test_icmp_echo4_conf()
600
601     def sourcenat_test_icmp_echo6_conf(self):
602         sports = [1234, 1235]
603         dports = [6661, 6662]
604
605         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
606             client_addr = self.pg0.remote_hosts[0].ip6
607             remote_addr = self.pg1.remote_hosts[nbr].ip6
608             src_nat_addr = self.pg2.remote_hosts[0].ip6
609
610             # ping from pods to outside network
611             p1 = (
612                 Ether(dst=self.pg0.local_mac,
613                       src=self.pg0.remote_hosts[0].mac) /
614                 IPv6(src=client_addr, dst=remote_addr) /
615                 ICMPv6EchoRequest(id=0xfeed) /
616                 Raw())
617
618             rxs = self.send_and_expect(
619                 self.pg0,
620                 p1 * N_PKTS,
621                 self.pg1)
622
623             for rx in rxs:
624                 self.assertEqual(rx[IPv6].src, src_nat_addr)
625                 self.assert_packet_checksums_valid(rx)
626
627             received_id = rx[0][ICMPv6EchoRequest].id
628             # ping reply from outside to pods
629             p2 = (
630                 Ether(dst=self.pg1.local_mac,
631                       src=self.pg1.remote_hosts[nbr].mac) /
632                 IPv6(src=remote_addr, dst=src_nat_addr) /
633                 ICMPv6EchoReply(id=received_id))
634             rxs = self.send_and_expect(
635                 self.pg1,
636                 p2 * N_PKTS,
637                 self.pg0)
638
639             for rx in rxs:
640                 self.assert_packet_checksums_valid(rx)
641                 self.assertEqual(rx[IPv6].src, remote_addr)
642                 self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed)
643
644     def sourcenat_test_icmp_echo4_conf(self):
645         sports = [1234, 1235]
646         dports = [6661, 6662]
647
648         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
649             IP46 = IP
650             client_addr = self.pg0.remote_hosts[0].ip4
651             remote_addr = self.pg1.remote_hosts[nbr].ip4
652             src_nat_addr = self.pg2.remote_hosts[0].ip4
653
654             # ping from pods to outside network
655             p1 = (
656                 Ether(dst=self.pg0.local_mac,
657                       src=self.pg0.remote_hosts[0].mac) /
658                 IP46(src=client_addr, dst=remote_addr) /
659                 ICMP(type=8, id=0xfeed) /
660                 Raw())
661
662             rxs = self.send_and_expect(
663                 self.pg0,
664                 p1 * N_PKTS,
665                 self.pg1)
666
667             for rx in rxs:
668                 self.assertEqual(rx[IP46].src, src_nat_addr)
669                 self.assert_packet_checksums_valid(rx)
670
671             received_id = rx[0][ICMP].id
672             # ping reply from outside to pods
673             p2 = (
674                 Ether(dst=self.pg1.local_mac,
675                       src=self.pg1.remote_hosts[nbr].mac) /
676                 IP46(src=remote_addr, dst=src_nat_addr) /
677                 ICMP(type=0, id=received_id))
678             rxs = self.send_and_expect(
679                 self.pg1,
680                 p2 * N_PKTS,
681                 self.pg0)
682
683             for rx in rxs:
684                 self.assert_packet_checksums_valid(rx)
685                 self.assertEqual(rx[IP46].src, remote_addr)
686                 self.assertEqual(rx[ICMP].id, 0xfeed)
687
688     def sourcenat_test_icmp_err_conf(self, isV6=False):
689         sports = [1234, 1235]
690         dports = [6661, 6662]
691
692         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
693             if isV6:
694                 IP46 = IPv6
695                 client_addr = self.pg0.remote_hosts[0].ip6
696                 remote_addr = self.pg1.remote_hosts[nbr].ip6
697                 src_nat_addr = self.pg2.remote_hosts[0].ip6
698                 ICMP46 = ICMPv6DestUnreach
699                 ICMPelem = ICMPv6DestUnreach(code=1)
700                 IP46error = IPerror6
701             else:
702                 IP46 = IP
703                 client_addr = self.pg0.remote_hosts[0].ip4
704                 remote_addr = self.pg1.remote_hosts[nbr].ip4
705                 src_nat_addr = self.pg2.remote_hosts[0].ip4
706                 IP46error = IPerror
707                 ICMP46 = ICMP
708                 ICMPelem = ICMP(type=11)
709
710             # from pods to outside network
711             p1 = (
712                 Ether(dst=self.pg0.local_mac,
713                       src=self.pg0.remote_hosts[0].mac) /
714                 IP46(src=client_addr, dst=remote_addr) /
715                 TCP(sport=sports[nbr], dport=dports[nbr]) /
716                 Raw())
717
718             rxs = self.send_and_expect(
719                 self.pg0,
720                 p1 * N_PKTS,
721                 self.pg1)
722             for rx in rxs:
723                 self.assert_packet_checksums_valid(rx)
724                 self.assertEqual(rx[IP46].dst, remote_addr)
725                 self.assertEqual(rx[TCP].dport, dports[nbr])
726                 self.assertEqual(rx[IP46].src, src_nat_addr)
727                 sport = rx[TCP].sport
728
729             InnerIP = rxs[0][IP46]
730             # from outside to pods, ICMP error
731             p2 = (
732                 Ether(dst=self.pg1.local_mac,
733                       src=self.pg1.remote_hosts[nbr].mac) /
734                 IP46(src=remote_addr, dst=src_nat_addr) /
735                 ICMPelem / InnerIP)
736
737             rxs = self.send_and_expect(
738                 self.pg1,
739                 p2 * N_PKTS,
740                 self.pg0)
741
742             for rx in rxs:
743                 self.assert_packet_checksums_valid(rx)
744                 self.assertEqual(rx[IP46].src, remote_addr)
745                 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
746                 self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr)
747                 self.assertEqual(rx[ICMP46][IP46error]
748                                  [TCPerror].sport, sports[nbr])
749                 self.assertEqual(rx[ICMP46][IP46error]
750                                  [TCPerror].dport, dports[nbr])
751
752     def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False):
753         sports = [1234, 1235]
754         dports = [6661, 6662]
755
756         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
757             if isV6:
758                 IP46 = IPv6
759                 client_addr = self.pg0.remote_hosts[0].ip6
760                 remote_addr = self.pg1.remote_hosts[nbr].ip6
761                 src_nat_addr = self.pg2.remote_hosts[0].ip6
762                 exclude_prefix = ip_network(
763                     "%s/100" % remote_addr, strict=False)
764             else:
765                 IP46 = IP
766                 client_addr = self.pg0.remote_hosts[0].ip4
767                 remote_addr = self.pg1.remote_hosts[nbr].ip4
768                 src_nat_addr = self.pg2.remote_hosts[0].ip4
769                 exclude_prefix = ip_network(
770                     "%s/16" % remote_addr, strict=False)
771             # from pods to outside network
772             p1 = (
773                 Ether(dst=self.pg0.local_mac,
774                       src=self.pg0.remote_hosts[0].mac) /
775                 IP46(src=client_addr, dst=remote_addr) /
776                 l4p(sport=sports[nbr], dport=dports[nbr]) /
777                 Raw())
778
779             self.vapi.cli("trace add pg-input 1")
780             rxs = self.send_and_expect(
781                 self.pg0,
782                 p1 * N_PKTS,
783                 self.pg1)
784             self.logger.info(self.vapi.cli("show trace max 1"))
785
786             for rx in rxs:
787                 self.assert_packet_checksums_valid(rx)
788                 self.assertEqual(rx[IP46].dst, remote_addr)
789                 self.assertEqual(rx[l4p].dport, dports[nbr])
790                 self.assertEqual(rx[IP46].src, src_nat_addr)
791                 sport = rx[l4p].sport
792
793             # from outside to pods
794             p2 = (
795                 Ether(dst=self.pg1.local_mac,
796                       src=self.pg1.remote_hosts[nbr].mac) /
797                 IP46(src=remote_addr, dst=src_nat_addr) /
798                 l4p(sport=dports[nbr], dport=sport) /
799                 Raw())
800
801             rxs = self.send_and_expect(
802                 self.pg1,
803                 p2 * N_PKTS,
804                 self.pg0)
805
806             for rx in rxs:
807                 self.assert_packet_checksums_valid(rx)
808                 self.assertEqual(rx[IP46].dst, client_addr)
809                 self.assertEqual(rx[l4p].dport, sports[nbr])
810                 self.assertEqual(rx[l4p].sport, dports[nbr])
811                 self.assertEqual(rx[IP46].src, remote_addr)
812
813             # add remote host to exclude list
814             self.vapi.cnat_add_del_snat_prefix(prefix=exclude_prefix, is_add=1)
815             self.vapi.cnat_session_purge()
816
817             rxs = self.send_and_expect(
818                 self.pg0,
819                 p1 * N_PKTS,
820                 self.pg1)
821             for rx in rxs:
822                 self.assert_packet_checksums_valid(rx)
823                 self.assertEqual(rx[IP46].dst, remote_addr)
824                 self.assertEqual(rx[l4p].dport, dports[nbr])
825                 self.assertEqual(rx[IP46].src, client_addr)
826
827             # remove remote host from exclude list
828             self.vapi.cnat_add_del_snat_prefix(prefix=exclude_prefix, is_add=0)
829             self.vapi.cnat_session_purge()
830
831             rxs = self.send_and_expect(
832                 self.pg0,
833                 p1 * N_PKTS,
834                 self.pg1)
835
836             for rx in rxs:
837                 self.assert_packet_checksums_valid(rx)
838                 self.assertEqual(rx[IP46].dst, remote_addr)
839                 self.assertEqual(rx[l4p].dport, dports[nbr])
840                 self.assertEqual(rx[IP46].src, src_nat_addr)
841
842             self.vapi.cnat_session_purge()
843
844
845 class TestCNatDHCP(VppTestCase):
846     """ CNat Translation """
847     extra_vpp_punt_config = ["cnat", "{",
848                              "session-db-buckets", "64",
849                              "session-cleanup-timeout", "0.1",
850                              "session-max-age", "1",
851                              "tcp-max-age", "1",
852                              "scanner", "off", "}"]
853
854     @classmethod
855     def setUpClass(cls):
856         super(TestCNatDHCP, cls).setUpClass()
857
858     @classmethod
859     def tearDownClass(cls):
860         super(TestCNatDHCP, cls).tearDownClass()
861
862     def tearDown(self):
863         for i in self.pg_interfaces:
864             i.admin_down()
865         super(TestCNatDHCP, self).tearDown()
866
867     def create_translation(self, vip_pg, *args, is_v6=False):
868         vip = Ep(sw_if_index=vip_pg.sw_if_index, is_v6=is_v6)
869         paths = []
870         for (src_pg, dst_pg) in args:
871             paths.append(EpTuple(
872                 Ep.from_pg(src_pg, is_v6=is_v6),
873                 Ep.from_pg(dst_pg, is_v6=is_v6)
874             ))
875         t1 = VppCNatTranslation(self, TCP, vip, paths)
876         t1.add_vpp_config()
877         return t1
878
879     def make_addr(self, sw_if_index, i, is_v6):
880         if is_v6:
881             return "fd01:%x::%u" % (sw_if_index, i + 1)
882         else:
883             return "172.16.%u.%u" % (sw_if_index, i)
884
885     def make_prefix(self, sw_if_index, i, is_v6):
886         if is_v6:
887             return "%s/128" % self.make_addr(sw_if_index, i, is_v6)
888         else:
889             return "%s/32" % self.make_addr(sw_if_index, i, is_v6)
890
891     def check_resolved(self, tr, vip_pg, *args, i=0, is_v6=False):
892         qt1 = tr.query_vpp_config()
893         self.assertEqual(str(qt1.vip.addr), self.make_addr(
894             vip_pg.sw_if_index, i, is_v6))
895         for (src_pg, dst_pg), path in zip(args, qt1.paths):
896             if src_pg:
897                 self.assertEqual(str(path.src_ep.addr), self.make_addr(
898                     src_pg.sw_if_index, i, is_v6))
899             if dst_pg:
900                 self.assertEqual(str(path.dst_ep.addr), self.make_addr(
901                     dst_pg.sw_if_index, i, is_v6))
902
903     def config_ips(self, rng, is_add=1, is_v6=False):
904         for pg, i in product(self.pg_interfaces, rng):
905             self.vapi.sw_interface_add_del_address(
906                 sw_if_index=pg.sw_if_index,
907                 prefix=self.make_prefix(pg.sw_if_index, i, is_v6),
908                 is_add=is_add)
909
910     def test_dhcp_v4(self):
911         self.create_pg_interfaces(range(5))
912         for i in self.pg_interfaces:
913             i.admin_up()
914         pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
915         t1 = self.create_translation(*pglist)
916         self.config_ips([0])
917         self.check_resolved(t1, *pglist)
918         self.config_ips([1])
919         self.config_ips([0], is_add=0)
920         self.check_resolved(t1, *pglist, i=1)
921         self.config_ips([1], is_add=0)
922         t1.remove_vpp_config()
923
924     def test_dhcp_v6(self):
925         self.create_pg_interfaces(range(5))
926         for i in self.pg_interfaces:
927             i.admin_up()
928         pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
929         t1 = self.create_translation(*pglist, is_v6=True)
930         self.config_ips([0], is_v6=True)
931         self.check_resolved(t1, *pglist, is_v6=True)
932         self.config_ips([1], is_v6=True)
933         self.config_ips([0], is_add=0, is_v6=True)
934         self.check_resolved(t1, *pglist, i=1, is_v6=True)
935         self.config_ips([1], is_add=0, is_v6=True)
936         t1.remove_vpp_config()
937
938     def test_dhcp_snat(self):
939         self.create_pg_interfaces(range(1))
940         for i in self.pg_interfaces:
941             i.admin_up()
942         self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
943         self.config_ips([0], is_v6=False)
944         self.config_ips([0], is_v6=True)
945         r = self.vapi.cnat_get_snat_addresses()
946         self.assertEqual(str(r.snat_ip4), self.make_addr(
947             self.pg0.sw_if_index, 0, False))
948         self.assertEqual(str(r.snat_ip6), self.make_addr(
949             self.pg0.sw_if_index, 0, True))
950         self.config_ips([1], is_v6=False)
951         self.config_ips([1], is_v6=True)
952         self.config_ips([0], is_add=0, is_v6=False)
953         self.config_ips([0], is_add=0, is_v6=True)
954         r = self.vapi.cnat_get_snat_addresses()
955         self.assertEqual(str(r.snat_ip4), self.make_addr(
956             self.pg0.sw_if_index, 1, False))
957         self.assertEqual(str(r.snat_ip6), self.make_addr(
958             self.pg0.sw_if_index, 1, True))
959         self.config_ips([1], is_add=0, is_v6=False)
960         self.config_ips([1], is_add=0, is_v6=True)
961
962
963 if __name__ == '__main__':
964     unittest.main(testRunner=VppTestRunner)