Fix coverity issue
[vpp.git] / test / test_snat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
15 from util import ppp
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
18
19
20 class MethodHolder(VppTestCase):
21     """ SNAT create capture and verify method holder """
22
23     @classmethod
24     def setUpClass(cls):
25         super(MethodHolder, cls).setUpClass()
26
27     def tearDown(self):
28         super(MethodHolder, self).tearDown()
29
30     def create_stream_in(self, in_if, out_if, ttl=64):
31         """
32         Create packet stream for inside network
33
34         :param in_if: Inside interface
35         :param out_if: Outside interface
36         :param ttl: TTL of generated packets
37         """
38         pkts = []
39         # TCP
40         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
41              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
42              TCP(sport=self.tcp_port_in, dport=20))
43         pkts.append(p)
44
45         # UDP
46         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
47              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
48              UDP(sport=self.udp_port_in, dport=20))
49         pkts.append(p)
50
51         # ICMP
52         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
53              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
54              ICMP(id=self.icmp_id_in, type='echo-request'))
55         pkts.append(p)
56
57         return pkts
58
59     def create_stream_in_ip6(self, in_if, out_if, hlim=64):
60         """
61         Create IPv6 packet stream for inside network
62
63         :param in_if: Inside interface
64         :param out_if: Outside interface
65         :param ttl: Hop Limit of generated packets
66         """
67         pkts = []
68         dst = ''.join(['64:ff9b::', out_if.remote_ip4])
69         # TCP
70         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
71              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
72              TCP(sport=self.tcp_port_in, dport=20))
73         pkts.append(p)
74
75         # UDP
76         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
77              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
78              UDP(sport=self.udp_port_in, dport=20))
79         pkts.append(p)
80
81         # ICMP
82         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
83              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
84              ICMPv6EchoRequest(id=self.icmp_id_in))
85         pkts.append(p)
86
87         return pkts
88
89     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
90         """
91         Create packet stream for outside network
92
93         :param out_if: Outside interface
94         :param dst_ip: Destination IP address (Default use global SNAT address)
95         :param ttl: TTL of generated packets
96         """
97         if dst_ip is None:
98             dst_ip = self.snat_addr
99         pkts = []
100         # TCP
101         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
102              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
103              TCP(dport=self.tcp_port_out, sport=20))
104         pkts.append(p)
105
106         # UDP
107         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
108              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
109              UDP(dport=self.udp_port_out, sport=20))
110         pkts.append(p)
111
112         # ICMP
113         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
114              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
115              ICMP(id=self.icmp_id_out, type='echo-reply'))
116         pkts.append(p)
117
118         return pkts
119
120     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
121                            packet_num=3, dst_ip=None):
122         """
123         Verify captured packets on outside network
124
125         :param capture: Captured packets
126         :param nat_ip: Translated IP address (Default use global SNAT address)
127         :param same_port: Sorce port number is not translated (Default False)
128         :param packet_num: Expected number of packets (Default 3)
129         :param dst_ip: Destination IP address (Default do not verify)
130         """
131         if nat_ip is None:
132             nat_ip = self.snat_addr
133         self.assertEqual(packet_num, len(capture))
134         for packet in capture:
135             try:
136                 self.assertEqual(packet[IP].src, nat_ip)
137                 if dst_ip is not None:
138                     self.assertEqual(packet[IP].dst, dst_ip)
139                 if packet.haslayer(TCP):
140                     if same_port:
141                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
142                     else:
143                         self.assertNotEqual(
144                             packet[TCP].sport, self.tcp_port_in)
145                     self.tcp_port_out = packet[TCP].sport
146                 elif packet.haslayer(UDP):
147                     if same_port:
148                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
149                     else:
150                         self.assertNotEqual(
151                             packet[UDP].sport, self.udp_port_in)
152                     self.udp_port_out = packet[UDP].sport
153                 else:
154                     if same_port:
155                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
156                     else:
157                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
158                     self.icmp_id_out = packet[ICMP].id
159             except:
160                 self.logger.error(ppp("Unexpected or invalid packet "
161                                       "(outside network):", packet))
162                 raise
163
164     def verify_capture_in(self, capture, in_if, packet_num=3):
165         """
166         Verify captured packets on inside network
167
168         :param capture: Captured packets
169         :param in_if: Inside interface
170         :param packet_num: Expected number of packets (Default 3)
171         """
172         self.assertEqual(packet_num, len(capture))
173         for packet in capture:
174             try:
175                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
176                 if packet.haslayer(TCP):
177                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
178                 elif packet.haslayer(UDP):
179                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
180                 else:
181                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
182             except:
183                 self.logger.error(ppp("Unexpected or invalid packet "
184                                       "(inside network):", packet))
185                 raise
186
187     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
188         """
189         Verify captured IPv6 packets on inside network
190
191         :param capture: Captured packets
192         :param src_ip: Source IP
193         :param dst_ip: Destination IP address
194         :param packet_num: Expected number of packets (Default 3)
195         """
196         self.assertEqual(packet_num, len(capture))
197         for packet in capture:
198             try:
199                 self.assertEqual(packet[IPv6].src, src_ip)
200                 self.assertEqual(packet[IPv6].dst, dst_ip)
201                 if packet.haslayer(TCP):
202                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
203                 elif packet.haslayer(UDP):
204                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
205                 else:
206                     self.assertEqual(packet[ICMPv6EchoReply].id,
207                                      self.icmp_id_in)
208             except:
209                 self.logger.error(ppp("Unexpected or invalid packet "
210                                       "(inside network):", packet))
211                 raise
212
213     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
214         """
215         Verify captured packet that don't have to be translated
216
217         :param capture: Captured packets
218         :param ingress_if: Ingress interface
219         :param egress_if: Egress interface
220         """
221         for packet in capture:
222             try:
223                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
224                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
225                 if packet.haslayer(TCP):
226                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
227                 elif packet.haslayer(UDP):
228                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
229                 else:
230                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
231             except:
232                 self.logger.error(ppp("Unexpected or invalid packet "
233                                       "(inside network):", packet))
234                 raise
235
236     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
237                                             packet_num=3, icmp_type=11):
238         """
239         Verify captured packets with ICMP errors on outside network
240
241         :param capture: Captured packets
242         :param src_ip: Translated IP address or IP address of VPP
243                        (Default use global SNAT address)
244         :param packet_num: Expected number of packets (Default 3)
245         :param icmp_type: Type of error ICMP packet
246                           we are expecting (Default 11)
247         """
248         if src_ip is None:
249             src_ip = self.snat_addr
250         self.assertEqual(packet_num, len(capture))
251         for packet in capture:
252             try:
253                 self.assertEqual(packet[IP].src, src_ip)
254                 self.assertTrue(packet.haslayer(ICMP))
255                 icmp = packet[ICMP]
256                 self.assertEqual(icmp.type, icmp_type)
257                 self.assertTrue(icmp.haslayer(IPerror))
258                 inner_ip = icmp[IPerror]
259                 if inner_ip.haslayer(TCPerror):
260                     self.assertEqual(inner_ip[TCPerror].dport,
261                                      self.tcp_port_out)
262                 elif inner_ip.haslayer(UDPerror):
263                     self.assertEqual(inner_ip[UDPerror].dport,
264                                      self.udp_port_out)
265                 else:
266                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
267             except:
268                 self.logger.error(ppp("Unexpected or invalid packet "
269                                       "(outside network):", packet))
270                 raise
271
272     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
273                                            icmp_type=11):
274         """
275         Verify captured packets with ICMP errors on inside network
276
277         :param capture: Captured packets
278         :param in_if: Inside interface
279         :param packet_num: Expected number of packets (Default 3)
280         :param icmp_type: Type of error ICMP packet
281                           we are expecting (Default 11)
282         """
283         self.assertEqual(packet_num, len(capture))
284         for packet in capture:
285             try:
286                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
287                 self.assertTrue(packet.haslayer(ICMP))
288                 icmp = packet[ICMP]
289                 self.assertEqual(icmp.type, icmp_type)
290                 self.assertTrue(icmp.haslayer(IPerror))
291                 inner_ip = icmp[IPerror]
292                 if inner_ip.haslayer(TCPerror):
293                     self.assertEqual(inner_ip[TCPerror].sport,
294                                      self.tcp_port_in)
295                 elif inner_ip.haslayer(UDPerror):
296                     self.assertEqual(inner_ip[UDPerror].sport,
297                                      self.udp_port_in)
298                 else:
299                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
300             except:
301                 self.logger.error(ppp("Unexpected or invalid packet "
302                                       "(inside network):", packet))
303                 raise
304
305     def verify_ipfix_nat44_ses(self, data):
306         """
307         Verify IPFIX NAT44 session create/delete event
308
309         :param data: Decoded IPFIX data records
310         """
311         nat44_ses_create_num = 0
312         nat44_ses_delete_num = 0
313         self.assertEqual(6, len(data))
314         for record in data:
315             # natEvent
316             self.assertIn(ord(record[230]), [4, 5])
317             if ord(record[230]) == 4:
318                 nat44_ses_create_num += 1
319             else:
320                 nat44_ses_delete_num += 1
321             # sourceIPv4Address
322             self.assertEqual(self.pg0.remote_ip4n, record[8])
323             # postNATSourceIPv4Address
324             self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
325                              record[225])
326             # ingressVRFID
327             self.assertEqual(struct.pack("!I", 0), record[234])
328             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
329             if IP_PROTOS.icmp == ord(record[4]):
330                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
331                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
332                                  record[227])
333             elif IP_PROTOS.tcp == ord(record[4]):
334                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
335                                  record[7])
336                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
337                                  record[227])
338             elif IP_PROTOS.udp == ord(record[4]):
339                 self.assertEqual(struct.pack("!H", self.udp_port_in),
340                                  record[7])
341                 self.assertEqual(struct.pack("!H", self.udp_port_out),
342                                  record[227])
343             else:
344                 self.fail("Invalid protocol")
345         self.assertEqual(3, nat44_ses_create_num)
346         self.assertEqual(3, nat44_ses_delete_num)
347
348     def verify_ipfix_addr_exhausted(self, data):
349         """
350         Verify IPFIX NAT addresses event
351
352         :param data: Decoded IPFIX data records
353         """
354         self.assertEqual(1, len(data))
355         record = data[0]
356         # natEvent
357         self.assertEqual(ord(record[230]), 3)
358         # natPoolID
359         self.assertEqual(struct.pack("!I", 0), record[283])
360
361
362 class TestSNAT(MethodHolder):
363     """ SNAT Test Cases """
364
365     @classmethod
366     def setUpClass(cls):
367         super(TestSNAT, cls).setUpClass()
368
369         try:
370             cls.tcp_port_in = 6303
371             cls.tcp_port_out = 6303
372             cls.udp_port_in = 6304
373             cls.udp_port_out = 6304
374             cls.icmp_id_in = 6305
375             cls.icmp_id_out = 6305
376             cls.snat_addr = '10.0.0.3'
377             cls.ipfix_src_port = 4739
378             cls.ipfix_domain_id = 1
379
380             cls.create_pg_interfaces(range(9))
381             cls.interfaces = list(cls.pg_interfaces[0:4])
382
383             for i in cls.interfaces:
384                 i.admin_up()
385                 i.config_ip4()
386                 i.resolve_arp()
387
388             cls.pg0.generate_remote_hosts(3)
389             cls.pg0.configure_ipv4_neighbors()
390
391             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
392
393             cls.pg4._local_ip4 = "172.16.255.1"
394             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
395             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
396             cls.pg4.set_table_ip4(10)
397             cls.pg5._local_ip4 = "172.16.255.3"
398             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
399             cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
400             cls.pg5.set_table_ip4(10)
401             cls.pg6._local_ip4 = "172.16.255.1"
402             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
403             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
404             cls.pg6.set_table_ip4(20)
405             for i in cls.overlapping_interfaces:
406                 i.config_ip4()
407                 i.admin_up()
408                 i.resolve_arp()
409
410             cls.pg7.admin_up()
411             cls.pg8.admin_up()
412
413         except Exception:
414             super(TestSNAT, cls).tearDownClass()
415             raise
416
417     def clear_snat(self):
418         """
419         Clear SNAT configuration.
420         """
421         # I found no elegant way to do this
422         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
423                                    dst_address_length=32,
424                                    next_hop_address=self.pg7.remote_ip4n,
425                                    next_hop_sw_if_index=self.pg7.sw_if_index,
426                                    is_add=0)
427         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
428                                    dst_address_length=32,
429                                    next_hop_address=self.pg8.remote_ip4n,
430                                    next_hop_sw_if_index=self.pg8.sw_if_index,
431                                    is_add=0)
432
433         for intf in [self.pg7, self.pg8]:
434             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
435             for n in neighbors:
436                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
437                                               n.mac_address,
438                                               n.ip_address,
439                                               is_add=0)
440
441         if self.pg7.has_ip4_config:
442             self.pg7.unconfig_ip4()
443
444         interfaces = self.vapi.snat_interface_addr_dump()
445         for intf in interfaces:
446             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
447
448         self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
449                              domain_id=self.ipfix_domain_id)
450         self.ipfix_src_port = 4739
451         self.ipfix_domain_id = 1
452
453         interfaces = self.vapi.snat_interface_dump()
454         for intf in interfaces:
455             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
456                                                      intf.is_inside,
457                                                      is_add=0)
458
459         static_mappings = self.vapi.snat_static_mapping_dump()
460         for sm in static_mappings:
461             self.vapi.snat_add_static_mapping(sm.local_ip_address,
462                                               sm.external_ip_address,
463                                               local_port=sm.local_port,
464                                               external_port=sm.external_port,
465                                               addr_only=sm.addr_only,
466                                               vrf_id=sm.vrf_id,
467                                               protocol=sm.protocol,
468                                               is_add=0)
469
470         adresses = self.vapi.snat_address_dump()
471         for addr in adresses:
472             self.vapi.snat_add_address_range(addr.ip_address,
473                                              addr.ip_address,
474                                              is_add=0)
475
476     def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
477                                 local_port=0, external_port=0, vrf_id=0,
478                                 is_add=1, external_sw_if_index=0xFFFFFFFF,
479                                 proto=0):
480         """
481         Add/delete S-NAT static mapping
482
483         :param local_ip: Local IP address
484         :param external_ip: External IP address
485         :param local_port: Local port number (Optional)
486         :param external_port: External port number (Optional)
487         :param vrf_id: VRF ID (Default 0)
488         :param is_add: 1 if add, 0 if delete (Default add)
489         :param external_sw_if_index: External interface instead of IP address
490         :param proto: IP protocol (Mandatory if port specified)
491         """
492         addr_only = 1
493         if local_port and external_port:
494             addr_only = 0
495         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
496         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
497         self.vapi.snat_add_static_mapping(
498             l_ip,
499             e_ip,
500             external_sw_if_index,
501             local_port,
502             external_port,
503             addr_only,
504             vrf_id,
505             proto,
506             is_add)
507
508     def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
509         """
510         Add/delete S-NAT address
511
512         :param ip: IP address
513         :param is_add: 1 if add, 0 if delete (Default add)
514         """
515         snat_addr = socket.inet_pton(socket.AF_INET, ip)
516         self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
517                                          vrf_id=vrf_id)
518
519     def test_dynamic(self):
520         """ SNAT dynamic translation test """
521
522         self.snat_add_address(self.snat_addr)
523         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
524         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
525                                                  is_inside=0)
526
527         # in2out
528         pkts = self.create_stream_in(self.pg0, self.pg1)
529         self.pg0.add_stream(pkts)
530         self.pg_enable_capture(self.pg_interfaces)
531         self.pg_start()
532         capture = self.pg1.get_capture(len(pkts))
533         self.verify_capture_out(capture)
534
535         # out2in
536         pkts = self.create_stream_out(self.pg1)
537         self.pg1.add_stream(pkts)
538         self.pg_enable_capture(self.pg_interfaces)
539         self.pg_start()
540         capture = self.pg0.get_capture(len(pkts))
541         self.verify_capture_in(capture, self.pg0)
542
543     def test_dynamic_icmp_errors_in2out_ttl_1(self):
544         """ SNAT handling of client packets with TTL=1 """
545
546         self.snat_add_address(self.snat_addr)
547         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
548         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
549                                                  is_inside=0)
550
551         # Client side - generate traffic
552         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
553         self.pg0.add_stream(pkts)
554         self.pg_enable_capture(self.pg_interfaces)
555         self.pg_start()
556
557         # Client side - verify ICMP type 11 packets
558         capture = self.pg0.get_capture(len(pkts))
559         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
560
561     def test_dynamic_icmp_errors_out2in_ttl_1(self):
562         """ SNAT handling of server packets with TTL=1 """
563
564         self.snat_add_address(self.snat_addr)
565         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
566         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
567                                                  is_inside=0)
568
569         # Client side - create sessions
570         pkts = self.create_stream_in(self.pg0, self.pg1)
571         self.pg0.add_stream(pkts)
572         self.pg_enable_capture(self.pg_interfaces)
573         self.pg_start()
574
575         # Server side - generate traffic
576         capture = self.pg1.get_capture(len(pkts))
577         self.verify_capture_out(capture)
578         pkts = self.create_stream_out(self.pg1, ttl=1)
579         self.pg1.add_stream(pkts)
580         self.pg_enable_capture(self.pg_interfaces)
581         self.pg_start()
582
583         # Server side - verify ICMP type 11 packets
584         capture = self.pg1.get_capture(len(pkts))
585         self.verify_capture_out_with_icmp_errors(capture,
586                                                  src_ip=self.pg1.local_ip4)
587
588     def test_dynamic_icmp_errors_in2out_ttl_2(self):
589         """ SNAT handling of error responses to client packets with TTL=2 """
590
591         self.snat_add_address(self.snat_addr)
592         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
593         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
594                                                  is_inside=0)
595
596         # Client side - generate traffic
597         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
598         self.pg0.add_stream(pkts)
599         self.pg_enable_capture(self.pg_interfaces)
600         self.pg_start()
601
602         # Server side - simulate ICMP type 11 response
603         capture = self.pg1.get_capture(len(pkts))
604         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
605                 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
606                 ICMP(type=11) / packet[IP] for packet in capture]
607         self.pg1.add_stream(pkts)
608         self.pg_enable_capture(self.pg_interfaces)
609         self.pg_start()
610
611         # Client side - verify ICMP type 11 packets
612         capture = self.pg0.get_capture(len(pkts))
613         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
614
615     def test_dynamic_icmp_errors_out2in_ttl_2(self):
616         """ SNAT handling of error responses to server packets with TTL=2 """
617
618         self.snat_add_address(self.snat_addr)
619         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
620         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
621                                                  is_inside=0)
622
623         # Client side - create sessions
624         pkts = self.create_stream_in(self.pg0, self.pg1)
625         self.pg0.add_stream(pkts)
626         self.pg_enable_capture(self.pg_interfaces)
627         self.pg_start()
628
629         # Server side - generate traffic
630         capture = self.pg1.get_capture(len(pkts))
631         self.verify_capture_out(capture)
632         pkts = self.create_stream_out(self.pg1, ttl=2)
633         self.pg1.add_stream(pkts)
634         self.pg_enable_capture(self.pg_interfaces)
635         self.pg_start()
636
637         # Client side - simulate ICMP type 11 response
638         capture = self.pg0.get_capture(len(pkts))
639         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
640                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
641                 ICMP(type=11) / packet[IP] for packet in capture]
642         self.pg0.add_stream(pkts)
643         self.pg_enable_capture(self.pg_interfaces)
644         self.pg_start()
645
646         # Server side - verify ICMP type 11 packets
647         capture = self.pg1.get_capture(len(pkts))
648         self.verify_capture_out_with_icmp_errors(capture)
649
650     def test_ping_out_interface_from_outside(self):
651         """ Ping SNAT out interface from outside network """
652
653         self.snat_add_address(self.snat_addr)
654         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
655         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
656                                                  is_inside=0)
657
658         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
659              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
660              ICMP(id=self.icmp_id_out, type='echo-request'))
661         pkts = [p]
662         self.pg1.add_stream(pkts)
663         self.pg_enable_capture(self.pg_interfaces)
664         self.pg_start()
665         capture = self.pg1.get_capture(len(pkts))
666         self.assertEqual(1, len(capture))
667         packet = capture[0]
668         try:
669             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
670             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
671             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
672             self.assertEqual(packet[ICMP].type, 0)  # echo reply
673         except:
674             self.logger.error(ppp("Unexpected or invalid packet "
675                                   "(outside network):", packet))
676             raise
677
678     def test_ping_internal_host_from_outside(self):
679         """ Ping internal host from outside network """
680
681         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
682         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
683         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
684                                                  is_inside=0)
685
686         # out2in
687         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
688                IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
689                ICMP(id=self.icmp_id_out, type='echo-request'))
690         self.pg1.add_stream(pkt)
691         self.pg_enable_capture(self.pg_interfaces)
692         self.pg_start()
693         capture = self.pg0.get_capture(1)
694         self.verify_capture_in(capture, self.pg0, packet_num=1)
695         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
696
697         # in2out
698         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
699                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
700                ICMP(id=self.icmp_id_in, type='echo-reply'))
701         self.pg0.add_stream(pkt)
702         self.pg_enable_capture(self.pg_interfaces)
703         self.pg_start()
704         capture = self.pg1.get_capture(1)
705         self.verify_capture_out(capture, same_port=True, packet_num=1)
706         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
707
708     def test_static_in(self):
709         """ SNAT 1:1 NAT initialized from inside network """
710
711         nat_ip = "10.0.0.10"
712         self.tcp_port_out = 6303
713         self.udp_port_out = 6304
714         self.icmp_id_out = 6305
715
716         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
717         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
718         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
719                                                  is_inside=0)
720
721         # in2out
722         pkts = self.create_stream_in(self.pg0, self.pg1)
723         self.pg0.add_stream(pkts)
724         self.pg_enable_capture(self.pg_interfaces)
725         self.pg_start()
726         capture = self.pg1.get_capture(len(pkts))
727         self.verify_capture_out(capture, nat_ip, True)
728
729         # out2in
730         pkts = self.create_stream_out(self.pg1, nat_ip)
731         self.pg1.add_stream(pkts)
732         self.pg_enable_capture(self.pg_interfaces)
733         self.pg_start()
734         capture = self.pg0.get_capture(len(pkts))
735         self.verify_capture_in(capture, self.pg0)
736
737     def test_static_out(self):
738         """ SNAT 1:1 NAT initialized from outside network """
739
740         nat_ip = "10.0.0.20"
741         self.tcp_port_out = 6303
742         self.udp_port_out = 6304
743         self.icmp_id_out = 6305
744
745         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
746         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
747         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
748                                                  is_inside=0)
749
750         # out2in
751         pkts = self.create_stream_out(self.pg1, nat_ip)
752         self.pg1.add_stream(pkts)
753         self.pg_enable_capture(self.pg_interfaces)
754         self.pg_start()
755         capture = self.pg0.get_capture(len(pkts))
756         self.verify_capture_in(capture, self.pg0)
757
758         # in2out
759         pkts = self.create_stream_in(self.pg0, self.pg1)
760         self.pg0.add_stream(pkts)
761         self.pg_enable_capture(self.pg_interfaces)
762         self.pg_start()
763         capture = self.pg1.get_capture(len(pkts))
764         self.verify_capture_out(capture, nat_ip, True)
765
766     def test_static_with_port_in(self):
767         """ SNAT 1:1 NAT with port initialized from inside network """
768
769         self.tcp_port_out = 3606
770         self.udp_port_out = 3607
771         self.icmp_id_out = 3608
772
773         self.snat_add_address(self.snat_addr)
774         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
775                                      self.tcp_port_in, self.tcp_port_out,
776                                      proto=IP_PROTOS.tcp)
777         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
778                                      self.udp_port_in, self.udp_port_out,
779                                      proto=IP_PROTOS.udp)
780         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
781                                      self.icmp_id_in, self.icmp_id_out,
782                                      proto=IP_PROTOS.icmp)
783         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
784         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
785                                                  is_inside=0)
786
787         # in2out
788         pkts = self.create_stream_in(self.pg0, self.pg1)
789         self.pg0.add_stream(pkts)
790         self.pg_enable_capture(self.pg_interfaces)
791         self.pg_start()
792         capture = self.pg1.get_capture(len(pkts))
793         self.verify_capture_out(capture)
794
795         # out2in
796         pkts = self.create_stream_out(self.pg1)
797         self.pg1.add_stream(pkts)
798         self.pg_enable_capture(self.pg_interfaces)
799         self.pg_start()
800         capture = self.pg0.get_capture(len(pkts))
801         self.verify_capture_in(capture, self.pg0)
802
803     def test_static_with_port_out(self):
804         """ SNAT 1:1 NAT with port initialized from outside network """
805
806         self.tcp_port_out = 30606
807         self.udp_port_out = 30607
808         self.icmp_id_out = 30608
809
810         self.snat_add_address(self.snat_addr)
811         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
812                                      self.tcp_port_in, self.tcp_port_out,
813                                      proto=IP_PROTOS.tcp)
814         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
815                                      self.udp_port_in, self.udp_port_out,
816                                      proto=IP_PROTOS.udp)
817         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
818                                      self.icmp_id_in, self.icmp_id_out,
819                                      proto=IP_PROTOS.icmp)
820         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
821         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
822                                                  is_inside=0)
823
824         # out2in
825         pkts = self.create_stream_out(self.pg1)
826         self.pg1.add_stream(pkts)
827         self.pg_enable_capture(self.pg_interfaces)
828         self.pg_start()
829         capture = self.pg0.get_capture(len(pkts))
830         self.verify_capture_in(capture, self.pg0)
831
832         # in2out
833         pkts = self.create_stream_in(self.pg0, self.pg1)
834         self.pg0.add_stream(pkts)
835         self.pg_enable_capture(self.pg_interfaces)
836         self.pg_start()
837         capture = self.pg1.get_capture(len(pkts))
838         self.verify_capture_out(capture)
839
840     def test_static_vrf_aware(self):
841         """ SNAT 1:1 NAT VRF awareness """
842
843         nat_ip1 = "10.0.0.30"
844         nat_ip2 = "10.0.0.40"
845         self.tcp_port_out = 6303
846         self.udp_port_out = 6304
847         self.icmp_id_out = 6305
848
849         self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
850                                      vrf_id=10)
851         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
852                                      vrf_id=10)
853         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
854                                                  is_inside=0)
855         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
856         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
857
858         # inside interface VRF match SNAT static mapping VRF
859         pkts = self.create_stream_in(self.pg4, self.pg3)
860         self.pg4.add_stream(pkts)
861         self.pg_enable_capture(self.pg_interfaces)
862         self.pg_start()
863         capture = self.pg3.get_capture(len(pkts))
864         self.verify_capture_out(capture, nat_ip1, True)
865
866         # inside interface VRF don't match SNAT static mapping VRF (packets
867         # are dropped)
868         pkts = self.create_stream_in(self.pg0, self.pg3)
869         self.pg0.add_stream(pkts)
870         self.pg_enable_capture(self.pg_interfaces)
871         self.pg_start()
872         self.pg3.assert_nothing_captured()
873
874     def test_multiple_inside_interfaces(self):
875         """ SNAT multiple inside interfaces (non-overlapping address space) """
876
877         self.snat_add_address(self.snat_addr)
878         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
879         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
880         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
881                                                  is_inside=0)
882
883         # between two S-NAT inside interfaces (no translation)
884         pkts = self.create_stream_in(self.pg0, self.pg1)
885         self.pg0.add_stream(pkts)
886         self.pg_enable_capture(self.pg_interfaces)
887         self.pg_start()
888         capture = self.pg1.get_capture(len(pkts))
889         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
890
891         # from S-NAT inside to interface without S-NAT feature (no translation)
892         pkts = self.create_stream_in(self.pg0, self.pg2)
893         self.pg0.add_stream(pkts)
894         self.pg_enable_capture(self.pg_interfaces)
895         self.pg_start()
896         capture = self.pg2.get_capture(len(pkts))
897         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
898
899         # in2out 1st interface
900         pkts = self.create_stream_in(self.pg0, self.pg3)
901         self.pg0.add_stream(pkts)
902         self.pg_enable_capture(self.pg_interfaces)
903         self.pg_start()
904         capture = self.pg3.get_capture(len(pkts))
905         self.verify_capture_out(capture)
906
907         # out2in 1st interface
908         pkts = self.create_stream_out(self.pg3)
909         self.pg3.add_stream(pkts)
910         self.pg_enable_capture(self.pg_interfaces)
911         self.pg_start()
912         capture = self.pg0.get_capture(len(pkts))
913         self.verify_capture_in(capture, self.pg0)
914
915         # in2out 2nd interface
916         pkts = self.create_stream_in(self.pg1, self.pg3)
917         self.pg1.add_stream(pkts)
918         self.pg_enable_capture(self.pg_interfaces)
919         self.pg_start()
920         capture = self.pg3.get_capture(len(pkts))
921         self.verify_capture_out(capture)
922
923         # out2in 2nd interface
924         pkts = self.create_stream_out(self.pg3)
925         self.pg3.add_stream(pkts)
926         self.pg_enable_capture(self.pg_interfaces)
927         self.pg_start()
928         capture = self.pg1.get_capture(len(pkts))
929         self.verify_capture_in(capture, self.pg1)
930
931     def test_inside_overlapping_interfaces(self):
932         """ SNAT multiple inside interfaces with overlapping address space """
933
934         static_nat_ip = "10.0.0.10"
935         self.snat_add_address(self.snat_addr)
936         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
937                                                  is_inside=0)
938         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
939         self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
940         self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
941         self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
942                                      vrf_id=20)
943
944         # between S-NAT inside interfaces with same VRF (no translation)
945         pkts = self.create_stream_in(self.pg4, self.pg5)
946         self.pg4.add_stream(pkts)
947         self.pg_enable_capture(self.pg_interfaces)
948         self.pg_start()
949         capture = self.pg5.get_capture(len(pkts))
950         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
951
952         # between S-NAT inside interfaces with different VRF (hairpinning)
953         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
954              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
955              TCP(sport=1234, dport=5678))
956         self.pg4.add_stream(p)
957         self.pg_enable_capture(self.pg_interfaces)
958         self.pg_start()
959         capture = self.pg6.get_capture(1)
960         p = capture[0]
961         try:
962             ip = p[IP]
963             tcp = p[TCP]
964             self.assertEqual(ip.src, self.snat_addr)
965             self.assertEqual(ip.dst, self.pg6.remote_ip4)
966             self.assertNotEqual(tcp.sport, 1234)
967             self.assertEqual(tcp.dport, 5678)
968         except:
969             self.logger.error(ppp("Unexpected or invalid packet:", p))
970             raise
971
972         # in2out 1st interface
973         pkts = self.create_stream_in(self.pg4, self.pg3)
974         self.pg4.add_stream(pkts)
975         self.pg_enable_capture(self.pg_interfaces)
976         self.pg_start()
977         capture = self.pg3.get_capture(len(pkts))
978         self.verify_capture_out(capture)
979
980         # out2in 1st interface
981         pkts = self.create_stream_out(self.pg3)
982         self.pg3.add_stream(pkts)
983         self.pg_enable_capture(self.pg_interfaces)
984         self.pg_start()
985         capture = self.pg4.get_capture(len(pkts))
986         self.verify_capture_in(capture, self.pg4)
987
988         # in2out 2nd interface
989         pkts = self.create_stream_in(self.pg5, self.pg3)
990         self.pg5.add_stream(pkts)
991         self.pg_enable_capture(self.pg_interfaces)
992         self.pg_start()
993         capture = self.pg3.get_capture(len(pkts))
994         self.verify_capture_out(capture)
995
996         # out2in 2nd interface
997         pkts = self.create_stream_out(self.pg3)
998         self.pg3.add_stream(pkts)
999         self.pg_enable_capture(self.pg_interfaces)
1000         self.pg_start()
1001         capture = self.pg5.get_capture(len(pkts))
1002         self.verify_capture_in(capture, self.pg5)
1003
1004         # pg5 session dump
1005         addresses = self.vapi.snat_address_dump()
1006         self.assertEqual(len(addresses), 1)
1007         sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
1008         self.assertEqual(len(sessions), 3)
1009         for session in sessions:
1010             self.assertFalse(session.is_static)
1011             self.assertEqual(session.inside_ip_address[0:4],
1012                              self.pg5.remote_ip4n)
1013             self.assertEqual(session.outside_ip_address,
1014                              addresses[0].ip_address)
1015         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1016         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1017         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1018         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1019         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1020         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1021         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1022         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1023         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1024
1025         # in2out 3rd interface
1026         pkts = self.create_stream_in(self.pg6, self.pg3)
1027         self.pg6.add_stream(pkts)
1028         self.pg_enable_capture(self.pg_interfaces)
1029         self.pg_start()
1030         capture = self.pg3.get_capture(len(pkts))
1031         self.verify_capture_out(capture, static_nat_ip, True)
1032
1033         # out2in 3rd interface
1034         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1035         self.pg3.add_stream(pkts)
1036         self.pg_enable_capture(self.pg_interfaces)
1037         self.pg_start()
1038         capture = self.pg6.get_capture(len(pkts))
1039         self.verify_capture_in(capture, self.pg6)
1040
1041         # general user and session dump verifications
1042         users = self.vapi.snat_user_dump()
1043         self.assertTrue(len(users) >= 3)
1044         addresses = self.vapi.snat_address_dump()
1045         self.assertEqual(len(addresses), 1)
1046         for user in users:
1047             sessions = self.vapi.snat_user_session_dump(user.ip_address,
1048                                                         user.vrf_id)
1049             for session in sessions:
1050                 self.assertEqual(user.ip_address, session.inside_ip_address)
1051                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1052                 self.assertTrue(session.protocol in
1053                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1054                                  IP_PROTOS.icmp])
1055
1056         # pg4 session dump
1057         sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
1058         self.assertTrue(len(sessions) >= 4)
1059         for session in sessions:
1060             self.assertFalse(session.is_static)
1061             self.assertEqual(session.inside_ip_address[0:4],
1062                              self.pg4.remote_ip4n)
1063             self.assertEqual(session.outside_ip_address,
1064                              addresses[0].ip_address)
1065
1066         # pg6 session dump
1067         sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1068         self.assertTrue(len(sessions) >= 3)
1069         for session in sessions:
1070             self.assertTrue(session.is_static)
1071             self.assertEqual(session.inside_ip_address[0:4],
1072                              self.pg6.remote_ip4n)
1073             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1074                              map(int, static_nat_ip.split('.')))
1075             self.assertTrue(session.inside_port in
1076                             [self.tcp_port_in, self.udp_port_in,
1077                              self.icmp_id_in])
1078
1079     def test_hairpinning(self):
1080         """ SNAT hairpinning - 1:1 NAT with port"""
1081
1082         host = self.pg0.remote_hosts[0]
1083         server = self.pg0.remote_hosts[1]
1084         host_in_port = 1234
1085         host_out_port = 0
1086         server_in_port = 5678
1087         server_out_port = 8765
1088
1089         self.snat_add_address(self.snat_addr)
1090         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1091         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1092                                                  is_inside=0)
1093         # add static mapping for server
1094         self.snat_add_static_mapping(server.ip4, self.snat_addr,
1095                                      server_in_port, server_out_port,
1096                                      proto=IP_PROTOS.tcp)
1097
1098         # send packet from host to server
1099         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1100              IP(src=host.ip4, dst=self.snat_addr) /
1101              TCP(sport=host_in_port, dport=server_out_port))
1102         self.pg0.add_stream(p)
1103         self.pg_enable_capture(self.pg_interfaces)
1104         self.pg_start()
1105         capture = self.pg0.get_capture(1)
1106         p = capture[0]
1107         try:
1108             ip = p[IP]
1109             tcp = p[TCP]
1110             self.assertEqual(ip.src, self.snat_addr)
1111             self.assertEqual(ip.dst, server.ip4)
1112             self.assertNotEqual(tcp.sport, host_in_port)
1113             self.assertEqual(tcp.dport, server_in_port)
1114             host_out_port = tcp.sport
1115         except:
1116             self.logger.error(ppp("Unexpected or invalid packet:", p))
1117             raise
1118
1119         # send reply from server to host
1120         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1121              IP(src=server.ip4, dst=self.snat_addr) /
1122              TCP(sport=server_in_port, dport=host_out_port))
1123         self.pg0.add_stream(p)
1124         self.pg_enable_capture(self.pg_interfaces)
1125         self.pg_start()
1126         capture = self.pg0.get_capture(1)
1127         p = capture[0]
1128         try:
1129             ip = p[IP]
1130             tcp = p[TCP]
1131             self.assertEqual(ip.src, self.snat_addr)
1132             self.assertEqual(ip.dst, host.ip4)
1133             self.assertEqual(tcp.sport, server_out_port)
1134             self.assertEqual(tcp.dport, host_in_port)
1135         except:
1136             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1137             raise
1138
1139     def test_hairpinning2(self):
1140         """ SNAT hairpinning - 1:1 NAT"""
1141
1142         server1_nat_ip = "10.0.0.10"
1143         server2_nat_ip = "10.0.0.11"
1144         host = self.pg0.remote_hosts[0]
1145         server1 = self.pg0.remote_hosts[1]
1146         server2 = self.pg0.remote_hosts[2]
1147         server_tcp_port = 22
1148         server_udp_port = 20
1149
1150         self.snat_add_address(self.snat_addr)
1151         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1152         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1153                                                  is_inside=0)
1154
1155         # add static mapping for servers
1156         self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
1157         self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
1158
1159         # host to server1
1160         pkts = []
1161         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1162              IP(src=host.ip4, dst=server1_nat_ip) /
1163              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1164         pkts.append(p)
1165         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1166              IP(src=host.ip4, dst=server1_nat_ip) /
1167              UDP(sport=self.udp_port_in, dport=server_udp_port))
1168         pkts.append(p)
1169         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1170              IP(src=host.ip4, dst=server1_nat_ip) /
1171              ICMP(id=self.icmp_id_in, type='echo-request'))
1172         pkts.append(p)
1173         self.pg0.add_stream(pkts)
1174         self.pg_enable_capture(self.pg_interfaces)
1175         self.pg_start()
1176         capture = self.pg0.get_capture(len(pkts))
1177         for packet in capture:
1178             try:
1179                 self.assertEqual(packet[IP].src, self.snat_addr)
1180                 self.assertEqual(packet[IP].dst, server1.ip4)
1181                 if packet.haslayer(TCP):
1182                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1183                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1184                     self.tcp_port_out = packet[TCP].sport
1185                 elif packet.haslayer(UDP):
1186                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1187                     self.assertEqual(packet[UDP].dport, server_udp_port)
1188                     self.udp_port_out = packet[UDP].sport
1189                 else:
1190                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1191                     self.icmp_id_out = packet[ICMP].id
1192             except:
1193                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1194                 raise
1195
1196         # server1 to host
1197         pkts = []
1198         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1199              IP(src=server1.ip4, dst=self.snat_addr) /
1200              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1201         pkts.append(p)
1202         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1203              IP(src=server1.ip4, dst=self.snat_addr) /
1204              UDP(sport=server_udp_port, dport=self.udp_port_out))
1205         pkts.append(p)
1206         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1207              IP(src=server1.ip4, dst=self.snat_addr) /
1208              ICMP(id=self.icmp_id_out, type='echo-reply'))
1209         pkts.append(p)
1210         self.pg0.add_stream(pkts)
1211         self.pg_enable_capture(self.pg_interfaces)
1212         self.pg_start()
1213         capture = self.pg0.get_capture(len(pkts))
1214         for packet in capture:
1215             try:
1216                 self.assertEqual(packet[IP].src, server1_nat_ip)
1217                 self.assertEqual(packet[IP].dst, host.ip4)
1218                 if packet.haslayer(TCP):
1219                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1220                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1221                 elif packet.haslayer(UDP):
1222                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1223                     self.assertEqual(packet[UDP].sport, server_udp_port)
1224                 else:
1225                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1226             except:
1227                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1228                 raise
1229
1230         # server2 to server1
1231         pkts = []
1232         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1233              IP(src=server2.ip4, dst=server1_nat_ip) /
1234              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1235         pkts.append(p)
1236         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1237              IP(src=server2.ip4, dst=server1_nat_ip) /
1238              UDP(sport=self.udp_port_in, dport=server_udp_port))
1239         pkts.append(p)
1240         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1241              IP(src=server2.ip4, dst=server1_nat_ip) /
1242              ICMP(id=self.icmp_id_in, type='echo-request'))
1243         pkts.append(p)
1244         self.pg0.add_stream(pkts)
1245         self.pg_enable_capture(self.pg_interfaces)
1246         self.pg_start()
1247         capture = self.pg0.get_capture(len(pkts))
1248         for packet in capture:
1249             try:
1250                 self.assertEqual(packet[IP].src, server2_nat_ip)
1251                 self.assertEqual(packet[IP].dst, server1.ip4)
1252                 if packet.haslayer(TCP):
1253                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1254                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1255                     self.tcp_port_out = packet[TCP].sport
1256                 elif packet.haslayer(UDP):
1257                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1258                     self.assertEqual(packet[UDP].dport, server_udp_port)
1259                     self.udp_port_out = packet[UDP].sport
1260                 else:
1261                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1262                     self.icmp_id_out = packet[ICMP].id
1263             except:
1264                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1265                 raise
1266
1267         # server1 to server2
1268         pkts = []
1269         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1270              IP(src=server1.ip4, dst=server2_nat_ip) /
1271              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1272         pkts.append(p)
1273         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1274              IP(src=server1.ip4, dst=server2_nat_ip) /
1275              UDP(sport=server_udp_port, dport=self.udp_port_out))
1276         pkts.append(p)
1277         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1278              IP(src=server1.ip4, dst=server2_nat_ip) /
1279              ICMP(id=self.icmp_id_out, type='echo-reply'))
1280         pkts.append(p)
1281         self.pg0.add_stream(pkts)
1282         self.pg_enable_capture(self.pg_interfaces)
1283         self.pg_start()
1284         capture = self.pg0.get_capture(len(pkts))
1285         for packet in capture:
1286             try:
1287                 self.assertEqual(packet[IP].src, server1_nat_ip)
1288                 self.assertEqual(packet[IP].dst, server2.ip4)
1289                 if packet.haslayer(TCP):
1290                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1291                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1292                 elif packet.haslayer(UDP):
1293                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1294                     self.assertEqual(packet[UDP].sport, server_udp_port)
1295                 else:
1296                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1297             except:
1298                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1299                 raise
1300
1301     def test_max_translations_per_user(self):
1302         """ MAX translations per user - recycle the least recently used """
1303
1304         self.snat_add_address(self.snat_addr)
1305         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1306         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1307                                                  is_inside=0)
1308
1309         # get maximum number of translations per user
1310         snat_config = self.vapi.snat_show_config()
1311
1312         # send more than maximum number of translations per user packets
1313         pkts_num = snat_config.max_translations_per_user + 5
1314         pkts = []
1315         for port in range(0, pkts_num):
1316             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1317                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1318                  TCP(sport=1025 + port))
1319             pkts.append(p)
1320         self.pg0.add_stream(pkts)
1321         self.pg_enable_capture(self.pg_interfaces)
1322         self.pg_start()
1323
1324         # verify number of translated packet
1325         self.pg1.get_capture(pkts_num)
1326
1327     def test_interface_addr(self):
1328         """ Acquire SNAT addresses from interface """
1329         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1330
1331         # no address in NAT pool
1332         adresses = self.vapi.snat_address_dump()
1333         self.assertEqual(0, len(adresses))
1334
1335         # configure interface address and check NAT address pool
1336         self.pg7.config_ip4()
1337         adresses = self.vapi.snat_address_dump()
1338         self.assertEqual(1, len(adresses))
1339         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1340
1341         # remove interface address and check NAT address pool
1342         self.pg7.unconfig_ip4()
1343         adresses = self.vapi.snat_address_dump()
1344         self.assertEqual(0, len(adresses))
1345
1346     def test_interface_addr_static_mapping(self):
1347         """ Static mapping with addresses from interface """
1348         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1349         self.snat_add_static_mapping('1.2.3.4',
1350                                      external_sw_if_index=self.pg7.sw_if_index)
1351
1352         # static mappings with external interface
1353         static_mappings = self.vapi.snat_static_mapping_dump()
1354         self.assertEqual(1, len(static_mappings))
1355         self.assertEqual(self.pg7.sw_if_index,
1356                          static_mappings[0].external_sw_if_index)
1357
1358         # configure interface address and check static mappings
1359         self.pg7.config_ip4()
1360         static_mappings = self.vapi.snat_static_mapping_dump()
1361         self.assertEqual(1, len(static_mappings))
1362         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1363                          self.pg7.local_ip4n)
1364         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1365
1366         # remove interface address and check static mappings
1367         self.pg7.unconfig_ip4()
1368         static_mappings = self.vapi.snat_static_mapping_dump()
1369         self.assertEqual(0, len(static_mappings))
1370
1371     def test_ipfix_nat44_sess(self):
1372         """ S-NAT IPFIX logging NAT44 session created/delted """
1373         self.ipfix_domain_id = 10
1374         self.ipfix_src_port = 20202
1375         colector_port = 30303
1376         bind_layers(UDP, IPFIX, dport=30303)
1377         self.snat_add_address(self.snat_addr)
1378         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1379         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1380                                                  is_inside=0)
1381         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1382                                      src_address=self.pg3.local_ip4n,
1383                                      path_mtu=512,
1384                                      template_interval=10,
1385                                      collector_port=colector_port)
1386         self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1387                              src_port=self.ipfix_src_port)
1388
1389         pkts = self.create_stream_in(self.pg0, self.pg1)
1390         self.pg0.add_stream(pkts)
1391         self.pg_enable_capture(self.pg_interfaces)
1392         self.pg_start()
1393         capture = self.pg1.get_capture(len(pkts))
1394         self.verify_capture_out(capture)
1395         self.snat_add_address(self.snat_addr, is_add=0)
1396         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1397         capture = self.pg3.get_capture(3)
1398         ipfix = IPFIXDecoder()
1399         # first load template
1400         for p in capture:
1401             self.assertTrue(p.haslayer(IPFIX))
1402             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1403             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1404             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1405             self.assertEqual(p[UDP].dport, colector_port)
1406             self.assertEqual(p[IPFIX].observationDomainID,
1407                              self.ipfix_domain_id)
1408             if p.haslayer(Template):
1409                 ipfix.add_template(p.getlayer(Template))
1410         # verify events in data set
1411         for p in capture:
1412             if p.haslayer(Data):
1413                 data = ipfix.decode_data_set(p.getlayer(Set))
1414                 self.verify_ipfix_nat44_ses(data)
1415
1416     def test_ipfix_addr_exhausted(self):
1417         """ S-NAT IPFIX logging NAT addresses exhausted """
1418         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1419         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1420                                                  is_inside=0)
1421         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1422                                      src_address=self.pg3.local_ip4n,
1423                                      path_mtu=512,
1424                                      template_interval=10)
1425         self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1426                              src_port=self.ipfix_src_port)
1427
1428         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1429              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1430              TCP(sport=3025))
1431         self.pg0.add_stream(p)
1432         self.pg_enable_capture(self.pg_interfaces)
1433         self.pg_start()
1434         capture = self.pg1.get_capture(0)
1435         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1436         capture = self.pg3.get_capture(3)
1437         ipfix = IPFIXDecoder()
1438         # first load template
1439         for p in capture:
1440             self.assertTrue(p.haslayer(IPFIX))
1441             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1442             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1443             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1444             self.assertEqual(p[UDP].dport, 4739)
1445             self.assertEqual(p[IPFIX].observationDomainID,
1446                              self.ipfix_domain_id)
1447             if p.haslayer(Template):
1448                 ipfix.add_template(p.getlayer(Template))
1449         # verify events in data set
1450         for p in capture:
1451             if p.haslayer(Data):
1452                 data = ipfix.decode_data_set(p.getlayer(Set))
1453                 self.verify_ipfix_addr_exhausted(data)
1454
1455     def test_pool_addr_fib(self):
1456         """ S-NAT add pool addresses to FIB """
1457         static_addr = '10.0.0.10'
1458         self.snat_add_address(self.snat_addr)
1459         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1460         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1461                                                  is_inside=0)
1462         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1463
1464         # SNAT address
1465         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1466              ARP(op=ARP.who_has, pdst=self.snat_addr,
1467                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1468         self.pg1.add_stream(p)
1469         self.pg_enable_capture(self.pg_interfaces)
1470         self.pg_start()
1471         capture = self.pg1.get_capture(1)
1472         self.assertTrue(capture[0].haslayer(ARP))
1473         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1474
1475         # 1:1 NAT address
1476         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1477              ARP(op=ARP.who_has, pdst=static_addr,
1478                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1479         self.pg1.add_stream(p)
1480         self.pg_enable_capture(self.pg_interfaces)
1481         self.pg_start()
1482         capture = self.pg1.get_capture(1)
1483         self.assertTrue(capture[0].haslayer(ARP))
1484         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1485
1486         # send ARP to non-SNAT interface
1487         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1488              ARP(op=ARP.who_has, pdst=self.snat_addr,
1489                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1490         self.pg2.add_stream(p)
1491         self.pg_enable_capture(self.pg_interfaces)
1492         self.pg_start()
1493         capture = self.pg1.get_capture(0)
1494
1495         # remove addresses and verify
1496         self.snat_add_address(self.snat_addr, is_add=0)
1497         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1498                                      is_add=0)
1499
1500         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1501              ARP(op=ARP.who_has, pdst=self.snat_addr,
1502                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1503         self.pg1.add_stream(p)
1504         self.pg_enable_capture(self.pg_interfaces)
1505         self.pg_start()
1506         capture = self.pg1.get_capture(0)
1507
1508         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1509              ARP(op=ARP.who_has, pdst=static_addr,
1510                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1511         self.pg1.add_stream(p)
1512         self.pg_enable_capture(self.pg_interfaces)
1513         self.pg_start()
1514         capture = self.pg1.get_capture(0)
1515
1516     def test_vrf_mode(self):
1517         """ S-NAT tenant VRF aware address pool mode """
1518
1519         vrf_id1 = 1
1520         vrf_id2 = 2
1521         nat_ip1 = "10.0.0.10"
1522         nat_ip2 = "10.0.0.11"
1523
1524         self.pg0.unconfig_ip4()
1525         self.pg1.unconfig_ip4()
1526         self.pg0.set_table_ip4(vrf_id1)
1527         self.pg1.set_table_ip4(vrf_id2)
1528         self.pg0.config_ip4()
1529         self.pg1.config_ip4()
1530
1531         self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1532         self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1533         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1534         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1535         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1536                                                  is_inside=0)
1537
1538         # first VRF
1539         pkts = self.create_stream_in(self.pg0, self.pg2)
1540         self.pg0.add_stream(pkts)
1541         self.pg_enable_capture(self.pg_interfaces)
1542         self.pg_start()
1543         capture = self.pg2.get_capture(len(pkts))
1544         self.verify_capture_out(capture, nat_ip1)
1545
1546         # second VRF
1547         pkts = self.create_stream_in(self.pg1, self.pg2)
1548         self.pg1.add_stream(pkts)
1549         self.pg_enable_capture(self.pg_interfaces)
1550         self.pg_start()
1551         capture = self.pg2.get_capture(len(pkts))
1552         self.verify_capture_out(capture, nat_ip2)
1553
1554     def test_vrf_feature_independent(self):
1555         """ S-NAT tenant VRF independent address pool mode """
1556
1557         nat_ip1 = "10.0.0.10"
1558         nat_ip2 = "10.0.0.11"
1559
1560         self.snat_add_address(nat_ip1)
1561         self.snat_add_address(nat_ip2)
1562         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1563         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1564         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1565                                                  is_inside=0)
1566
1567         # first VRF
1568         pkts = self.create_stream_in(self.pg0, self.pg2)
1569         self.pg0.add_stream(pkts)
1570         self.pg_enable_capture(self.pg_interfaces)
1571         self.pg_start()
1572         capture = self.pg2.get_capture(len(pkts))
1573         self.verify_capture_out(capture, nat_ip1)
1574
1575         # second VRF
1576         pkts = self.create_stream_in(self.pg1, self.pg2)
1577         self.pg1.add_stream(pkts)
1578         self.pg_enable_capture(self.pg_interfaces)
1579         self.pg_start()
1580         capture = self.pg2.get_capture(len(pkts))
1581         self.verify_capture_out(capture, nat_ip1)
1582
1583     def test_dynamic_ipless_interfaces(self):
1584         """ SNAT interfaces without configured ip dynamic map """
1585
1586         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1587                                       self.pg7.remote_mac,
1588                                       self.pg7.remote_ip4n,
1589                                       is_static=1)
1590         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1591                                       self.pg8.remote_mac,
1592                                       self.pg8.remote_ip4n,
1593                                       is_static=1)
1594
1595         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1596                                    dst_address_length=32,
1597                                    next_hop_address=self.pg7.remote_ip4n,
1598                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1599         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1600                                    dst_address_length=32,
1601                                    next_hop_address=self.pg8.remote_ip4n,
1602                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1603
1604         self.snat_add_address(self.snat_addr)
1605         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1606         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1607                                                  is_inside=0)
1608
1609         # in2out
1610         pkts = self.create_stream_in(self.pg7, self.pg8)
1611         self.pg7.add_stream(pkts)
1612         self.pg_enable_capture(self.pg_interfaces)
1613         self.pg_start()
1614         capture = self.pg8.get_capture(len(pkts))
1615         self.verify_capture_out(capture)
1616
1617         # out2in
1618         pkts = self.create_stream_out(self.pg8, self.snat_addr)
1619         self.pg8.add_stream(pkts)
1620         self.pg_enable_capture(self.pg_interfaces)
1621         self.pg_start()
1622         capture = self.pg7.get_capture(len(pkts))
1623         self.verify_capture_in(capture, self.pg7)
1624
1625     def test_static_ipless_interfaces(self):
1626         """ SNAT 1:1 NAT interfaces without configured ip """
1627
1628         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1629                                       self.pg7.remote_mac,
1630                                       self.pg7.remote_ip4n,
1631                                       is_static=1)
1632         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1633                                       self.pg8.remote_mac,
1634                                       self.pg8.remote_ip4n,
1635                                       is_static=1)
1636
1637         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1638                                    dst_address_length=32,
1639                                    next_hop_address=self.pg7.remote_ip4n,
1640                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1641         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1642                                    dst_address_length=32,
1643                                    next_hop_address=self.pg8.remote_ip4n,
1644                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1645
1646         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1647         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1648         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1649                                                  is_inside=0)
1650
1651         # out2in
1652         pkts = self.create_stream_out(self.pg8)
1653         self.pg8.add_stream(pkts)
1654         self.pg_enable_capture(self.pg_interfaces)
1655         self.pg_start()
1656         capture = self.pg7.get_capture(len(pkts))
1657         self.verify_capture_in(capture, self.pg7)
1658
1659         # in2out
1660         pkts = self.create_stream_in(self.pg7, self.pg8)
1661         self.pg7.add_stream(pkts)
1662         self.pg_enable_capture(self.pg_interfaces)
1663         self.pg_start()
1664         capture = self.pg8.get_capture(len(pkts))
1665         self.verify_capture_out(capture, self.snat_addr, True)
1666
1667     def test_static_with_port_ipless_interfaces(self):
1668         """ SNAT 1:1 NAT with port interfaces without configured ip """
1669
1670         self.tcp_port_out = 30606
1671         self.udp_port_out = 30607
1672         self.icmp_id_out = 30608
1673
1674         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1675                                       self.pg7.remote_mac,
1676                                       self.pg7.remote_ip4n,
1677                                       is_static=1)
1678         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1679                                       self.pg8.remote_mac,
1680                                       self.pg8.remote_ip4n,
1681                                       is_static=1)
1682
1683         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1684                                    dst_address_length=32,
1685                                    next_hop_address=self.pg7.remote_ip4n,
1686                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1687         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1688                                    dst_address_length=32,
1689                                    next_hop_address=self.pg8.remote_ip4n,
1690                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1691
1692         self.snat_add_address(self.snat_addr)
1693         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1694                                      self.tcp_port_in, self.tcp_port_out,
1695                                      proto=IP_PROTOS.tcp)
1696         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1697                                      self.udp_port_in, self.udp_port_out,
1698                                      proto=IP_PROTOS.udp)
1699         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1700                                      self.icmp_id_in, self.icmp_id_out,
1701                                      proto=IP_PROTOS.icmp)
1702         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1703         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1704                                                  is_inside=0)
1705
1706         # out2in
1707         pkts = self.create_stream_out(self.pg8)
1708         self.pg8.add_stream(pkts)
1709         self.pg_enable_capture(self.pg_interfaces)
1710         self.pg_start()
1711         capture = self.pg7.get_capture(len(pkts))
1712         self.verify_capture_in(capture, self.pg7)
1713
1714         # in2out
1715         pkts = self.create_stream_in(self.pg7, self.pg8)
1716         self.pg7.add_stream(pkts)
1717         self.pg_enable_capture(self.pg_interfaces)
1718         self.pg_start()
1719         capture = self.pg8.get_capture(len(pkts))
1720         self.verify_capture_out(capture)
1721
1722     def tearDown(self):
1723         super(TestSNAT, self).tearDown()
1724         if not self.vpp_dead:
1725             self.logger.info(self.vapi.cli("show snat verbose"))
1726             self.clear_snat()
1727
1728
1729 class TestDeterministicNAT(MethodHolder):
1730     """ Deterministic NAT Test Cases """
1731
1732     @classmethod
1733     def setUpConstants(cls):
1734         super(TestDeterministicNAT, cls).setUpConstants()
1735         cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1736
1737     @classmethod
1738     def setUpClass(cls):
1739         super(TestDeterministicNAT, cls).setUpClass()
1740
1741         try:
1742             cls.tcp_port_in = 6303
1743             cls.tcp_external_port = 6303
1744             cls.udp_port_in = 6304
1745             cls.udp_external_port = 6304
1746             cls.icmp_id_in = 6305
1747             cls.snat_addr = '10.0.0.3'
1748
1749             cls.create_pg_interfaces(range(3))
1750             cls.interfaces = list(cls.pg_interfaces)
1751
1752             for i in cls.interfaces:
1753                 i.admin_up()
1754                 i.config_ip4()
1755                 i.resolve_arp()
1756
1757             cls.pg0.generate_remote_hosts(2)
1758             cls.pg0.configure_ipv4_neighbors()
1759
1760         except Exception:
1761             super(TestDeterministicNAT, cls).tearDownClass()
1762             raise
1763
1764     def create_stream_in(self, in_if, out_if, ttl=64):
1765         """
1766         Create packet stream for inside network
1767
1768         :param in_if: Inside interface
1769         :param out_if: Outside interface
1770         :param ttl: TTL of generated packets
1771         """
1772         pkts = []
1773         # TCP
1774         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1775              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1776              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1777         pkts.append(p)
1778
1779         # UDP
1780         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1781              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1782              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
1783         pkts.append(p)
1784
1785         # ICMP
1786         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1787              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1788              ICMP(id=self.icmp_id_in, type='echo-request'))
1789         pkts.append(p)
1790
1791         return pkts
1792
1793     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
1794         """
1795         Create packet stream for outside network
1796
1797         :param out_if: Outside interface
1798         :param dst_ip: Destination IP address (Default use global SNAT address)
1799         :param ttl: TTL of generated packets
1800         """
1801         if dst_ip is None:
1802             dst_ip = self.snat_addr
1803         pkts = []
1804         # TCP
1805         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1806              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1807              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
1808         pkts.append(p)
1809
1810         # UDP
1811         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1812              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1813              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
1814         pkts.append(p)
1815
1816         # ICMP
1817         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1818              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1819              ICMP(id=self.icmp_external_id, type='echo-reply'))
1820         pkts.append(p)
1821
1822         return pkts
1823
1824     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
1825         """
1826         Verify captured packets on outside network
1827
1828         :param capture: Captured packets
1829         :param nat_ip: Translated IP address (Default use global SNAT address)
1830         :param same_port: Sorce port number is not translated (Default False)
1831         :param packet_num: Expected number of packets (Default 3)
1832         """
1833         if nat_ip is None:
1834             nat_ip = self.snat_addr
1835         self.assertEqual(packet_num, len(capture))
1836         for packet in capture:
1837             try:
1838                 self.assertEqual(packet[IP].src, nat_ip)
1839                 if packet.haslayer(TCP):
1840                     self.tcp_port_out = packet[TCP].sport
1841                 elif packet.haslayer(UDP):
1842                     self.udp_port_out = packet[UDP].sport
1843                 else:
1844                     self.icmp_external_id = packet[ICMP].id
1845             except:
1846                 self.logger.error(ppp("Unexpected or invalid packet "
1847                                       "(outside network):", packet))
1848                 raise
1849
1850     def initiate_tcp_session(self, in_if, out_if):
1851         """
1852         Initiates TCP session
1853
1854         :param in_if: Inside interface
1855         :param out_if: Outside interface
1856         """
1857         try:
1858             # SYN packet in->out
1859             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1860                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1861                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1862                      flags="S"))
1863             in_if.add_stream(p)
1864             self.pg_enable_capture(self.pg_interfaces)
1865             self.pg_start()
1866             capture = out_if.get_capture(1)
1867             p = capture[0]
1868             self.tcp_port_out = p[TCP].sport
1869
1870             # SYN + ACK packet out->in
1871             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
1872                  IP(src=out_if.remote_ip4, dst=self.snat_addr) /
1873                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1874                      flags="SA"))
1875             out_if.add_stream(p)
1876             self.pg_enable_capture(self.pg_interfaces)
1877             self.pg_start()
1878             in_if.get_capture(1)
1879
1880             # ACK packet in->out
1881             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1882                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1883                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1884                      flags="A"))
1885             in_if.add_stream(p)
1886             self.pg_enable_capture(self.pg_interfaces)
1887             self.pg_start()
1888             out_if.get_capture(1)
1889
1890         except:
1891             self.logger.error("TCP 3 way handshake failed")
1892             raise
1893
1894     def verify_ipfix_max_entries_per_user(self, data):
1895         """
1896         Verify IPFIX maximum entries per user exceeded event
1897
1898         :param data: Decoded IPFIX data records
1899         """
1900         self.assertEqual(1, len(data))
1901         record = data[0]
1902         # natEvent
1903         self.assertEqual(ord(record[230]), 13)
1904         # natQuotaExceededEvent
1905         self.assertEqual('\x03\x00\x00\x00', record[466])
1906         # sourceIPv4Address
1907         self.assertEqual(self.pg0.remote_ip4n, record[8])
1908
1909     def test_deterministic_mode(self):
1910         """ S-NAT run deterministic mode """
1911         in_addr = '172.16.255.0'
1912         out_addr = '172.17.255.50'
1913         in_addr_t = '172.16.255.20'
1914         in_addr_n = socket.inet_aton(in_addr)
1915         out_addr_n = socket.inet_aton(out_addr)
1916         in_addr_t_n = socket.inet_aton(in_addr_t)
1917         in_plen = 24
1918         out_plen = 32
1919
1920         snat_config = self.vapi.snat_show_config()
1921         self.assertEqual(1, snat_config.deterministic)
1922
1923         self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
1924
1925         rep1 = self.vapi.snat_det_forward(in_addr_t_n)
1926         self.assertEqual(rep1.out_addr[:4], out_addr_n)
1927         rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
1928         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
1929
1930         deterministic_mappings = self.vapi.snat_det_map_dump()
1931         self.assertEqual(len(deterministic_mappings), 1)
1932         dsm = deterministic_mappings[0]
1933         self.assertEqual(in_addr_n, dsm.in_addr[:4])
1934         self.assertEqual(in_plen, dsm.in_plen)
1935         self.assertEqual(out_addr_n, dsm.out_addr[:4])
1936         self.assertEqual(out_plen, dsm.out_plen)
1937
1938         self.clear_snat()
1939         deterministic_mappings = self.vapi.snat_det_map_dump()
1940         self.assertEqual(len(deterministic_mappings), 0)
1941
1942     def test_set_timeouts(self):
1943         """ Set deterministic NAT timeouts """
1944         timeouts_before = self.vapi.snat_det_get_timeouts()
1945
1946         self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
1947                                         timeouts_before.tcp_established + 10,
1948                                         timeouts_before.tcp_transitory + 10,
1949                                         timeouts_before.icmp + 10)
1950
1951         timeouts_after = self.vapi.snat_det_get_timeouts()
1952
1953         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
1954         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
1955         self.assertNotEqual(timeouts_before.tcp_established,
1956                             timeouts_after.tcp_established)
1957         self.assertNotEqual(timeouts_before.tcp_transitory,
1958                             timeouts_after.tcp_transitory)
1959
1960     def test_det_in(self):
1961         """ CGNAT translation test (TCP, UDP, ICMP) """
1962
1963         nat_ip = "10.0.0.10"
1964
1965         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1966                                    32,
1967                                    socket.inet_aton(nat_ip),
1968                                    32)
1969         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1970         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1971                                                  is_inside=0)
1972
1973         # in2out
1974         pkts = self.create_stream_in(self.pg0, self.pg1)
1975         self.pg0.add_stream(pkts)
1976         self.pg_enable_capture(self.pg_interfaces)
1977         self.pg_start()
1978         capture = self.pg1.get_capture(len(pkts))
1979         self.verify_capture_out(capture, nat_ip)
1980
1981         # out2in
1982         pkts = self.create_stream_out(self.pg1, nat_ip)
1983         self.pg1.add_stream(pkts)
1984         self.pg_enable_capture(self.pg_interfaces)
1985         self.pg_start()
1986         capture = self.pg0.get_capture(len(pkts))
1987         self.verify_capture_in(capture, self.pg0)
1988
1989         # session dump test
1990         sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
1991         self.assertEqual(len(sessions), 3)
1992
1993         # TCP session
1994         s = sessions[0]
1995         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
1996         self.assertEqual(s.in_port, self.tcp_port_in)
1997         self.assertEqual(s.out_port, self.tcp_port_out)
1998         self.assertEqual(s.ext_port, self.tcp_external_port)
1999
2000         # UDP session
2001         s = sessions[1]
2002         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2003         self.assertEqual(s.in_port, self.udp_port_in)
2004         self.assertEqual(s.out_port, self.udp_port_out)
2005         self.assertEqual(s.ext_port, self.udp_external_port)
2006
2007         # ICMP session
2008         s = sessions[2]
2009         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2010         self.assertEqual(s.in_port, self.icmp_id_in)
2011         self.assertEqual(s.out_port, self.icmp_external_id)
2012
2013     def test_multiple_users(self):
2014         """ CGNAT multiple users """
2015
2016         nat_ip = "10.0.0.10"
2017         port_in = 80
2018         external_port = 6303
2019
2020         host0 = self.pg0.remote_hosts[0]
2021         host1 = self.pg0.remote_hosts[1]
2022
2023         self.vapi.snat_add_det_map(host0.ip4n,
2024                                    24,
2025                                    socket.inet_aton(nat_ip),
2026                                    32)
2027         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2028         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2029                                                  is_inside=0)
2030
2031         # host0 to out
2032         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2033              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2034              TCP(sport=port_in, dport=external_port))
2035         self.pg0.add_stream(p)
2036         self.pg_enable_capture(self.pg_interfaces)
2037         self.pg_start()
2038         capture = self.pg1.get_capture(1)
2039         p = capture[0]
2040         try:
2041             ip = p[IP]
2042             tcp = p[TCP]
2043             self.assertEqual(ip.src, nat_ip)
2044             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2045             self.assertEqual(tcp.dport, external_port)
2046             port_out0 = tcp.sport
2047         except:
2048             self.logger.error(ppp("Unexpected or invalid packet:", p))
2049             raise
2050
2051         # host1 to out
2052         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2053              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2054              TCP(sport=port_in, dport=external_port))
2055         self.pg0.add_stream(p)
2056         self.pg_enable_capture(self.pg_interfaces)
2057         self.pg_start()
2058         capture = self.pg1.get_capture(1)
2059         p = capture[0]
2060         try:
2061             ip = p[IP]
2062             tcp = p[TCP]
2063             self.assertEqual(ip.src, nat_ip)
2064             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2065             self.assertEqual(tcp.dport, external_port)
2066             port_out1 = tcp.sport
2067         except:
2068             self.logger.error(ppp("Unexpected or invalid packet:", p))
2069             raise
2070
2071         dms = self.vapi.snat_det_map_dump()
2072         self.assertEqual(1, len(dms))
2073         self.assertEqual(2, dms[0].ses_num)
2074
2075         # out to host0
2076         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2077              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2078              TCP(sport=external_port, dport=port_out0))
2079         self.pg1.add_stream(p)
2080         self.pg_enable_capture(self.pg_interfaces)
2081         self.pg_start()
2082         capture = self.pg0.get_capture(1)
2083         p = capture[0]
2084         try:
2085             ip = p[IP]
2086             tcp = p[TCP]
2087             self.assertEqual(ip.src, self.pg1.remote_ip4)
2088             self.assertEqual(ip.dst, host0.ip4)
2089             self.assertEqual(tcp.dport, port_in)
2090             self.assertEqual(tcp.sport, external_port)
2091         except:
2092             self.logger.error(ppp("Unexpected or invalid packet:", p))
2093             raise
2094
2095         # out to host1
2096         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2097              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2098              TCP(sport=external_port, dport=port_out1))
2099         self.pg1.add_stream(p)
2100         self.pg_enable_capture(self.pg_interfaces)
2101         self.pg_start()
2102         capture = self.pg0.get_capture(1)
2103         p = capture[0]
2104         try:
2105             ip = p[IP]
2106             tcp = p[TCP]
2107             self.assertEqual(ip.src, self.pg1.remote_ip4)
2108             self.assertEqual(ip.dst, host1.ip4)
2109             self.assertEqual(tcp.dport, port_in)
2110             self.assertEqual(tcp.sport, external_port)
2111         except:
2112             self.logger.error(ppp("Unexpected or invalid packet", p))
2113             raise
2114
2115         # session close api test
2116         self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
2117                                              port_out1,
2118                                              self.pg1.remote_ip4n,
2119                                              external_port)
2120         dms = self.vapi.snat_det_map_dump()
2121         self.assertEqual(dms[0].ses_num, 1)
2122
2123         self.vapi.snat_det_close_session_in(host0.ip4n,
2124                                             port_in,
2125                                             self.pg1.remote_ip4n,
2126                                             external_port)
2127         dms = self.vapi.snat_det_map_dump()
2128         self.assertEqual(dms[0].ses_num, 0)
2129
2130     def test_tcp_session_close_detection_in(self):
2131         """ CGNAT TCP session close initiated from inside network """
2132         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2133                                    32,
2134                                    socket.inet_aton(self.snat_addr),
2135                                    32)
2136         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2137         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2138                                                  is_inside=0)
2139
2140         self.initiate_tcp_session(self.pg0, self.pg1)
2141
2142         # close the session from inside
2143         try:
2144             # FIN packet in -> out
2145             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2146                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2147                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2148                      flags="F"))
2149             self.pg0.add_stream(p)
2150             self.pg_enable_capture(self.pg_interfaces)
2151             self.pg_start()
2152             self.pg1.get_capture(1)
2153
2154             pkts = []
2155
2156             # ACK packet out -> in
2157             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2158                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2159                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2160                      flags="A"))
2161             pkts.append(p)
2162
2163             # FIN packet out -> in
2164             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2165                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2166                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2167                      flags="F"))
2168             pkts.append(p)
2169
2170             self.pg1.add_stream(pkts)
2171             self.pg_enable_capture(self.pg_interfaces)
2172             self.pg_start()
2173             self.pg0.get_capture(2)
2174
2175             # ACK packet in -> out
2176             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2177                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2178                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2179                      flags="A"))
2180             self.pg0.add_stream(p)
2181             self.pg_enable_capture(self.pg_interfaces)
2182             self.pg_start()
2183             self.pg1.get_capture(1)
2184
2185             # Check if snat closed the session
2186             dms = self.vapi.snat_det_map_dump()
2187             self.assertEqual(0, dms[0].ses_num)
2188         except:
2189             self.logger.error("TCP session termination failed")
2190             raise
2191
2192     def test_tcp_session_close_detection_out(self):
2193         """ CGNAT TCP session close initiated from outside network """
2194         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2195                                    32,
2196                                    socket.inet_aton(self.snat_addr),
2197                                    32)
2198         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2199         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2200                                                  is_inside=0)
2201
2202         self.initiate_tcp_session(self.pg0, self.pg1)
2203
2204         # close the session from outside
2205         try:
2206             # FIN packet out -> in
2207             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2208                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2209                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2210                      flags="F"))
2211             self.pg1.add_stream(p)
2212             self.pg_enable_capture(self.pg_interfaces)
2213             self.pg_start()
2214             self.pg0.get_capture(1)
2215
2216             pkts = []
2217
2218             # ACK packet in -> out
2219             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2220                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2221                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2222                      flags="A"))
2223             pkts.append(p)
2224
2225             # ACK packet in -> out
2226             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2227                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2228                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2229                      flags="F"))
2230             pkts.append(p)
2231
2232             self.pg0.add_stream(pkts)
2233             self.pg_enable_capture(self.pg_interfaces)
2234             self.pg_start()
2235             self.pg1.get_capture(2)
2236
2237             # ACK packet out -> in
2238             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2239                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2240                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2241                      flags="A"))
2242             self.pg1.add_stream(p)
2243             self.pg_enable_capture(self.pg_interfaces)
2244             self.pg_start()
2245             self.pg0.get_capture(1)
2246
2247             # Check if snat closed the session
2248             dms = self.vapi.snat_det_map_dump()
2249             self.assertEqual(0, dms[0].ses_num)
2250         except:
2251             self.logger.error("TCP session termination failed")
2252             raise
2253
2254     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2255     def test_session_timeout(self):
2256         """ CGNAT session timeouts """
2257         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2258                                    32,
2259                                    socket.inet_aton(self.snat_addr),
2260                                    32)
2261         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2262         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2263                                                  is_inside=0)
2264
2265         self.initiate_tcp_session(self.pg0, self.pg1)
2266         self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2267         pkts = self.create_stream_in(self.pg0, self.pg1)
2268         self.pg0.add_stream(pkts)
2269         self.pg_enable_capture(self.pg_interfaces)
2270         self.pg_start()
2271         capture = self.pg1.get_capture(len(pkts))
2272         sleep(15)
2273
2274         dms = self.vapi.snat_det_map_dump()
2275         self.assertEqual(0, dms[0].ses_num)
2276
2277     def test_session_limit_per_user(self):
2278         """ CGNAT maximum 1000 sessions per user should be created """
2279         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2280                                    32,
2281                                    socket.inet_aton(self.snat_addr),
2282                                    32)
2283         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2284         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2285                                                  is_inside=0)
2286         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2287                                      src_address=self.pg2.local_ip4n,
2288                                      path_mtu=512,
2289                                      template_interval=10)
2290         self.vapi.snat_ipfix()
2291
2292         pkts = []
2293         for port in range(1025, 2025):
2294             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2295                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2296                  UDP(sport=port, dport=port))
2297             pkts.append(p)
2298
2299         self.pg0.add_stream(pkts)
2300         self.pg_enable_capture(self.pg_interfaces)
2301         self.pg_start()
2302         capture = self.pg1.get_capture(len(pkts))
2303
2304         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2305              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2306              UDP(sport=3001, dport=3002))
2307         self.pg0.add_stream(p)
2308         self.pg_enable_capture(self.pg_interfaces)
2309         self.pg_start()
2310         capture = self.pg1.assert_nothing_captured()
2311
2312         # verify ICMP error packet
2313         capture = self.pg0.get_capture(1)
2314         p = capture[0]
2315         self.assertTrue(p.haslayer(ICMP))
2316         icmp = p[ICMP]
2317         self.assertEqual(icmp.type, 3)
2318         self.assertEqual(icmp.code, 1)
2319         self.assertTrue(icmp.haslayer(IPerror))
2320         inner_ip = icmp[IPerror]
2321         self.assertEqual(inner_ip[UDPerror].sport, 3001)
2322         self.assertEqual(inner_ip[UDPerror].dport, 3002)
2323
2324         dms = self.vapi.snat_det_map_dump()
2325
2326         self.assertEqual(1000, dms[0].ses_num)
2327
2328         # verify IPFIX logging
2329         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2330         capture = self.pg2.get_capture(2)
2331         ipfix = IPFIXDecoder()
2332         # first load template
2333         for p in capture:
2334             self.assertTrue(p.haslayer(IPFIX))
2335             if p.haslayer(Template):
2336                 ipfix.add_template(p.getlayer(Template))
2337         # verify events in data set
2338         for p in capture:
2339             if p.haslayer(Data):
2340                 data = ipfix.decode_data_set(p.getlayer(Set))
2341                 self.verify_ipfix_max_entries_per_user(data)
2342
2343     def clear_snat(self):
2344         """
2345         Clear SNAT configuration.
2346         """
2347         self.vapi.snat_ipfix(enable=0)
2348         self.vapi.snat_det_set_timeouts()
2349         deterministic_mappings = self.vapi.snat_det_map_dump()
2350         for dsm in deterministic_mappings:
2351             self.vapi.snat_add_det_map(dsm.in_addr,
2352                                        dsm.in_plen,
2353                                        dsm.out_addr,
2354                                        dsm.out_plen,
2355                                        is_add=0)
2356
2357         interfaces = self.vapi.snat_interface_dump()
2358         for intf in interfaces:
2359             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2360                                                      intf.is_inside,
2361                                                      is_add=0)
2362
2363     def tearDown(self):
2364         super(TestDeterministicNAT, self).tearDown()
2365         if not self.vpp_dead:
2366             self.logger.info(self.vapi.cli("show snat detail"))
2367             self.clear_snat()
2368
2369
2370 class TestNAT64(MethodHolder):
2371     """ NAT64 Test Cases """
2372
2373     @classmethod
2374     def setUpClass(cls):
2375         super(TestNAT64, cls).setUpClass()
2376
2377         try:
2378             cls.tcp_port_in = 6303
2379             cls.tcp_port_out = 6303
2380             cls.udp_port_in = 6304
2381             cls.udp_port_out = 6304
2382             cls.icmp_id_in = 6305
2383             cls.icmp_id_out = 6305
2384             cls.nat_addr = '10.0.0.3'
2385             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
2386
2387             cls.create_pg_interfaces(range(2))
2388             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
2389             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
2390
2391             for i in cls.ip6_interfaces:
2392                 i.admin_up()
2393                 i.config_ip6()
2394                 i.resolve_ndp()
2395
2396             for i in cls.ip4_interfaces:
2397                 i.admin_up()
2398                 i.config_ip4()
2399                 i.resolve_arp()
2400
2401         except Exception:
2402             super(TestNAT64, cls).tearDownClass()
2403             raise
2404
2405     def test_pool(self):
2406         """ Add/delete address to NAT64 pool """
2407         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
2408
2409         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
2410
2411         addresses = self.vapi.nat64_pool_addr_dump()
2412         self.assertEqual(len(addresses), 1)
2413         self.assertEqual(addresses[0].address, nat_addr)
2414
2415         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
2416
2417         addresses = self.vapi.nat64_pool_addr_dump()
2418         self.assertEqual(len(addresses), 0)
2419
2420     def test_interface(self):
2421         """ Enable/disable NAT64 feature on the interface """
2422         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2423         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2424
2425         interfaces = self.vapi.nat64_interface_dump()
2426         self.assertEqual(len(interfaces), 2)
2427         pg0_found = False
2428         pg1_found = False
2429         for intf in interfaces:
2430             if intf.sw_if_index == self.pg0.sw_if_index:
2431                 self.assertEqual(intf.is_inside, 1)
2432                 pg0_found = True
2433             elif intf.sw_if_index == self.pg1.sw_if_index:
2434                 self.assertEqual(intf.is_inside, 0)
2435                 pg1_found = True
2436         self.assertTrue(pg0_found)
2437         self.assertTrue(pg1_found)
2438
2439         features = self.vapi.cli("show interface features pg0")
2440         self.assertNotEqual(features.find('nat64-in2out'), -1)
2441         features = self.vapi.cli("show interface features pg1")
2442         self.assertNotEqual(features.find('nat64-out2in'), -1)
2443
2444         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
2445         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
2446
2447         interfaces = self.vapi.nat64_interface_dump()
2448         self.assertEqual(len(interfaces), 0)
2449
2450     def test_static_bib(self):
2451         """ Add/delete static BIB entry """
2452         in_addr = socket.inet_pton(socket.AF_INET6,
2453                                    '2001:db8:85a3::8a2e:370:7334')
2454         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
2455         in_port = 1234
2456         out_port = 5678
2457         proto = IP_PROTOS.tcp
2458
2459         self.vapi.nat64_add_del_static_bib(in_addr,
2460                                            out_addr,
2461                                            in_port,
2462                                            out_port,
2463                                            proto)
2464         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2465         static_bib_num = 0
2466         for bibe in bib:
2467             if bibe.is_static:
2468                 static_bib_num += 1
2469                 self.assertEqual(bibe.i_addr, in_addr)
2470                 self.assertEqual(bibe.o_addr, out_addr)
2471                 self.assertEqual(bibe.i_port, in_port)
2472                 self.assertEqual(bibe.o_port, out_port)
2473         self.assertEqual(static_bib_num, 1)
2474
2475         self.vapi.nat64_add_del_static_bib(in_addr,
2476                                            out_addr,
2477                                            in_port,
2478                                            out_port,
2479                                            proto,
2480                                            is_add=0)
2481         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2482         static_bib_num = 0
2483         for bibe in bib:
2484             if bibe.is_static:
2485                 static_bib_num += 1
2486         self.assertEqual(static_bib_num, 0)
2487
2488     def test_set_timeouts(self):
2489         """ Set NAT64 timeouts """
2490         # verify default values
2491         timeouts = self.vapi.nat64_get_timeouts()
2492         self.assertEqual(timeouts.udp, 300)
2493         self.assertEqual(timeouts.icmp, 60)
2494         self.assertEqual(timeouts.tcp_trans, 240)
2495         self.assertEqual(timeouts.tcp_est, 7440)
2496         self.assertEqual(timeouts.tcp_incoming_syn, 6)
2497
2498         # set and verify custom values
2499         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
2500                                      tcp_est=7450, tcp_incoming_syn=10)
2501         timeouts = self.vapi.nat64_get_timeouts()
2502         self.assertEqual(timeouts.udp, 200)
2503         self.assertEqual(timeouts.icmp, 30)
2504         self.assertEqual(timeouts.tcp_trans, 250)
2505         self.assertEqual(timeouts.tcp_est, 7450)
2506         self.assertEqual(timeouts.tcp_incoming_syn, 10)
2507
2508     def test_dynamic(self):
2509         """ NAT64 dynamic translation test """
2510         self.tcp_port_in = 6303
2511         self.udp_port_in = 6304
2512         self.icmp_id_in = 6305
2513
2514         ses_num_start = self.nat64_get_ses_num()
2515
2516         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2517                                                 self.nat_addr_n)
2518         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2519         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2520
2521         # in2out
2522         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2523         self.pg0.add_stream(pkts)
2524         self.pg_enable_capture(self.pg_interfaces)
2525         self.pg_start()
2526         capture = self.pg1.get_capture(3)
2527         self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
2528                                 dst_ip=self.pg1.remote_ip4)
2529
2530         # out2in
2531         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2532         self.pg1.add_stream(pkts)
2533         self.pg_enable_capture(self.pg_interfaces)
2534         self.pg_start()
2535         capture = self.pg0.get_capture(3)
2536         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2537         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2538
2539         # in2out
2540         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2541         self.pg0.add_stream(pkts)
2542         self.pg_enable_capture(self.pg_interfaces)
2543         self.pg_start()
2544         capture = self.pg1.get_capture(3)
2545         self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
2546                                 dst_ip=self.pg1.remote_ip4)
2547
2548         # out2in
2549         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2550         self.pg1.add_stream(pkts)
2551         self.pg_enable_capture(self.pg_interfaces)
2552         self.pg_start()
2553         capture = self.pg0.get_capture(3)
2554         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2555         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2556
2557         ses_num_end = self.nat64_get_ses_num()
2558
2559         self.assertEqual(ses_num_end - ses_num_start, 3)
2560
2561     def test_static(self):
2562         """ NAT64 static translation test """
2563         self.tcp_port_in = 60303
2564         self.udp_port_in = 60304
2565         self.icmp_id_in = 60305
2566         self.tcp_port_out = 60303
2567         self.udp_port_out = 60304
2568         self.icmp_id_out = 60305
2569
2570         ses_num_start = self.nat64_get_ses_num()
2571
2572         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2573                                                 self.nat_addr_n)
2574         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2575         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2576
2577         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2578                                            self.nat_addr_n,
2579                                            self.tcp_port_in,
2580                                            self.tcp_port_out,
2581                                            IP_PROTOS.tcp)
2582         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2583                                            self.nat_addr_n,
2584                                            self.udp_port_in,
2585                                            self.udp_port_out,
2586                                            IP_PROTOS.udp)
2587         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2588                                            self.nat_addr_n,
2589                                            self.icmp_id_in,
2590                                            self.icmp_id_out,
2591                                            IP_PROTOS.icmp)
2592
2593         # in2out
2594         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2595         self.pg0.add_stream(pkts)
2596         self.pg_enable_capture(self.pg_interfaces)
2597         self.pg_start()
2598         capture = self.pg1.get_capture(3)
2599         self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
2600                                 dst_ip=self.pg1.remote_ip4, same_port=True)
2601
2602         # out2in
2603         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2604         self.pg1.add_stream(pkts)
2605         self.pg_enable_capture(self.pg_interfaces)
2606         self.pg_start()
2607         capture = self.pg0.get_capture(3)
2608         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2609         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2610
2611         ses_num_end = self.nat64_get_ses_num()
2612
2613         self.assertEqual(ses_num_end - ses_num_start, 3)
2614
2615     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2616     def test_session_timeout(self):
2617         """ NAT64 session timeout """
2618         self.icmp_id_in = 1234
2619         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2620                                                 self.nat_addr_n)
2621         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2622         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2623         self.vapi.nat64_set_timeouts(icmp=5)
2624
2625         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2626         self.pg0.add_stream(pkts)
2627         self.pg_enable_capture(self.pg_interfaces)
2628         self.pg_start()
2629         capture = self.pg1.get_capture(3)
2630
2631         ses_num_before_timeout = self.nat64_get_ses_num()
2632
2633         sleep(15)
2634
2635         # ICMP session after timeout
2636         ses_num_after_timeout = self.nat64_get_ses_num()
2637         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
2638
2639     def test_icmp_error(self):
2640         """ NAT64 ICMP Error message translation """
2641         self.tcp_port_in = 6303
2642         self.udp_port_in = 6304
2643         self.icmp_id_in = 6305
2644
2645         ses_num_start = self.nat64_get_ses_num()
2646
2647         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2648                                                 self.nat_addr_n)
2649         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2650         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2651
2652         # send some packets to create sessions
2653         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2654         self.pg0.add_stream(pkts)
2655         self.pg_enable_capture(self.pg_interfaces)
2656         self.pg_start()
2657         capture_ip4 = self.pg1.get_capture(len(pkts))
2658         self.verify_capture_out(capture_ip4, packet_num=3,
2659                                 nat_ip=self.nat_addr,
2660                                 dst_ip=self.pg1.remote_ip4)
2661
2662         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2663         self.pg1.add_stream(pkts)
2664         self.pg_enable_capture(self.pg_interfaces)
2665         self.pg_start()
2666         capture_ip6 = self.pg0.get_capture(len(pkts))
2667         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2668         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
2669                                    self.pg0.remote_ip6)
2670
2671         # in2out
2672         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2673                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
2674                 ICMPv6DestUnreach(code=1) /
2675                 packet[IPv6] for packet in capture_ip6]
2676         self.pg0.add_stream(pkts)
2677         self.pg_enable_capture(self.pg_interfaces)
2678         self.pg_start()
2679         capture = self.pg1.get_capture(len(pkts))
2680         for packet in capture:
2681             try:
2682                 self.assertEqual(packet[IP].src, self.nat_addr)
2683                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2684                 self.assertEqual(packet[ICMP].type, 3)
2685                 self.assertEqual(packet[ICMP].code, 13)
2686                 inner = packet[IPerror]
2687                 self.assertEqual(inner.src, self.pg1.remote_ip4)
2688                 self.assertEqual(inner.dst, self.nat_addr)
2689                 if inner.haslayer(TCPerror):
2690                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
2691                 elif inner.haslayer(UDPerror):
2692                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
2693                 else:
2694                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
2695             except:
2696                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2697                 raise
2698
2699         # out2in
2700         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2701                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2702                 ICMP(type=3, code=13) /
2703                 packet[IP] for packet in capture_ip4]
2704         self.pg1.add_stream(pkts)
2705         self.pg_enable_capture(self.pg_interfaces)
2706         self.pg_start()
2707         capture = self.pg0.get_capture(len(pkts))
2708         for packet in capture:
2709             try:
2710                 self.assertEqual(packet[IPv6].src, ip.src)
2711                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
2712                 icmp = packet[ICMPv6DestUnreach]
2713                 self.assertEqual(icmp.code, 1)
2714                 inner = icmp[IPerror6]
2715                 self.assertEqual(inner.src, self.pg0.remote_ip6)
2716                 self.assertEqual(inner.dst, ip.src)
2717                 if inner.haslayer(TCPerror):
2718                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
2719                 elif inner.haslayer(UDPerror):
2720                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
2721                 else:
2722                     self.assertEqual(inner[ICMPv6EchoRequest].id,
2723                                      self.icmp_id_in)
2724             except:
2725                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2726                 raise
2727
2728     def nat64_get_ses_num(self):
2729         """
2730         Return number of active NAT64 sessions.
2731         """
2732         ses_num = 0
2733         st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
2734         ses_num += len(st)
2735         st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
2736         ses_num += len(st)
2737         st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
2738         ses_num += len(st)
2739         return ses_num
2740
2741     def clear_nat64(self):
2742         """
2743         Clear NAT64 configuration.
2744         """
2745         self.vapi.nat64_set_timeouts()
2746
2747         interfaces = self.vapi.nat64_interface_dump()
2748         for intf in interfaces:
2749             self.vapi.nat64_add_del_interface(intf.sw_if_index,
2750                                               intf.is_inside,
2751                                               is_add=0)
2752
2753         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2754         for bibe in bib:
2755             if bibe.is_static:
2756                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
2757                                                    bibe.o_addr,
2758                                                    bibe.i_port,
2759                                                    bibe.o_port,
2760                                                    bibe.proto,
2761                                                    bibe.vrf_id,
2762                                                    is_add=0)
2763
2764         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
2765         for bibe in bib:
2766             if bibe.is_static:
2767                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
2768                                                    bibe.o_addr,
2769                                                    bibe.i_port,
2770                                                    bibe.o_port,
2771                                                    bibe.proto,
2772                                                    bibe.vrf_id,
2773                                                    is_add=0)
2774
2775         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
2776         for bibe in bib:
2777             if bibe.is_static:
2778                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
2779                                                    bibe.o_addr,
2780                                                    bibe.i_port,
2781                                                    bibe.o_port,
2782                                                    bibe.proto,
2783                                                    bibe.vrf_id,
2784                                                    is_add=0)
2785
2786         adresses = self.vapi.nat64_pool_addr_dump()
2787         for addr in adresses:
2788             self.vapi.nat64_add_del_pool_addr_range(addr.address,
2789                                                     addr.address,
2790                                                     is_add=0)
2791
2792     def tearDown(self):
2793         super(TestNAT64, self).tearDown()
2794         if not self.vpp_dead:
2795             self.logger.info(self.vapi.cli("show nat64 pool"))
2796             self.logger.info(self.vapi.cli("show nat64 interfaces"))
2797             self.logger.info(self.vapi.cli("show nat64 bib tcp"))
2798             self.logger.info(self.vapi.cli("show nat64 bib udp"))
2799             self.logger.info(self.vapi.cli("show nat64 bib icmp"))
2800             self.logger.info(self.vapi.cli("show nat64 session table tcp"))
2801             self.logger.info(self.vapi.cli("show nat64 session table udp"))
2802             self.logger.info(self.vapi.cli("show nat64 session table icmp"))
2803             self.clear_nat64()
2804
2805 if __name__ == '__main__':
2806     unittest.main(testRunner=VppTestRunner)