cnat: Ip ICMP error support
[vpp.git] / src / plugins / cnat / test / test_cnat.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6 from vpp_ip import DpoProto
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether
10 from scapy.layers.inet import IP, UDP, TCP, ICMP
11 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
12 from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
13
14 import struct
15
16 from ipaddress import ip_address, ip_network, \
17     IPv4Address, IPv6Address, IPv4Network, IPv6Network
18
19 from vpp_object import VppObject
20 from vpp_papi import VppEnum
21
22 N_PKTS = 15
23
24
25 def find_cnat_translation(test, id):
26     ts = test.vapi.cnat_translation_dump()
27     for t in ts:
28         if id == t.translation.id:
29             return True
30     return False
31
32
33 class Ep(object):
34     """ CNat endpoint """
35
36     def __init__(self, ip, port, l4p=TCP):
37         self.ip = ip
38         self.port = port
39         self.l4p = l4p
40
41     def encode(self):
42         return {'addr': self.ip,
43                 'port': self.port}
44
45     @property
46     def isV6(self):
47         return ":" in self.ip
48
49     def __str__(self):
50         return ("%s:%d" % (self.ip, self.port))
51
52
53 class EpTuple(object):
54     """ CNat endpoint """
55
56     def __init__(self, src, dst):
57         self.src = src
58         self.dst = dst
59
60     def encode(self):
61         return {'src_ep': self.src.encode(),
62                 'dst_ep': self.dst.encode()}
63
64     def __str__(self):
65         return ("%s->%s" % (self.src, self.dst))
66
67
68 class VppCNatTranslation(VppObject):
69
70     def __init__(self, test, iproto, vip, paths):
71         self._test = test
72         self.vip = vip
73         self.iproto = iproto
74         self.paths = paths
75         self.encoded_paths = []
76         for path in self.paths:
77             self.encoded_paths.append(path.encode())
78
79     @property
80     def vl4_proto(self):
81         ip_proto = VppEnum.vl_api_ip_proto_t
82         return {
83             UDP: ip_proto.IP_API_PROTO_UDP,
84             TCP: ip_proto.IP_API_PROTO_TCP,
85         }[self.iproto]
86
87     def delete(self):
88         r = self._test.vapi.cnat_translation_del(id=self.id)
89
90     def add_vpp_config(self):
91         r = self._test.vapi.cnat_translation_update(
92             {'vip': self.vip.encode(),
93              'ip_proto': self.vl4_proto,
94              'n_paths': len(self.paths),
95              'paths': self.encoded_paths})
96         self._test.registry.register(self, self._test.logger)
97         self.id = r.id
98
99     def modify_vpp_config(self, paths):
100         self.paths = paths
101         self.encoded_paths = []
102         for path in self.paths:
103             self.encoded_paths.append(path.encode())
104
105         r = self._test.vapi.cnat_translation_update(
106             {'vip': self.vip.encode(),
107              'ip_proto': self.vl4_proto,
108              'n_paths': len(self.paths),
109              'paths': self.encoded_paths})
110         self._test.registry.register(self, self._test.logger)
111
112     def remove_vpp_config(self):
113         self._test.vapi.cnat_translation_del(self.id)
114
115     def query_vpp_config(self):
116         return find_cnat_translation(self._test, self.id)
117
118     def object_id(self):
119         return ("cnat-translation-%s" % (self.vip))
120
121     def get_stats(self):
122         c = self._test.statistics.get_counter("/net/cnat-translation")
123         return c[0][self.id]
124
125
126 class VppCNATSourceNat(VppObject):
127
128     def __init__(self, test, address, exclude_subnets=[]):
129         self._test = test
130         self.address = address
131         self.exclude_subnets = exclude_subnets
132
133     def add_vpp_config(self):
134         a = ip_address(self.address)
135         if 4 == a.version:
136             self._test.vapi.cnat_set_snat_addresses(snat_ip4=self.address)
137         else:
138             self._test.vapi.cnat_set_snat_addresses(snat_ip6=self.address)
139         for subnet in self.exclude_subnets:
140             self.cnat_exclude_subnet(subnet, True)
141
142     def cnat_exclude_subnet(self, exclude_subnet, isAdd=True):
143         add = 1 if isAdd else 0
144         self._test.vapi.cnat_add_del_snat_prefix(
145             prefix=exclude_subnet, is_add=add)
146
147     def query_vpp_config(self):
148         return False
149
150     def remove_vpp_config(self):
151         return False
152
153
154 class TestCNatTranslation(VppTestCase):
155     """ CNat Translation """
156     extra_vpp_punt_config = ["cnat", "{",
157                              "session-db-buckets", "64",
158                              "session-cleanup-timeout", "0.1",
159                              "session-max-age", "1",
160                              "tcp-max-age", "1",
161                              "scanner", "off", "}"]
162
163     @classmethod
164     def setUpClass(cls):
165         super(TestCNatTranslation, cls).setUpClass()
166
167     @classmethod
168     def tearDownClass(cls):
169         super(TestCNatTranslation, cls).tearDownClass()
170
171     def setUp(self):
172         super(TestCNatTranslation, self).setUp()
173
174         self.create_pg_interfaces(range(3))
175
176         for i in self.pg_interfaces:
177             i.admin_up()
178             i.config_ip4()
179             i.resolve_arp()
180             i.config_ip6()
181             i.resolve_ndp()
182
183     def tearDown(self):
184         for i in self.pg_interfaces:
185             i.unconfig_ip4()
186             i.unconfig_ip6()
187             i.admin_down()
188         super(TestCNatTranslation, self).tearDown()
189
190     def cnat_create_translation(self, vip, nbr):
191         ip_v = "ip6" if vip.isV6 else "ip4"
192         dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr)
193         sep = Ep("::", 0) if vip.isV6 else Ep("0.0.0.0", 0)
194         t1 = VppCNatTranslation(
195             self, vip.l4p, vip,
196             [EpTuple(sep, dep), EpTuple(sep, dep)])
197         t1.add_vpp_config()
198         return t1
199
200     def cnat_test_translation(self, t1, nbr, sports, isV6=False):
201         ip_v = "ip6" if isV6 else "ip4"
202         ip_class = IPv6 if isV6 else IP
203         vip = t1.vip
204
205         #
206         # Flows
207         #
208         for src in self.pg0.remote_hosts:
209             for sport in sports:
210                 # from client to vip
211                 p1 = (Ether(dst=self.pg0.local_mac,
212                             src=src.mac) /
213                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
214                       vip.l4p(sport=sport, dport=vip.port) /
215                       Raw())
216
217                 self.vapi.cli("trace add pg-input 1")
218                 rxs = self.send_and_expect(self.pg0,
219                                            p1 * N_PKTS,
220                                            self.pg1)
221
222                 for rx in rxs:
223                     self.assert_packet_checksums_valid(rx)
224                     self.assertEqual(
225                         rx[ip_class].dst,
226                         getattr(self.pg1.remote_hosts[nbr], ip_v))
227                     self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
228                     self.assertEqual(
229                         rx[ip_class].src,
230                         getattr(src, ip_v))
231                     self.assertEqual(rx[vip.l4p].sport, sport)
232
233                 # from vip to client
234                 p1 = (Ether(dst=self.pg1.local_mac,
235                             src=self.pg1.remote_mac) /
236                       ip_class(src=getattr(
237                           self.pg1.remote_hosts[nbr],
238                           ip_v),
239                           dst=getattr(src, ip_v)) /
240                       vip.l4p(sport=4000 + nbr, dport=sport) /
241                       Raw())
242
243                 rxs = self.send_and_expect(self.pg1,
244                                            p1 * N_PKTS,
245                                            self.pg0)
246
247                 for rx in rxs:
248                     self.assert_packet_checksums_valid(rx)
249                     self.assertEqual(
250                         rx[ip_class].dst,
251                         getattr(src, ip_v))
252                     self.assertEqual(rx[vip.l4p].dport, sport)
253                     self.assertEqual(rx[ip_class].src, vip.ip)
254                     self.assertEqual(rx[vip.l4p].sport, vip.port)
255
256                 #
257                 # packets to the VIP that do not match a
258                 # translation are dropped
259                 #
260                 p1 = (Ether(dst=self.pg0.local_mac,
261                             src=src.mac) /
262                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
263                       vip.l4p(sport=sport, dport=6666) /
264                       Raw())
265
266                 self.send_and_assert_no_replies(self.pg0,
267                                                 p1 * N_PKTS,
268                                                 self.pg1)
269
270                 #
271                 # packets from the VIP that do not match a
272                 # session are forwarded
273                 #
274                 p1 = (Ether(dst=self.pg1.local_mac,
275                             src=self.pg1.remote_mac) /
276                       ip_class(src=getattr(
277                           self.pg1.remote_hosts[nbr],
278                           ip_v),
279                           dst=getattr(src, ip_v)) /
280                       vip.l4p(sport=6666, dport=sport) /
281                       Raw())
282
283                 rxs = self.send_and_expect(self.pg1,
284                                            p1 * N_PKTS,
285                                            self.pg0)
286
287         self.assertEqual(t1.get_stats()['packets'],
288                          N_PKTS *
289                          len(sports) *
290                          len(self.pg0.remote_hosts))
291
292     def cnat_test_translation_update(self, t1, sports, isV6=False):
293         ip_v = "ip6" if isV6 else "ip4"
294         ip_class = IPv6 if isV6 else IP
295         vip = t1.vip
296
297         #
298         # modify the translation to use a different backend
299         #
300         dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000)
301         sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
302         t1.modify_vpp_config([EpTuple(sep, dep)])
303
304         #
305         # existing flows follow the old path
306         #
307         for src in self.pg0.remote_hosts:
308             for sport in sports:
309                 # from client to vip
310                 p1 = (Ether(dst=self.pg0.local_mac,
311                             src=src.mac) /
312                       ip_class(src=getattr(src, ip_v), dst=vip.ip) /
313                       vip.l4p(sport=sport, dport=vip.port) /
314                       Raw())
315
316                 rxs = self.send_and_expect(self.pg0,
317                                            p1 * N_PKTS,
318                                            self.pg1)
319
320         #
321         # new flows go to the new backend
322         #
323         for src in self.pg0.remote_hosts:
324             p1 = (Ether(dst=self.pg0.local_mac,
325                         src=src.mac) /
326                   ip_class(src=getattr(src, ip_v), dst=vip.ip) /
327                   vip.l4p(sport=9999, dport=vip.port) /
328                   Raw())
329
330             rxs = self.send_and_expect(self.pg0,
331                                        p1 * N_PKTS,
332                                        self.pg2)
333
334     def cnat_translation(self, vips, isV6=False):
335         """ CNat Translation """
336
337         ip_class = IPv6 if isV6 else IP
338         ip_v = "ip6" if isV6 else "ip4"
339         sports = [1234, 1233]
340
341         #
342         # turn the scanner off whilst testing otherwise sessions
343         # will time out
344         #
345         self.vapi.cli("test cnat scanner off")
346
347         sessions = self.vapi.cnat_session_dump()
348
349         trs = []
350         for nbr, vip in enumerate(vips):
351             trs.append(self.cnat_create_translation(vip, nbr))
352
353         self.logger.info(self.vapi.cli("sh cnat client"))
354         self.logger.info(self.vapi.cli("sh cnat translation"))
355
356         #
357         # translations
358         #
359         for nbr, vip in enumerate(vips):
360             self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6)
361             self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6)
362             if isV6:
363                 self.logger.info(self.vapi.cli(
364                     "sh ip6 fib %s" % self.pg0.remote_ip6))
365             else:
366                 self.logger.info(self.vapi.cli(
367                     "sh ip fib %s" % self.pg0.remote_ip4))
368             self.logger.info(self.vapi.cli("sh cnat session verbose"))
369
370         #
371         # turn the scanner back on and wait until the sessions
372         # all disapper
373         #
374         self.vapi.cli("test cnat scanner on")
375
376         n_tries = 0
377         sessions = self.vapi.cnat_session_dump()
378         while (len(sessions) and n_tries < 100):
379             n_tries += 1
380             sessions = self.vapi.cnat_session_dump()
381             self.sleep(2)
382             print(self.vapi.cli("show cnat session verbose"))
383
384         self.assertTrue(n_tries < 100)
385         self.vapi.cli("test cnat scanner off")
386
387         #
388         # load some flows again and purge
389         #
390         for vip in vips:
391             for src in self.pg0.remote_hosts:
392                 for sport in sports:
393                     # from client to vip
394                     p1 = (Ether(dst=self.pg0.local_mac,
395                                 src=src.mac) /
396                           ip_class(src=getattr(src, ip_v), dst=vip.ip) /
397                           vip.l4p(sport=sport, dport=vip.port) /
398                           Raw())
399                     self.send_and_expect(self.pg0,
400                                          p1 * N_PKTS,
401                                          self.pg2)
402
403         for tr in trs:
404             tr.delete()
405
406         self.assertTrue(self.vapi.cnat_session_dump())
407         self.vapi.cnat_session_purge()
408         self.assertFalse(self.vapi.cnat_session_dump())
409
410     def test_icmp(self):
411         vips = [
412             Ep("30.0.0.1", 5555),
413             Ep("30.0.0.2", 5554),
414             Ep("30.0.0.2", 5553, UDP),
415             Ep("30::1", 6666),
416             Ep("30::2", 5553, UDP),
417         ]
418         sport = 1234
419
420         self.pg0.generate_remote_hosts(len(vips))
421         self.pg0.configure_ipv6_neighbors()
422         self.pg0.configure_ipv4_neighbors()
423
424         self.pg1.generate_remote_hosts(len(vips))
425         self.pg1.configure_ipv6_neighbors()
426         self.pg1.configure_ipv4_neighbors()
427
428         self.vapi.cli("test cnat scanner off")
429         trs = []
430         for nbr, vip in enumerate(vips):
431             trs.append(self.cnat_create_translation(vip, nbr))
432
433         self.logger.info(self.vapi.cli("sh cnat client"))
434         self.logger.info(self.vapi.cli("sh cnat translation"))
435
436         for nbr, vip in enumerate(vips):
437             if vip.isV6:
438                 client_addr = self.pg0.remote_hosts[0].ip6
439                 remote_addr = self.pg1.remote_hosts[nbr].ip6
440                 remote2_addr = self.pg2.remote_hosts[0].ip6
441             else:
442                 client_addr = self.pg0.remote_hosts[0].ip4
443                 remote_addr = self.pg1.remote_hosts[nbr].ip4
444                 remote2_addr = self.pg2.remote_hosts[0].ip4
445             IP46 = IPv6 if vip.isV6 else IP
446             # from client to vip
447             p1 = (Ether(dst=self.pg0.local_mac,
448                         src=self.pg0.remote_hosts[0].mac) /
449                   IP46(src=client_addr, dst=vip.ip) /
450                   vip.l4p(sport=sport, dport=vip.port) /
451                   Raw())
452
453             rxs = self.send_and_expect(self.pg0,
454                                        p1 * N_PKTS,
455                                        self.pg1)
456
457             for rx in rxs:
458                 self.assert_packet_checksums_valid(rx)
459                 self.assertEqual(rx[IP46].dst, remote_addr)
460                 self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
461                 self.assertEqual(rx[IP46].src, client_addr)
462                 self.assertEqual(rx[vip.l4p].sport, sport)
463
464             InnerIP = rxs[0][IP46]
465
466             ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP
467             ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11)
468             # from vip to client, ICMP error
469             p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
470                   IP46(src=remote_addr, dst=client_addr) /
471                   ICMPelem / InnerIP)
472
473             rxs = self.send_and_expect(self.pg1,
474                                        p1 * N_PKTS,
475                                        self.pg0)
476
477             TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
478             IP46error = IPerror6 if vip.isV6 else IPerror
479             for rx in rxs:
480                 self.assert_packet_checksums_valid(rx)
481                 self.assertEqual(rx[IP46].src, vip.ip)
482                 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
483                 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
484                 self.assertEqual(rx[ICMP46][IP46error]
485                                  [TCPUDPError].sport, sport)
486                 self.assertEqual(rx[ICMP46][IP46error]
487                                  [TCPUDPError].dport, vip.port)
488
489             # from other remote to client, ICMP error
490             # outside shouldn't be NAT-ed
491             p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
492                   IP46(src=remote2_addr, dst=client_addr) /
493                   ICMPelem / InnerIP)
494
495             rxs = self.send_and_expect(self.pg1,
496                                        p1 * N_PKTS,
497                                        self.pg0)
498
499             TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
500             IP46error = IPerror6 if vip.isV6 else IPerror
501             for rx in rxs:
502                 self.assert_packet_checksums_valid(rx)
503                 self.assertEqual(rx[IP46].src, remote2_addr)
504                 self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
505                 self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
506                 self.assertEqual(rx[ICMP46][IP46error]
507                                  [TCPUDPError].sport, sport)
508                 self.assertEqual(rx[ICMP46][IP46error]
509                                  [TCPUDPError].dport, vip.port)
510
511         self.vapi.cnat_session_purge()
512
513     def test_cnat6(self):
514         # """ CNat Translation ipv6 """
515         vips = [
516             Ep("30::1", 5555),
517             Ep("30::2", 5554),
518             Ep("30::2", 5553, UDP),
519         ]
520
521         self.pg0.generate_remote_hosts(len(vips))
522         self.pg0.configure_ipv6_neighbors()
523         self.pg1.generate_remote_hosts(len(vips))
524         self.pg1.configure_ipv6_neighbors()
525
526         self.cnat_translation(vips, isV6=True)
527
528     def test_cnat4(self):
529         # """ CNat Translation ipv4 """
530
531         vips = [
532             Ep("30.0.0.1", 5555),
533             Ep("30.0.0.2", 5554),
534             Ep("30.0.0.2", 5553, UDP),
535         ]
536
537         self.pg0.generate_remote_hosts(len(vips))
538         self.pg0.configure_ipv4_neighbors()
539         self.pg1.generate_remote_hosts(len(vips))
540         self.pg1.configure_ipv4_neighbors()
541
542         self.cnat_translation(vips)
543
544
545 class TestCNatSourceNAT(VppTestCase):
546     """ CNat Source NAT """
547     extra_vpp_punt_config = ["cnat", "{",
548                              "session-max-age", "1",
549                              "tcp-max-age", "1", "}"]
550
551     @classmethod
552     def setUpClass(cls):
553         super(TestCNatSourceNAT, cls).setUpClass()
554
555     @classmethod
556     def tearDownClass(cls):
557         super(TestCNatSourceNAT, cls).tearDownClass()
558
559     def setUp(self):
560         super(TestCNatSourceNAT, self).setUp()
561
562         self.create_pg_interfaces(range(3))
563
564         for i in self.pg_interfaces:
565             i.admin_up()
566             i.config_ip4()
567             i.resolve_arp()
568             i.config_ip6()
569             i.resolve_ndp()
570
571     def tearDown(self):
572         for i in self.pg_interfaces:
573             i.unconfig_ip4()
574             i.unconfig_ip6()
575             i.admin_down()
576         super(TestCNatSourceNAT, self).tearDown()
577
578     def cnat_set_snat_address(self, srcNatAddr, interface, isV6=False):
579         t1 = VppCNATSourceNat(self, srcNatAddr)
580         t1.add_vpp_config()
581         cnat_arc_name = "ip6-unicast" if isV6 else "ip4-unicast"
582         cnat_feature_name = "ip6-cnat-snat" if isV6 else "ip4-cnat-snat"
583         self.vapi.feature_enable_disable(
584             enable=1,
585             arc_name=cnat_arc_name,
586             feature_name=cnat_feature_name,
587             sw_if_index=interface.sw_if_index)
588
589         return t1
590
591     def cnat_test_sourcenat(self, srcNatAddr, l4p=TCP, isV6=False):
592         ip_v = "ip6" if isV6 else "ip4"
593         IP46 = IPv6 if isV6 else IP
594         sports = [1234, 1235, 1236]
595         dports = [6661, 6662, 6663]
596
597         self.pg0.generate_remote_hosts(1)
598         self.pg0.configure_ipv4_neighbors()
599         self.pg0.configure_ipv6_neighbors()
600         self.pg1.generate_remote_hosts(len(sports))
601         self.pg1.configure_ipv4_neighbors()
602         self.pg1.configure_ipv6_neighbors()
603
604         self.vapi.cli("test cnat scanner on")
605         t1 = self.cnat_set_snat_address(srcNatAddr, self.pg0, isV6)
606
607         for nbr, remote_host in enumerate(self.pg1.remote_hosts):
608             if isV6:
609                 client_addr = self.pg0.remote_hosts[0].ip6
610                 remote_addr = self.pg1.remote_hosts[nbr].ip6
611             else:
612                 client_addr = self.pg0.remote_hosts[0].ip4
613                 remote_addr = self.pg1.remote_hosts[nbr].ip4
614             # from pods to outside network
615             p1 = (
616                 Ether(dst=self.pg0.local_mac,
617                       src=self.pg0.remote_hosts[0].mac) /
618                 IP46(src=client_addr, dst=remote_addr) /
619                 l4p(sport=sports[nbr], dport=dports[nbr]) /
620                 Raw())
621
622             rxs = self.send_and_expect(
623                 self.pg0,
624                 p1 * N_PKTS,
625                 self.pg1)
626             for rx in rxs:
627                 self.assert_packet_checksums_valid(rx)
628                 self.assertEqual(rx[IP46].dst, remote_addr)
629                 self.assertEqual(rx[l4p].dport, dports[nbr])
630                 self.assertEqual(rx[IP46].src, srcNatAddr)
631                 sport = rx[l4p].sport
632
633             # from outside to pods
634             p2 = (
635                 Ether(dst=self.pg1.local_mac,
636                       src=self.pg1.remote_hosts[nbr].mac) /
637                 IP46(src=remote_addr, dst=srcNatAddr) /
638                 l4p(sport=dports[nbr], dport=sport) /
639                 Raw())
640
641             rxs = self.send_and_expect(
642                 self.pg1,
643                 p2 * N_PKTS,
644                 self.pg0)
645
646             for rx in rxs:
647                 self.assert_packet_checksums_valid(rx)
648                 self.assertEqual(rx[IP46].dst, client_addr)
649                 self.assertEqual(rx[l4p].dport, sports[nbr])
650                 self.assertEqual(rx[l4p].sport, dports[nbr])
651                 self.assertEqual(rx[IP46].src, remote_addr)
652
653             # add remote host to exclude list
654             subnet_mask = 100 if isV6 else 16
655             subnet = "%s/%d" % (remote_addr, subnet_mask)
656             exclude_subnet = ip_network(subnet, strict=False)
657
658             t1.cnat_exclude_subnet(exclude_subnet)
659             self.vapi.cnat_session_purge()
660
661             rxs = self.send_and_expect(
662                 self.pg0,
663                 p1 * N_PKTS,
664                 self.pg1)
665             for rx in rxs:
666                 self.assert_packet_checksums_valid(rx)
667                 self.assertEqual(rx[IP46].dst, remote_addr)
668                 self.assertEqual(rx[l4p].dport, dports[nbr])
669                 self.assertEqual(rx[IP46].src, client_addr)
670
671             # remove remote host from exclude list
672             t1.cnat_exclude_subnet(exclude_subnet, isAdd=False)
673             self.vapi.cnat_session_purge()
674
675             rxs = self.send_and_expect(
676                 self.pg0,
677                 p1 * N_PKTS,
678                 self.pg1)
679
680             for rx in rxs:
681                 self.assert_packet_checksums_valid(rx)
682                 self.assertEqual(rx[IP46].dst, remote_addr)
683                 self.assertEqual(rx[l4p].dport, dports[nbr])
684                 self.assertEqual(rx[IP46].src, srcNatAddr)
685
686     def test_cnat6_sourcenat(self):
687         # """ CNat Source Nat ipv6 """
688         self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, TCP, True)
689         self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, UDP, True)
690
691     def test_cnat4_sourcenat(self):
692         # """ CNat Source Nat ipv4 """
693         self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, TCP)
694         self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, UDP)
695
696
697 if __name__ == '__main__':
698     unittest.main(testRunner=VppTestRunner)