cnat: Fix session with deleted tr
[vpp.git] / src / plugins / cnat / test / test_cnat.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto, INVALID_INDEX
7 from itertools import product
8
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP, TCP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
14 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
15
16 import struct
17
18 from ipaddress import ip_address, ip_network, \
19     IPv4Address, IPv6Address, IPv4Network, IPv6Network
20
21 from vpp_object import VppObject
22 from vpp_papi import VppEnum
23
24 N_PKTS = 15
25
26
27 class Ep(object):
28     """ CNat endpoint """
29
30     def __init__(self, ip=None, port=0, l4p=TCP,
31                  sw_if_index=INVALID_INDEX, is_v6=False):
32         self.ip = ip
33         if ip is None:
34             self.ip = "::" if is_v6 else "0.0.0.0"
35         self.port = port
36         self.l4p = l4p
37         self.sw_if_index = sw_if_index
38         if is_v6:
39             self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP6
40         else:
41             self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP4
42
43     def encode(self):
44         return {'addr': self.ip,
45                 'port': self.port,
46                 'sw_if_index': self.sw_if_index,
47                 'if_af': self.if_af}
48
49     @classmethod
50     def from_pg(cls, pg, is_v6=False):
51         if pg is None:
52             return cls(is_v6=is_v6)
53         else:
54             return cls(sw_if_index=pg.sw_if_index, is_v6=is_v6)
55
56     @property
57     def isV6(self):
58         return ":" in self.ip
59
60     def __str__(self):
61         return ("%s:%d" % (self.ip, self.port))
62
63
64 class EpTuple(object):
65     """ CNat endpoint """
66
67     def __init__(self, src, dst):
68         self.src = src
69         self.dst = dst
70
71     def encode(self):
72         return {'src_ep': self.src.encode(),
73                 'dst_ep': self.dst.encode()}
74
75     def __str__(self):
76         return ("%s->%s" % (self.src, self.dst))
77
78
79 class VppCNatTranslation(VppObject):
80
81     def __init__(self, test, iproto, vip, paths):
82         self._test = test
83         self.vip = vip
84         self.iproto = iproto
85         self.paths = paths
86         self.encoded_paths = []
87         for path in self.paths:
88             self.encoded_paths.append(path.encode())
89
90     def __str__(self):
91         return ("%s %s %s" % (self.vip, self.iproto, self.paths))
92
93     @property
94     def vl4_proto(self):
95         ip_proto = VppEnum.vl_api_ip_proto_t
96         return {
97             UDP: ip_proto.IP_API_PROTO_UDP,
98             TCP: ip_proto.IP_API_PROTO_TCP,
99         }[self.iproto]
100
101     def add_vpp_config(self):
102         r = self._test.vapi.cnat_translation_update(
103             {'vip': self.vip.encode(),
104              'ip_proto': self.vl4_proto,
105              'n_paths': len(self.paths),
106              'paths': self.encoded_paths})
107         self._test.registry.register(self, self._test.logger)
108         self.id = r.id
109
110     def modify_vpp_config(self, paths):
111         self.paths = paths
112         self.encoded_paths = []
113         for path in self.paths:
114             self.encoded_paths.append(path.encode())
115
116         r = self._test.vapi.cnat_translation_update(
117             {'vip': self.vip.encode(),
118              'ip_proto': self.vl4_proto,
119              'n_paths': len(self.paths),
120              'paths': self.encoded_paths})
121         self._test.registry.register(self, self._test.logger)
122
123     def remove_vpp_config(self):
124         self._test.vapi.cnat_translation_del(id=self.id)
125
126     def query_vpp_config(self):
127         for t in self._test.vapi.cnat_translation_dump():
128             if self.id == t.translation.id:
129                 return t.translation
130         return None
131
132     def object_id(self):
133         return ("cnat-translation-%s" % (self.vip))
134
135     def get_stats(self):
136         c = self._test.statistics.get_counter("/net/cnat-translation")
137         return c[0][self.id]
138
139
140 class TestCNatTranslation(VppTestCase):
141     """ CNat Translation """
142     extra_vpp_punt_config = ["cnat", "{",
143                              "session-db-buckets", "64",
144                              "session-cleanup-timeout", "0.1",
145                              "session-max-age", "1",
146                              "tcp-max-age", "1",
147                              "scanner", "off", "}"]
148
149     @classmethod
150     def setUpClass(cls):
151         super(TestCNatTranslation, cls).setUpClass()
152
153     @classmethod
154     def tearDownClass(cls):
155         super(TestCNatTranslation, cls).tearDownClass()
156
157     def setUp(self):
158         super(TestCNatTranslation, self).setUp()
159
160         self.create_pg_interfaces(range(3))
161
162         for i in self.pg_interfaces:
163             i.admin_up()
164             i.config_ip4()
165             i.resolve_arp()
166             i.config_ip6()
167             i.resolve_ndp()
168
169     def tearDown(self):
170         for i in self.pg_interfaces:
171             i.unconfig_ip4()
172             i.unconfig_ip6()
173             i.admin_down()
174         super(TestCNatTranslation, self).tearDown()
175
176     def cnat_create_translation(self, vip, nbr):
177         ip_v = "ip6" if vip.isV6 else "ip4"
178         dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr)
179         sep = Ep("::", 0) if vip.isV6 else Ep("0.0.0.0", 0)
180         t1 = VppCNatTranslation(
181             self, vip.l4p, vip,
182             [EpTuple(sep, dep), EpTuple(sep, dep)])
183         t1.add_vpp_config()
184         return t1
185
186     def cnat_test_translation(self, t1, nbr, sports, isV6=False):
187         ip_v = "ip6" if isV6 else "ip4"
188         ip_class = IPv6 if isV6 else IP
189         vip = t1.vip
190
191         #
192         # Flows
193         #
194         for src in self.pg0.remote_hosts:
195             for sport in sports:
196                 # from client to vip
197                 p1 = (Ether(dst=self.pg0.local_mac,
198                             src=src.mac) /
199                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
200                       vip.l4p(sport=sport, dport=vip.port) /
201                       Raw())
202
203                 self.vapi.cli("trace add pg-input 1")
204                 rxs = self.send_and_expect(self.pg0,
205                                            p1 * N_PKTS,
206                                            self.pg1)
207                 self.logger.info(self.vapi.cli("show trace max 1"))
208
209                 for rx in rxs:
210                     self.assert_packet_checksums_valid(rx)
211                     self.assertEqual(
212                         rx[ip_class].dst,
213                         getattr(self.pg1.remote_hosts[nbr], ip_v))
214                     self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
215                     self.assertEqual(
216                         rx[ip_class].src,
217                         getattr(src, ip_v))
218                     self.assertEqual(rx[vip.l4p].sport, sport)
219
220                 # from vip to client
221                 p1 = (Ether(dst=self.pg1.local_mac,
222                             src=self.pg1.remote_mac) /
223                       ip_class(src=getattr(
224                           self.pg1.remote_hosts[nbr],
225                           ip_v),
226                           dst=getattr(src, ip_v)) /
227                       vip.l4p(sport=4000 + nbr, dport=sport) /
228                       Raw())
229
230                 rxs = self.send_and_expect(self.pg1,
231                                            p1 * N_PKTS,
232                                            self.pg0)
233
234                 for rx in rxs:
235                     self.assert_packet_checksums_valid(rx)
236                     self.assertEqual(
237                         rx[ip_class].dst,
238                         getattr(src, ip_v))
239                     self.assertEqual(rx[vip.l4p].dport, sport)
240                     self.assertEqual(rx[ip_class].src, vip.ip)
241                     self.assertEqual(rx[vip.l4p].sport, vip.port)
242
243                 #
244                 # packets to the VIP that do not match a
245                 # translation are dropped
246                 #
247                 p1 = (Ether(dst=self.pg0.local_mac,
248                             src=src.mac) /
249                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
250                       vip.l4p(sport=sport, dport=6666) /
251                       Raw())
252
253                 self.send_and_assert_no_replies(self.pg0,
254                                                 p1 * N_PKTS,
255                                                 self.pg1)
256
257                 #
258                 # packets from the VIP that do not match a
259                 # session are forwarded
260                 #
261                 p1 = (Ether(dst=self.pg1.local_mac,
262                             src=self.pg1.remote_mac) /
263                       ip_class(src=getattr(
264                           self.pg1.remote_hosts[nbr],
265                           ip_v),
266                           dst=getattr(src, ip_v)) /
267                       vip.l4p(sport=6666, dport=sport) /
268                       Raw())
269
270                 rxs = self.send_and_expect(self.pg1,
271                                            p1 * N_PKTS,
272                                            self.pg0)
273
274     def cnat_test_translation_update(self, t1, sports, isV6=False):
275         ip_v = "ip6" if isV6 else "ip4"
276         ip_class = IPv6 if isV6 else IP
277         vip = t1.vip
278
279         #
280         # modify the translation to use a different backend
281         #
282         dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000)
283         sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
284         t1.modify_vpp_config([EpTuple(sep, dep)])
285
286         #
287         # existing flows follow the old path
288         #
289         for src in self.pg0.remote_hosts:
290             for sport in sports:
291                 # from client to vip
292                 p1 = (Ether(dst=self.pg0.local_mac,
293                             src=src.mac) /
294                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
295                       vip.l4p(sport=sport, dport=vip.port) /
296                       Raw())
297
298                 rxs = self.send_and_expect(self.pg0,
299                                            p1 * N_PKTS,
300                                            self.pg1)
301
302         #
303         # new flows go to the new backend
304         #
305         for src in self.pg0.remote_hosts:
306             p1 = (Ether(dst=self.pg0.local_mac,
307                         src=src.mac) /
308                   ip_class(src=getattr(src, ip_v), dst=vip.ip) /
309                   vip.l4p(sport=9999, dport=vip.port) /
310                   Raw())
311
312             rxs = self.send_and_expect(self.pg0,
313                                        p1 * N_PKTS,
314                                        self.pg2)
315
316     def cnat_translation(self, vips, isV6=False):
317         """ CNat Translation """
318
319         ip_class = IPv6 if isV6 else IP
320         ip_v = "ip6" if isV6 else "ip4"
321         sports = [1234, 1233]
322
323         #
324         # turn the scanner off whilst testing otherwise sessions
325         # will time out
326         #
327         self.vapi.cli("test cnat scanner off")
328
329         sessions = self.vapi.cnat_session_dump()
330
331         trs = []
332         for nbr, vip in enumerate(vips):
333             trs.append(self.cnat_create_translation(vip, nbr))
334
335         self.logger.info(self.vapi.cli("sh cnat client"))
336         self.logger.info(self.vapi.cli("sh cnat translation"))
337
338         #
339         # translations
340         #
341         for nbr, vip in enumerate(vips):
342             self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6)
343             self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6)
344             if isV6:
345                 self.logger.info(self.vapi.cli(
346                     "sh ip6 fib %s" % self.pg0.remote_ip6))
347             else:
348                 self.logger.info(self.vapi.cli(
349                     "sh ip fib %s" % self.pg0.remote_ip4))
350             self.logger.info(self.vapi.cli("sh cnat session verbose"))
351
352         #
353         # turn the scanner back on and wait until the sessions
354         # all disapper
355         #
356         self.vapi.cli("test cnat scanner on")
357
358         n_tries = 0
359         sessions = self.vapi.cnat_session_dump()
360         while (len(sessions) and n_tries < 100):
361             n_tries += 1
362             sessions = self.vapi.cnat_session_dump()
363             self.sleep(2)
364             self.logger.info(self.vapi.cli("show cnat session verbose"))
365
366         self.assertTrue(n_tries < 100)
367         self.vapi.cli("test cnat scanner off")
368
369         #
370         # load some flows again and purge
371         #
372         for vip in vips:
373             for src in self.pg0.remote_hosts:
374                 for sport in sports:
375                     # from client to vip
376                     p1 = (Ether(dst=self.pg0.local_mac,
377                                 src=src.mac) /
378                           ip_class(src=getattr(src, ip_v), dst=vip.ip) /
379                           vip.l4p(sport=sport, dport=vip.port) /
380                           Raw())
381                     self.send_and_expect(self.pg0,
382                                          p1 * N_PKTS,
383                                          self.pg2)
384
385         for tr in trs:
386             tr.remove_vpp_config()
387
388         self.assertTrue(self.vapi.cnat_session_dump())
389         self.vapi.cnat_session_purge()
390         self.assertFalse(self.vapi.cnat_session_dump())
391
392     def test_icmp(self):
393         vips = [
394             Ep("30.0.0.1", 5555),
395             Ep("30.0.0.2", 5554),
396             Ep("30.0.0.2", 5553, UDP),
397             Ep("30::1", 6666),
398             Ep("30::2", 5553, UDP),
399         ]
400         sport = 1234
401
402         self.pg0.generate_remote_hosts(len(vips))
403         self.pg0.configure_ipv6_neighbors()
404         self.pg0.configure_ipv4_neighbors()
405
406         self.pg1.generate_remote_hosts(len(vips))
407         self.pg1.configure_ipv6_neighbors()
408         self.pg1.configure_ipv4_neighbors()
409
410         self.vapi.cli("test cnat scanner off")
411         trs = []
412         for nbr, vip in enumerate(vips):
413             trs.append(self.cnat_create_translation(vip, nbr))
414
415         self.logger.info(self.vapi.cli("sh cnat client"))
416         self.logger.info(self.vapi.cli("sh cnat translation"))
417
418         for nbr, vip in enumerate(vips):
419             if vip.isV6:
420                 client_addr = self.pg0.remote_hosts[0].ip6
421                 remote_addr = self.pg1.remote_hosts[nbr].ip6
422                 remote2_addr = self.pg2.remote_hosts[0].ip6
423             else:
424                 client_addr = self.pg0.remote_hosts[0].ip4
425                 remote_addr = self.pg1.remote_hosts[nbr].ip4
426                 remote2_addr = self.pg2.remote_hosts[0].ip4
427             IP46 = IPv6 if vip.isV6 else IP
428             # from client to vip
429             p1 = (Ether(dst=self.pg0.local_mac,
430                         src=self.pg0.remote_hosts[0].mac) /
431                   IP46(src=client_addr, dst=vip.ip) /
432                   vip.l4p(sport=sport, dport=vip.port) /
433                   Raw())
434
435             rxs = self.send_and_expect(self.pg0,
436                                        p1 * N_PKTS,
437                                        self.pg1)
438
439             for rx in rxs:
440                 self.assert_packet_checksums_valid(rx)
441                 self.assertEqual(rx[IP46].dst, remote_addr)
442                 self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
443                 self.assertEqual(rx[IP46].src, client_addr)
444                 self.assertEqual(rx[vip.l4p].sport, sport)
445
446             InnerIP = rxs[0][IP46]
447
448             ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP
449             ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11)
450             # from vip to client, ICMP error
451             p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
452                   IP46(src=remote_addr, dst=client_addr) /
453                   ICMPelem / InnerIP)
454
455             rxs = self.send_and_expect(self.pg1,
456                                        p1 * N_PKTS,
457                                        self.pg0)
458
459             TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
460             IP46error = IPerror6 if vip.isV6 else IPerror
461             for rx in rxs:
462                 self.assert_packet_checksums_valid(rx)
463                 self.assertEqual(rx[IP46].src, vip.ip)
464                 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
465                 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
466                 self.assertEqual(rx[ICMP46][IP46error]
467                                  [TCPUDPError].sport, sport)
468                 self.assertEqual(rx[ICMP46][IP46error]
469                                  [TCPUDPError].dport, vip.port)
470
471             # from other remote to client, ICMP error
472             # outside shouldn't be NAT-ed
473             p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
474                   IP46(src=remote2_addr, dst=client_addr) /
475                   ICMPelem / InnerIP)
476
477             rxs = self.send_and_expect(self.pg1,
478                                        p1 * N_PKTS,
479                                        self.pg0)
480
481             TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
482             IP46error = IPerror6 if vip.isV6 else IPerror
483             for rx in rxs:
484                 self.assert_packet_checksums_valid(rx)
485                 self.assertEqual(rx[IP46].src, remote2_addr)
486                 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
487                 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
488                 self.assertEqual(rx[ICMP46][IP46error]
489                                  [TCPUDPError].sport, sport)
490                 self.assertEqual(rx[ICMP46][IP46error]
491                                  [TCPUDPError].dport, vip.port)
492
493         self.vapi.cnat_session_purge()
494
495     def test_cnat6(self):
496         # """ CNat Translation ipv6 """
497         vips = [
498             Ep("30::1", 5555),
499             Ep("30::2", 5554),
500             Ep("30::2", 5553, UDP),
501         ]
502
503         self.pg0.generate_remote_hosts(len(vips))
504         self.pg0.configure_ipv6_neighbors()
505         self.pg1.generate_remote_hosts(len(vips))
506         self.pg1.configure_ipv6_neighbors()
507
508         self.cnat_translation(vips, isV6=True)
509
510     def test_cnat4(self):
511         # """ CNat Translation ipv4 """
512
513         vips = [
514             Ep("30.0.0.1", 5555),
515             Ep("30.0.0.2", 5554),
516             Ep("30.0.0.2", 5553, UDP),
517         ]
518
519         self.pg0.generate_remote_hosts(len(vips))
520         self.pg0.configure_ipv4_neighbors()
521         self.pg1.generate_remote_hosts(len(vips))
522         self.pg1.configure_ipv4_neighbors()
523
524         self.cnat_translation(vips)
525
526
527 class TestCNatSourceNAT(VppTestCase):
528     """ CNat Source NAT """
529     extra_vpp_punt_config = ["cnat", "{",
530                              "session-max-age", "1",
531                              "tcp-max-age", "1", "}"]
532
533     @classmethod
534     def setUpClass(cls):
535         super(TestCNatSourceNAT, cls).setUpClass()
536
537     @classmethod
538     def tearDownClass(cls):
539         super(TestCNatSourceNAT, cls).tearDownClass()
540
541     def setUp(self):
542         super(TestCNatSourceNAT, self).setUp()
543
544         self.create_pg_interfaces(range(3))
545
546         for i in self.pg_interfaces:
547             i.admin_up()
548             i.config_ip4()
549             i.resolve_arp()
550             i.config_ip6()
551             i.resolve_ndp()
552
553         self.pg0.configure_ipv6_neighbors()
554         self.pg0.configure_ipv4_neighbors()
555         self.pg1.generate_remote_hosts(2)
556         self.pg1.configure_ipv4_neighbors()
557         self.pg1.configure_ipv6_neighbors()
558
559         self.vapi.cli("test cnat scanner off")
560         self.vapi.cnat_set_snat_addresses(
561             snat_ip4=self.pg2.remote_hosts[0].ip4,
562             snat_ip6=self.pg2.remote_hosts[0].ip6)
563         self.vapi.feature_enable_disable(
564             enable=1,
565             arc_name="ip6-unicast",
566             feature_name="ip6-cnat-snat",
567             sw_if_index=self.pg0.sw_if_index)
568         self.vapi.feature_enable_disable(
569             enable=1,
570             arc_name="ip4-unicast",
571             feature_name="ip4-cnat-snat",
572             sw_if_index=self.pg0.sw_if_index)
573
574     def tearDown(self):
575         self.vapi.cnat_session_purge()
576         for i in self.pg_interfaces:
577             i.unconfig_ip4()
578             i.unconfig_ip6()
579             i.admin_down()
580         super(TestCNatSourceNAT, self).tearDown()
581
582     def test_snat_v6(self):
583         # """ CNat Source Nat v6 """
584         self.sourcenat_test_tcp_udp_conf(TCP, isV6=True)
585         self.sourcenat_test_tcp_udp_conf(UDP, isV6=True)
586         self.sourcenat_test_icmp_err_conf(isV6=True)
587         self.sourcenat_test_icmp_echo6_conf()
588
589     def test_snat_v4(self):
590         # """ CNat Source Nat v4 """
591         self.sourcenat_test_tcp_udp_conf(TCP)
592         self.sourcenat_test_tcp_udp_conf(UDP)
593         self.sourcenat_test_icmp_err_conf()
594         self.sourcenat_test_icmp_echo4_conf()
595
596     def sourcenat_test_icmp_echo6_conf(self):
597         sports = [1234, 1235]
598         dports = [6661, 6662]
599
600         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
601             client_addr = self.pg0.remote_hosts[0].ip6
602             remote_addr = self.pg1.remote_hosts[nbr].ip6
603             src_nat_addr = self.pg2.remote_hosts[0].ip6
604
605             # ping from pods to outside network
606             p1 = (
607                 Ether(dst=self.pg0.local_mac,
608                       src=self.pg0.remote_hosts[0].mac) /
609                 IPv6(src=client_addr, dst=remote_addr) /
610                 ICMPv6EchoRequest(id=0xfeed) /
611                 Raw())
612
613             rxs = self.send_and_expect(
614                 self.pg0,
615                 p1 * N_PKTS,
616                 self.pg1)
617
618             for rx in rxs:
619                 self.assertEqual(rx[IPv6].src, src_nat_addr)
620                 self.assert_packet_checksums_valid(rx)
621
622             received_id = rx[0][ICMPv6EchoRequest].id
623             # ping reply from outside to pods
624             p2 = (
625                 Ether(dst=self.pg1.local_mac,
626                       src=self.pg1.remote_hosts[nbr].mac) /
627                 IPv6(src=remote_addr, dst=src_nat_addr) /
628                 ICMPv6EchoReply(id=received_id))
629             rxs = self.send_and_expect(
630                 self.pg1,
631                 p2 * N_PKTS,
632                 self.pg0)
633
634             for rx in rxs:
635                 self.assert_packet_checksums_valid(rx)
636                 self.assertEqual(rx[IPv6].src, remote_addr)
637                 self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed)
638
639     def sourcenat_test_icmp_echo4_conf(self):
640         sports = [1234, 1235]
641         dports = [6661, 6662]
642
643         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
644             IP46 = IP
645             client_addr = self.pg0.remote_hosts[0].ip4
646             remote_addr = self.pg1.remote_hosts[nbr].ip4
647             src_nat_addr = self.pg2.remote_hosts[0].ip4
648
649             # ping from pods to outside network
650             p1 = (
651                 Ether(dst=self.pg0.local_mac,
652                       src=self.pg0.remote_hosts[0].mac) /
653                 IP46(src=client_addr, dst=remote_addr) /
654                 ICMP(type=8, id=0xfeed) /
655                 Raw())
656
657             rxs = self.send_and_expect(
658                 self.pg0,
659                 p1 * N_PKTS,
660                 self.pg1)
661
662             for rx in rxs:
663                 self.assertEqual(rx[IP46].src, src_nat_addr)
664                 self.assert_packet_checksums_valid(rx)
665
666             received_id = rx[0][ICMP].id
667             # ping reply from outside to pods
668             p2 = (
669                 Ether(dst=self.pg1.local_mac,
670                       src=self.pg1.remote_hosts[nbr].mac) /
671                 IP46(src=remote_addr, dst=src_nat_addr) /
672                 ICMP(type=0, id=received_id))
673             rxs = self.send_and_expect(
674                 self.pg1,
675                 p2 * N_PKTS,
676                 self.pg0)
677
678             for rx in rxs:
679                 self.assert_packet_checksums_valid(rx)
680                 self.assertEqual(rx[IP46].src, remote_addr)
681                 self.assertEqual(rx[ICMP].id, 0xfeed)
682
683     def sourcenat_test_icmp_err_conf(self, isV6=False):
684         sports = [1234, 1235]
685         dports = [6661, 6662]
686
687         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
688             if isV6:
689                 IP46 = IPv6
690                 client_addr = self.pg0.remote_hosts[0].ip6
691                 remote_addr = self.pg1.remote_hosts[nbr].ip6
692                 src_nat_addr = self.pg2.remote_hosts[0].ip6
693                 ICMP46 = ICMPv6DestUnreach
694                 ICMPelem = ICMPv6DestUnreach(code=1)
695                 IP46error = IPerror6
696             else:
697                 IP46 = IP
698                 client_addr = self.pg0.remote_hosts[0].ip4
699                 remote_addr = self.pg1.remote_hosts[nbr].ip4
700                 src_nat_addr = self.pg2.remote_hosts[0].ip4
701                 IP46error = IPerror
702                 ICMP46 = ICMP
703                 ICMPelem = ICMP(type=11)
704
705             # from pods to outside network
706             p1 = (
707                 Ether(dst=self.pg0.local_mac,
708                       src=self.pg0.remote_hosts[0].mac) /
709                 IP46(src=client_addr, dst=remote_addr) /
710                 TCP(sport=sports[nbr], dport=dports[nbr]) /
711                 Raw())
712
713             rxs = self.send_and_expect(
714                 self.pg0,
715                 p1 * N_PKTS,
716                 self.pg1)
717             for rx in rxs:
718                 self.assert_packet_checksums_valid(rx)
719                 self.assertEqual(rx[IP46].dst, remote_addr)
720                 self.assertEqual(rx[TCP].dport, dports[nbr])
721                 self.assertEqual(rx[IP46].src, src_nat_addr)
722                 sport = rx[TCP].sport
723
724             InnerIP = rxs[0][IP46]
725             # from outside to pods, ICMP error
726             p2 = (
727                 Ether(dst=self.pg1.local_mac,
728                       src=self.pg1.remote_hosts[nbr].mac) /
729                 IP46(src=remote_addr, dst=src_nat_addr) /
730                 ICMPelem / InnerIP)
731
732             rxs = self.send_and_expect(
733                 self.pg1,
734                 p2 * N_PKTS,
735                 self.pg0)
736
737             for rx in rxs:
738                 self.assert_packet_checksums_valid(rx)
739                 self.assertEqual(rx[IP46].src, remote_addr)
740                 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
741                 self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr)
742                 self.assertEqual(rx[ICMP46][IP46error]
743                                  [TCPerror].sport, sports[nbr])
744                 self.assertEqual(rx[ICMP46][IP46error]
745                                  [TCPerror].dport, dports[nbr])
746
747     def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False):
748         sports = [1234, 1235]
749         dports = [6661, 6662]
750
751         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
752             if isV6:
753                 IP46 = IPv6
754                 client_addr = self.pg0.remote_hosts[0].ip6
755                 remote_addr = self.pg1.remote_hosts[nbr].ip6
756                 src_nat_addr = self.pg2.remote_hosts[0].ip6
757                 exclude_prefix = ip_network(
758                     "%s/100" % remote_addr, strict=False)
759             else:
760                 IP46 = IP
761                 client_addr = self.pg0.remote_hosts[0].ip4
762                 remote_addr = self.pg1.remote_hosts[nbr].ip4
763                 src_nat_addr = self.pg2.remote_hosts[0].ip4
764                 exclude_prefix = ip_network(
765                     "%s/16" % remote_addr, strict=False)
766             # from pods to outside network
767             p1 = (
768                 Ether(dst=self.pg0.local_mac,
769                       src=self.pg0.remote_hosts[0].mac) /
770                 IP46(src=client_addr, dst=remote_addr) /
771                 l4p(sport=sports[nbr], dport=dports[nbr]) /
772                 Raw())
773
774             self.vapi.cli("trace add pg-input 1")
775             rxs = self.send_and_expect(
776                 self.pg0,
777                 p1 * N_PKTS,
778                 self.pg1)
779             self.logger.info(self.vapi.cli("show trace max 1"))
780
781             for rx in rxs:
782                 self.assert_packet_checksums_valid(rx)
783                 self.assertEqual(rx[IP46].dst, remote_addr)
784                 self.assertEqual(rx[l4p].dport, dports[nbr])
785                 self.assertEqual(rx[IP46].src, src_nat_addr)
786                 sport = rx[l4p].sport
787
788             # from outside to pods
789             p2 = (
790                 Ether(dst=self.pg1.local_mac,
791                       src=self.pg1.remote_hosts[nbr].mac) /
792                 IP46(src=remote_addr, dst=src_nat_addr) /
793                 l4p(sport=dports[nbr], dport=sport) /
794                 Raw())
795
796             rxs = self.send_and_expect(
797                 self.pg1,
798                 p2 * N_PKTS,
799                 self.pg0)
800
801             for rx in rxs:
802                 self.assert_packet_checksums_valid(rx)
803                 self.assertEqual(rx[IP46].dst, client_addr)
804                 self.assertEqual(rx[l4p].dport, sports[nbr])
805                 self.assertEqual(rx[l4p].sport, dports[nbr])
806                 self.assertEqual(rx[IP46].src, remote_addr)
807
808             # add remote host to exclude list
809             self.vapi.cnat_add_del_snat_prefix(prefix=exclude_prefix, is_add=1)
810             self.vapi.cnat_session_purge()
811
812             rxs = self.send_and_expect(
813                 self.pg0,
814                 p1 * N_PKTS,
815                 self.pg1)
816             for rx in rxs:
817                 self.assert_packet_checksums_valid(rx)
818                 self.assertEqual(rx[IP46].dst, remote_addr)
819                 self.assertEqual(rx[l4p].dport, dports[nbr])
820                 self.assertEqual(rx[IP46].src, client_addr)
821
822             # remove remote host from exclude list
823             self.vapi.cnat_add_del_snat_prefix(prefix=exclude_prefix, is_add=0)
824             self.vapi.cnat_session_purge()
825
826             rxs = self.send_and_expect(
827                 self.pg0,
828                 p1 * N_PKTS,
829                 self.pg1)
830
831             for rx in rxs:
832                 self.assert_packet_checksums_valid(rx)
833                 self.assertEqual(rx[IP46].dst, remote_addr)
834                 self.assertEqual(rx[l4p].dport, dports[nbr])
835                 self.assertEqual(rx[IP46].src, src_nat_addr)
836
837             self.vapi.cnat_session_purge()
838
839
840 class TestCNatDHCP(VppTestCase):
841     """ CNat Translation """
842     extra_vpp_punt_config = ["cnat", "{",
843                              "session-db-buckets", "64",
844                              "session-cleanup-timeout", "0.1",
845                              "session-max-age", "1",
846                              "tcp-max-age", "1",
847                              "scanner", "off", "}"]
848
849     @classmethod
850     def setUpClass(cls):
851         super(TestCNatDHCP, cls).setUpClass()
852
853     @classmethod
854     def tearDownClass(cls):
855         super(TestCNatDHCP, cls).tearDownClass()
856
857     def tearDown(self):
858         for i in self.pg_interfaces:
859             i.admin_down()
860         super(TestCNatDHCP, self).tearDown()
861
862     def create_translation(self, vip_pg, *args, is_v6=False):
863         vip = Ep(sw_if_index=vip_pg.sw_if_index, is_v6=is_v6)
864         paths = []
865         for (src_pg, dst_pg) in args:
866             paths.append(EpTuple(
867                 Ep.from_pg(src_pg, is_v6=is_v6),
868                 Ep.from_pg(dst_pg, is_v6=is_v6)
869             ))
870         t1 = VppCNatTranslation(self, TCP, vip, paths)
871         t1.add_vpp_config()
872         return t1
873
874     def make_addr(self, sw_if_index, i, is_v6):
875         if is_v6:
876             return "fd01:%x::%u" % (sw_if_index, i + 1)
877         else:
878             return "172.16.%u.%u" % (sw_if_index, i)
879
880     def make_prefix(self, sw_if_index, i, is_v6):
881         if is_v6:
882             return "%s/128" % self.make_addr(sw_if_index, i, is_v6)
883         else:
884             return "%s/32" % self.make_addr(sw_if_index, i, is_v6)
885
886     def check_resolved(self, tr, vip_pg, *args, i=0, is_v6=False):
887         qt1 = tr.query_vpp_config()
888         self.assertEqual(str(qt1.vip.addr), self.make_addr(
889             vip_pg.sw_if_index, i, is_v6))
890         for (src_pg, dst_pg), path in zip(args, qt1.paths):
891             if src_pg:
892                 self.assertEqual(str(path.src_ep.addr), self.make_addr(
893                     src_pg.sw_if_index, i, is_v6))
894             if dst_pg:
895                 self.assertEqual(str(path.dst_ep.addr), self.make_addr(
896                     dst_pg.sw_if_index, i, is_v6))
897
898     def config_ips(self, rng, is_add=1, is_v6=False):
899         for pg, i in product(self.pg_interfaces, rng):
900             self.vapi.sw_interface_add_del_address(
901                 sw_if_index=pg.sw_if_index,
902                 prefix=self.make_prefix(pg.sw_if_index, i, is_v6),
903                 is_add=is_add)
904
905     def test_dhcp_v4(self):
906         self.create_pg_interfaces(range(5))
907         for i in self.pg_interfaces:
908             i.admin_up()
909         pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
910         t1 = self.create_translation(*pglist)
911         self.config_ips([0])
912         self.check_resolved(t1, *pglist)
913         self.config_ips([1])
914         self.config_ips([0], is_add=0)
915         self.check_resolved(t1, *pglist, i=1)
916         self.config_ips([1], is_add=0)
917         t1.remove_vpp_config()
918
919     def test_dhcp_v6(self):
920         self.create_pg_interfaces(range(5))
921         for i in self.pg_interfaces:
922             i.admin_up()
923         pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4))
924         t1 = self.create_translation(*pglist, is_v6=True)
925         self.config_ips([0], is_v6=True)
926         self.check_resolved(t1, *pglist, is_v6=True)
927         self.config_ips([1], is_v6=True)
928         self.config_ips([0], is_add=0, is_v6=True)
929         self.check_resolved(t1, *pglist, i=1, is_v6=True)
930         self.config_ips([1], is_add=0, is_v6=True)
931         t1.remove_vpp_config()
932
933     def test_dhcp_snat(self):
934         self.create_pg_interfaces(range(1))
935         for i in self.pg_interfaces:
936             i.admin_up()
937         self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
938         self.config_ips([0], is_v6=False)
939         self.config_ips([0], is_v6=True)
940         r = self.vapi.cnat_get_snat_addresses()
941         self.assertEqual(str(r.snat_ip4), self.make_addr(
942             self.pg0.sw_if_index, 0, False))
943         self.assertEqual(str(r.snat_ip6), self.make_addr(
944             self.pg0.sw_if_index, 0, True))
945         self.config_ips([1], is_v6=False)
946         self.config_ips([1], is_v6=True)
947         self.config_ips([0], is_add=0, is_v6=False)
948         self.config_ips([0], is_add=0, is_v6=True)
949         r = self.vapi.cnat_get_snat_addresses()
950         self.assertEqual(str(r.snat_ip4), self.make_addr(
951             self.pg0.sw_if_index, 1, False))
952         self.assertEqual(str(r.snat_ip6), self.make_addr(
953             self.pg0.sw_if_index, 1, True))
954         self.config_ips([1], is_add=0, is_v6=False)
955         self.config_ips([1], is_add=0, is_v6=True)
956
957
958 if __name__ == '__main__':
959     unittest.main(testRunner=VppTestRunner)