cnat: Add support for SNat ICMP
[vpp.git] / src / plugins / cnat / test / test_cnat.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, UDP, TCP, ICMP
11 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
12 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
13 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
14
15 import struct
16
17 from ipaddress import ip_address, ip_network, \
18     IPv4Address, IPv6Address, IPv4Network, IPv6Network
19
20 from vpp_object import VppObject
21 from vpp_papi import VppEnum
22
23 N_PKTS = 15
24
25
26 def find_cnat_translation(test, id):
27     ts = test.vapi.cnat_translation_dump()
28     for t in ts:
29         if id == t.translation.id:
30             return True
31     return False
32
33
34 class Ep(object):
35     """ CNat endpoint """
36
37     def __init__(self, ip, port, l4p=TCP):
38         self.ip = ip
39         self.port = port
40         self.l4p = l4p
41
42     def encode(self):
43         return {'addr': self.ip,
44                 'port': self.port}
45
46     @property
47     def isV6(self):
48         return ":" in self.ip
49
50     def __str__(self):
51         return ("%s:%d" % (self.ip, self.port))
52
53
54 class EpTuple(object):
55     """ CNat endpoint """
56
57     def __init__(self, src, dst):
58         self.src = src
59         self.dst = dst
60
61     def encode(self):
62         return {'src_ep': self.src.encode(),
63                 'dst_ep': self.dst.encode()}
64
65     def __str__(self):
66         return ("%s->%s" % (self.src, self.dst))
67
68
69 class VppCNatTranslation(VppObject):
70
71     def __init__(self, test, iproto, vip, paths):
72         self._test = test
73         self.vip = vip
74         self.iproto = iproto
75         self.paths = paths
76         self.encoded_paths = []
77         for path in self.paths:
78             self.encoded_paths.append(path.encode())
79
80     @property
81     def vl4_proto(self):
82         ip_proto = VppEnum.vl_api_ip_proto_t
83         return {
84             UDP: ip_proto.IP_API_PROTO_UDP,
85             TCP: ip_proto.IP_API_PROTO_TCP,
86         }[self.iproto]
87
88     def delete(self):
89         r = self._test.vapi.cnat_translation_del(id=self.id)
90
91     def add_vpp_config(self):
92         r = self._test.vapi.cnat_translation_update(
93             {'vip': self.vip.encode(),
94              'ip_proto': self.vl4_proto,
95              'n_paths': len(self.paths),
96              'paths': self.encoded_paths})
97         self._test.registry.register(self, self._test.logger)
98         self.id = r.id
99
100     def modify_vpp_config(self, paths):
101         self.paths = paths
102         self.encoded_paths = []
103         for path in self.paths:
104             self.encoded_paths.append(path.encode())
105
106         r = self._test.vapi.cnat_translation_update(
107             {'vip': self.vip.encode(),
108              'ip_proto': self.vl4_proto,
109              'n_paths': len(self.paths),
110              'paths': self.encoded_paths})
111         self._test.registry.register(self, self._test.logger)
112
113     def remove_vpp_config(self):
114         self._test.vapi.cnat_translation_del(self.id)
115
116     def query_vpp_config(self):
117         return find_cnat_translation(self._test, self.id)
118
119     def object_id(self):
120         return ("cnat-translation-%s" % (self.vip))
121
122     def get_stats(self):
123         c = self._test.statistics.get_counter("/net/cnat-translation")
124         return c[0][self.id]
125
126
127 class TestCNatTranslation(VppTestCase):
128     """ CNat Translation """
129     extra_vpp_punt_config = ["cnat", "{",
130                              "session-db-buckets", "64",
131                              "session-cleanup-timeout", "0.1",
132                              "session-max-age", "1",
133                              "tcp-max-age", "1",
134                              "scanner", "off", "}"]
135
136     @classmethod
137     def setUpClass(cls):
138         super(TestCNatTranslation, cls).setUpClass()
139
140     @classmethod
141     def tearDownClass(cls):
142         super(TestCNatTranslation, cls).tearDownClass()
143
144     def setUp(self):
145         super(TestCNatTranslation, self).setUp()
146
147         self.create_pg_interfaces(range(3))
148
149         for i in self.pg_interfaces:
150             i.admin_up()
151             i.config_ip4()
152             i.resolve_arp()
153             i.config_ip6()
154             i.resolve_ndp()
155
156     def tearDown(self):
157         for i in self.pg_interfaces:
158             i.unconfig_ip4()
159             i.unconfig_ip6()
160             i.admin_down()
161         super(TestCNatTranslation, self).tearDown()
162
163     def cnat_create_translation(self, vip, nbr):
164         ip_v = "ip6" if vip.isV6 else "ip4"
165         dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr)
166         sep = Ep("::", 0) if vip.isV6 else Ep("0.0.0.0", 0)
167         t1 = VppCNatTranslation(
168             self, vip.l4p, vip,
169             [EpTuple(sep, dep), EpTuple(sep, dep)])
170         t1.add_vpp_config()
171         return t1
172
173     def cnat_test_translation(self, t1, nbr, sports, isV6=False):
174         ip_v = "ip6" if isV6 else "ip4"
175         ip_class = IPv6 if isV6 else IP
176         vip = t1.vip
177
178         #
179         # Flows
180         #
181         for src in self.pg0.remote_hosts:
182             for sport in sports:
183                 # from client to vip
184                 p1 = (Ether(dst=self.pg0.local_mac,
185                             src=src.mac) /
186                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
187                       vip.l4p(sport=sport, dport=vip.port) /
188                       Raw())
189
190                 self.vapi.cli("trace add pg-input 1")
191                 rxs = self.send_and_expect(self.pg0,
192                                            p1 * N_PKTS,
193                                            self.pg1)
194
195                 for rx in rxs:
196                     self.assert_packet_checksums_valid(rx)
197                     self.assertEqual(
198                         rx[ip_class].dst,
199                         getattr(self.pg1.remote_hosts[nbr], ip_v))
200                     self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
201                     self.assertEqual(
202                         rx[ip_class].src,
203                         getattr(src, ip_v))
204                     self.assertEqual(rx[vip.l4p].sport, sport)
205
206                 # from vip to client
207                 p1 = (Ether(dst=self.pg1.local_mac,
208                             src=self.pg1.remote_mac) /
209                       ip_class(src=getattr(
210                           self.pg1.remote_hosts[nbr],
211                           ip_v),
212                           dst=getattr(src, ip_v)) /
213                       vip.l4p(sport=4000 + nbr, dport=sport) /
214                       Raw())
215
216                 rxs = self.send_and_expect(self.pg1,
217                                            p1 * N_PKTS,
218                                            self.pg0)
219
220                 for rx in rxs:
221                     self.assert_packet_checksums_valid(rx)
222                     self.assertEqual(
223                         rx[ip_class].dst,
224                         getattr(src, ip_v))
225                     self.assertEqual(rx[vip.l4p].dport, sport)
226                     self.assertEqual(rx[ip_class].src, vip.ip)
227                     self.assertEqual(rx[vip.l4p].sport, vip.port)
228
229                 #
230                 # packets to the VIP that do not match a
231                 # translation are dropped
232                 #
233                 p1 = (Ether(dst=self.pg0.local_mac,
234                             src=src.mac) /
235                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
236                       vip.l4p(sport=sport, dport=6666) /
237                       Raw())
238
239                 self.send_and_assert_no_replies(self.pg0,
240                                                 p1 * N_PKTS,
241                                                 self.pg1)
242
243                 #
244                 # packets from the VIP that do not match a
245                 # session are forwarded
246                 #
247                 p1 = (Ether(dst=self.pg1.local_mac,
248                             src=self.pg1.remote_mac) /
249                       ip_class(src=getattr(
250                           self.pg1.remote_hosts[nbr],
251                           ip_v),
252                           dst=getattr(src, ip_v)) /
253                       vip.l4p(sport=6666, dport=sport) /
254                       Raw())
255
256                 rxs = self.send_and_expect(self.pg1,
257                                            p1 * N_PKTS,
258                                            self.pg0)
259
260         self.assertEqual(t1.get_stats()['packets'],
261                          N_PKTS *
262                          len(sports) *
263                          len(self.pg0.remote_hosts))
264
265     def cnat_test_translation_update(self, t1, sports, isV6=False):
266         ip_v = "ip6" if isV6 else "ip4"
267         ip_class = IPv6 if isV6 else IP
268         vip = t1.vip
269
270         #
271         # modify the translation to use a different backend
272         #
273         dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000)
274         sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
275         t1.modify_vpp_config([EpTuple(sep, dep)])
276
277         #
278         # existing flows follow the old path
279         #
280         for src in self.pg0.remote_hosts:
281             for sport in sports:
282                 # from client to vip
283                 p1 = (Ether(dst=self.pg0.local_mac,
284                             src=src.mac) /
285                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
286                       vip.l4p(sport=sport, dport=vip.port) /
287                       Raw())
288
289                 rxs = self.send_and_expect(self.pg0,
290                                            p1 * N_PKTS,
291                                            self.pg1)
292
293         #
294         # new flows go to the new backend
295         #
296         for src in self.pg0.remote_hosts:
297             p1 = (Ether(dst=self.pg0.local_mac,
298                         src=src.mac) /
299                   ip_class(src=getattr(src, ip_v), dst=vip.ip) /
300                   vip.l4p(sport=9999, dport=vip.port) /
301                   Raw())
302
303             rxs = self.send_and_expect(self.pg0,
304                                        p1 * N_PKTS,
305                                        self.pg2)
306
307     def cnat_translation(self, vips, isV6=False):
308         """ CNat Translation """
309
310         ip_class = IPv6 if isV6 else IP
311         ip_v = "ip6" if isV6 else "ip4"
312         sports = [1234, 1233]
313
314         #
315         # turn the scanner off whilst testing otherwise sessions
316         # will time out
317         #
318         self.vapi.cli("test cnat scanner off")
319
320         sessions = self.vapi.cnat_session_dump()
321
322         trs = []
323         for nbr, vip in enumerate(vips):
324             trs.append(self.cnat_create_translation(vip, nbr))
325
326         self.logger.info(self.vapi.cli("sh cnat client"))
327         self.logger.info(self.vapi.cli("sh cnat translation"))
328
329         #
330         # translations
331         #
332         for nbr, vip in enumerate(vips):
333             self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6)
334             self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6)
335             if isV6:
336                 self.logger.info(self.vapi.cli(
337                     "sh ip6 fib %s" % self.pg0.remote_ip6))
338             else:
339                 self.logger.info(self.vapi.cli(
340                     "sh ip fib %s" % self.pg0.remote_ip4))
341             self.logger.info(self.vapi.cli("sh cnat session verbose"))
342
343         #
344         # turn the scanner back on and wait until the sessions
345         # all disapper
346         #
347         self.vapi.cli("test cnat scanner on")
348
349         n_tries = 0
350         sessions = self.vapi.cnat_session_dump()
351         while (len(sessions) and n_tries < 100):
352             n_tries += 1
353             sessions = self.vapi.cnat_session_dump()
354             self.sleep(2)
355             print(self.vapi.cli("show cnat session verbose"))
356
357         self.assertTrue(n_tries < 100)
358         self.vapi.cli("test cnat scanner off")
359
360         #
361         # load some flows again and purge
362         #
363         for vip in vips:
364             for src in self.pg0.remote_hosts:
365                 for sport in sports:
366                     # from client to vip
367                     p1 = (Ether(dst=self.pg0.local_mac,
368                                 src=src.mac) /
369                           ip_class(src=getattr(src, ip_v), dst=vip.ip) /
370                           vip.l4p(sport=sport, dport=vip.port) /
371                           Raw())
372                     self.send_and_expect(self.pg0,
373                                          p1 * N_PKTS,
374                                          self.pg2)
375
376         for tr in trs:
377             tr.delete()
378
379         self.assertTrue(self.vapi.cnat_session_dump())
380         self.vapi.cnat_session_purge()
381         self.assertFalse(self.vapi.cnat_session_dump())
382
383     def test_icmp(self):
384         vips = [
385             Ep("30.0.0.1", 5555),
386             Ep("30.0.0.2", 5554),
387             Ep("30.0.0.2", 5553, UDP),
388             Ep("30::1", 6666),
389             Ep("30::2", 5553, UDP),
390         ]
391         sport = 1234
392
393         self.pg0.generate_remote_hosts(len(vips))
394         self.pg0.configure_ipv6_neighbors()
395         self.pg0.configure_ipv4_neighbors()
396
397         self.pg1.generate_remote_hosts(len(vips))
398         self.pg1.configure_ipv6_neighbors()
399         self.pg1.configure_ipv4_neighbors()
400
401         self.vapi.cli("test cnat scanner off")
402         trs = []
403         for nbr, vip in enumerate(vips):
404             trs.append(self.cnat_create_translation(vip, nbr))
405
406         self.logger.info(self.vapi.cli("sh cnat client"))
407         self.logger.info(self.vapi.cli("sh cnat translation"))
408
409         for nbr, vip in enumerate(vips):
410             if vip.isV6:
411                 client_addr = self.pg0.remote_hosts[0].ip6
412                 remote_addr = self.pg1.remote_hosts[nbr].ip6
413                 remote2_addr = self.pg2.remote_hosts[0].ip6
414             else:
415                 client_addr = self.pg0.remote_hosts[0].ip4
416                 remote_addr = self.pg1.remote_hosts[nbr].ip4
417                 remote2_addr = self.pg2.remote_hosts[0].ip4
418             IP46 = IPv6 if vip.isV6 else IP
419             # from client to vip
420             p1 = (Ether(dst=self.pg0.local_mac,
421                         src=self.pg0.remote_hosts[0].mac) /
422                   IP46(src=client_addr, dst=vip.ip) /
423                   vip.l4p(sport=sport, dport=vip.port) /
424                   Raw())
425
426             rxs = self.send_and_expect(self.pg0,
427                                        p1 * N_PKTS,
428                                        self.pg1)
429
430             for rx in rxs:
431                 self.assert_packet_checksums_valid(rx)
432                 self.assertEqual(rx[IP46].dst, remote_addr)
433                 self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
434                 self.assertEqual(rx[IP46].src, client_addr)
435                 self.assertEqual(rx[vip.l4p].sport, sport)
436
437             InnerIP = rxs[0][IP46]
438
439             ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP
440             ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11)
441             # from vip to client, ICMP error
442             p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
443                   IP46(src=remote_addr, dst=client_addr) /
444                   ICMPelem / InnerIP)
445
446             rxs = self.send_and_expect(self.pg1,
447                                        p1 * N_PKTS,
448                                        self.pg0)
449
450             TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
451             IP46error = IPerror6 if vip.isV6 else IPerror
452             for rx in rxs:
453                 self.assert_packet_checksums_valid(rx)
454                 self.assertEqual(rx[IP46].src, vip.ip)
455                 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
456                 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
457                 self.assertEqual(rx[ICMP46][IP46error]
458                                  [TCPUDPError].sport, sport)
459                 self.assertEqual(rx[ICMP46][IP46error]
460                                  [TCPUDPError].dport, vip.port)
461
462             # from other remote to client, ICMP error
463             # outside shouldn't be NAT-ed
464             p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
465                   IP46(src=remote2_addr, dst=client_addr) /
466                   ICMPelem / InnerIP)
467
468             rxs = self.send_and_expect(self.pg1,
469                                        p1 * N_PKTS,
470                                        self.pg0)
471
472             TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
473             IP46error = IPerror6 if vip.isV6 else IPerror
474             for rx in rxs:
475                 self.assert_packet_checksums_valid(rx)
476                 self.assertEqual(rx[IP46].src, remote2_addr)
477                 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
478                 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
479                 self.assertEqual(rx[ICMP46][IP46error]
480                                  [TCPUDPError].sport, sport)
481                 self.assertEqual(rx[ICMP46][IP46error]
482                                  [TCPUDPError].dport, vip.port)
483
484         self.vapi.cnat_session_purge()
485
486     def test_cnat6(self):
487         # """ CNat Translation ipv6 """
488         vips = [
489             Ep("30::1", 5555),
490             Ep("30::2", 5554),
491             Ep("30::2", 5553, UDP),
492         ]
493
494         self.pg0.generate_remote_hosts(len(vips))
495         self.pg0.configure_ipv6_neighbors()
496         self.pg1.generate_remote_hosts(len(vips))
497         self.pg1.configure_ipv6_neighbors()
498
499         self.cnat_translation(vips, isV6=True)
500
501     def test_cnat4(self):
502         # """ CNat Translation ipv4 """
503
504         vips = [
505             Ep("30.0.0.1", 5555),
506             Ep("30.0.0.2", 5554),
507             Ep("30.0.0.2", 5553, UDP),
508         ]
509
510         self.pg0.generate_remote_hosts(len(vips))
511         self.pg0.configure_ipv4_neighbors()
512         self.pg1.generate_remote_hosts(len(vips))
513         self.pg1.configure_ipv4_neighbors()
514
515         self.cnat_translation(vips)
516
517
518 class TestCNatSourceNAT(VppTestCase):
519     """ CNat Source NAT """
520     extra_vpp_punt_config = ["cnat", "{",
521                              "session-max-age", "1",
522                              "tcp-max-age", "1", "}"]
523
524     @classmethod
525     def setUpClass(cls):
526         super(TestCNatSourceNAT, cls).setUpClass()
527
528     @classmethod
529     def tearDownClass(cls):
530         super(TestCNatSourceNAT, cls).tearDownClass()
531
532     def setUp(self):
533         super(TestCNatSourceNAT, self).setUp()
534
535         self.create_pg_interfaces(range(3))
536
537         for i in self.pg_interfaces:
538             i.admin_up()
539             i.config_ip4()
540             i.resolve_arp()
541             i.config_ip6()
542             i.resolve_ndp()
543
544         self.pg0.configure_ipv6_neighbors()
545         self.pg0.configure_ipv4_neighbors()
546         self.pg1.generate_remote_hosts(2)
547         self.pg1.configure_ipv4_neighbors()
548         self.pg1.configure_ipv6_neighbors()
549
550         self.vapi.cli("test cnat scanner off")
551         self.vapi.cnat_set_snat_addresses(
552             snat_ip4=self.pg2.remote_hosts[0].ip4,
553             snat_ip6=self.pg2.remote_hosts[0].ip6)
554         self.vapi.feature_enable_disable(
555             enable=1,
556             arc_name="ip6-unicast",
557             feature_name="ip6-cnat-snat",
558             sw_if_index=self.pg0.sw_if_index)
559         self.vapi.feature_enable_disable(
560             enable=1,
561             arc_name="ip4-unicast",
562             feature_name="ip4-cnat-snat",
563             sw_if_index=self.pg0.sw_if_index)
564
565     def tearDown(self):
566         self.vapi.cnat_session_purge()
567         for i in self.pg_interfaces:
568             i.unconfig_ip4()
569             i.unconfig_ip6()
570             i.admin_down()
571         super(TestCNatSourceNAT, self).tearDown()
572
573     def test_snat_v6(self):
574         # """ CNat Source Nat v6 """
575         self.sourcenat_test_tcp_udp_conf(TCP, isV6=True)
576         self.sourcenat_test_tcp_udp_conf(UDP, isV6=True)
577         self.sourcenat_test_icmp_err_conf(isV6=True)
578         self.sourcenat_test_icmp_echo6_conf()
579
580     def test_snat_v4(self):
581         # """ CNat Source Nat v4 """
582         self.sourcenat_test_tcp_udp_conf(TCP)
583         self.sourcenat_test_tcp_udp_conf(UDP)
584         self.sourcenat_test_icmp_err_conf()
585         self.sourcenat_test_icmp_echo4_conf()
586
587     def sourcenat_test_icmp_echo6_conf(self):
588         sports = [1234, 1235]
589         dports = [6661, 6662]
590
591         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
592             client_addr = self.pg0.remote_hosts[0].ip6
593             remote_addr = self.pg1.remote_hosts[nbr].ip6
594             src_nat_addr = self.pg2.remote_hosts[0].ip6
595
596             # ping from pods to outside network
597             p1 = (
598                 Ether(dst=self.pg0.local_mac,
599                       src=self.pg0.remote_hosts[0].mac) /
600                 IPv6(src=client_addr, dst=remote_addr) /
601                 ICMPv6EchoRequest(id=0xfeed) /
602                 Raw())
603
604             rxs = self.send_and_expect(
605                 self.pg0,
606                 p1 * N_PKTS,
607                 self.pg1)
608
609             for rx in rxs:
610                 self.assertEqual(rx[IPv6].src, src_nat_addr)
611                 self.assert_packet_checksums_valid(rx)
612
613             received_id = rx[0][ICMPv6EchoRequest].id
614             # ping reply from outside to pods
615             p2 = (
616                 Ether(dst=self.pg1.local_mac,
617                       src=self.pg1.remote_hosts[nbr].mac) /
618                 IPv6(src=remote_addr, dst=src_nat_addr) /
619                 ICMPv6EchoReply(id=received_id))
620             rxs = self.send_and_expect(
621                 self.pg1,
622                 p2 * N_PKTS,
623                 self.pg0)
624
625             for rx in rxs:
626                 self.assert_packet_checksums_valid(rx)
627                 self.assertEqual(rx[IPv6].src, remote_addr)
628                 self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed)
629
630     def sourcenat_test_icmp_echo4_conf(self):
631         sports = [1234, 1235]
632         dports = [6661, 6662]
633
634         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
635             IP46 = IP
636             client_addr = self.pg0.remote_hosts[0].ip4
637             remote_addr = self.pg1.remote_hosts[nbr].ip4
638             src_nat_addr = self.pg2.remote_hosts[0].ip4
639
640             # ping from pods to outside network
641             p1 = (
642                 Ether(dst=self.pg0.local_mac,
643                       src=self.pg0.remote_hosts[0].mac) /
644                 IP46(src=client_addr, dst=remote_addr) /
645                 ICMP(type=8, id=0xfeed) /
646                 Raw())
647
648             rxs = self.send_and_expect(
649                 self.pg0,
650                 p1 * N_PKTS,
651                 self.pg1)
652
653             for rx in rxs:
654                 self.assertEqual(rx[IP46].src, src_nat_addr)
655                 self.assert_packet_checksums_valid(rx)
656
657             received_id = rx[0][ICMP].id
658             # ping reply from outside to pods
659             p2 = (
660                 Ether(dst=self.pg1.local_mac,
661                       src=self.pg1.remote_hosts[nbr].mac) /
662                 IP46(src=remote_addr, dst=src_nat_addr) /
663                 ICMP(type=0, id=received_id))
664             rxs = self.send_and_expect(
665                 self.pg1,
666                 p2 * N_PKTS,
667                 self.pg0)
668
669             for rx in rxs:
670                 self.assert_packet_checksums_valid(rx)
671                 self.assertEqual(rx[IP46].src, remote_addr)
672                 self.assertEqual(rx[ICMP].id, 0xfeed)
673
674     def sourcenat_test_icmp_err_conf(self, isV6=False):
675         sports = [1234, 1235]
676         dports = [6661, 6662]
677
678         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
679             if isV6:
680                 IP46 = IPv6
681                 client_addr = self.pg0.remote_hosts[0].ip6
682                 remote_addr = self.pg1.remote_hosts[nbr].ip6
683                 src_nat_addr = self.pg2.remote_hosts[0].ip6
684                 ICMP46 = ICMPv6DestUnreach
685                 ICMPelem = ICMPv6DestUnreach(code=1)
686                 IP46error = IPerror6
687             else:
688                 IP46 = IP
689                 client_addr = self.pg0.remote_hosts[0].ip4
690                 remote_addr = self.pg1.remote_hosts[nbr].ip4
691                 src_nat_addr = self.pg2.remote_hosts[0].ip4
692                 IP46error = IPerror
693                 ICMP46 = ICMP
694                 ICMPelem = ICMP(type=11)
695
696             # from pods to outside network
697             p1 = (
698                 Ether(dst=self.pg0.local_mac,
699                       src=self.pg0.remote_hosts[0].mac) /
700                 IP46(src=client_addr, dst=remote_addr) /
701                 TCP(sport=sports[nbr], dport=dports[nbr]) /
702                 Raw())
703
704             rxs = self.send_and_expect(
705                 self.pg0,
706                 p1 * N_PKTS,
707                 self.pg1)
708             for rx in rxs:
709                 self.assert_packet_checksums_valid(rx)
710                 self.assertEqual(rx[IP46].dst, remote_addr)
711                 self.assertEqual(rx[TCP].dport, dports[nbr])
712                 self.assertEqual(rx[IP46].src, src_nat_addr)
713                 sport = rx[TCP].sport
714
715             InnerIP = rxs[0][IP46]
716             # from outside to pods, ICMP error
717             p2 = (
718                 Ether(dst=self.pg1.local_mac,
719                       src=self.pg1.remote_hosts[nbr].mac) /
720                 IP46(src=remote_addr, dst=src_nat_addr) /
721                 ICMPelem / InnerIP)
722
723             rxs = self.send_and_expect(
724                 self.pg1,
725                 p2 * N_PKTS,
726                 self.pg0)
727
728             for rx in rxs:
729                 self.assert_packet_checksums_valid(rx)
730                 self.assertEqual(rx[IP46].src, remote_addr)
731                 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
732                 self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr)
733                 self.assertEqual(rx[ICMP46][IP46error]
734                                  [TCPerror].sport, sports[nbr])
735                 self.assertEqual(rx[ICMP46][IP46error]
736                                  [TCPerror].dport, dports[nbr])
737
738     def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False):
739         sports = [1234, 1235]
740         dports = [6661, 6662]
741
742         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
743             if isV6:
744                 IP46 = IPv6
745                 client_addr = self.pg0.remote_hosts[0].ip6
746                 remote_addr = self.pg1.remote_hosts[nbr].ip6
747                 src_nat_addr = self.pg2.remote_hosts[0].ip6
748                 exclude_prefix = ip_network(
749                     "%s/100" % remote_addr, strict=False)
750             else:
751                 IP46 = IP
752                 client_addr = self.pg0.remote_hosts[0].ip4
753                 remote_addr = self.pg1.remote_hosts[nbr].ip4
754                 src_nat_addr = self.pg2.remote_hosts[0].ip4
755                 exclude_prefix = ip_network(
756                     "%s/16" % remote_addr, strict=False)
757             # from pods to outside network
758             p1 = (
759                 Ether(dst=self.pg0.local_mac,
760                       src=self.pg0.remote_hosts[0].mac) /
761                 IP46(src=client_addr, dst=remote_addr) /
762                 l4p(sport=sports[nbr], dport=dports[nbr]) /
763                 Raw())
764
765             rxs = self.send_and_expect(
766                 self.pg0,
767                 p1 * N_PKTS,
768                 self.pg1)
769             for rx in rxs:
770                 self.assert_packet_checksums_valid(rx)
771                 self.assertEqual(rx[IP46].dst, remote_addr)
772                 self.assertEqual(rx[l4p].dport, dports[nbr])
773                 self.assertEqual(rx[IP46].src, src_nat_addr)
774                 sport = rx[l4p].sport
775
776             # from outside to pods
777             p2 = (
778                 Ether(dst=self.pg1.local_mac,
779                       src=self.pg1.remote_hosts[nbr].mac) /
780                 IP46(src=remote_addr, dst=src_nat_addr) /
781                 l4p(sport=dports[nbr], dport=sport) /
782                 Raw())
783
784             rxs = self.send_and_expect(
785                 self.pg1,
786                 p2 * N_PKTS,
787                 self.pg0)
788
789             for rx in rxs:
790                 self.assert_packet_checksums_valid(rx)
791                 self.assertEqual(rx[IP46].dst, client_addr)
792                 self.assertEqual(rx[l4p].dport, sports[nbr])
793                 self.assertEqual(rx[l4p].sport, dports[nbr])
794                 self.assertEqual(rx[IP46].src, remote_addr)
795
796             # add remote host to exclude list
797             self.vapi.cnat_add_del_snat_prefix(prefix=exclude_prefix, is_add=1)
798             self.vapi.cnat_session_purge()
799
800             rxs = self.send_and_expect(
801                 self.pg0,
802                 p1 * N_PKTS,
803                 self.pg1)
804             for rx in rxs:
805                 self.assert_packet_checksums_valid(rx)
806                 self.assertEqual(rx[IP46].dst, remote_addr)
807                 self.assertEqual(rx[l4p].dport, dports[nbr])
808                 self.assertEqual(rx[IP46].src, client_addr)
809
810             # remove remote host from exclude list
811             self.vapi.cnat_add_del_snat_prefix(prefix=exclude_prefix, is_add=0)
812             self.vapi.cnat_session_purge()
813
814             rxs = self.send_and_expect(
815                 self.pg0,
816                 p1 * N_PKTS,
817                 self.pg1)
818
819             for rx in rxs:
820                 self.assert_packet_checksums_valid(rx)
821                 self.assertEqual(rx[IP46].dst, remote_addr)
822                 self.assertEqual(rx[l4p].dport, dports[nbr])
823                 self.assertEqual(rx[IP46].src, src_nat_addr)
824
825             self.vapi.cnat_session_purge()
826
827
828 if __name__ == '__main__':
829     unittest.main(testRunner=VppTestRunner)