acl-plugin: bihash-based ACL lookup
[vpp.git] / test / test_snat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
15 from util import ppp
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
18
19
20 class MethodHolder(VppTestCase):
21     """ SNAT create capture and verify method holder """
22
23     @classmethod
24     def setUpClass(cls):
25         super(MethodHolder, cls).setUpClass()
26
27     def tearDown(self):
28         super(MethodHolder, self).tearDown()
29
30     def check_ip_checksum(self, pkt):
31         """
32         Check IP checksum of the packet
33
34         :param pkt: Packet to check IP checksum
35         """
36         new = pkt.__class__(str(pkt))
37         del new['IP'].chksum
38         new = new.__class__(str(new))
39         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
40
41     def check_tcp_checksum(self, pkt):
42         """
43         Check TCP checksum in IP packet
44
45         :param pkt: Packet to check TCP checksum
46         """
47         new = pkt.__class__(str(pkt))
48         del new['TCP'].chksum
49         new = new.__class__(str(new))
50         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
51
52     def check_udp_checksum(self, pkt):
53         """
54         Check UDP checksum in IP packet
55
56         :param pkt: Packet to check UDP checksum
57         """
58         new = pkt.__class__(str(pkt))
59         del new['UDP'].chksum
60         new = new.__class__(str(new))
61         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
62
63     def check_icmp_errror_embedded(self, pkt):
64         """
65         Check ICMP error embeded packet checksum
66
67         :param pkt: Packet to check ICMP error embeded packet checksum
68         """
69         if pkt.haslayer(IPerror):
70             new = pkt.__class__(str(pkt))
71             del new['IPerror'].chksum
72             new = new.__class__(str(new))
73             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
74
75         if pkt.haslayer(TCPerror):
76             new = pkt.__class__(str(pkt))
77             del new['TCPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
80
81         if pkt.haslayer(UDPerror):
82             if pkt['UDPerror'].chksum != 0:
83                 new = pkt.__class__(str(pkt))
84                 del new['UDPerror'].chksum
85                 new = new.__class__(str(new))
86                 self.assertEqual(new['UDPerror'].chksum,
87                                  pkt['UDPerror'].chksum)
88
89         if pkt.haslayer(ICMPerror):
90             del new['ICMPerror'].chksum
91             new = new.__class__(str(new))
92             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
93
94     def check_icmp_checksum(self, pkt):
95         """
96         Check ICMP checksum in IPv4 packet
97
98         :param pkt: Packet to check ICMP checksum
99         """
100         new = pkt.__class__(str(pkt))
101         del new['ICMP'].chksum
102         new = new.__class__(str(new))
103         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
104         if pkt.haslayer(IPerror):
105             self.check_icmp_errror_embedded(pkt)
106
107     def check_icmpv6_checksum(self, pkt):
108         """
109         Check ICMPv6 checksum in IPv4 packet
110
111         :param pkt: Packet to check ICMPv6 checksum
112         """
113         new = pkt.__class__(str(pkt))
114         if pkt.haslayer(ICMPv6DestUnreach):
115             del new['ICMPv6DestUnreach'].cksum
116             new = new.__class__(str(new))
117             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
118                              pkt['ICMPv6DestUnreach'].cksum)
119             self.check_icmp_errror_embedded(pkt)
120         if pkt.haslayer(ICMPv6EchoRequest):
121             del new['ICMPv6EchoRequest'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
124                              pkt['ICMPv6EchoRequest'].cksum)
125         if pkt.haslayer(ICMPv6EchoReply):
126             del new['ICMPv6EchoReply'].cksum
127             new = new.__class__(str(new))
128             self.assertEqual(new['ICMPv6EchoReply'].cksum,
129                              pkt['ICMPv6EchoReply'].cksum)
130
131     def create_stream_in(self, in_if, out_if, ttl=64):
132         """
133         Create packet stream for inside network
134
135         :param in_if: Inside interface
136         :param out_if: Outside interface
137         :param ttl: TTL of generated packets
138         """
139         pkts = []
140         # TCP
141         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
142              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
143              TCP(sport=self.tcp_port_in, dport=20))
144         pkts.append(p)
145
146         # UDP
147         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149              UDP(sport=self.udp_port_in, dport=20))
150         pkts.append(p)
151
152         # ICMP
153         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155              ICMP(id=self.icmp_id_in, type='echo-request'))
156         pkts.append(p)
157
158         return pkts
159
160     def create_stream_in_ip6(self, in_if, out_if, hlim=64):
161         """
162         Create IPv6 packet stream for inside network
163
164         :param in_if: Inside interface
165         :param out_if: Outside interface
166         :param ttl: Hop Limit of generated packets
167         """
168         pkts = []
169         dst = ''.join(['64:ff9b::', out_if.remote_ip4])
170         # TCP
171         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
172              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
173              TCP(sport=self.tcp_port_in, dport=20))
174         pkts.append(p)
175
176         # UDP
177         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
178              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
179              UDP(sport=self.udp_port_in, dport=20))
180         pkts.append(p)
181
182         # ICMP
183         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
184              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
185              ICMPv6EchoRequest(id=self.icmp_id_in))
186         pkts.append(p)
187
188         return pkts
189
190     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
191         """
192         Create packet stream for outside network
193
194         :param out_if: Outside interface
195         :param dst_ip: Destination IP address (Default use global SNAT address)
196         :param ttl: TTL of generated packets
197         """
198         if dst_ip is None:
199             dst_ip = self.snat_addr
200         pkts = []
201         # TCP
202         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
203              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
204              TCP(dport=self.tcp_port_out, sport=20))
205         pkts.append(p)
206
207         # UDP
208         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
209              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
210              UDP(dport=self.udp_port_out, sport=20))
211         pkts.append(p)
212
213         # ICMP
214         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
215              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
216              ICMP(id=self.icmp_id_out, type='echo-reply'))
217         pkts.append(p)
218
219         return pkts
220
221     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
222                            packet_num=3, dst_ip=None):
223         """
224         Verify captured packets on outside network
225
226         :param capture: Captured packets
227         :param nat_ip: Translated IP address (Default use global SNAT address)
228         :param same_port: Sorce port number is not translated (Default False)
229         :param packet_num: Expected number of packets (Default 3)
230         :param dst_ip: Destination IP address (Default do not verify)
231         """
232         if nat_ip is None:
233             nat_ip = self.snat_addr
234         self.assertEqual(packet_num, len(capture))
235         for packet in capture:
236             try:
237                 self.check_ip_checksum(packet)
238                 self.assertEqual(packet[IP].src, nat_ip)
239                 if dst_ip is not None:
240                     self.assertEqual(packet[IP].dst, dst_ip)
241                 if packet.haslayer(TCP):
242                     if same_port:
243                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
244                     else:
245                         self.assertNotEqual(
246                             packet[TCP].sport, self.tcp_port_in)
247                     self.tcp_port_out = packet[TCP].sport
248                     self.check_tcp_checksum(packet)
249                 elif packet.haslayer(UDP):
250                     if same_port:
251                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
252                     else:
253                         self.assertNotEqual(
254                             packet[UDP].sport, self.udp_port_in)
255                     self.udp_port_out = packet[UDP].sport
256                 else:
257                     if same_port:
258                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
259                     else:
260                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
261                     self.icmp_id_out = packet[ICMP].id
262                     self.check_icmp_checksum(packet)
263             except:
264                 self.logger.error(ppp("Unexpected or invalid packet "
265                                       "(outside network):", packet))
266                 raise
267
268     def verify_capture_in(self, capture, in_if, packet_num=3):
269         """
270         Verify captured packets on inside network
271
272         :param capture: Captured packets
273         :param in_if: Inside interface
274         :param packet_num: Expected number of packets (Default 3)
275         """
276         self.assertEqual(packet_num, len(capture))
277         for packet in capture:
278             try:
279                 self.check_ip_checksum(packet)
280                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
281                 if packet.haslayer(TCP):
282                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
283                     self.check_tcp_checksum(packet)
284                 elif packet.haslayer(UDP):
285                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
286                 else:
287                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
288                     self.check_icmp_checksum(packet)
289             except:
290                 self.logger.error(ppp("Unexpected or invalid packet "
291                                       "(inside network):", packet))
292                 raise
293
294     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
295         """
296         Verify captured IPv6 packets on inside network
297
298         :param capture: Captured packets
299         :param src_ip: Source IP
300         :param dst_ip: Destination IP address
301         :param packet_num: Expected number of packets (Default 3)
302         """
303         self.assertEqual(packet_num, len(capture))
304         for packet in capture:
305             try:
306                 self.assertEqual(packet[IPv6].src, src_ip)
307                 self.assertEqual(packet[IPv6].dst, dst_ip)
308                 if packet.haslayer(TCP):
309                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
310                     self.check_tcp_checksum(packet)
311                 elif packet.haslayer(UDP):
312                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
313                     self.check_udp_checksum(packet)
314                 else:
315                     self.assertEqual(packet[ICMPv6EchoReply].id,
316                                      self.icmp_id_in)
317                     self.check_icmpv6_checksum(packet)
318             except:
319                 self.logger.error(ppp("Unexpected or invalid packet "
320                                       "(inside network):", packet))
321                 raise
322
323     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
324         """
325         Verify captured packet that don't have to be translated
326
327         :param capture: Captured packets
328         :param ingress_if: Ingress interface
329         :param egress_if: Egress interface
330         """
331         for packet in capture:
332             try:
333                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
334                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
335                 if packet.haslayer(TCP):
336                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
337                 elif packet.haslayer(UDP):
338                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
339                 else:
340                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
341             except:
342                 self.logger.error(ppp("Unexpected or invalid packet "
343                                       "(inside network):", packet))
344                 raise
345
346     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
347                                             packet_num=3, icmp_type=11):
348         """
349         Verify captured packets with ICMP errors on outside network
350
351         :param capture: Captured packets
352         :param src_ip: Translated IP address or IP address of VPP
353                        (Default use global SNAT address)
354         :param packet_num: Expected number of packets (Default 3)
355         :param icmp_type: Type of error ICMP packet
356                           we are expecting (Default 11)
357         """
358         if src_ip is None:
359             src_ip = self.snat_addr
360         self.assertEqual(packet_num, len(capture))
361         for packet in capture:
362             try:
363                 self.assertEqual(packet[IP].src, src_ip)
364                 self.assertTrue(packet.haslayer(ICMP))
365                 icmp = packet[ICMP]
366                 self.assertEqual(icmp.type, icmp_type)
367                 self.assertTrue(icmp.haslayer(IPerror))
368                 inner_ip = icmp[IPerror]
369                 if inner_ip.haslayer(TCPerror):
370                     self.assertEqual(inner_ip[TCPerror].dport,
371                                      self.tcp_port_out)
372                 elif inner_ip.haslayer(UDPerror):
373                     self.assertEqual(inner_ip[UDPerror].dport,
374                                      self.udp_port_out)
375                 else:
376                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
377             except:
378                 self.logger.error(ppp("Unexpected or invalid packet "
379                                       "(outside network):", packet))
380                 raise
381
382     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
383                                            icmp_type=11):
384         """
385         Verify captured packets with ICMP errors on inside network
386
387         :param capture: Captured packets
388         :param in_if: Inside interface
389         :param packet_num: Expected number of packets (Default 3)
390         :param icmp_type: Type of error ICMP packet
391                           we are expecting (Default 11)
392         """
393         self.assertEqual(packet_num, len(capture))
394         for packet in capture:
395             try:
396                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
397                 self.assertTrue(packet.haslayer(ICMP))
398                 icmp = packet[ICMP]
399                 self.assertEqual(icmp.type, icmp_type)
400                 self.assertTrue(icmp.haslayer(IPerror))
401                 inner_ip = icmp[IPerror]
402                 if inner_ip.haslayer(TCPerror):
403                     self.assertEqual(inner_ip[TCPerror].sport,
404                                      self.tcp_port_in)
405                 elif inner_ip.haslayer(UDPerror):
406                     self.assertEqual(inner_ip[UDPerror].sport,
407                                      self.udp_port_in)
408                 else:
409                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
410             except:
411                 self.logger.error(ppp("Unexpected or invalid packet "
412                                       "(inside network):", packet))
413                 raise
414
415     def verify_ipfix_nat44_ses(self, data):
416         """
417         Verify IPFIX NAT44 session create/delete event
418
419         :param data: Decoded IPFIX data records
420         """
421         nat44_ses_create_num = 0
422         nat44_ses_delete_num = 0
423         self.assertEqual(6, len(data))
424         for record in data:
425             # natEvent
426             self.assertIn(ord(record[230]), [4, 5])
427             if ord(record[230]) == 4:
428                 nat44_ses_create_num += 1
429             else:
430                 nat44_ses_delete_num += 1
431             # sourceIPv4Address
432             self.assertEqual(self.pg0.remote_ip4n, record[8])
433             # postNATSourceIPv4Address
434             self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
435                              record[225])
436             # ingressVRFID
437             self.assertEqual(struct.pack("!I", 0), record[234])
438             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
439             if IP_PROTOS.icmp == ord(record[4]):
440                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
441                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
442                                  record[227])
443             elif IP_PROTOS.tcp == ord(record[4]):
444                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
445                                  record[7])
446                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
447                                  record[227])
448             elif IP_PROTOS.udp == ord(record[4]):
449                 self.assertEqual(struct.pack("!H", self.udp_port_in),
450                                  record[7])
451                 self.assertEqual(struct.pack("!H", self.udp_port_out),
452                                  record[227])
453             else:
454                 self.fail("Invalid protocol")
455         self.assertEqual(3, nat44_ses_create_num)
456         self.assertEqual(3, nat44_ses_delete_num)
457
458     def verify_ipfix_addr_exhausted(self, data):
459         """
460         Verify IPFIX NAT addresses event
461
462         :param data: Decoded IPFIX data records
463         """
464         self.assertEqual(1, len(data))
465         record = data[0]
466         # natEvent
467         self.assertEqual(ord(record[230]), 3)
468         # natPoolID
469         self.assertEqual(struct.pack("!I", 0), record[283])
470
471
472 class TestSNAT(MethodHolder):
473     """ SNAT Test Cases """
474
475     @classmethod
476     def setUpClass(cls):
477         super(TestSNAT, cls).setUpClass()
478
479         try:
480             cls.tcp_port_in = 6303
481             cls.tcp_port_out = 6303
482             cls.udp_port_in = 6304
483             cls.udp_port_out = 6304
484             cls.icmp_id_in = 6305
485             cls.icmp_id_out = 6305
486             cls.snat_addr = '10.0.0.3'
487             cls.ipfix_src_port = 4739
488             cls.ipfix_domain_id = 1
489
490             cls.create_pg_interfaces(range(9))
491             cls.interfaces = list(cls.pg_interfaces[0:4])
492
493             for i in cls.interfaces:
494                 i.admin_up()
495                 i.config_ip4()
496                 i.resolve_arp()
497
498             cls.pg0.generate_remote_hosts(3)
499             cls.pg0.configure_ipv4_neighbors()
500
501             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
502
503             cls.pg4._local_ip4 = "172.16.255.1"
504             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
505             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
506             cls.pg4.set_table_ip4(10)
507             cls.pg5._local_ip4 = "172.16.255.3"
508             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
509             cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
510             cls.pg5.set_table_ip4(10)
511             cls.pg6._local_ip4 = "172.16.255.1"
512             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
513             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
514             cls.pg6.set_table_ip4(20)
515             for i in cls.overlapping_interfaces:
516                 i.config_ip4()
517                 i.admin_up()
518                 i.resolve_arp()
519
520             cls.pg7.admin_up()
521             cls.pg8.admin_up()
522
523         except Exception:
524             super(TestSNAT, cls).tearDownClass()
525             raise
526
527     def clear_snat(self):
528         """
529         Clear SNAT configuration.
530         """
531         # I found no elegant way to do this
532         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
533                                    dst_address_length=32,
534                                    next_hop_address=self.pg7.remote_ip4n,
535                                    next_hop_sw_if_index=self.pg7.sw_if_index,
536                                    is_add=0)
537         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
538                                    dst_address_length=32,
539                                    next_hop_address=self.pg8.remote_ip4n,
540                                    next_hop_sw_if_index=self.pg8.sw_if_index,
541                                    is_add=0)
542
543         for intf in [self.pg7, self.pg8]:
544             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
545             for n in neighbors:
546                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
547                                               n.mac_address,
548                                               n.ip_address,
549                                               is_add=0)
550
551         if self.pg7.has_ip4_config:
552             self.pg7.unconfig_ip4()
553
554         interfaces = self.vapi.snat_interface_addr_dump()
555         for intf in interfaces:
556             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
557
558         self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
559                              domain_id=self.ipfix_domain_id)
560         self.ipfix_src_port = 4739
561         self.ipfix_domain_id = 1
562
563         interfaces = self.vapi.snat_interface_dump()
564         for intf in interfaces:
565             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
566                                                      intf.is_inside,
567                                                      is_add=0)
568
569         static_mappings = self.vapi.snat_static_mapping_dump()
570         for sm in static_mappings:
571             self.vapi.snat_add_static_mapping(sm.local_ip_address,
572                                               sm.external_ip_address,
573                                               local_port=sm.local_port,
574                                               external_port=sm.external_port,
575                                               addr_only=sm.addr_only,
576                                               vrf_id=sm.vrf_id,
577                                               protocol=sm.protocol,
578                                               is_add=0)
579
580         adresses = self.vapi.snat_address_dump()
581         for addr in adresses:
582             self.vapi.snat_add_address_range(addr.ip_address,
583                                              addr.ip_address,
584                                              is_add=0)
585
586     def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
587                                 local_port=0, external_port=0, vrf_id=0,
588                                 is_add=1, external_sw_if_index=0xFFFFFFFF,
589                                 proto=0):
590         """
591         Add/delete S-NAT static mapping
592
593         :param local_ip: Local IP address
594         :param external_ip: External IP address
595         :param local_port: Local port number (Optional)
596         :param external_port: External port number (Optional)
597         :param vrf_id: VRF ID (Default 0)
598         :param is_add: 1 if add, 0 if delete (Default add)
599         :param external_sw_if_index: External interface instead of IP address
600         :param proto: IP protocol (Mandatory if port specified)
601         """
602         addr_only = 1
603         if local_port and external_port:
604             addr_only = 0
605         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
606         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
607         self.vapi.snat_add_static_mapping(
608             l_ip,
609             e_ip,
610             external_sw_if_index,
611             local_port,
612             external_port,
613             addr_only,
614             vrf_id,
615             proto,
616             is_add)
617
618     def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
619         """
620         Add/delete S-NAT address
621
622         :param ip: IP address
623         :param is_add: 1 if add, 0 if delete (Default add)
624         """
625         snat_addr = socket.inet_pton(socket.AF_INET, ip)
626         self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
627                                          vrf_id=vrf_id)
628
629     def test_dynamic(self):
630         """ SNAT dynamic translation test """
631
632         self.snat_add_address(self.snat_addr)
633         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
634         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
635                                                  is_inside=0)
636
637         # in2out
638         pkts = self.create_stream_in(self.pg0, self.pg1)
639         self.pg0.add_stream(pkts)
640         self.pg_enable_capture(self.pg_interfaces)
641         self.pg_start()
642         capture = self.pg1.get_capture(len(pkts))
643         self.verify_capture_out(capture)
644
645         # out2in
646         pkts = self.create_stream_out(self.pg1)
647         self.pg1.add_stream(pkts)
648         self.pg_enable_capture(self.pg_interfaces)
649         self.pg_start()
650         capture = self.pg0.get_capture(len(pkts))
651         self.verify_capture_in(capture, self.pg0)
652
653     def test_dynamic_icmp_errors_in2out_ttl_1(self):
654         """ SNAT handling of client packets with TTL=1 """
655
656         self.snat_add_address(self.snat_addr)
657         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
658         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
659                                                  is_inside=0)
660
661         # Client side - generate traffic
662         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
663         self.pg0.add_stream(pkts)
664         self.pg_enable_capture(self.pg_interfaces)
665         self.pg_start()
666
667         # Client side - verify ICMP type 11 packets
668         capture = self.pg0.get_capture(len(pkts))
669         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
670
671     def test_dynamic_icmp_errors_out2in_ttl_1(self):
672         """ SNAT handling of server packets with TTL=1 """
673
674         self.snat_add_address(self.snat_addr)
675         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
676         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
677                                                  is_inside=0)
678
679         # Client side - create sessions
680         pkts = self.create_stream_in(self.pg0, self.pg1)
681         self.pg0.add_stream(pkts)
682         self.pg_enable_capture(self.pg_interfaces)
683         self.pg_start()
684
685         # Server side - generate traffic
686         capture = self.pg1.get_capture(len(pkts))
687         self.verify_capture_out(capture)
688         pkts = self.create_stream_out(self.pg1, ttl=1)
689         self.pg1.add_stream(pkts)
690         self.pg_enable_capture(self.pg_interfaces)
691         self.pg_start()
692
693         # Server side - verify ICMP type 11 packets
694         capture = self.pg1.get_capture(len(pkts))
695         self.verify_capture_out_with_icmp_errors(capture,
696                                                  src_ip=self.pg1.local_ip4)
697
698     def test_dynamic_icmp_errors_in2out_ttl_2(self):
699         """ SNAT handling of error responses to client packets with TTL=2 """
700
701         self.snat_add_address(self.snat_addr)
702         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
703         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
704                                                  is_inside=0)
705
706         # Client side - generate traffic
707         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
708         self.pg0.add_stream(pkts)
709         self.pg_enable_capture(self.pg_interfaces)
710         self.pg_start()
711
712         # Server side - simulate ICMP type 11 response
713         capture = self.pg1.get_capture(len(pkts))
714         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
715                 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
716                 ICMP(type=11) / packet[IP] for packet in capture]
717         self.pg1.add_stream(pkts)
718         self.pg_enable_capture(self.pg_interfaces)
719         self.pg_start()
720
721         # Client side - verify ICMP type 11 packets
722         capture = self.pg0.get_capture(len(pkts))
723         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
724
725     def test_dynamic_icmp_errors_out2in_ttl_2(self):
726         """ SNAT handling of error responses to server packets with TTL=2 """
727
728         self.snat_add_address(self.snat_addr)
729         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
730         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
731                                                  is_inside=0)
732
733         # Client side - create sessions
734         pkts = self.create_stream_in(self.pg0, self.pg1)
735         self.pg0.add_stream(pkts)
736         self.pg_enable_capture(self.pg_interfaces)
737         self.pg_start()
738
739         # Server side - generate traffic
740         capture = self.pg1.get_capture(len(pkts))
741         self.verify_capture_out(capture)
742         pkts = self.create_stream_out(self.pg1, ttl=2)
743         self.pg1.add_stream(pkts)
744         self.pg_enable_capture(self.pg_interfaces)
745         self.pg_start()
746
747         # Client side - simulate ICMP type 11 response
748         capture = self.pg0.get_capture(len(pkts))
749         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
750                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
751                 ICMP(type=11) / packet[IP] for packet in capture]
752         self.pg0.add_stream(pkts)
753         self.pg_enable_capture(self.pg_interfaces)
754         self.pg_start()
755
756         # Server side - verify ICMP type 11 packets
757         capture = self.pg1.get_capture(len(pkts))
758         self.verify_capture_out_with_icmp_errors(capture)
759
760     def test_ping_out_interface_from_outside(self):
761         """ Ping SNAT out interface from outside network """
762
763         self.snat_add_address(self.snat_addr)
764         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
765         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
766                                                  is_inside=0)
767
768         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
769              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
770              ICMP(id=self.icmp_id_out, type='echo-request'))
771         pkts = [p]
772         self.pg1.add_stream(pkts)
773         self.pg_enable_capture(self.pg_interfaces)
774         self.pg_start()
775         capture = self.pg1.get_capture(len(pkts))
776         self.assertEqual(1, len(capture))
777         packet = capture[0]
778         try:
779             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
780             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
781             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
782             self.assertEqual(packet[ICMP].type, 0)  # echo reply
783         except:
784             self.logger.error(ppp("Unexpected or invalid packet "
785                                   "(outside network):", packet))
786             raise
787
788     def test_ping_internal_host_from_outside(self):
789         """ Ping internal host from outside network """
790
791         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
792         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
793         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
794                                                  is_inside=0)
795
796         # out2in
797         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
798                IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
799                ICMP(id=self.icmp_id_out, type='echo-request'))
800         self.pg1.add_stream(pkt)
801         self.pg_enable_capture(self.pg_interfaces)
802         self.pg_start()
803         capture = self.pg0.get_capture(1)
804         self.verify_capture_in(capture, self.pg0, packet_num=1)
805         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
806
807         # in2out
808         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
809                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
810                ICMP(id=self.icmp_id_in, type='echo-reply'))
811         self.pg0.add_stream(pkt)
812         self.pg_enable_capture(self.pg_interfaces)
813         self.pg_start()
814         capture = self.pg1.get_capture(1)
815         self.verify_capture_out(capture, same_port=True, packet_num=1)
816         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
817
818     def test_static_in(self):
819         """ SNAT 1:1 NAT initialized from inside network """
820
821         nat_ip = "10.0.0.10"
822         self.tcp_port_out = 6303
823         self.udp_port_out = 6304
824         self.icmp_id_out = 6305
825
826         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
827         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
828         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
829                                                  is_inside=0)
830
831         # in2out
832         pkts = self.create_stream_in(self.pg0, self.pg1)
833         self.pg0.add_stream(pkts)
834         self.pg_enable_capture(self.pg_interfaces)
835         self.pg_start()
836         capture = self.pg1.get_capture(len(pkts))
837         self.verify_capture_out(capture, nat_ip, True)
838
839         # out2in
840         pkts = self.create_stream_out(self.pg1, nat_ip)
841         self.pg1.add_stream(pkts)
842         self.pg_enable_capture(self.pg_interfaces)
843         self.pg_start()
844         capture = self.pg0.get_capture(len(pkts))
845         self.verify_capture_in(capture, self.pg0)
846
847     def test_static_out(self):
848         """ SNAT 1:1 NAT initialized from outside network """
849
850         nat_ip = "10.0.0.20"
851         self.tcp_port_out = 6303
852         self.udp_port_out = 6304
853         self.icmp_id_out = 6305
854
855         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
856         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
857         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
858                                                  is_inside=0)
859
860         # out2in
861         pkts = self.create_stream_out(self.pg1, nat_ip)
862         self.pg1.add_stream(pkts)
863         self.pg_enable_capture(self.pg_interfaces)
864         self.pg_start()
865         capture = self.pg0.get_capture(len(pkts))
866         self.verify_capture_in(capture, self.pg0)
867
868         # in2out
869         pkts = self.create_stream_in(self.pg0, self.pg1)
870         self.pg0.add_stream(pkts)
871         self.pg_enable_capture(self.pg_interfaces)
872         self.pg_start()
873         capture = self.pg1.get_capture(len(pkts))
874         self.verify_capture_out(capture, nat_ip, True)
875
876     def test_static_with_port_in(self):
877         """ SNAT 1:1 NAT with port initialized from inside network """
878
879         self.tcp_port_out = 3606
880         self.udp_port_out = 3607
881         self.icmp_id_out = 3608
882
883         self.snat_add_address(self.snat_addr)
884         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
885                                      self.tcp_port_in, self.tcp_port_out,
886                                      proto=IP_PROTOS.tcp)
887         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
888                                      self.udp_port_in, self.udp_port_out,
889                                      proto=IP_PROTOS.udp)
890         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
891                                      self.icmp_id_in, self.icmp_id_out,
892                                      proto=IP_PROTOS.icmp)
893         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
894         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
895                                                  is_inside=0)
896
897         # in2out
898         pkts = self.create_stream_in(self.pg0, self.pg1)
899         self.pg0.add_stream(pkts)
900         self.pg_enable_capture(self.pg_interfaces)
901         self.pg_start()
902         capture = self.pg1.get_capture(len(pkts))
903         self.verify_capture_out(capture)
904
905         # out2in
906         pkts = self.create_stream_out(self.pg1)
907         self.pg1.add_stream(pkts)
908         self.pg_enable_capture(self.pg_interfaces)
909         self.pg_start()
910         capture = self.pg0.get_capture(len(pkts))
911         self.verify_capture_in(capture, self.pg0)
912
913     def test_static_with_port_out(self):
914         """ SNAT 1:1 NAT with port initialized from outside network """
915
916         self.tcp_port_out = 30606
917         self.udp_port_out = 30607
918         self.icmp_id_out = 30608
919
920         self.snat_add_address(self.snat_addr)
921         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
922                                      self.tcp_port_in, self.tcp_port_out,
923                                      proto=IP_PROTOS.tcp)
924         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
925                                      self.udp_port_in, self.udp_port_out,
926                                      proto=IP_PROTOS.udp)
927         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
928                                      self.icmp_id_in, self.icmp_id_out,
929                                      proto=IP_PROTOS.icmp)
930         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
931         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
932                                                  is_inside=0)
933
934         # out2in
935         pkts = self.create_stream_out(self.pg1)
936         self.pg1.add_stream(pkts)
937         self.pg_enable_capture(self.pg_interfaces)
938         self.pg_start()
939         capture = self.pg0.get_capture(len(pkts))
940         self.verify_capture_in(capture, self.pg0)
941
942         # in2out
943         pkts = self.create_stream_in(self.pg0, self.pg1)
944         self.pg0.add_stream(pkts)
945         self.pg_enable_capture(self.pg_interfaces)
946         self.pg_start()
947         capture = self.pg1.get_capture(len(pkts))
948         self.verify_capture_out(capture)
949
950     def test_static_vrf_aware(self):
951         """ SNAT 1:1 NAT VRF awareness """
952
953         nat_ip1 = "10.0.0.30"
954         nat_ip2 = "10.0.0.40"
955         self.tcp_port_out = 6303
956         self.udp_port_out = 6304
957         self.icmp_id_out = 6305
958
959         self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
960                                      vrf_id=10)
961         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
962                                      vrf_id=10)
963         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
964                                                  is_inside=0)
965         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
966         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
967
968         # inside interface VRF match SNAT static mapping VRF
969         pkts = self.create_stream_in(self.pg4, self.pg3)
970         self.pg4.add_stream(pkts)
971         self.pg_enable_capture(self.pg_interfaces)
972         self.pg_start()
973         capture = self.pg3.get_capture(len(pkts))
974         self.verify_capture_out(capture, nat_ip1, True)
975
976         # inside interface VRF don't match SNAT static mapping VRF (packets
977         # are dropped)
978         pkts = self.create_stream_in(self.pg0, self.pg3)
979         self.pg0.add_stream(pkts)
980         self.pg_enable_capture(self.pg_interfaces)
981         self.pg_start()
982         self.pg3.assert_nothing_captured()
983
984     def test_multiple_inside_interfaces(self):
985         """ SNAT multiple inside interfaces (non-overlapping address space) """
986
987         self.snat_add_address(self.snat_addr)
988         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
989         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
990         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
991                                                  is_inside=0)
992
993         # between two S-NAT inside interfaces (no translation)
994         pkts = self.create_stream_in(self.pg0, self.pg1)
995         self.pg0.add_stream(pkts)
996         self.pg_enable_capture(self.pg_interfaces)
997         self.pg_start()
998         capture = self.pg1.get_capture(len(pkts))
999         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1000
1001         # from S-NAT inside to interface without S-NAT feature (no translation)
1002         pkts = self.create_stream_in(self.pg0, self.pg2)
1003         self.pg0.add_stream(pkts)
1004         self.pg_enable_capture(self.pg_interfaces)
1005         self.pg_start()
1006         capture = self.pg2.get_capture(len(pkts))
1007         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1008
1009         # in2out 1st interface
1010         pkts = self.create_stream_in(self.pg0, self.pg3)
1011         self.pg0.add_stream(pkts)
1012         self.pg_enable_capture(self.pg_interfaces)
1013         self.pg_start()
1014         capture = self.pg3.get_capture(len(pkts))
1015         self.verify_capture_out(capture)
1016
1017         # out2in 1st interface
1018         pkts = self.create_stream_out(self.pg3)
1019         self.pg3.add_stream(pkts)
1020         self.pg_enable_capture(self.pg_interfaces)
1021         self.pg_start()
1022         capture = self.pg0.get_capture(len(pkts))
1023         self.verify_capture_in(capture, self.pg0)
1024
1025         # in2out 2nd interface
1026         pkts = self.create_stream_in(self.pg1, self.pg3)
1027         self.pg1.add_stream(pkts)
1028         self.pg_enable_capture(self.pg_interfaces)
1029         self.pg_start()
1030         capture = self.pg3.get_capture(len(pkts))
1031         self.verify_capture_out(capture)
1032
1033         # out2in 2nd interface
1034         pkts = self.create_stream_out(self.pg3)
1035         self.pg3.add_stream(pkts)
1036         self.pg_enable_capture(self.pg_interfaces)
1037         self.pg_start()
1038         capture = self.pg1.get_capture(len(pkts))
1039         self.verify_capture_in(capture, self.pg1)
1040
1041     def test_inside_overlapping_interfaces(self):
1042         """ SNAT multiple inside interfaces with overlapping address space """
1043
1044         static_nat_ip = "10.0.0.10"
1045         self.snat_add_address(self.snat_addr)
1046         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1047                                                  is_inside=0)
1048         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
1049         self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
1050         self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
1051         self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1052                                      vrf_id=20)
1053
1054         # between S-NAT inside interfaces with same VRF (no translation)
1055         pkts = self.create_stream_in(self.pg4, self.pg5)
1056         self.pg4.add_stream(pkts)
1057         self.pg_enable_capture(self.pg_interfaces)
1058         self.pg_start()
1059         capture = self.pg5.get_capture(len(pkts))
1060         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1061
1062         # between S-NAT inside interfaces with different VRF (hairpinning)
1063         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1064              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1065              TCP(sport=1234, dport=5678))
1066         self.pg4.add_stream(p)
1067         self.pg_enable_capture(self.pg_interfaces)
1068         self.pg_start()
1069         capture = self.pg6.get_capture(1)
1070         p = capture[0]
1071         try:
1072             ip = p[IP]
1073             tcp = p[TCP]
1074             self.assertEqual(ip.src, self.snat_addr)
1075             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1076             self.assertNotEqual(tcp.sport, 1234)
1077             self.assertEqual(tcp.dport, 5678)
1078         except:
1079             self.logger.error(ppp("Unexpected or invalid packet:", p))
1080             raise
1081
1082         # in2out 1st interface
1083         pkts = self.create_stream_in(self.pg4, self.pg3)
1084         self.pg4.add_stream(pkts)
1085         self.pg_enable_capture(self.pg_interfaces)
1086         self.pg_start()
1087         capture = self.pg3.get_capture(len(pkts))
1088         self.verify_capture_out(capture)
1089
1090         # out2in 1st interface
1091         pkts = self.create_stream_out(self.pg3)
1092         self.pg3.add_stream(pkts)
1093         self.pg_enable_capture(self.pg_interfaces)
1094         self.pg_start()
1095         capture = self.pg4.get_capture(len(pkts))
1096         self.verify_capture_in(capture, self.pg4)
1097
1098         # in2out 2nd interface
1099         pkts = self.create_stream_in(self.pg5, self.pg3)
1100         self.pg5.add_stream(pkts)
1101         self.pg_enable_capture(self.pg_interfaces)
1102         self.pg_start()
1103         capture = self.pg3.get_capture(len(pkts))
1104         self.verify_capture_out(capture)
1105
1106         # out2in 2nd interface
1107         pkts = self.create_stream_out(self.pg3)
1108         self.pg3.add_stream(pkts)
1109         self.pg_enable_capture(self.pg_interfaces)
1110         self.pg_start()
1111         capture = self.pg5.get_capture(len(pkts))
1112         self.verify_capture_in(capture, self.pg5)
1113
1114         # pg5 session dump
1115         addresses = self.vapi.snat_address_dump()
1116         self.assertEqual(len(addresses), 1)
1117         sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
1118         self.assertEqual(len(sessions), 3)
1119         for session in sessions:
1120             self.assertFalse(session.is_static)
1121             self.assertEqual(session.inside_ip_address[0:4],
1122                              self.pg5.remote_ip4n)
1123             self.assertEqual(session.outside_ip_address,
1124                              addresses[0].ip_address)
1125         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1126         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1127         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1128         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1129         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1130         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1131         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1132         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1133         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1134
1135         # in2out 3rd interface
1136         pkts = self.create_stream_in(self.pg6, self.pg3)
1137         self.pg6.add_stream(pkts)
1138         self.pg_enable_capture(self.pg_interfaces)
1139         self.pg_start()
1140         capture = self.pg3.get_capture(len(pkts))
1141         self.verify_capture_out(capture, static_nat_ip, True)
1142
1143         # out2in 3rd interface
1144         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1145         self.pg3.add_stream(pkts)
1146         self.pg_enable_capture(self.pg_interfaces)
1147         self.pg_start()
1148         capture = self.pg6.get_capture(len(pkts))
1149         self.verify_capture_in(capture, self.pg6)
1150
1151         # general user and session dump verifications
1152         users = self.vapi.snat_user_dump()
1153         self.assertTrue(len(users) >= 3)
1154         addresses = self.vapi.snat_address_dump()
1155         self.assertEqual(len(addresses), 1)
1156         for user in users:
1157             sessions = self.vapi.snat_user_session_dump(user.ip_address,
1158                                                         user.vrf_id)
1159             for session in sessions:
1160                 self.assertEqual(user.ip_address, session.inside_ip_address)
1161                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1162                 self.assertTrue(session.protocol in
1163                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1164                                  IP_PROTOS.icmp])
1165
1166         # pg4 session dump
1167         sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
1168         self.assertTrue(len(sessions) >= 4)
1169         for session in sessions:
1170             self.assertFalse(session.is_static)
1171             self.assertEqual(session.inside_ip_address[0:4],
1172                              self.pg4.remote_ip4n)
1173             self.assertEqual(session.outside_ip_address,
1174                              addresses[0].ip_address)
1175
1176         # pg6 session dump
1177         sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1178         self.assertTrue(len(sessions) >= 3)
1179         for session in sessions:
1180             self.assertTrue(session.is_static)
1181             self.assertEqual(session.inside_ip_address[0:4],
1182                              self.pg6.remote_ip4n)
1183             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1184                              map(int, static_nat_ip.split('.')))
1185             self.assertTrue(session.inside_port in
1186                             [self.tcp_port_in, self.udp_port_in,
1187                              self.icmp_id_in])
1188
1189     def test_hairpinning(self):
1190         """ SNAT hairpinning - 1:1 NAT with port"""
1191
1192         host = self.pg0.remote_hosts[0]
1193         server = self.pg0.remote_hosts[1]
1194         host_in_port = 1234
1195         host_out_port = 0
1196         server_in_port = 5678
1197         server_out_port = 8765
1198
1199         self.snat_add_address(self.snat_addr)
1200         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1201         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1202                                                  is_inside=0)
1203         # add static mapping for server
1204         self.snat_add_static_mapping(server.ip4, self.snat_addr,
1205                                      server_in_port, server_out_port,
1206                                      proto=IP_PROTOS.tcp)
1207
1208         # send packet from host to server
1209         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1210              IP(src=host.ip4, dst=self.snat_addr) /
1211              TCP(sport=host_in_port, dport=server_out_port))
1212         self.pg0.add_stream(p)
1213         self.pg_enable_capture(self.pg_interfaces)
1214         self.pg_start()
1215         capture = self.pg0.get_capture(1)
1216         p = capture[0]
1217         try:
1218             ip = p[IP]
1219             tcp = p[TCP]
1220             self.assertEqual(ip.src, self.snat_addr)
1221             self.assertEqual(ip.dst, server.ip4)
1222             self.assertNotEqual(tcp.sport, host_in_port)
1223             self.assertEqual(tcp.dport, server_in_port)
1224             self.check_tcp_checksum(p)
1225             host_out_port = tcp.sport
1226         except:
1227             self.logger.error(ppp("Unexpected or invalid packet:", p))
1228             raise
1229
1230         # send reply from server to host
1231         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1232              IP(src=server.ip4, dst=self.snat_addr) /
1233              TCP(sport=server_in_port, dport=host_out_port))
1234         self.pg0.add_stream(p)
1235         self.pg_enable_capture(self.pg_interfaces)
1236         self.pg_start()
1237         capture = self.pg0.get_capture(1)
1238         p = capture[0]
1239         try:
1240             ip = p[IP]
1241             tcp = p[TCP]
1242             self.assertEqual(ip.src, self.snat_addr)
1243             self.assertEqual(ip.dst, host.ip4)
1244             self.assertEqual(tcp.sport, server_out_port)
1245             self.assertEqual(tcp.dport, host_in_port)
1246             self.check_tcp_checksum(p)
1247         except:
1248             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1249             raise
1250
1251     def test_hairpinning2(self):
1252         """ SNAT hairpinning - 1:1 NAT"""
1253
1254         server1_nat_ip = "10.0.0.10"
1255         server2_nat_ip = "10.0.0.11"
1256         host = self.pg0.remote_hosts[0]
1257         server1 = self.pg0.remote_hosts[1]
1258         server2 = self.pg0.remote_hosts[2]
1259         server_tcp_port = 22
1260         server_udp_port = 20
1261
1262         self.snat_add_address(self.snat_addr)
1263         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1264         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1265                                                  is_inside=0)
1266
1267         # add static mapping for servers
1268         self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
1269         self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
1270
1271         # host to server1
1272         pkts = []
1273         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1274              IP(src=host.ip4, dst=server1_nat_ip) /
1275              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1276         pkts.append(p)
1277         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1278              IP(src=host.ip4, dst=server1_nat_ip) /
1279              UDP(sport=self.udp_port_in, dport=server_udp_port))
1280         pkts.append(p)
1281         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1282              IP(src=host.ip4, dst=server1_nat_ip) /
1283              ICMP(id=self.icmp_id_in, type='echo-request'))
1284         pkts.append(p)
1285         self.pg0.add_stream(pkts)
1286         self.pg_enable_capture(self.pg_interfaces)
1287         self.pg_start()
1288         capture = self.pg0.get_capture(len(pkts))
1289         for packet in capture:
1290             try:
1291                 self.assertEqual(packet[IP].src, self.snat_addr)
1292                 self.assertEqual(packet[IP].dst, server1.ip4)
1293                 if packet.haslayer(TCP):
1294                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1295                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1296                     self.tcp_port_out = packet[TCP].sport
1297                     self.check_tcp_checksum(packet)
1298                 elif packet.haslayer(UDP):
1299                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1300                     self.assertEqual(packet[UDP].dport, server_udp_port)
1301                     self.udp_port_out = packet[UDP].sport
1302                 else:
1303                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1304                     self.icmp_id_out = packet[ICMP].id
1305             except:
1306                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1307                 raise
1308
1309         # server1 to host
1310         pkts = []
1311         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1312              IP(src=server1.ip4, dst=self.snat_addr) /
1313              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1314         pkts.append(p)
1315         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1316              IP(src=server1.ip4, dst=self.snat_addr) /
1317              UDP(sport=server_udp_port, dport=self.udp_port_out))
1318         pkts.append(p)
1319         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1320              IP(src=server1.ip4, dst=self.snat_addr) /
1321              ICMP(id=self.icmp_id_out, type='echo-reply'))
1322         pkts.append(p)
1323         self.pg0.add_stream(pkts)
1324         self.pg_enable_capture(self.pg_interfaces)
1325         self.pg_start()
1326         capture = self.pg0.get_capture(len(pkts))
1327         for packet in capture:
1328             try:
1329                 self.assertEqual(packet[IP].src, server1_nat_ip)
1330                 self.assertEqual(packet[IP].dst, host.ip4)
1331                 if packet.haslayer(TCP):
1332                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1333                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1334                     self.check_tcp_checksum(packet)
1335                 elif packet.haslayer(UDP):
1336                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1337                     self.assertEqual(packet[UDP].sport, server_udp_port)
1338                 else:
1339                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1340             except:
1341                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1342                 raise
1343
1344         # server2 to server1
1345         pkts = []
1346         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1347              IP(src=server2.ip4, dst=server1_nat_ip) /
1348              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1349         pkts.append(p)
1350         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1351              IP(src=server2.ip4, dst=server1_nat_ip) /
1352              UDP(sport=self.udp_port_in, dport=server_udp_port))
1353         pkts.append(p)
1354         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1355              IP(src=server2.ip4, dst=server1_nat_ip) /
1356              ICMP(id=self.icmp_id_in, type='echo-request'))
1357         pkts.append(p)
1358         self.pg0.add_stream(pkts)
1359         self.pg_enable_capture(self.pg_interfaces)
1360         self.pg_start()
1361         capture = self.pg0.get_capture(len(pkts))
1362         for packet in capture:
1363             try:
1364                 self.assertEqual(packet[IP].src, server2_nat_ip)
1365                 self.assertEqual(packet[IP].dst, server1.ip4)
1366                 if packet.haslayer(TCP):
1367                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1368                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1369                     self.tcp_port_out = packet[TCP].sport
1370                     self.check_tcp_checksum(packet)
1371                 elif packet.haslayer(UDP):
1372                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1373                     self.assertEqual(packet[UDP].dport, server_udp_port)
1374                     self.udp_port_out = packet[UDP].sport
1375                 else:
1376                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1377                     self.icmp_id_out = packet[ICMP].id
1378             except:
1379                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1380                 raise
1381
1382         # server1 to server2
1383         pkts = []
1384         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1385              IP(src=server1.ip4, dst=server2_nat_ip) /
1386              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1387         pkts.append(p)
1388         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1389              IP(src=server1.ip4, dst=server2_nat_ip) /
1390              UDP(sport=server_udp_port, dport=self.udp_port_out))
1391         pkts.append(p)
1392         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1393              IP(src=server1.ip4, dst=server2_nat_ip) /
1394              ICMP(id=self.icmp_id_out, type='echo-reply'))
1395         pkts.append(p)
1396         self.pg0.add_stream(pkts)
1397         self.pg_enable_capture(self.pg_interfaces)
1398         self.pg_start()
1399         capture = self.pg0.get_capture(len(pkts))
1400         for packet in capture:
1401             try:
1402                 self.assertEqual(packet[IP].src, server1_nat_ip)
1403                 self.assertEqual(packet[IP].dst, server2.ip4)
1404                 if packet.haslayer(TCP):
1405                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1406                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1407                     self.check_tcp_checksum(packet)
1408                 elif packet.haslayer(UDP):
1409                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1410                     self.assertEqual(packet[UDP].sport, server_udp_port)
1411                 else:
1412                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1413             except:
1414                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1415                 raise
1416
1417     def test_max_translations_per_user(self):
1418         """ MAX translations per user - recycle the least recently used """
1419
1420         self.snat_add_address(self.snat_addr)
1421         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1422         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1423                                                  is_inside=0)
1424
1425         # get maximum number of translations per user
1426         snat_config = self.vapi.snat_show_config()
1427
1428         # send more than maximum number of translations per user packets
1429         pkts_num = snat_config.max_translations_per_user + 5
1430         pkts = []
1431         for port in range(0, pkts_num):
1432             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1433                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1434                  TCP(sport=1025 + port))
1435             pkts.append(p)
1436         self.pg0.add_stream(pkts)
1437         self.pg_enable_capture(self.pg_interfaces)
1438         self.pg_start()
1439
1440         # verify number of translated packet
1441         self.pg1.get_capture(pkts_num)
1442
1443     def test_interface_addr(self):
1444         """ Acquire SNAT addresses from interface """
1445         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1446
1447         # no address in NAT pool
1448         adresses = self.vapi.snat_address_dump()
1449         self.assertEqual(0, len(adresses))
1450
1451         # configure interface address and check NAT address pool
1452         self.pg7.config_ip4()
1453         adresses = self.vapi.snat_address_dump()
1454         self.assertEqual(1, len(adresses))
1455         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1456
1457         # remove interface address and check NAT address pool
1458         self.pg7.unconfig_ip4()
1459         adresses = self.vapi.snat_address_dump()
1460         self.assertEqual(0, len(adresses))
1461
1462     def test_interface_addr_static_mapping(self):
1463         """ Static mapping with addresses from interface """
1464         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1465         self.snat_add_static_mapping('1.2.3.4',
1466                                      external_sw_if_index=self.pg7.sw_if_index)
1467
1468         # static mappings with external interface
1469         static_mappings = self.vapi.snat_static_mapping_dump()
1470         self.assertEqual(1, len(static_mappings))
1471         self.assertEqual(self.pg7.sw_if_index,
1472                          static_mappings[0].external_sw_if_index)
1473
1474         # configure interface address and check static mappings
1475         self.pg7.config_ip4()
1476         static_mappings = self.vapi.snat_static_mapping_dump()
1477         self.assertEqual(1, len(static_mappings))
1478         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1479                          self.pg7.local_ip4n)
1480         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1481
1482         # remove interface address and check static mappings
1483         self.pg7.unconfig_ip4()
1484         static_mappings = self.vapi.snat_static_mapping_dump()
1485         self.assertEqual(0, len(static_mappings))
1486
1487     def test_ipfix_nat44_sess(self):
1488         """ S-NAT IPFIX logging NAT44 session created/delted """
1489         self.ipfix_domain_id = 10
1490         self.ipfix_src_port = 20202
1491         colector_port = 30303
1492         bind_layers(UDP, IPFIX, dport=30303)
1493         self.snat_add_address(self.snat_addr)
1494         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1495         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1496                                                  is_inside=0)
1497         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1498                                      src_address=self.pg3.local_ip4n,
1499                                      path_mtu=512,
1500                                      template_interval=10,
1501                                      collector_port=colector_port)
1502         self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1503                              src_port=self.ipfix_src_port)
1504
1505         pkts = self.create_stream_in(self.pg0, self.pg1)
1506         self.pg0.add_stream(pkts)
1507         self.pg_enable_capture(self.pg_interfaces)
1508         self.pg_start()
1509         capture = self.pg1.get_capture(len(pkts))
1510         self.verify_capture_out(capture)
1511         self.snat_add_address(self.snat_addr, is_add=0)
1512         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1513         capture = self.pg3.get_capture(3)
1514         ipfix = IPFIXDecoder()
1515         # first load template
1516         for p in capture:
1517             self.assertTrue(p.haslayer(IPFIX))
1518             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1519             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1520             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1521             self.assertEqual(p[UDP].dport, colector_port)
1522             self.assertEqual(p[IPFIX].observationDomainID,
1523                              self.ipfix_domain_id)
1524             if p.haslayer(Template):
1525                 ipfix.add_template(p.getlayer(Template))
1526         # verify events in data set
1527         for p in capture:
1528             if p.haslayer(Data):
1529                 data = ipfix.decode_data_set(p.getlayer(Set))
1530                 self.verify_ipfix_nat44_ses(data)
1531
1532     def test_ipfix_addr_exhausted(self):
1533         """ S-NAT IPFIX logging NAT addresses exhausted """
1534         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1535         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1536                                                  is_inside=0)
1537         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1538                                      src_address=self.pg3.local_ip4n,
1539                                      path_mtu=512,
1540                                      template_interval=10)
1541         self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1542                              src_port=self.ipfix_src_port)
1543
1544         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1545              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1546              TCP(sport=3025))
1547         self.pg0.add_stream(p)
1548         self.pg_enable_capture(self.pg_interfaces)
1549         self.pg_start()
1550         capture = self.pg1.get_capture(0)
1551         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1552         capture = self.pg3.get_capture(3)
1553         ipfix = IPFIXDecoder()
1554         # first load template
1555         for p in capture:
1556             self.assertTrue(p.haslayer(IPFIX))
1557             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1558             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1559             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1560             self.assertEqual(p[UDP].dport, 4739)
1561             self.assertEqual(p[IPFIX].observationDomainID,
1562                              self.ipfix_domain_id)
1563             if p.haslayer(Template):
1564                 ipfix.add_template(p.getlayer(Template))
1565         # verify events in data set
1566         for p in capture:
1567             if p.haslayer(Data):
1568                 data = ipfix.decode_data_set(p.getlayer(Set))
1569                 self.verify_ipfix_addr_exhausted(data)
1570
1571     def test_pool_addr_fib(self):
1572         """ S-NAT add pool addresses to FIB """
1573         static_addr = '10.0.0.10'
1574         self.snat_add_address(self.snat_addr)
1575         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1576         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1577                                                  is_inside=0)
1578         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1579
1580         # SNAT address
1581         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1582              ARP(op=ARP.who_has, pdst=self.snat_addr,
1583                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1584         self.pg1.add_stream(p)
1585         self.pg_enable_capture(self.pg_interfaces)
1586         self.pg_start()
1587         capture = self.pg1.get_capture(1)
1588         self.assertTrue(capture[0].haslayer(ARP))
1589         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1590
1591         # 1:1 NAT address
1592         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1593              ARP(op=ARP.who_has, pdst=static_addr,
1594                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1595         self.pg1.add_stream(p)
1596         self.pg_enable_capture(self.pg_interfaces)
1597         self.pg_start()
1598         capture = self.pg1.get_capture(1)
1599         self.assertTrue(capture[0].haslayer(ARP))
1600         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1601
1602         # send ARP to non-SNAT interface
1603         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1604              ARP(op=ARP.who_has, pdst=self.snat_addr,
1605                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1606         self.pg2.add_stream(p)
1607         self.pg_enable_capture(self.pg_interfaces)
1608         self.pg_start()
1609         capture = self.pg1.get_capture(0)
1610
1611         # remove addresses and verify
1612         self.snat_add_address(self.snat_addr, is_add=0)
1613         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1614                                      is_add=0)
1615
1616         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1617              ARP(op=ARP.who_has, pdst=self.snat_addr,
1618                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1619         self.pg1.add_stream(p)
1620         self.pg_enable_capture(self.pg_interfaces)
1621         self.pg_start()
1622         capture = self.pg1.get_capture(0)
1623
1624         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1625              ARP(op=ARP.who_has, pdst=static_addr,
1626                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1627         self.pg1.add_stream(p)
1628         self.pg_enable_capture(self.pg_interfaces)
1629         self.pg_start()
1630         capture = self.pg1.get_capture(0)
1631
1632     def test_vrf_mode(self):
1633         """ S-NAT tenant VRF aware address pool mode """
1634
1635         vrf_id1 = 1
1636         vrf_id2 = 2
1637         nat_ip1 = "10.0.0.10"
1638         nat_ip2 = "10.0.0.11"
1639
1640         self.pg0.unconfig_ip4()
1641         self.pg1.unconfig_ip4()
1642         self.pg0.set_table_ip4(vrf_id1)
1643         self.pg1.set_table_ip4(vrf_id2)
1644         self.pg0.config_ip4()
1645         self.pg1.config_ip4()
1646
1647         self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1648         self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1649         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1650         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1651         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1652                                                  is_inside=0)
1653
1654         # first VRF
1655         pkts = self.create_stream_in(self.pg0, self.pg2)
1656         self.pg0.add_stream(pkts)
1657         self.pg_enable_capture(self.pg_interfaces)
1658         self.pg_start()
1659         capture = self.pg2.get_capture(len(pkts))
1660         self.verify_capture_out(capture, nat_ip1)
1661
1662         # second VRF
1663         pkts = self.create_stream_in(self.pg1, self.pg2)
1664         self.pg1.add_stream(pkts)
1665         self.pg_enable_capture(self.pg_interfaces)
1666         self.pg_start()
1667         capture = self.pg2.get_capture(len(pkts))
1668         self.verify_capture_out(capture, nat_ip2)
1669
1670     def test_vrf_feature_independent(self):
1671         """ S-NAT tenant VRF independent address pool mode """
1672
1673         nat_ip1 = "10.0.0.10"
1674         nat_ip2 = "10.0.0.11"
1675
1676         self.snat_add_address(nat_ip1)
1677         self.snat_add_address(nat_ip2)
1678         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1679         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1680         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1681                                                  is_inside=0)
1682
1683         # first VRF
1684         pkts = self.create_stream_in(self.pg0, self.pg2)
1685         self.pg0.add_stream(pkts)
1686         self.pg_enable_capture(self.pg_interfaces)
1687         self.pg_start()
1688         capture = self.pg2.get_capture(len(pkts))
1689         self.verify_capture_out(capture, nat_ip1)
1690
1691         # second VRF
1692         pkts = self.create_stream_in(self.pg1, self.pg2)
1693         self.pg1.add_stream(pkts)
1694         self.pg_enable_capture(self.pg_interfaces)
1695         self.pg_start()
1696         capture = self.pg2.get_capture(len(pkts))
1697         self.verify_capture_out(capture, nat_ip1)
1698
1699     def test_dynamic_ipless_interfaces(self):
1700         """ SNAT interfaces without configured ip dynamic map """
1701
1702         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1703                                       self.pg7.remote_mac,
1704                                       self.pg7.remote_ip4n,
1705                                       is_static=1)
1706         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1707                                       self.pg8.remote_mac,
1708                                       self.pg8.remote_ip4n,
1709                                       is_static=1)
1710
1711         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1712                                    dst_address_length=32,
1713                                    next_hop_address=self.pg7.remote_ip4n,
1714                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1715         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1716                                    dst_address_length=32,
1717                                    next_hop_address=self.pg8.remote_ip4n,
1718                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1719
1720         self.snat_add_address(self.snat_addr)
1721         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1722         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1723                                                  is_inside=0)
1724
1725         # in2out
1726         pkts = self.create_stream_in(self.pg7, self.pg8)
1727         self.pg7.add_stream(pkts)
1728         self.pg_enable_capture(self.pg_interfaces)
1729         self.pg_start()
1730         capture = self.pg8.get_capture(len(pkts))
1731         self.verify_capture_out(capture)
1732
1733         # out2in
1734         pkts = self.create_stream_out(self.pg8, self.snat_addr)
1735         self.pg8.add_stream(pkts)
1736         self.pg_enable_capture(self.pg_interfaces)
1737         self.pg_start()
1738         capture = self.pg7.get_capture(len(pkts))
1739         self.verify_capture_in(capture, self.pg7)
1740
1741     def test_static_ipless_interfaces(self):
1742         """ SNAT 1:1 NAT interfaces without configured ip """
1743
1744         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1745                                       self.pg7.remote_mac,
1746                                       self.pg7.remote_ip4n,
1747                                       is_static=1)
1748         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1749                                       self.pg8.remote_mac,
1750                                       self.pg8.remote_ip4n,
1751                                       is_static=1)
1752
1753         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1754                                    dst_address_length=32,
1755                                    next_hop_address=self.pg7.remote_ip4n,
1756                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1757         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1758                                    dst_address_length=32,
1759                                    next_hop_address=self.pg8.remote_ip4n,
1760                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1761
1762         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1763         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1764         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1765                                                  is_inside=0)
1766
1767         # out2in
1768         pkts = self.create_stream_out(self.pg8)
1769         self.pg8.add_stream(pkts)
1770         self.pg_enable_capture(self.pg_interfaces)
1771         self.pg_start()
1772         capture = self.pg7.get_capture(len(pkts))
1773         self.verify_capture_in(capture, self.pg7)
1774
1775         # in2out
1776         pkts = self.create_stream_in(self.pg7, self.pg8)
1777         self.pg7.add_stream(pkts)
1778         self.pg_enable_capture(self.pg_interfaces)
1779         self.pg_start()
1780         capture = self.pg8.get_capture(len(pkts))
1781         self.verify_capture_out(capture, self.snat_addr, True)
1782
1783     def test_static_with_port_ipless_interfaces(self):
1784         """ SNAT 1:1 NAT with port interfaces without configured ip """
1785
1786         self.tcp_port_out = 30606
1787         self.udp_port_out = 30607
1788         self.icmp_id_out = 30608
1789
1790         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1791                                       self.pg7.remote_mac,
1792                                       self.pg7.remote_ip4n,
1793                                       is_static=1)
1794         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1795                                       self.pg8.remote_mac,
1796                                       self.pg8.remote_ip4n,
1797                                       is_static=1)
1798
1799         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1800                                    dst_address_length=32,
1801                                    next_hop_address=self.pg7.remote_ip4n,
1802                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1803         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1804                                    dst_address_length=32,
1805                                    next_hop_address=self.pg8.remote_ip4n,
1806                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1807
1808         self.snat_add_address(self.snat_addr)
1809         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1810                                      self.tcp_port_in, self.tcp_port_out,
1811                                      proto=IP_PROTOS.tcp)
1812         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1813                                      self.udp_port_in, self.udp_port_out,
1814                                      proto=IP_PROTOS.udp)
1815         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1816                                      self.icmp_id_in, self.icmp_id_out,
1817                                      proto=IP_PROTOS.icmp)
1818         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1819         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1820                                                  is_inside=0)
1821
1822         # out2in
1823         pkts = self.create_stream_out(self.pg8)
1824         self.pg8.add_stream(pkts)
1825         self.pg_enable_capture(self.pg_interfaces)
1826         self.pg_start()
1827         capture = self.pg7.get_capture(len(pkts))
1828         self.verify_capture_in(capture, self.pg7)
1829
1830         # in2out
1831         pkts = self.create_stream_in(self.pg7, self.pg8)
1832         self.pg7.add_stream(pkts)
1833         self.pg_enable_capture(self.pg_interfaces)
1834         self.pg_start()
1835         capture = self.pg8.get_capture(len(pkts))
1836         self.verify_capture_out(capture)
1837
1838     def tearDown(self):
1839         super(TestSNAT, self).tearDown()
1840         if not self.vpp_dead:
1841             self.logger.info(self.vapi.cli("show snat verbose"))
1842             self.clear_snat()
1843
1844
1845 class TestDeterministicNAT(MethodHolder):
1846     """ Deterministic NAT Test Cases """
1847
1848     @classmethod
1849     def setUpConstants(cls):
1850         super(TestDeterministicNAT, cls).setUpConstants()
1851         cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1852
1853     @classmethod
1854     def setUpClass(cls):
1855         super(TestDeterministicNAT, cls).setUpClass()
1856
1857         try:
1858             cls.tcp_port_in = 6303
1859             cls.tcp_external_port = 6303
1860             cls.udp_port_in = 6304
1861             cls.udp_external_port = 6304
1862             cls.icmp_id_in = 6305
1863             cls.snat_addr = '10.0.0.3'
1864
1865             cls.create_pg_interfaces(range(3))
1866             cls.interfaces = list(cls.pg_interfaces)
1867
1868             for i in cls.interfaces:
1869                 i.admin_up()
1870                 i.config_ip4()
1871                 i.resolve_arp()
1872
1873             cls.pg0.generate_remote_hosts(2)
1874             cls.pg0.configure_ipv4_neighbors()
1875
1876         except Exception:
1877             super(TestDeterministicNAT, cls).tearDownClass()
1878             raise
1879
1880     def create_stream_in(self, in_if, out_if, ttl=64):
1881         """
1882         Create packet stream for inside network
1883
1884         :param in_if: Inside interface
1885         :param out_if: Outside interface
1886         :param ttl: TTL of generated packets
1887         """
1888         pkts = []
1889         # TCP
1890         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1891              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1892              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1893         pkts.append(p)
1894
1895         # UDP
1896         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1897              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1898              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
1899         pkts.append(p)
1900
1901         # ICMP
1902         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1903              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1904              ICMP(id=self.icmp_id_in, type='echo-request'))
1905         pkts.append(p)
1906
1907         return pkts
1908
1909     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
1910         """
1911         Create packet stream for outside network
1912
1913         :param out_if: Outside interface
1914         :param dst_ip: Destination IP address (Default use global SNAT address)
1915         :param ttl: TTL of generated packets
1916         """
1917         if dst_ip is None:
1918             dst_ip = self.snat_addr
1919         pkts = []
1920         # TCP
1921         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1922              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1923              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
1924         pkts.append(p)
1925
1926         # UDP
1927         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1928              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1929              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
1930         pkts.append(p)
1931
1932         # ICMP
1933         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1934              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1935              ICMP(id=self.icmp_external_id, type='echo-reply'))
1936         pkts.append(p)
1937
1938         return pkts
1939
1940     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
1941         """
1942         Verify captured packets on outside network
1943
1944         :param capture: Captured packets
1945         :param nat_ip: Translated IP address (Default use global SNAT address)
1946         :param same_port: Sorce port number is not translated (Default False)
1947         :param packet_num: Expected number of packets (Default 3)
1948         """
1949         if nat_ip is None:
1950             nat_ip = self.snat_addr
1951         self.assertEqual(packet_num, len(capture))
1952         for packet in capture:
1953             try:
1954                 self.assertEqual(packet[IP].src, nat_ip)
1955                 if packet.haslayer(TCP):
1956                     self.tcp_port_out = packet[TCP].sport
1957                 elif packet.haslayer(UDP):
1958                     self.udp_port_out = packet[UDP].sport
1959                 else:
1960                     self.icmp_external_id = packet[ICMP].id
1961             except:
1962                 self.logger.error(ppp("Unexpected or invalid packet "
1963                                       "(outside network):", packet))
1964                 raise
1965
1966     def initiate_tcp_session(self, in_if, out_if):
1967         """
1968         Initiates TCP session
1969
1970         :param in_if: Inside interface
1971         :param out_if: Outside interface
1972         """
1973         try:
1974             # SYN packet in->out
1975             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1976                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1977                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1978                      flags="S"))
1979             in_if.add_stream(p)
1980             self.pg_enable_capture(self.pg_interfaces)
1981             self.pg_start()
1982             capture = out_if.get_capture(1)
1983             p = capture[0]
1984             self.tcp_port_out = p[TCP].sport
1985
1986             # SYN + ACK packet out->in
1987             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
1988                  IP(src=out_if.remote_ip4, dst=self.snat_addr) /
1989                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1990                      flags="SA"))
1991             out_if.add_stream(p)
1992             self.pg_enable_capture(self.pg_interfaces)
1993             self.pg_start()
1994             in_if.get_capture(1)
1995
1996             # ACK packet in->out
1997             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1998                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1999                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2000                      flags="A"))
2001             in_if.add_stream(p)
2002             self.pg_enable_capture(self.pg_interfaces)
2003             self.pg_start()
2004             out_if.get_capture(1)
2005
2006         except:
2007             self.logger.error("TCP 3 way handshake failed")
2008             raise
2009
2010     def verify_ipfix_max_entries_per_user(self, data):
2011         """
2012         Verify IPFIX maximum entries per user exceeded event
2013
2014         :param data: Decoded IPFIX data records
2015         """
2016         self.assertEqual(1, len(data))
2017         record = data[0]
2018         # natEvent
2019         self.assertEqual(ord(record[230]), 13)
2020         # natQuotaExceededEvent
2021         self.assertEqual('\x03\x00\x00\x00', record[466])
2022         # sourceIPv4Address
2023         self.assertEqual(self.pg0.remote_ip4n, record[8])
2024
2025     def test_deterministic_mode(self):
2026         """ S-NAT run deterministic mode """
2027         in_addr = '172.16.255.0'
2028         out_addr = '172.17.255.50'
2029         in_addr_t = '172.16.255.20'
2030         in_addr_n = socket.inet_aton(in_addr)
2031         out_addr_n = socket.inet_aton(out_addr)
2032         in_addr_t_n = socket.inet_aton(in_addr_t)
2033         in_plen = 24
2034         out_plen = 32
2035
2036         snat_config = self.vapi.snat_show_config()
2037         self.assertEqual(1, snat_config.deterministic)
2038
2039         self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
2040
2041         rep1 = self.vapi.snat_det_forward(in_addr_t_n)
2042         self.assertEqual(rep1.out_addr[:4], out_addr_n)
2043         rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
2044         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2045
2046         deterministic_mappings = self.vapi.snat_det_map_dump()
2047         self.assertEqual(len(deterministic_mappings), 1)
2048         dsm = deterministic_mappings[0]
2049         self.assertEqual(in_addr_n, dsm.in_addr[:4])
2050         self.assertEqual(in_plen, dsm.in_plen)
2051         self.assertEqual(out_addr_n, dsm.out_addr[:4])
2052         self.assertEqual(out_plen, dsm.out_plen)
2053
2054         self.clear_snat()
2055         deterministic_mappings = self.vapi.snat_det_map_dump()
2056         self.assertEqual(len(deterministic_mappings), 0)
2057
2058     def test_set_timeouts(self):
2059         """ Set deterministic NAT timeouts """
2060         timeouts_before = self.vapi.snat_det_get_timeouts()
2061
2062         self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
2063                                         timeouts_before.tcp_established + 10,
2064                                         timeouts_before.tcp_transitory + 10,
2065                                         timeouts_before.icmp + 10)
2066
2067         timeouts_after = self.vapi.snat_det_get_timeouts()
2068
2069         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2070         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2071         self.assertNotEqual(timeouts_before.tcp_established,
2072                             timeouts_after.tcp_established)
2073         self.assertNotEqual(timeouts_before.tcp_transitory,
2074                             timeouts_after.tcp_transitory)
2075
2076     def test_det_in(self):
2077         """ CGNAT translation test (TCP, UDP, ICMP) """
2078
2079         nat_ip = "10.0.0.10"
2080
2081         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2082                                    32,
2083                                    socket.inet_aton(nat_ip),
2084                                    32)
2085         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2086         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2087                                                  is_inside=0)
2088
2089         # in2out
2090         pkts = self.create_stream_in(self.pg0, self.pg1)
2091         self.pg0.add_stream(pkts)
2092         self.pg_enable_capture(self.pg_interfaces)
2093         self.pg_start()
2094         capture = self.pg1.get_capture(len(pkts))
2095         self.verify_capture_out(capture, nat_ip)
2096
2097         # out2in
2098         pkts = self.create_stream_out(self.pg1, nat_ip)
2099         self.pg1.add_stream(pkts)
2100         self.pg_enable_capture(self.pg_interfaces)
2101         self.pg_start()
2102         capture = self.pg0.get_capture(len(pkts))
2103         self.verify_capture_in(capture, self.pg0)
2104
2105         # session dump test
2106         sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
2107         self.assertEqual(len(sessions), 3)
2108
2109         # TCP session
2110         s = sessions[0]
2111         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2112         self.assertEqual(s.in_port, self.tcp_port_in)
2113         self.assertEqual(s.out_port, self.tcp_port_out)
2114         self.assertEqual(s.ext_port, self.tcp_external_port)
2115
2116         # UDP session
2117         s = sessions[1]
2118         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2119         self.assertEqual(s.in_port, self.udp_port_in)
2120         self.assertEqual(s.out_port, self.udp_port_out)
2121         self.assertEqual(s.ext_port, self.udp_external_port)
2122
2123         # ICMP session
2124         s = sessions[2]
2125         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2126         self.assertEqual(s.in_port, self.icmp_id_in)
2127         self.assertEqual(s.out_port, self.icmp_external_id)
2128
2129     def test_multiple_users(self):
2130         """ CGNAT multiple users """
2131
2132         nat_ip = "10.0.0.10"
2133         port_in = 80
2134         external_port = 6303
2135
2136         host0 = self.pg0.remote_hosts[0]
2137         host1 = self.pg0.remote_hosts[1]
2138
2139         self.vapi.snat_add_det_map(host0.ip4n,
2140                                    24,
2141                                    socket.inet_aton(nat_ip),
2142                                    32)
2143         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2144         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2145                                                  is_inside=0)
2146
2147         # host0 to out
2148         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2149              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2150              TCP(sport=port_in, dport=external_port))
2151         self.pg0.add_stream(p)
2152         self.pg_enable_capture(self.pg_interfaces)
2153         self.pg_start()
2154         capture = self.pg1.get_capture(1)
2155         p = capture[0]
2156         try:
2157             ip = p[IP]
2158             tcp = p[TCP]
2159             self.assertEqual(ip.src, nat_ip)
2160             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2161             self.assertEqual(tcp.dport, external_port)
2162             port_out0 = tcp.sport
2163         except:
2164             self.logger.error(ppp("Unexpected or invalid packet:", p))
2165             raise
2166
2167         # host1 to out
2168         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2169              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2170              TCP(sport=port_in, dport=external_port))
2171         self.pg0.add_stream(p)
2172         self.pg_enable_capture(self.pg_interfaces)
2173         self.pg_start()
2174         capture = self.pg1.get_capture(1)
2175         p = capture[0]
2176         try:
2177             ip = p[IP]
2178             tcp = p[TCP]
2179             self.assertEqual(ip.src, nat_ip)
2180             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2181             self.assertEqual(tcp.dport, external_port)
2182             port_out1 = tcp.sport
2183         except:
2184             self.logger.error(ppp("Unexpected or invalid packet:", p))
2185             raise
2186
2187         dms = self.vapi.snat_det_map_dump()
2188         self.assertEqual(1, len(dms))
2189         self.assertEqual(2, dms[0].ses_num)
2190
2191         # out to host0
2192         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2193              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2194              TCP(sport=external_port, dport=port_out0))
2195         self.pg1.add_stream(p)
2196         self.pg_enable_capture(self.pg_interfaces)
2197         self.pg_start()
2198         capture = self.pg0.get_capture(1)
2199         p = capture[0]
2200         try:
2201             ip = p[IP]
2202             tcp = p[TCP]
2203             self.assertEqual(ip.src, self.pg1.remote_ip4)
2204             self.assertEqual(ip.dst, host0.ip4)
2205             self.assertEqual(tcp.dport, port_in)
2206             self.assertEqual(tcp.sport, external_port)
2207         except:
2208             self.logger.error(ppp("Unexpected or invalid packet:", p))
2209             raise
2210
2211         # out to host1
2212         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2213              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2214              TCP(sport=external_port, dport=port_out1))
2215         self.pg1.add_stream(p)
2216         self.pg_enable_capture(self.pg_interfaces)
2217         self.pg_start()
2218         capture = self.pg0.get_capture(1)
2219         p = capture[0]
2220         try:
2221             ip = p[IP]
2222             tcp = p[TCP]
2223             self.assertEqual(ip.src, self.pg1.remote_ip4)
2224             self.assertEqual(ip.dst, host1.ip4)
2225             self.assertEqual(tcp.dport, port_in)
2226             self.assertEqual(tcp.sport, external_port)
2227         except:
2228             self.logger.error(ppp("Unexpected or invalid packet", p))
2229             raise
2230
2231         # session close api test
2232         self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
2233                                              port_out1,
2234                                              self.pg1.remote_ip4n,
2235                                              external_port)
2236         dms = self.vapi.snat_det_map_dump()
2237         self.assertEqual(dms[0].ses_num, 1)
2238
2239         self.vapi.snat_det_close_session_in(host0.ip4n,
2240                                             port_in,
2241                                             self.pg1.remote_ip4n,
2242                                             external_port)
2243         dms = self.vapi.snat_det_map_dump()
2244         self.assertEqual(dms[0].ses_num, 0)
2245
2246     def test_tcp_session_close_detection_in(self):
2247         """ CGNAT TCP session close initiated from inside network """
2248         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2249                                    32,
2250                                    socket.inet_aton(self.snat_addr),
2251                                    32)
2252         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2253         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2254                                                  is_inside=0)
2255
2256         self.initiate_tcp_session(self.pg0, self.pg1)
2257
2258         # close the session from inside
2259         try:
2260             # FIN packet in -> out
2261             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2262                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2263                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2264                      flags="F"))
2265             self.pg0.add_stream(p)
2266             self.pg_enable_capture(self.pg_interfaces)
2267             self.pg_start()
2268             self.pg1.get_capture(1)
2269
2270             pkts = []
2271
2272             # ACK packet out -> in
2273             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2274                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2275                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2276                      flags="A"))
2277             pkts.append(p)
2278
2279             # FIN packet out -> in
2280             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2281                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2282                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2283                      flags="F"))
2284             pkts.append(p)
2285
2286             self.pg1.add_stream(pkts)
2287             self.pg_enable_capture(self.pg_interfaces)
2288             self.pg_start()
2289             self.pg0.get_capture(2)
2290
2291             # ACK packet in -> out
2292             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2293                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2294                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2295                      flags="A"))
2296             self.pg0.add_stream(p)
2297             self.pg_enable_capture(self.pg_interfaces)
2298             self.pg_start()
2299             self.pg1.get_capture(1)
2300
2301             # Check if snat closed the session
2302             dms = self.vapi.snat_det_map_dump()
2303             self.assertEqual(0, dms[0].ses_num)
2304         except:
2305             self.logger.error("TCP session termination failed")
2306             raise
2307
2308     def test_tcp_session_close_detection_out(self):
2309         """ CGNAT TCP session close initiated from outside network """
2310         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2311                                    32,
2312                                    socket.inet_aton(self.snat_addr),
2313                                    32)
2314         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2315         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2316                                                  is_inside=0)
2317
2318         self.initiate_tcp_session(self.pg0, self.pg1)
2319
2320         # close the session from outside
2321         try:
2322             # FIN packet out -> in
2323             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2324                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2325                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2326                      flags="F"))
2327             self.pg1.add_stream(p)
2328             self.pg_enable_capture(self.pg_interfaces)
2329             self.pg_start()
2330             self.pg0.get_capture(1)
2331
2332             pkts = []
2333
2334             # ACK packet in -> out
2335             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2336                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2337                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2338                      flags="A"))
2339             pkts.append(p)
2340
2341             # ACK packet in -> out
2342             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2343                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2344                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2345                      flags="F"))
2346             pkts.append(p)
2347
2348             self.pg0.add_stream(pkts)
2349             self.pg_enable_capture(self.pg_interfaces)
2350             self.pg_start()
2351             self.pg1.get_capture(2)
2352
2353             # ACK packet out -> in
2354             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2355                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2356                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2357                      flags="A"))
2358             self.pg1.add_stream(p)
2359             self.pg_enable_capture(self.pg_interfaces)
2360             self.pg_start()
2361             self.pg0.get_capture(1)
2362
2363             # Check if snat closed the session
2364             dms = self.vapi.snat_det_map_dump()
2365             self.assertEqual(0, dms[0].ses_num)
2366         except:
2367             self.logger.error("TCP session termination failed")
2368             raise
2369
2370     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2371     def test_session_timeout(self):
2372         """ CGNAT session timeouts """
2373         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2374                                    32,
2375                                    socket.inet_aton(self.snat_addr),
2376                                    32)
2377         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2378         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2379                                                  is_inside=0)
2380
2381         self.initiate_tcp_session(self.pg0, self.pg1)
2382         self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2383         pkts = self.create_stream_in(self.pg0, self.pg1)
2384         self.pg0.add_stream(pkts)
2385         self.pg_enable_capture(self.pg_interfaces)
2386         self.pg_start()
2387         capture = self.pg1.get_capture(len(pkts))
2388         sleep(15)
2389
2390         dms = self.vapi.snat_det_map_dump()
2391         self.assertEqual(0, dms[0].ses_num)
2392
2393     def test_session_limit_per_user(self):
2394         """ CGNAT maximum 1000 sessions per user should be created """
2395         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2396                                    32,
2397                                    socket.inet_aton(self.snat_addr),
2398                                    32)
2399         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2400         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2401                                                  is_inside=0)
2402         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2403                                      src_address=self.pg2.local_ip4n,
2404                                      path_mtu=512,
2405                                      template_interval=10)
2406         self.vapi.snat_ipfix()
2407
2408         pkts = []
2409         for port in range(1025, 2025):
2410             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2411                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2412                  UDP(sport=port, dport=port))
2413             pkts.append(p)
2414
2415         self.pg0.add_stream(pkts)
2416         self.pg_enable_capture(self.pg_interfaces)
2417         self.pg_start()
2418         capture = self.pg1.get_capture(len(pkts))
2419
2420         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2421              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2422              UDP(sport=3001, dport=3002))
2423         self.pg0.add_stream(p)
2424         self.pg_enable_capture(self.pg_interfaces)
2425         self.pg_start()
2426         capture = self.pg1.assert_nothing_captured()
2427
2428         # verify ICMP error packet
2429         capture = self.pg0.get_capture(1)
2430         p = capture[0]
2431         self.assertTrue(p.haslayer(ICMP))
2432         icmp = p[ICMP]
2433         self.assertEqual(icmp.type, 3)
2434         self.assertEqual(icmp.code, 1)
2435         self.assertTrue(icmp.haslayer(IPerror))
2436         inner_ip = icmp[IPerror]
2437         self.assertEqual(inner_ip[UDPerror].sport, 3001)
2438         self.assertEqual(inner_ip[UDPerror].dport, 3002)
2439
2440         dms = self.vapi.snat_det_map_dump()
2441
2442         self.assertEqual(1000, dms[0].ses_num)
2443
2444         # verify IPFIX logging
2445         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2446         capture = self.pg2.get_capture(2)
2447         ipfix = IPFIXDecoder()
2448         # first load template
2449         for p in capture:
2450             self.assertTrue(p.haslayer(IPFIX))
2451             if p.haslayer(Template):
2452                 ipfix.add_template(p.getlayer(Template))
2453         # verify events in data set
2454         for p in capture:
2455             if p.haslayer(Data):
2456                 data = ipfix.decode_data_set(p.getlayer(Set))
2457                 self.verify_ipfix_max_entries_per_user(data)
2458
2459     def clear_snat(self):
2460         """
2461         Clear SNAT configuration.
2462         """
2463         self.vapi.snat_ipfix(enable=0)
2464         self.vapi.snat_det_set_timeouts()
2465         deterministic_mappings = self.vapi.snat_det_map_dump()
2466         for dsm in deterministic_mappings:
2467             self.vapi.snat_add_det_map(dsm.in_addr,
2468                                        dsm.in_plen,
2469                                        dsm.out_addr,
2470                                        dsm.out_plen,
2471                                        is_add=0)
2472
2473         interfaces = self.vapi.snat_interface_dump()
2474         for intf in interfaces:
2475             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2476                                                      intf.is_inside,
2477                                                      is_add=0)
2478
2479     def tearDown(self):
2480         super(TestDeterministicNAT, self).tearDown()
2481         if not self.vpp_dead:
2482             self.logger.info(self.vapi.cli("show snat detail"))
2483             self.clear_snat()
2484
2485
2486 class TestNAT64(MethodHolder):
2487     """ NAT64 Test Cases """
2488
2489     @classmethod
2490     def setUpClass(cls):
2491         super(TestNAT64, cls).setUpClass()
2492
2493         try:
2494             cls.tcp_port_in = 6303
2495             cls.tcp_port_out = 6303
2496             cls.udp_port_in = 6304
2497             cls.udp_port_out = 6304
2498             cls.icmp_id_in = 6305
2499             cls.icmp_id_out = 6305
2500             cls.nat_addr = '10.0.0.3'
2501             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
2502             cls.vrf1_id = 10
2503             cls.vrf1_nat_addr = '10.0.10.3'
2504             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
2505                                                    cls.vrf1_nat_addr)
2506
2507             cls.create_pg_interfaces(range(3))
2508             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
2509             cls.ip6_interfaces.append(cls.pg_interfaces[2])
2510             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
2511
2512             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
2513
2514             cls.pg0.generate_remote_hosts(2)
2515
2516             for i in cls.ip6_interfaces:
2517                 i.admin_up()
2518                 i.config_ip6()
2519                 i.configure_ipv6_neighbors()
2520
2521             for i in cls.ip4_interfaces:
2522                 i.admin_up()
2523                 i.config_ip4()
2524                 i.resolve_arp()
2525
2526         except Exception:
2527             super(TestNAT64, cls).tearDownClass()
2528             raise
2529
2530     def test_pool(self):
2531         """ Add/delete address to NAT64 pool """
2532         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
2533
2534         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
2535
2536         addresses = self.vapi.nat64_pool_addr_dump()
2537         self.assertEqual(len(addresses), 1)
2538         self.assertEqual(addresses[0].address, nat_addr)
2539
2540         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
2541
2542         addresses = self.vapi.nat64_pool_addr_dump()
2543         self.assertEqual(len(addresses), 0)
2544
2545     def test_interface(self):
2546         """ Enable/disable NAT64 feature on the interface """
2547         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2548         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2549
2550         interfaces = self.vapi.nat64_interface_dump()
2551         self.assertEqual(len(interfaces), 2)
2552         pg0_found = False
2553         pg1_found = False
2554         for intf in interfaces:
2555             if intf.sw_if_index == self.pg0.sw_if_index:
2556                 self.assertEqual(intf.is_inside, 1)
2557                 pg0_found = True
2558             elif intf.sw_if_index == self.pg1.sw_if_index:
2559                 self.assertEqual(intf.is_inside, 0)
2560                 pg1_found = True
2561         self.assertTrue(pg0_found)
2562         self.assertTrue(pg1_found)
2563
2564         features = self.vapi.cli("show interface features pg0")
2565         self.assertNotEqual(features.find('nat64-in2out'), -1)
2566         features = self.vapi.cli("show interface features pg1")
2567         self.assertNotEqual(features.find('nat64-out2in'), -1)
2568
2569         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
2570         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
2571
2572         interfaces = self.vapi.nat64_interface_dump()
2573         self.assertEqual(len(interfaces), 0)
2574
2575     def test_static_bib(self):
2576         """ Add/delete static BIB entry """
2577         in_addr = socket.inet_pton(socket.AF_INET6,
2578                                    '2001:db8:85a3::8a2e:370:7334')
2579         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
2580         in_port = 1234
2581         out_port = 5678
2582         proto = IP_PROTOS.tcp
2583
2584         self.vapi.nat64_add_del_static_bib(in_addr,
2585                                            out_addr,
2586                                            in_port,
2587                                            out_port,
2588                                            proto)
2589         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2590         static_bib_num = 0
2591         for bibe in bib:
2592             if bibe.is_static:
2593                 static_bib_num += 1
2594                 self.assertEqual(bibe.i_addr, in_addr)
2595                 self.assertEqual(bibe.o_addr, out_addr)
2596                 self.assertEqual(bibe.i_port, in_port)
2597                 self.assertEqual(bibe.o_port, out_port)
2598         self.assertEqual(static_bib_num, 1)
2599
2600         self.vapi.nat64_add_del_static_bib(in_addr,
2601                                            out_addr,
2602                                            in_port,
2603                                            out_port,
2604                                            proto,
2605                                            is_add=0)
2606         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2607         static_bib_num = 0
2608         for bibe in bib:
2609             if bibe.is_static:
2610                 static_bib_num += 1
2611         self.assertEqual(static_bib_num, 0)
2612
2613     def test_set_timeouts(self):
2614         """ Set NAT64 timeouts """
2615         # verify default values
2616         timeouts = self.vapi.nat64_get_timeouts()
2617         self.assertEqual(timeouts.udp, 300)
2618         self.assertEqual(timeouts.icmp, 60)
2619         self.assertEqual(timeouts.tcp_trans, 240)
2620         self.assertEqual(timeouts.tcp_est, 7440)
2621         self.assertEqual(timeouts.tcp_incoming_syn, 6)
2622
2623         # set and verify custom values
2624         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
2625                                      tcp_est=7450, tcp_incoming_syn=10)
2626         timeouts = self.vapi.nat64_get_timeouts()
2627         self.assertEqual(timeouts.udp, 200)
2628         self.assertEqual(timeouts.icmp, 30)
2629         self.assertEqual(timeouts.tcp_trans, 250)
2630         self.assertEqual(timeouts.tcp_est, 7450)
2631         self.assertEqual(timeouts.tcp_incoming_syn, 10)
2632
2633     def test_dynamic(self):
2634         """ NAT64 dynamic translation test """
2635         self.tcp_port_in = 6303
2636         self.udp_port_in = 6304
2637         self.icmp_id_in = 6305
2638
2639         ses_num_start = self.nat64_get_ses_num()
2640
2641         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2642                                                 self.nat_addr_n)
2643         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2644         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2645
2646         # in2out
2647         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2648         self.pg0.add_stream(pkts)
2649         self.pg_enable_capture(self.pg_interfaces)
2650         self.pg_start()
2651         capture = self.pg1.get_capture(len(pkts))
2652         self.verify_capture_out(capture, nat_ip=self.nat_addr,
2653                                 dst_ip=self.pg1.remote_ip4)
2654
2655         # out2in
2656         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2657         self.pg1.add_stream(pkts)
2658         self.pg_enable_capture(self.pg_interfaces)
2659         self.pg_start()
2660         capture = self.pg0.get_capture(len(pkts))
2661         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2662         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2663
2664         # in2out
2665         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2666         self.pg0.add_stream(pkts)
2667         self.pg_enable_capture(self.pg_interfaces)
2668         self.pg_start()
2669         capture = self.pg1.get_capture(len(pkts))
2670         self.verify_capture_out(capture, nat_ip=self.nat_addr,
2671                                 dst_ip=self.pg1.remote_ip4)
2672
2673         # out2in
2674         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2675         self.pg1.add_stream(pkts)
2676         self.pg_enable_capture(self.pg_interfaces)
2677         self.pg_start()
2678         capture = self.pg0.get_capture(len(pkts))
2679         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2680
2681         ses_num_end = self.nat64_get_ses_num()
2682
2683         self.assertEqual(ses_num_end - ses_num_start, 3)
2684
2685         # tenant with specific VRF
2686         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
2687                                                 self.vrf1_nat_addr_n,
2688                                                 vrf_id=self.vrf1_id)
2689         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
2690
2691         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
2692         self.pg2.add_stream(pkts)
2693         self.pg_enable_capture(self.pg_interfaces)
2694         self.pg_start()
2695         capture = self.pg1.get_capture(len(pkts))
2696         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
2697                                 dst_ip=self.pg1.remote_ip4)
2698
2699         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
2700         self.pg1.add_stream(pkts)
2701         self.pg_enable_capture(self.pg_interfaces)
2702         self.pg_start()
2703         capture = self.pg2.get_capture(len(pkts))
2704         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
2705
2706     def test_static(self):
2707         """ NAT64 static translation test """
2708         self.tcp_port_in = 60303
2709         self.udp_port_in = 60304
2710         self.icmp_id_in = 60305
2711         self.tcp_port_out = 60303
2712         self.udp_port_out = 60304
2713         self.icmp_id_out = 60305
2714
2715         ses_num_start = self.nat64_get_ses_num()
2716
2717         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2718                                                 self.nat_addr_n)
2719         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2720         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2721
2722         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2723                                            self.nat_addr_n,
2724                                            self.tcp_port_in,
2725                                            self.tcp_port_out,
2726                                            IP_PROTOS.tcp)
2727         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2728                                            self.nat_addr_n,
2729                                            self.udp_port_in,
2730                                            self.udp_port_out,
2731                                            IP_PROTOS.udp)
2732         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2733                                            self.nat_addr_n,
2734                                            self.icmp_id_in,
2735                                            self.icmp_id_out,
2736                                            IP_PROTOS.icmp)
2737
2738         # in2out
2739         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2740         self.pg0.add_stream(pkts)
2741         self.pg_enable_capture(self.pg_interfaces)
2742         self.pg_start()
2743         capture = self.pg1.get_capture(len(pkts))
2744         self.verify_capture_out(capture, nat_ip=self.nat_addr,
2745                                 dst_ip=self.pg1.remote_ip4, same_port=True)
2746
2747         # out2in
2748         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2749         self.pg1.add_stream(pkts)
2750         self.pg_enable_capture(self.pg_interfaces)
2751         self.pg_start()
2752         capture = self.pg0.get_capture(len(pkts))
2753         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2754         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2755
2756         ses_num_end = self.nat64_get_ses_num()
2757
2758         self.assertEqual(ses_num_end - ses_num_start, 3)
2759
2760     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2761     def test_session_timeout(self):
2762         """ NAT64 session timeout """
2763         self.icmp_id_in = 1234
2764         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2765                                                 self.nat_addr_n)
2766         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2767         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2768         self.vapi.nat64_set_timeouts(icmp=5)
2769
2770         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2771         self.pg0.add_stream(pkts)
2772         self.pg_enable_capture(self.pg_interfaces)
2773         self.pg_start()
2774         capture = self.pg1.get_capture(len(pkts))
2775
2776         ses_num_before_timeout = self.nat64_get_ses_num()
2777
2778         sleep(15)
2779
2780         # ICMP session after timeout
2781         ses_num_after_timeout = self.nat64_get_ses_num()
2782         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
2783
2784     def test_icmp_error(self):
2785         """ NAT64 ICMP Error message translation """
2786         self.tcp_port_in = 6303
2787         self.udp_port_in = 6304
2788         self.icmp_id_in = 6305
2789
2790         ses_num_start = self.nat64_get_ses_num()
2791
2792         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2793                                                 self.nat_addr_n)
2794         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2795         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2796
2797         # send some packets to create sessions
2798         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2799         self.pg0.add_stream(pkts)
2800         self.pg_enable_capture(self.pg_interfaces)
2801         self.pg_start()
2802         capture_ip4 = self.pg1.get_capture(len(pkts))
2803         self.verify_capture_out(capture_ip4,
2804                                 nat_ip=self.nat_addr,
2805                                 dst_ip=self.pg1.remote_ip4)
2806
2807         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2808         self.pg1.add_stream(pkts)
2809         self.pg_enable_capture(self.pg_interfaces)
2810         self.pg_start()
2811         capture_ip6 = self.pg0.get_capture(len(pkts))
2812         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2813         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
2814                                    self.pg0.remote_ip6)
2815
2816         # in2out
2817         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2818                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
2819                 ICMPv6DestUnreach(code=1) /
2820                 packet[IPv6] for packet in capture_ip6]
2821         self.pg0.add_stream(pkts)
2822         self.pg_enable_capture(self.pg_interfaces)
2823         self.pg_start()
2824         capture = self.pg1.get_capture(len(pkts))
2825         for packet in capture:
2826             try:
2827                 self.assertEqual(packet[IP].src, self.nat_addr)
2828                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2829                 self.assertEqual(packet[ICMP].type, 3)
2830                 self.assertEqual(packet[ICMP].code, 13)
2831                 inner = packet[IPerror]
2832                 self.assertEqual(inner.src, self.pg1.remote_ip4)
2833                 self.assertEqual(inner.dst, self.nat_addr)
2834                 self.check_icmp_checksum(packet)
2835                 if inner.haslayer(TCPerror):
2836                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
2837                 elif inner.haslayer(UDPerror):
2838                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
2839                 else:
2840                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
2841             except:
2842                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2843                 raise
2844
2845         # out2in
2846         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2847                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2848                 ICMP(type=3, code=13) /
2849                 packet[IP] for packet in capture_ip4]
2850         self.pg1.add_stream(pkts)
2851         self.pg_enable_capture(self.pg_interfaces)
2852         self.pg_start()
2853         capture = self.pg0.get_capture(len(pkts))
2854         for packet in capture:
2855             try:
2856                 self.assertEqual(packet[IPv6].src, ip.src)
2857                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
2858                 icmp = packet[ICMPv6DestUnreach]
2859                 self.assertEqual(icmp.code, 1)
2860                 inner = icmp[IPerror6]
2861                 self.assertEqual(inner.src, self.pg0.remote_ip6)
2862                 self.assertEqual(inner.dst, ip.src)
2863                 self.check_icmpv6_checksum(packet)
2864                 if inner.haslayer(TCPerror):
2865                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
2866                 elif inner.haslayer(UDPerror):
2867                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
2868                 else:
2869                     self.assertEqual(inner[ICMPv6EchoRequest].id,
2870                                      self.icmp_id_in)
2871             except:
2872                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2873                 raise
2874
2875     def test_hairpinning(self):
2876         """ NAT64 hairpinning """
2877
2878         client = self.pg0.remote_hosts[0]
2879         server = self.pg0.remote_hosts[1]
2880         server_tcp_in_port = 22
2881         server_tcp_out_port = 4022
2882         server_udp_in_port = 23
2883         server_udp_out_port = 4023
2884         client_tcp_in_port = 1234
2885         client_udp_in_port = 1235
2886         client_tcp_out_port = 0
2887         client_udp_out_port = 0
2888         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
2889         nat_addr_ip6 = ip.src
2890
2891         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2892                                                 self.nat_addr_n)
2893         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2894         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2895
2896         self.vapi.nat64_add_del_static_bib(server.ip6n,
2897                                            self.nat_addr_n,
2898                                            server_tcp_in_port,
2899                                            server_tcp_out_port,
2900                                            IP_PROTOS.tcp)
2901         self.vapi.nat64_add_del_static_bib(server.ip6n,
2902                                            self.nat_addr_n,
2903                                            server_udp_in_port,
2904                                            server_udp_out_port,
2905                                            IP_PROTOS.udp)
2906
2907         # client to server
2908         pkts = []
2909         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2910              IPv6(src=client.ip6, dst=nat_addr_ip6) /
2911              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
2912         pkts.append(p)
2913         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2914              IPv6(src=client.ip6, dst=nat_addr_ip6) /
2915              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
2916         pkts.append(p)
2917         self.pg0.add_stream(pkts)
2918         self.pg_enable_capture(self.pg_interfaces)
2919         self.pg_start()
2920         capture = self.pg0.get_capture(len(pkts))
2921         for packet in capture:
2922             try:
2923                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
2924                 self.assertEqual(packet[IPv6].dst, server.ip6)
2925                 if packet.haslayer(TCP):
2926                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
2927                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
2928                     self.check_tcp_checksum(packet)
2929                     client_tcp_out_port = packet[TCP].sport
2930                 else:
2931                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
2932                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
2933                     self.check_udp_checksum(packet)
2934                     client_udp_out_port = packet[UDP].sport
2935             except:
2936                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2937                 raise
2938
2939         # server to client
2940         pkts = []
2941         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2942              IPv6(src=server.ip6, dst=nat_addr_ip6) /
2943              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
2944         pkts.append(p)
2945         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2946              IPv6(src=server.ip6, dst=nat_addr_ip6) /
2947              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
2948         pkts.append(p)
2949         self.pg0.add_stream(pkts)
2950         self.pg_enable_capture(self.pg_interfaces)
2951         self.pg_start()
2952         capture = self.pg0.get_capture(len(pkts))
2953         for packet in capture:
2954             try:
2955                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
2956                 self.assertEqual(packet[IPv6].dst, client.ip6)
2957                 if packet.haslayer(TCP):
2958                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
2959                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
2960                     self.check_tcp_checksum(packet)
2961                 else:
2962                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
2963                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
2964                     self.check_udp_checksum(packet)
2965             except:
2966                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2967                 raise
2968
2969         # ICMP error
2970         pkts = []
2971         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2972                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
2973                 ICMPv6DestUnreach(code=1) /
2974                 packet[IPv6] for packet in capture]
2975         self.pg0.add_stream(pkts)
2976         self.pg_enable_capture(self.pg_interfaces)
2977         self.pg_start()
2978         capture = self.pg0.get_capture(len(pkts))
2979         for packet in capture:
2980             try:
2981                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
2982                 self.assertEqual(packet[IPv6].dst, server.ip6)
2983                 icmp = packet[ICMPv6DestUnreach]
2984                 self.assertEqual(icmp.code, 1)
2985                 inner = icmp[IPerror6]
2986                 self.assertEqual(inner.src, server.ip6)
2987                 self.assertEqual(inner.dst, nat_addr_ip6)
2988                 self.check_icmpv6_checksum(packet)
2989                 if inner.haslayer(TCPerror):
2990                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
2991                     self.assertEqual(inner[TCPerror].dport,
2992                                      client_tcp_out_port)
2993                 else:
2994                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
2995                     self.assertEqual(inner[UDPerror].dport,
2996                                      client_udp_out_port)
2997             except:
2998                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2999                 raise
3000
3001     def nat64_get_ses_num(self):
3002         """
3003         Return number of active NAT64 sessions.
3004         """
3005         ses_num = 0
3006         st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
3007         ses_num += len(st)
3008         st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
3009         ses_num += len(st)
3010         st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
3011         ses_num += len(st)
3012         return ses_num
3013
3014     def clear_nat64(self):
3015         """
3016         Clear NAT64 configuration.
3017         """
3018         self.vapi.nat64_set_timeouts()
3019
3020         interfaces = self.vapi.nat64_interface_dump()
3021         for intf in interfaces:
3022             self.vapi.nat64_add_del_interface(intf.sw_if_index,
3023                                               intf.is_inside,
3024                                               is_add=0)
3025
3026         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3027         for bibe in bib:
3028             if bibe.is_static:
3029                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3030                                                    bibe.o_addr,
3031                                                    bibe.i_port,
3032                                                    bibe.o_port,
3033                                                    bibe.proto,
3034                                                    bibe.vrf_id,
3035                                                    is_add=0)
3036
3037         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3038         for bibe in bib:
3039             if bibe.is_static:
3040                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3041                                                    bibe.o_addr,
3042                                                    bibe.i_port,
3043                                                    bibe.o_port,
3044                                                    bibe.proto,
3045                                                    bibe.vrf_id,
3046                                                    is_add=0)
3047
3048         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3049         for bibe in bib:
3050             if bibe.is_static:
3051                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3052                                                    bibe.o_addr,
3053                                                    bibe.i_port,
3054                                                    bibe.o_port,
3055                                                    bibe.proto,
3056                                                    bibe.vrf_id,
3057                                                    is_add=0)
3058
3059         adresses = self.vapi.nat64_pool_addr_dump()
3060         for addr in adresses:
3061             self.vapi.nat64_add_del_pool_addr_range(addr.address,
3062                                                     addr.address,
3063                                                     vrf_id=addr.vrf_id,
3064                                                     is_add=0)
3065
3066     def tearDown(self):
3067         super(TestNAT64, self).tearDown()
3068         if not self.vpp_dead:
3069             self.logger.info(self.vapi.cli("show nat64 pool"))
3070             self.logger.info(self.vapi.cli("show nat64 interfaces"))
3071             self.logger.info(self.vapi.cli("show nat64 bib tcp"))
3072             self.logger.info(self.vapi.cli("show nat64 bib udp"))
3073             self.logger.info(self.vapi.cli("show nat64 bib icmp"))
3074             self.logger.info(self.vapi.cli("show nat64 session table tcp"))
3075             self.logger.info(self.vapi.cli("show nat64 session table udp"))
3076             self.logger.info(self.vapi.cli("show nat64 session table icmp"))
3077             self.clear_nat64()
3078
3079 if __name__ == '__main__':
3080     unittest.main(testRunner=VppTestRunner)