e148fbae56c0ae6b95a061089149e0f502c0545c
[vpp.git] / test / test_snat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
15 from util import ppp
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
18
19
20 class MethodHolder(VppTestCase):
21     """ SNAT create capture and verify method holder """
22
23     @classmethod
24     def setUpClass(cls):
25         super(MethodHolder, cls).setUpClass()
26
27     def tearDown(self):
28         super(MethodHolder, self).tearDown()
29
30     def check_ip_checksum(self, pkt):
31         """
32         Check IP checksum of the packet
33
34         :param pkt: Packet to check IP checksum
35         """
36         new = pkt.__class__(str(pkt))
37         del new['IP'].chksum
38         new = new.__class__(str(new))
39         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
40
41     def check_tcp_checksum(self, pkt):
42         """
43         Check TCP checksum in IP packet
44
45         :param pkt: Packet to check TCP checksum
46         """
47         new = pkt.__class__(str(pkt))
48         del new['TCP'].chksum
49         new = new.__class__(str(new))
50         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
51
52     def check_udp_checksum(self, pkt):
53         """
54         Check UDP checksum in IP packet
55
56         :param pkt: Packet to check UDP checksum
57         """
58         new = pkt.__class__(str(pkt))
59         del new['UDP'].chksum
60         new = new.__class__(str(new))
61         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
62
63     def check_icmp_errror_embedded(self, pkt):
64         """
65         Check ICMP error embeded packet checksum
66
67         :param pkt: Packet to check ICMP error embeded packet checksum
68         """
69         if pkt.haslayer(IPerror):
70             new = pkt.__class__(str(pkt))
71             del new['IPerror'].chksum
72             new = new.__class__(str(new))
73             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
74
75         if pkt.haslayer(TCPerror):
76             new = pkt.__class__(str(pkt))
77             del new['TCPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
80
81         if pkt.haslayer(UDPerror):
82             if pkt['UDPerror'].chksum != 0:
83                 new = pkt.__class__(str(pkt))
84                 del new['UDPerror'].chksum
85                 new = new.__class__(str(new))
86                 self.assertEqual(new['UDPerror'].chksum,
87                                  pkt['UDPerror'].chksum)
88
89         if pkt.haslayer(ICMPerror):
90             del new['ICMPerror'].chksum
91             new = new.__class__(str(new))
92             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
93
94     def check_icmp_checksum(self, pkt):
95         """
96         Check ICMP checksum in IPv4 packet
97
98         :param pkt: Packet to check ICMP checksum
99         """
100         new = pkt.__class__(str(pkt))
101         del new['ICMP'].chksum
102         new = new.__class__(str(new))
103         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
104         if pkt.haslayer(IPerror):
105             self.check_icmp_errror_embedded(pkt)
106
107     def check_icmpv6_checksum(self, pkt):
108         """
109         Check ICMPv6 checksum in IPv4 packet
110
111         :param pkt: Packet to check ICMPv6 checksum
112         """
113         new = pkt.__class__(str(pkt))
114         if pkt.haslayer(ICMPv6DestUnreach):
115             del new['ICMPv6DestUnreach'].cksum
116             new = new.__class__(str(new))
117             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
118                              pkt['ICMPv6DestUnreach'].cksum)
119             self.check_icmp_errror_embedded(pkt)
120         if pkt.haslayer(ICMPv6EchoRequest):
121             del new['ICMPv6EchoRequest'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
124                              pkt['ICMPv6EchoRequest'].cksum)
125         if pkt.haslayer(ICMPv6EchoReply):
126             del new['ICMPv6EchoReply'].cksum
127             new = new.__class__(str(new))
128             self.assertEqual(new['ICMPv6EchoReply'].cksum,
129                              pkt['ICMPv6EchoReply'].cksum)
130
131     def create_stream_in(self, in_if, out_if, ttl=64):
132         """
133         Create packet stream for inside network
134
135         :param in_if: Inside interface
136         :param out_if: Outside interface
137         :param ttl: TTL of generated packets
138         """
139         pkts = []
140         # TCP
141         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
142              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
143              TCP(sport=self.tcp_port_in, dport=20))
144         pkts.append(p)
145
146         # UDP
147         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149              UDP(sport=self.udp_port_in, dport=20))
150         pkts.append(p)
151
152         # ICMP
153         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155              ICMP(id=self.icmp_id_in, type='echo-request'))
156         pkts.append(p)
157
158         return pkts
159
160     def create_stream_in_ip6(self, in_if, out_if, hlim=64):
161         """
162         Create IPv6 packet stream for inside network
163
164         :param in_if: Inside interface
165         :param out_if: Outside interface
166         :param ttl: Hop Limit of generated packets
167         """
168         pkts = []
169         dst = ''.join(['64:ff9b::', out_if.remote_ip4])
170         # TCP
171         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
172              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
173              TCP(sport=self.tcp_port_in, dport=20))
174         pkts.append(p)
175
176         # UDP
177         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
178              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
179              UDP(sport=self.udp_port_in, dport=20))
180         pkts.append(p)
181
182         # ICMP
183         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
184              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
185              ICMPv6EchoRequest(id=self.icmp_id_in))
186         pkts.append(p)
187
188         return pkts
189
190     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
191         """
192         Create packet stream for outside network
193
194         :param out_if: Outside interface
195         :param dst_ip: Destination IP address (Default use global SNAT address)
196         :param ttl: TTL of generated packets
197         """
198         if dst_ip is None:
199             dst_ip = self.snat_addr
200         pkts = []
201         # TCP
202         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
203              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
204              TCP(dport=self.tcp_port_out, sport=20))
205         pkts.append(p)
206
207         # UDP
208         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
209              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
210              UDP(dport=self.udp_port_out, sport=20))
211         pkts.append(p)
212
213         # ICMP
214         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
215              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
216              ICMP(id=self.icmp_id_out, type='echo-reply'))
217         pkts.append(p)
218
219         return pkts
220
221     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
222                            packet_num=3, dst_ip=None):
223         """
224         Verify captured packets on outside network
225
226         :param capture: Captured packets
227         :param nat_ip: Translated IP address (Default use global SNAT address)
228         :param same_port: Sorce port number is not translated (Default False)
229         :param packet_num: Expected number of packets (Default 3)
230         :param dst_ip: Destination IP address (Default do not verify)
231         """
232         if nat_ip is None:
233             nat_ip = self.snat_addr
234         self.assertEqual(packet_num, len(capture))
235         for packet in capture:
236             try:
237                 self.check_ip_checksum(packet)
238                 self.assertEqual(packet[IP].src, nat_ip)
239                 if dst_ip is not None:
240                     self.assertEqual(packet[IP].dst, dst_ip)
241                 if packet.haslayer(TCP):
242                     if same_port:
243                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
244                     else:
245                         self.assertNotEqual(
246                             packet[TCP].sport, self.tcp_port_in)
247                     self.tcp_port_out = packet[TCP].sport
248                     self.check_tcp_checksum(packet)
249                 elif packet.haslayer(UDP):
250                     if same_port:
251                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
252                     else:
253                         self.assertNotEqual(
254                             packet[UDP].sport, self.udp_port_in)
255                     self.udp_port_out = packet[UDP].sport
256                 else:
257                     if same_port:
258                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
259                     else:
260                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
261                     self.icmp_id_out = packet[ICMP].id
262                     self.check_icmp_checksum(packet)
263             except:
264                 self.logger.error(ppp("Unexpected or invalid packet "
265                                       "(outside network):", packet))
266                 raise
267
268     def verify_capture_in(self, capture, in_if, packet_num=3):
269         """
270         Verify captured packets on inside network
271
272         :param capture: Captured packets
273         :param in_if: Inside interface
274         :param packet_num: Expected number of packets (Default 3)
275         """
276         self.assertEqual(packet_num, len(capture))
277         for packet in capture:
278             try:
279                 self.check_ip_checksum(packet)
280                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
281                 if packet.haslayer(TCP):
282                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
283                     self.check_tcp_checksum(packet)
284                 elif packet.haslayer(UDP):
285                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
286                 else:
287                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
288                     self.check_icmp_checksum(packet)
289             except:
290                 self.logger.error(ppp("Unexpected or invalid packet "
291                                       "(inside network):", packet))
292                 raise
293
294     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
295         """
296         Verify captured IPv6 packets on inside network
297
298         :param capture: Captured packets
299         :param src_ip: Source IP
300         :param dst_ip: Destination IP address
301         :param packet_num: Expected number of packets (Default 3)
302         """
303         self.assertEqual(packet_num, len(capture))
304         for packet in capture:
305             try:
306                 self.assertEqual(packet[IPv6].src, src_ip)
307                 self.assertEqual(packet[IPv6].dst, dst_ip)
308                 if packet.haslayer(TCP):
309                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
310                     self.check_tcp_checksum(packet)
311                 elif packet.haslayer(UDP):
312                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
313                     self.check_udp_checksum(packet)
314                 else:
315                     self.assertEqual(packet[ICMPv6EchoReply].id,
316                                      self.icmp_id_in)
317                     self.check_icmpv6_checksum(packet)
318             except:
319                 self.logger.error(ppp("Unexpected or invalid packet "
320                                       "(inside network):", packet))
321                 raise
322
323     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
324         """
325         Verify captured packet that don't have to be translated
326
327         :param capture: Captured packets
328         :param ingress_if: Ingress interface
329         :param egress_if: Egress interface
330         """
331         for packet in capture:
332             try:
333                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
334                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
335                 if packet.haslayer(TCP):
336                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
337                 elif packet.haslayer(UDP):
338                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
339                 else:
340                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
341             except:
342                 self.logger.error(ppp("Unexpected or invalid packet "
343                                       "(inside network):", packet))
344                 raise
345
346     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
347                                             packet_num=3, icmp_type=11):
348         """
349         Verify captured packets with ICMP errors on outside network
350
351         :param capture: Captured packets
352         :param src_ip: Translated IP address or IP address of VPP
353                        (Default use global SNAT address)
354         :param packet_num: Expected number of packets (Default 3)
355         :param icmp_type: Type of error ICMP packet
356                           we are expecting (Default 11)
357         """
358         if src_ip is None:
359             src_ip = self.snat_addr
360         self.assertEqual(packet_num, len(capture))
361         for packet in capture:
362             try:
363                 self.assertEqual(packet[IP].src, src_ip)
364                 self.assertTrue(packet.haslayer(ICMP))
365                 icmp = packet[ICMP]
366                 self.assertEqual(icmp.type, icmp_type)
367                 self.assertTrue(icmp.haslayer(IPerror))
368                 inner_ip = icmp[IPerror]
369                 if inner_ip.haslayer(TCPerror):
370                     self.assertEqual(inner_ip[TCPerror].dport,
371                                      self.tcp_port_out)
372                 elif inner_ip.haslayer(UDPerror):
373                     self.assertEqual(inner_ip[UDPerror].dport,
374                                      self.udp_port_out)
375                 else:
376                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
377             except:
378                 self.logger.error(ppp("Unexpected or invalid packet "
379                                       "(outside network):", packet))
380                 raise
381
382     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
383                                            icmp_type=11):
384         """
385         Verify captured packets with ICMP errors on inside network
386
387         :param capture: Captured packets
388         :param in_if: Inside interface
389         :param packet_num: Expected number of packets (Default 3)
390         :param icmp_type: Type of error ICMP packet
391                           we are expecting (Default 11)
392         """
393         self.assertEqual(packet_num, len(capture))
394         for packet in capture:
395             try:
396                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
397                 self.assertTrue(packet.haslayer(ICMP))
398                 icmp = packet[ICMP]
399                 self.assertEqual(icmp.type, icmp_type)
400                 self.assertTrue(icmp.haslayer(IPerror))
401                 inner_ip = icmp[IPerror]
402                 if inner_ip.haslayer(TCPerror):
403                     self.assertEqual(inner_ip[TCPerror].sport,
404                                      self.tcp_port_in)
405                 elif inner_ip.haslayer(UDPerror):
406                     self.assertEqual(inner_ip[UDPerror].sport,
407                                      self.udp_port_in)
408                 else:
409                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
410             except:
411                 self.logger.error(ppp("Unexpected or invalid packet "
412                                       "(inside network):", packet))
413                 raise
414
415     def verify_ipfix_nat44_ses(self, data):
416         """
417         Verify IPFIX NAT44 session create/delete event
418
419         :param data: Decoded IPFIX data records
420         """
421         nat44_ses_create_num = 0
422         nat44_ses_delete_num = 0
423         self.assertEqual(6, len(data))
424         for record in data:
425             # natEvent
426             self.assertIn(ord(record[230]), [4, 5])
427             if ord(record[230]) == 4:
428                 nat44_ses_create_num += 1
429             else:
430                 nat44_ses_delete_num += 1
431             # sourceIPv4Address
432             self.assertEqual(self.pg0.remote_ip4n, record[8])
433             # postNATSourceIPv4Address
434             self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
435                              record[225])
436             # ingressVRFID
437             self.assertEqual(struct.pack("!I", 0), record[234])
438             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
439             if IP_PROTOS.icmp == ord(record[4]):
440                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
441                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
442                                  record[227])
443             elif IP_PROTOS.tcp == ord(record[4]):
444                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
445                                  record[7])
446                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
447                                  record[227])
448             elif IP_PROTOS.udp == ord(record[4]):
449                 self.assertEqual(struct.pack("!H", self.udp_port_in),
450                                  record[7])
451                 self.assertEqual(struct.pack("!H", self.udp_port_out),
452                                  record[227])
453             else:
454                 self.fail("Invalid protocol")
455         self.assertEqual(3, nat44_ses_create_num)
456         self.assertEqual(3, nat44_ses_delete_num)
457
458     def verify_ipfix_addr_exhausted(self, data):
459         """
460         Verify IPFIX NAT addresses event
461
462         :param data: Decoded IPFIX data records
463         """
464         self.assertEqual(1, len(data))
465         record = data[0]
466         # natEvent
467         self.assertEqual(ord(record[230]), 3)
468         # natPoolID
469         self.assertEqual(struct.pack("!I", 0), record[283])
470
471
472 class TestSNAT(MethodHolder):
473     """ SNAT Test Cases """
474
475     @classmethod
476     def setUpClass(cls):
477         super(TestSNAT, cls).setUpClass()
478
479         try:
480             cls.tcp_port_in = 6303
481             cls.tcp_port_out = 6303
482             cls.udp_port_in = 6304
483             cls.udp_port_out = 6304
484             cls.icmp_id_in = 6305
485             cls.icmp_id_out = 6305
486             cls.snat_addr = '10.0.0.3'
487             cls.ipfix_src_port = 4739
488             cls.ipfix_domain_id = 1
489
490             cls.create_pg_interfaces(range(9))
491             cls.interfaces = list(cls.pg_interfaces[0:4])
492
493             for i in cls.interfaces:
494                 i.admin_up()
495                 i.config_ip4()
496                 i.resolve_arp()
497
498             cls.pg0.generate_remote_hosts(3)
499             cls.pg0.configure_ipv4_neighbors()
500
501             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
502
503             cls.pg4._local_ip4 = "172.16.255.1"
504             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
505             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
506             cls.pg4.set_table_ip4(10)
507             cls.pg5._local_ip4 = "172.16.255.3"
508             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
509             cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
510             cls.pg5.set_table_ip4(10)
511             cls.pg6._local_ip4 = "172.16.255.1"
512             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
513             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
514             cls.pg6.set_table_ip4(20)
515             for i in cls.overlapping_interfaces:
516                 i.config_ip4()
517                 i.admin_up()
518                 i.resolve_arp()
519
520             cls.pg7.admin_up()
521             cls.pg8.admin_up()
522
523         except Exception:
524             super(TestSNAT, cls).tearDownClass()
525             raise
526
527     def clear_snat(self):
528         """
529         Clear SNAT configuration.
530         """
531         # I found no elegant way to do this
532         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
533                                    dst_address_length=32,
534                                    next_hop_address=self.pg7.remote_ip4n,
535                                    next_hop_sw_if_index=self.pg7.sw_if_index,
536                                    is_add=0)
537         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
538                                    dst_address_length=32,
539                                    next_hop_address=self.pg8.remote_ip4n,
540                                    next_hop_sw_if_index=self.pg8.sw_if_index,
541                                    is_add=0)
542
543         for intf in [self.pg7, self.pg8]:
544             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
545             for n in neighbors:
546                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
547                                               n.mac_address,
548                                               n.ip_address,
549                                               is_add=0)
550
551         if self.pg7.has_ip4_config:
552             self.pg7.unconfig_ip4()
553
554         interfaces = self.vapi.snat_interface_addr_dump()
555         for intf in interfaces:
556             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
557
558         self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
559                              domain_id=self.ipfix_domain_id)
560         self.ipfix_src_port = 4739
561         self.ipfix_domain_id = 1
562
563         interfaces = self.vapi.snat_interface_dump()
564         for intf in interfaces:
565             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
566                                                      intf.is_inside,
567                                                      is_add=0)
568
569         static_mappings = self.vapi.snat_static_mapping_dump()
570         for sm in static_mappings:
571             self.vapi.snat_add_static_mapping(sm.local_ip_address,
572                                               sm.external_ip_address,
573                                               local_port=sm.local_port,
574                                               external_port=sm.external_port,
575                                               addr_only=sm.addr_only,
576                                               vrf_id=sm.vrf_id,
577                                               protocol=sm.protocol,
578                                               is_add=0)
579
580         adresses = self.vapi.snat_address_dump()
581         for addr in adresses:
582             self.vapi.snat_add_address_range(addr.ip_address,
583                                              addr.ip_address,
584                                              is_add=0)
585
586     def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
587                                 local_port=0, external_port=0, vrf_id=0,
588                                 is_add=1, external_sw_if_index=0xFFFFFFFF,
589                                 proto=0):
590         """
591         Add/delete S-NAT static mapping
592
593         :param local_ip: Local IP address
594         :param external_ip: External IP address
595         :param local_port: Local port number (Optional)
596         :param external_port: External port number (Optional)
597         :param vrf_id: VRF ID (Default 0)
598         :param is_add: 1 if add, 0 if delete (Default add)
599         :param external_sw_if_index: External interface instead of IP address
600         :param proto: IP protocol (Mandatory if port specified)
601         """
602         addr_only = 1
603         if local_port and external_port:
604             addr_only = 0
605         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
606         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
607         self.vapi.snat_add_static_mapping(
608             l_ip,
609             e_ip,
610             external_sw_if_index,
611             local_port,
612             external_port,
613             addr_only,
614             vrf_id,
615             proto,
616             is_add)
617
618     def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
619         """
620         Add/delete S-NAT address
621
622         :param ip: IP address
623         :param is_add: 1 if add, 0 if delete (Default add)
624         """
625         snat_addr = socket.inet_pton(socket.AF_INET, ip)
626         self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
627                                          vrf_id=vrf_id)
628
629     def test_dynamic(self):
630         """ SNAT dynamic translation test """
631
632         self.snat_add_address(self.snat_addr)
633         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
634         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
635                                                  is_inside=0)
636
637         # in2out
638         pkts = self.create_stream_in(self.pg0, self.pg1)
639         self.pg0.add_stream(pkts)
640         self.pg_enable_capture(self.pg_interfaces)
641         self.pg_start()
642         capture = self.pg1.get_capture(len(pkts))
643         self.verify_capture_out(capture)
644
645         # out2in
646         pkts = self.create_stream_out(self.pg1)
647         self.pg1.add_stream(pkts)
648         self.pg_enable_capture(self.pg_interfaces)
649         self.pg_start()
650         capture = self.pg0.get_capture(len(pkts))
651         self.verify_capture_in(capture, self.pg0)
652
653     def test_dynamic_icmp_errors_in2out_ttl_1(self):
654         """ SNAT handling of client packets with TTL=1 """
655
656         self.snat_add_address(self.snat_addr)
657         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
658         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
659                                                  is_inside=0)
660
661         # Client side - generate traffic
662         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
663         self.pg0.add_stream(pkts)
664         self.pg_enable_capture(self.pg_interfaces)
665         self.pg_start()
666
667         # Client side - verify ICMP type 11 packets
668         capture = self.pg0.get_capture(len(pkts))
669         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
670
671     def test_dynamic_icmp_errors_out2in_ttl_1(self):
672         """ SNAT handling of server packets with TTL=1 """
673
674         self.snat_add_address(self.snat_addr)
675         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
676         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
677                                                  is_inside=0)
678
679         # Client side - create sessions
680         pkts = self.create_stream_in(self.pg0, self.pg1)
681         self.pg0.add_stream(pkts)
682         self.pg_enable_capture(self.pg_interfaces)
683         self.pg_start()
684
685         # Server side - generate traffic
686         capture = self.pg1.get_capture(len(pkts))
687         self.verify_capture_out(capture)
688         pkts = self.create_stream_out(self.pg1, ttl=1)
689         self.pg1.add_stream(pkts)
690         self.pg_enable_capture(self.pg_interfaces)
691         self.pg_start()
692
693         # Server side - verify ICMP type 11 packets
694         capture = self.pg1.get_capture(len(pkts))
695         self.verify_capture_out_with_icmp_errors(capture,
696                                                  src_ip=self.pg1.local_ip4)
697
698     def test_dynamic_icmp_errors_in2out_ttl_2(self):
699         """ SNAT handling of error responses to client packets with TTL=2 """
700
701         self.snat_add_address(self.snat_addr)
702         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
703         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
704                                                  is_inside=0)
705
706         # Client side - generate traffic
707         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
708         self.pg0.add_stream(pkts)
709         self.pg_enable_capture(self.pg_interfaces)
710         self.pg_start()
711
712         # Server side - simulate ICMP type 11 response
713         capture = self.pg1.get_capture(len(pkts))
714         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
715                 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
716                 ICMP(type=11) / packet[IP] for packet in capture]
717         self.pg1.add_stream(pkts)
718         self.pg_enable_capture(self.pg_interfaces)
719         self.pg_start()
720
721         # Client side - verify ICMP type 11 packets
722         capture = self.pg0.get_capture(len(pkts))
723         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
724
725     def test_dynamic_icmp_errors_out2in_ttl_2(self):
726         """ SNAT handling of error responses to server packets with TTL=2 """
727
728         self.snat_add_address(self.snat_addr)
729         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
730         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
731                                                  is_inside=0)
732
733         # Client side - create sessions
734         pkts = self.create_stream_in(self.pg0, self.pg1)
735         self.pg0.add_stream(pkts)
736         self.pg_enable_capture(self.pg_interfaces)
737         self.pg_start()
738
739         # Server side - generate traffic
740         capture = self.pg1.get_capture(len(pkts))
741         self.verify_capture_out(capture)
742         pkts = self.create_stream_out(self.pg1, ttl=2)
743         self.pg1.add_stream(pkts)
744         self.pg_enable_capture(self.pg_interfaces)
745         self.pg_start()
746
747         # Client side - simulate ICMP type 11 response
748         capture = self.pg0.get_capture(len(pkts))
749         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
750                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
751                 ICMP(type=11) / packet[IP] for packet in capture]
752         self.pg0.add_stream(pkts)
753         self.pg_enable_capture(self.pg_interfaces)
754         self.pg_start()
755
756         # Server side - verify ICMP type 11 packets
757         capture = self.pg1.get_capture(len(pkts))
758         self.verify_capture_out_with_icmp_errors(capture)
759
760     def test_ping_out_interface_from_outside(self):
761         """ Ping SNAT out interface from outside network """
762
763         self.snat_add_address(self.snat_addr)
764         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
765         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
766                                                  is_inside=0)
767
768         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
769              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
770              ICMP(id=self.icmp_id_out, type='echo-request'))
771         pkts = [p]
772         self.pg1.add_stream(pkts)
773         self.pg_enable_capture(self.pg_interfaces)
774         self.pg_start()
775         capture = self.pg1.get_capture(len(pkts))
776         self.assertEqual(1, len(capture))
777         packet = capture[0]
778         try:
779             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
780             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
781             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
782             self.assertEqual(packet[ICMP].type, 0)  # echo reply
783         except:
784             self.logger.error(ppp("Unexpected or invalid packet "
785                                   "(outside network):", packet))
786             raise
787
788     def test_ping_internal_host_from_outside(self):
789         """ Ping internal host from outside network """
790
791         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
792         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
793         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
794                                                  is_inside=0)
795
796         # out2in
797         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
798                IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
799                ICMP(id=self.icmp_id_out, type='echo-request'))
800         self.pg1.add_stream(pkt)
801         self.pg_enable_capture(self.pg_interfaces)
802         self.pg_start()
803         capture = self.pg0.get_capture(1)
804         self.verify_capture_in(capture, self.pg0, packet_num=1)
805         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
806
807         # in2out
808         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
809                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
810                ICMP(id=self.icmp_id_in, type='echo-reply'))
811         self.pg0.add_stream(pkt)
812         self.pg_enable_capture(self.pg_interfaces)
813         self.pg_start()
814         capture = self.pg1.get_capture(1)
815         self.verify_capture_out(capture, same_port=True, packet_num=1)
816         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
817
818     def test_static_in(self):
819         """ SNAT 1:1 NAT initialized from inside network """
820
821         nat_ip = "10.0.0.10"
822         self.tcp_port_out = 6303
823         self.udp_port_out = 6304
824         self.icmp_id_out = 6305
825
826         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
827         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
828         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
829                                                  is_inside=0)
830
831         # in2out
832         pkts = self.create_stream_in(self.pg0, self.pg1)
833         self.pg0.add_stream(pkts)
834         self.pg_enable_capture(self.pg_interfaces)
835         self.pg_start()
836         capture = self.pg1.get_capture(len(pkts))
837         self.verify_capture_out(capture, nat_ip, True)
838
839         # out2in
840         pkts = self.create_stream_out(self.pg1, nat_ip)
841         self.pg1.add_stream(pkts)
842         self.pg_enable_capture(self.pg_interfaces)
843         self.pg_start()
844         capture = self.pg0.get_capture(len(pkts))
845         self.verify_capture_in(capture, self.pg0)
846
847     def test_static_out(self):
848         """ SNAT 1:1 NAT initialized from outside network """
849
850         nat_ip = "10.0.0.20"
851         self.tcp_port_out = 6303
852         self.udp_port_out = 6304
853         self.icmp_id_out = 6305
854
855         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
856         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
857         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
858                                                  is_inside=0)
859
860         # out2in
861         pkts = self.create_stream_out(self.pg1, nat_ip)
862         self.pg1.add_stream(pkts)
863         self.pg_enable_capture(self.pg_interfaces)
864         self.pg_start()
865         capture = self.pg0.get_capture(len(pkts))
866         self.verify_capture_in(capture, self.pg0)
867
868         # in2out
869         pkts = self.create_stream_in(self.pg0, self.pg1)
870         self.pg0.add_stream(pkts)
871         self.pg_enable_capture(self.pg_interfaces)
872         self.pg_start()
873         capture = self.pg1.get_capture(len(pkts))
874         self.verify_capture_out(capture, nat_ip, True)
875
876     def test_static_with_port_in(self):
877         """ SNAT 1:1 NAT with port initialized from inside network """
878
879         self.tcp_port_out = 3606
880         self.udp_port_out = 3607
881         self.icmp_id_out = 3608
882
883         self.snat_add_address(self.snat_addr)
884         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
885                                      self.tcp_port_in, self.tcp_port_out,
886                                      proto=IP_PROTOS.tcp)
887         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
888                                      self.udp_port_in, self.udp_port_out,
889                                      proto=IP_PROTOS.udp)
890         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
891                                      self.icmp_id_in, self.icmp_id_out,
892                                      proto=IP_PROTOS.icmp)
893         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
894         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
895                                                  is_inside=0)
896
897         # in2out
898         pkts = self.create_stream_in(self.pg0, self.pg1)
899         self.pg0.add_stream(pkts)
900         self.pg_enable_capture(self.pg_interfaces)
901         self.pg_start()
902         capture = self.pg1.get_capture(len(pkts))
903         self.verify_capture_out(capture)
904
905         # out2in
906         pkts = self.create_stream_out(self.pg1)
907         self.pg1.add_stream(pkts)
908         self.pg_enable_capture(self.pg_interfaces)
909         self.pg_start()
910         capture = self.pg0.get_capture(len(pkts))
911         self.verify_capture_in(capture, self.pg0)
912
913     def test_static_with_port_out(self):
914         """ SNAT 1:1 NAT with port initialized from outside network """
915
916         self.tcp_port_out = 30606
917         self.udp_port_out = 30607
918         self.icmp_id_out = 30608
919
920         self.snat_add_address(self.snat_addr)
921         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
922                                      self.tcp_port_in, self.tcp_port_out,
923                                      proto=IP_PROTOS.tcp)
924         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
925                                      self.udp_port_in, self.udp_port_out,
926                                      proto=IP_PROTOS.udp)
927         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
928                                      self.icmp_id_in, self.icmp_id_out,
929                                      proto=IP_PROTOS.icmp)
930         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
931         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
932                                                  is_inside=0)
933
934         # out2in
935         pkts = self.create_stream_out(self.pg1)
936         self.pg1.add_stream(pkts)
937         self.pg_enable_capture(self.pg_interfaces)
938         self.pg_start()
939         capture = self.pg0.get_capture(len(pkts))
940         self.verify_capture_in(capture, self.pg0)
941
942         # in2out
943         pkts = self.create_stream_in(self.pg0, self.pg1)
944         self.pg0.add_stream(pkts)
945         self.pg_enable_capture(self.pg_interfaces)
946         self.pg_start()
947         capture = self.pg1.get_capture(len(pkts))
948         self.verify_capture_out(capture)
949
950     def test_static_vrf_aware(self):
951         """ SNAT 1:1 NAT VRF awareness """
952
953         nat_ip1 = "10.0.0.30"
954         nat_ip2 = "10.0.0.40"
955         self.tcp_port_out = 6303
956         self.udp_port_out = 6304
957         self.icmp_id_out = 6305
958
959         self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
960                                      vrf_id=10)
961         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
962                                      vrf_id=10)
963         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
964                                                  is_inside=0)
965         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
966         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
967
968         # inside interface VRF match SNAT static mapping VRF
969         pkts = self.create_stream_in(self.pg4, self.pg3)
970         self.pg4.add_stream(pkts)
971         self.pg_enable_capture(self.pg_interfaces)
972         self.pg_start()
973         capture = self.pg3.get_capture(len(pkts))
974         self.verify_capture_out(capture, nat_ip1, True)
975
976         # inside interface VRF don't match SNAT static mapping VRF (packets
977         # are dropped)
978         pkts = self.create_stream_in(self.pg0, self.pg3)
979         self.pg0.add_stream(pkts)
980         self.pg_enable_capture(self.pg_interfaces)
981         self.pg_start()
982         self.pg3.assert_nothing_captured()
983
984     def test_multiple_inside_interfaces(self):
985         """ SNAT multiple inside interfaces (non-overlapping address space) """
986
987         self.snat_add_address(self.snat_addr)
988         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
989         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
990         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
991                                                  is_inside=0)
992
993         # between two S-NAT inside interfaces (no translation)
994         pkts = self.create_stream_in(self.pg0, self.pg1)
995         self.pg0.add_stream(pkts)
996         self.pg_enable_capture(self.pg_interfaces)
997         self.pg_start()
998         capture = self.pg1.get_capture(len(pkts))
999         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1000
1001         # from S-NAT inside to interface without S-NAT feature (no translation)
1002         pkts = self.create_stream_in(self.pg0, self.pg2)
1003         self.pg0.add_stream(pkts)
1004         self.pg_enable_capture(self.pg_interfaces)
1005         self.pg_start()
1006         capture = self.pg2.get_capture(len(pkts))
1007         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1008
1009         # in2out 1st interface
1010         pkts = self.create_stream_in(self.pg0, self.pg3)
1011         self.pg0.add_stream(pkts)
1012         self.pg_enable_capture(self.pg_interfaces)
1013         self.pg_start()
1014         capture = self.pg3.get_capture(len(pkts))
1015         self.verify_capture_out(capture)
1016
1017         # out2in 1st interface
1018         pkts = self.create_stream_out(self.pg3)
1019         self.pg3.add_stream(pkts)
1020         self.pg_enable_capture(self.pg_interfaces)
1021         self.pg_start()
1022         capture = self.pg0.get_capture(len(pkts))
1023         self.verify_capture_in(capture, self.pg0)
1024
1025         # in2out 2nd interface
1026         pkts = self.create_stream_in(self.pg1, self.pg3)
1027         self.pg1.add_stream(pkts)
1028         self.pg_enable_capture(self.pg_interfaces)
1029         self.pg_start()
1030         capture = self.pg3.get_capture(len(pkts))
1031         self.verify_capture_out(capture)
1032
1033         # out2in 2nd interface
1034         pkts = self.create_stream_out(self.pg3)
1035         self.pg3.add_stream(pkts)
1036         self.pg_enable_capture(self.pg_interfaces)
1037         self.pg_start()
1038         capture = self.pg1.get_capture(len(pkts))
1039         self.verify_capture_in(capture, self.pg1)
1040
1041     def test_inside_overlapping_interfaces(self):
1042         """ SNAT multiple inside interfaces with overlapping address space """
1043
1044         static_nat_ip = "10.0.0.10"
1045         self.snat_add_address(self.snat_addr)
1046         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1047                                                  is_inside=0)
1048         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
1049         self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
1050         self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
1051         self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1052                                      vrf_id=20)
1053
1054         # between S-NAT inside interfaces with same VRF (no translation)
1055         pkts = self.create_stream_in(self.pg4, self.pg5)
1056         self.pg4.add_stream(pkts)
1057         self.pg_enable_capture(self.pg_interfaces)
1058         self.pg_start()
1059         capture = self.pg5.get_capture(len(pkts))
1060         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1061
1062         # between S-NAT inside interfaces with different VRF (hairpinning)
1063         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1064              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1065              TCP(sport=1234, dport=5678))
1066         self.pg4.add_stream(p)
1067         self.pg_enable_capture(self.pg_interfaces)
1068         self.pg_start()
1069         capture = self.pg6.get_capture(1)
1070         p = capture[0]
1071         try:
1072             ip = p[IP]
1073             tcp = p[TCP]
1074             self.assertEqual(ip.src, self.snat_addr)
1075             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1076             self.assertNotEqual(tcp.sport, 1234)
1077             self.assertEqual(tcp.dport, 5678)
1078         except:
1079             self.logger.error(ppp("Unexpected or invalid packet:", p))
1080             raise
1081
1082         # in2out 1st interface
1083         pkts = self.create_stream_in(self.pg4, self.pg3)
1084         self.pg4.add_stream(pkts)
1085         self.pg_enable_capture(self.pg_interfaces)
1086         self.pg_start()
1087         capture = self.pg3.get_capture(len(pkts))
1088         self.verify_capture_out(capture)
1089
1090         # out2in 1st interface
1091         pkts = self.create_stream_out(self.pg3)
1092         self.pg3.add_stream(pkts)
1093         self.pg_enable_capture(self.pg_interfaces)
1094         self.pg_start()
1095         capture = self.pg4.get_capture(len(pkts))
1096         self.verify_capture_in(capture, self.pg4)
1097
1098         # in2out 2nd interface
1099         pkts = self.create_stream_in(self.pg5, self.pg3)
1100         self.pg5.add_stream(pkts)
1101         self.pg_enable_capture(self.pg_interfaces)
1102         self.pg_start()
1103         capture = self.pg3.get_capture(len(pkts))
1104         self.verify_capture_out(capture)
1105
1106         # out2in 2nd interface
1107         pkts = self.create_stream_out(self.pg3)
1108         self.pg3.add_stream(pkts)
1109         self.pg_enable_capture(self.pg_interfaces)
1110         self.pg_start()
1111         capture = self.pg5.get_capture(len(pkts))
1112         self.verify_capture_in(capture, self.pg5)
1113
1114         # pg5 session dump
1115         addresses = self.vapi.snat_address_dump()
1116         self.assertEqual(len(addresses), 1)
1117         sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
1118         self.assertEqual(len(sessions), 3)
1119         for session in sessions:
1120             self.assertFalse(session.is_static)
1121             self.assertEqual(session.inside_ip_address[0:4],
1122                              self.pg5.remote_ip4n)
1123             self.assertEqual(session.outside_ip_address,
1124                              addresses[0].ip_address)
1125         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1126         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1127         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1128         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1129         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1130         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1131         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1132         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1133         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1134
1135         # in2out 3rd interface
1136         pkts = self.create_stream_in(self.pg6, self.pg3)
1137         self.pg6.add_stream(pkts)
1138         self.pg_enable_capture(self.pg_interfaces)
1139         self.pg_start()
1140         capture = self.pg3.get_capture(len(pkts))
1141         self.verify_capture_out(capture, static_nat_ip, True)
1142
1143         # out2in 3rd interface
1144         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1145         self.pg3.add_stream(pkts)
1146         self.pg_enable_capture(self.pg_interfaces)
1147         self.pg_start()
1148         capture = self.pg6.get_capture(len(pkts))
1149         self.verify_capture_in(capture, self.pg6)
1150
1151         # general user and session dump verifications
1152         users = self.vapi.snat_user_dump()
1153         self.assertTrue(len(users) >= 3)
1154         addresses = self.vapi.snat_address_dump()
1155         self.assertEqual(len(addresses), 1)
1156         for user in users:
1157             sessions = self.vapi.snat_user_session_dump(user.ip_address,
1158                                                         user.vrf_id)
1159             for session in sessions:
1160                 self.assertEqual(user.ip_address, session.inside_ip_address)
1161                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1162                 self.assertTrue(session.protocol in
1163                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1164                                  IP_PROTOS.icmp])
1165
1166         # pg4 session dump
1167         sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
1168         self.assertTrue(len(sessions) >= 4)
1169         for session in sessions:
1170             self.assertFalse(session.is_static)
1171             self.assertEqual(session.inside_ip_address[0:4],
1172                              self.pg4.remote_ip4n)
1173             self.assertEqual(session.outside_ip_address,
1174                              addresses[0].ip_address)
1175
1176         # pg6 session dump
1177         sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1178         self.assertTrue(len(sessions) >= 3)
1179         for session in sessions:
1180             self.assertTrue(session.is_static)
1181             self.assertEqual(session.inside_ip_address[0:4],
1182                              self.pg6.remote_ip4n)
1183             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1184                              map(int, static_nat_ip.split('.')))
1185             self.assertTrue(session.inside_port in
1186                             [self.tcp_port_in, self.udp_port_in,
1187                              self.icmp_id_in])
1188
1189     def test_hairpinning(self):
1190         """ SNAT hairpinning - 1:1 NAT with port"""
1191
1192         host = self.pg0.remote_hosts[0]
1193         server = self.pg0.remote_hosts[1]
1194         host_in_port = 1234
1195         host_out_port = 0
1196         server_in_port = 5678
1197         server_out_port = 8765
1198
1199         self.snat_add_address(self.snat_addr)
1200         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1201         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1202                                                  is_inside=0)
1203         # add static mapping for server
1204         self.snat_add_static_mapping(server.ip4, self.snat_addr,
1205                                      server_in_port, server_out_port,
1206                                      proto=IP_PROTOS.tcp)
1207
1208         # send packet from host to server
1209         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1210              IP(src=host.ip4, dst=self.snat_addr) /
1211              TCP(sport=host_in_port, dport=server_out_port))
1212         self.pg0.add_stream(p)
1213         self.pg_enable_capture(self.pg_interfaces)
1214         self.pg_start()
1215         capture = self.pg0.get_capture(1)
1216         p = capture[0]
1217         try:
1218             ip = p[IP]
1219             tcp = p[TCP]
1220             self.assertEqual(ip.src, self.snat_addr)
1221             self.assertEqual(ip.dst, server.ip4)
1222             self.assertNotEqual(tcp.sport, host_in_port)
1223             self.assertEqual(tcp.dport, server_in_port)
1224             self.check_tcp_checksum(p)
1225             host_out_port = tcp.sport
1226         except:
1227             self.logger.error(ppp("Unexpected or invalid packet:", p))
1228             raise
1229
1230         # send reply from server to host
1231         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1232              IP(src=server.ip4, dst=self.snat_addr) /
1233              TCP(sport=server_in_port, dport=host_out_port))
1234         self.pg0.add_stream(p)
1235         self.pg_enable_capture(self.pg_interfaces)
1236         self.pg_start()
1237         capture = self.pg0.get_capture(1)
1238         p = capture[0]
1239         try:
1240             ip = p[IP]
1241             tcp = p[TCP]
1242             self.assertEqual(ip.src, self.snat_addr)
1243             self.assertEqual(ip.dst, host.ip4)
1244             self.assertEqual(tcp.sport, server_out_port)
1245             self.assertEqual(tcp.dport, host_in_port)
1246             self.check_tcp_checksum(p)
1247         except:
1248             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1249             raise
1250
1251     def test_hairpinning2(self):
1252         """ SNAT hairpinning - 1:1 NAT"""
1253
1254         server1_nat_ip = "10.0.0.10"
1255         server2_nat_ip = "10.0.0.11"
1256         host = self.pg0.remote_hosts[0]
1257         server1 = self.pg0.remote_hosts[1]
1258         server2 = self.pg0.remote_hosts[2]
1259         server_tcp_port = 22
1260         server_udp_port = 20
1261
1262         self.snat_add_address(self.snat_addr)
1263         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1264         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1265                                                  is_inside=0)
1266
1267         # add static mapping for servers
1268         self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
1269         self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
1270
1271         # host to server1
1272         pkts = []
1273         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1274              IP(src=host.ip4, dst=server1_nat_ip) /
1275              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1276         pkts.append(p)
1277         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1278              IP(src=host.ip4, dst=server1_nat_ip) /
1279              UDP(sport=self.udp_port_in, dport=server_udp_port))
1280         pkts.append(p)
1281         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1282              IP(src=host.ip4, dst=server1_nat_ip) /
1283              ICMP(id=self.icmp_id_in, type='echo-request'))
1284         pkts.append(p)
1285         self.pg0.add_stream(pkts)
1286         self.pg_enable_capture(self.pg_interfaces)
1287         self.pg_start()
1288         capture = self.pg0.get_capture(len(pkts))
1289         for packet in capture:
1290             try:
1291                 self.assertEqual(packet[IP].src, self.snat_addr)
1292                 self.assertEqual(packet[IP].dst, server1.ip4)
1293                 if packet.haslayer(TCP):
1294                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1295                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1296                     self.tcp_port_out = packet[TCP].sport
1297                     self.check_tcp_checksum(packet)
1298                 elif packet.haslayer(UDP):
1299                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1300                     self.assertEqual(packet[UDP].dport, server_udp_port)
1301                     self.udp_port_out = packet[UDP].sport
1302                 else:
1303                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1304                     self.icmp_id_out = packet[ICMP].id
1305             except:
1306                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1307                 raise
1308
1309         # server1 to host
1310         pkts = []
1311         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1312              IP(src=server1.ip4, dst=self.snat_addr) /
1313              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1314         pkts.append(p)
1315         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1316              IP(src=server1.ip4, dst=self.snat_addr) /
1317              UDP(sport=server_udp_port, dport=self.udp_port_out))
1318         pkts.append(p)
1319         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1320              IP(src=server1.ip4, dst=self.snat_addr) /
1321              ICMP(id=self.icmp_id_out, type='echo-reply'))
1322         pkts.append(p)
1323         self.pg0.add_stream(pkts)
1324         self.pg_enable_capture(self.pg_interfaces)
1325         self.pg_start()
1326         capture = self.pg0.get_capture(len(pkts))
1327         for packet in capture:
1328             try:
1329                 self.assertEqual(packet[IP].src, server1_nat_ip)
1330                 self.assertEqual(packet[IP].dst, host.ip4)
1331                 if packet.haslayer(TCP):
1332                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1333                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1334                     self.check_tcp_checksum(packet)
1335                 elif packet.haslayer(UDP):
1336                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1337                     self.assertEqual(packet[UDP].sport, server_udp_port)
1338                 else:
1339                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1340             except:
1341                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1342                 raise
1343
1344         # server2 to server1
1345         pkts = []
1346         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1347              IP(src=server2.ip4, dst=server1_nat_ip) /
1348              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1349         pkts.append(p)
1350         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1351              IP(src=server2.ip4, dst=server1_nat_ip) /
1352              UDP(sport=self.udp_port_in, dport=server_udp_port))
1353         pkts.append(p)
1354         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1355              IP(src=server2.ip4, dst=server1_nat_ip) /
1356              ICMP(id=self.icmp_id_in, type='echo-request'))
1357         pkts.append(p)
1358         self.pg0.add_stream(pkts)
1359         self.pg_enable_capture(self.pg_interfaces)
1360         self.pg_start()
1361         capture = self.pg0.get_capture(len(pkts))
1362         for packet in capture:
1363             try:
1364                 self.assertEqual(packet[IP].src, server2_nat_ip)
1365                 self.assertEqual(packet[IP].dst, server1.ip4)
1366                 if packet.haslayer(TCP):
1367                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1368                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1369                     self.tcp_port_out = packet[TCP].sport
1370                     self.check_tcp_checksum(packet)
1371                 elif packet.haslayer(UDP):
1372                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1373                     self.assertEqual(packet[UDP].dport, server_udp_port)
1374                     self.udp_port_out = packet[UDP].sport
1375                 else:
1376                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1377                     self.icmp_id_out = packet[ICMP].id
1378             except:
1379                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1380                 raise
1381
1382         # server1 to server2
1383         pkts = []
1384         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1385              IP(src=server1.ip4, dst=server2_nat_ip) /
1386              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1387         pkts.append(p)
1388         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1389              IP(src=server1.ip4, dst=server2_nat_ip) /
1390              UDP(sport=server_udp_port, dport=self.udp_port_out))
1391         pkts.append(p)
1392         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1393              IP(src=server1.ip4, dst=server2_nat_ip) /
1394              ICMP(id=self.icmp_id_out, type='echo-reply'))
1395         pkts.append(p)
1396         self.pg0.add_stream(pkts)
1397         self.pg_enable_capture(self.pg_interfaces)
1398         self.pg_start()
1399         capture = self.pg0.get_capture(len(pkts))
1400         for packet in capture:
1401             try:
1402                 self.assertEqual(packet[IP].src, server1_nat_ip)
1403                 self.assertEqual(packet[IP].dst, server2.ip4)
1404                 if packet.haslayer(TCP):
1405                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1406                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1407                     self.check_tcp_checksum(packet)
1408                 elif packet.haslayer(UDP):
1409                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1410                     self.assertEqual(packet[UDP].sport, server_udp_port)
1411                 else:
1412                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1413             except:
1414                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1415                 raise
1416
1417     def test_max_translations_per_user(self):
1418         """ MAX translations per user - recycle the least recently used """
1419
1420         self.snat_add_address(self.snat_addr)
1421         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1422         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1423                                                  is_inside=0)
1424
1425         # get maximum number of translations per user
1426         snat_config = self.vapi.snat_show_config()
1427
1428         # send more than maximum number of translations per user packets
1429         pkts_num = snat_config.max_translations_per_user + 5
1430         pkts = []
1431         for port in range(0, pkts_num):
1432             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1433                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1434                  TCP(sport=1025 + port))
1435             pkts.append(p)
1436         self.pg0.add_stream(pkts)
1437         self.pg_enable_capture(self.pg_interfaces)
1438         self.pg_start()
1439
1440         # verify number of translated packet
1441         self.pg1.get_capture(pkts_num)
1442
1443     def test_interface_addr(self):
1444         """ Acquire SNAT addresses from interface """
1445         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1446
1447         # no address in NAT pool
1448         adresses = self.vapi.snat_address_dump()
1449         self.assertEqual(0, len(adresses))
1450
1451         # configure interface address and check NAT address pool
1452         self.pg7.config_ip4()
1453         adresses = self.vapi.snat_address_dump()
1454         self.assertEqual(1, len(adresses))
1455         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1456
1457         # remove interface address and check NAT address pool
1458         self.pg7.unconfig_ip4()
1459         adresses = self.vapi.snat_address_dump()
1460         self.assertEqual(0, len(adresses))
1461
1462     def test_interface_addr_static_mapping(self):
1463         """ Static mapping with addresses from interface """
1464         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1465         self.snat_add_static_mapping('1.2.3.4',
1466                                      external_sw_if_index=self.pg7.sw_if_index)
1467
1468         # static mappings with external interface
1469         static_mappings = self.vapi.snat_static_mapping_dump()
1470         self.assertEqual(1, len(static_mappings))
1471         self.assertEqual(self.pg7.sw_if_index,
1472                          static_mappings[0].external_sw_if_index)
1473
1474         # configure interface address and check static mappings
1475         self.pg7.config_ip4()
1476         static_mappings = self.vapi.snat_static_mapping_dump()
1477         self.assertEqual(1, len(static_mappings))
1478         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1479                          self.pg7.local_ip4n)
1480         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1481
1482         # remove interface address and check static mappings
1483         self.pg7.unconfig_ip4()
1484         static_mappings = self.vapi.snat_static_mapping_dump()
1485         self.assertEqual(0, len(static_mappings))
1486
1487     def test_ipfix_nat44_sess(self):
1488         """ S-NAT IPFIX logging NAT44 session created/delted """
1489         self.ipfix_domain_id = 10
1490         self.ipfix_src_port = 20202
1491         colector_port = 30303
1492         bind_layers(UDP, IPFIX, dport=30303)
1493         self.snat_add_address(self.snat_addr)
1494         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1495         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1496                                                  is_inside=0)
1497         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1498                                      src_address=self.pg3.local_ip4n,
1499                                      path_mtu=512,
1500                                      template_interval=10,
1501                                      collector_port=colector_port)
1502         self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1503                              src_port=self.ipfix_src_port)
1504
1505         pkts = self.create_stream_in(self.pg0, self.pg1)
1506         self.pg0.add_stream(pkts)
1507         self.pg_enable_capture(self.pg_interfaces)
1508         self.pg_start()
1509         capture = self.pg1.get_capture(len(pkts))
1510         self.verify_capture_out(capture)
1511         self.snat_add_address(self.snat_addr, is_add=0)
1512         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1513         capture = self.pg3.get_capture(3)
1514         ipfix = IPFIXDecoder()
1515         # first load template
1516         for p in capture:
1517             self.assertTrue(p.haslayer(IPFIX))
1518             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1519             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1520             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1521             self.assertEqual(p[UDP].dport, colector_port)
1522             self.assertEqual(p[IPFIX].observationDomainID,
1523                              self.ipfix_domain_id)
1524             if p.haslayer(Template):
1525                 ipfix.add_template(p.getlayer(Template))
1526         # verify events in data set
1527         for p in capture:
1528             if p.haslayer(Data):
1529                 data = ipfix.decode_data_set(p.getlayer(Set))
1530                 self.verify_ipfix_nat44_ses(data)
1531
1532     def test_ipfix_addr_exhausted(self):
1533         """ S-NAT IPFIX logging NAT addresses exhausted """
1534         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1535         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1536                                                  is_inside=0)
1537         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1538                                      src_address=self.pg3.local_ip4n,
1539                                      path_mtu=512,
1540                                      template_interval=10)
1541         self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1542                              src_port=self.ipfix_src_port)
1543
1544         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1545              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1546              TCP(sport=3025))
1547         self.pg0.add_stream(p)
1548         self.pg_enable_capture(self.pg_interfaces)
1549         self.pg_start()
1550         capture = self.pg1.get_capture(0)
1551         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1552         capture = self.pg3.get_capture(3)
1553         ipfix = IPFIXDecoder()
1554         # first load template
1555         for p in capture:
1556             self.assertTrue(p.haslayer(IPFIX))
1557             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1558             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1559             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1560             self.assertEqual(p[UDP].dport, 4739)
1561             self.assertEqual(p[IPFIX].observationDomainID,
1562                              self.ipfix_domain_id)
1563             if p.haslayer(Template):
1564                 ipfix.add_template(p.getlayer(Template))
1565         # verify events in data set
1566         for p in capture:
1567             if p.haslayer(Data):
1568                 data = ipfix.decode_data_set(p.getlayer(Set))
1569                 self.verify_ipfix_addr_exhausted(data)
1570
1571     def test_pool_addr_fib(self):
1572         """ S-NAT add pool addresses to FIB """
1573         static_addr = '10.0.0.10'
1574         self.snat_add_address(self.snat_addr)
1575         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1576         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1577                                                  is_inside=0)
1578         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1579
1580         # SNAT address
1581         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1582              ARP(op=ARP.who_has, pdst=self.snat_addr,
1583                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1584         self.pg1.add_stream(p)
1585         self.pg_enable_capture(self.pg_interfaces)
1586         self.pg_start()
1587         capture = self.pg1.get_capture(1)
1588         self.assertTrue(capture[0].haslayer(ARP))
1589         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1590
1591         # 1:1 NAT address
1592         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1593              ARP(op=ARP.who_has, pdst=static_addr,
1594                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1595         self.pg1.add_stream(p)
1596         self.pg_enable_capture(self.pg_interfaces)
1597         self.pg_start()
1598         capture = self.pg1.get_capture(1)
1599         self.assertTrue(capture[0].haslayer(ARP))
1600         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1601
1602         # send ARP to non-SNAT interface
1603         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1604              ARP(op=ARP.who_has, pdst=self.snat_addr,
1605                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1606         self.pg2.add_stream(p)
1607         self.pg_enable_capture(self.pg_interfaces)
1608         self.pg_start()
1609         capture = self.pg1.get_capture(0)
1610
1611         # remove addresses and verify
1612         self.snat_add_address(self.snat_addr, is_add=0)
1613         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1614                                      is_add=0)
1615
1616         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1617              ARP(op=ARP.who_has, pdst=self.snat_addr,
1618                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1619         self.pg1.add_stream(p)
1620         self.pg_enable_capture(self.pg_interfaces)
1621         self.pg_start()
1622         capture = self.pg1.get_capture(0)
1623
1624         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1625              ARP(op=ARP.who_has, pdst=static_addr,
1626                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1627         self.pg1.add_stream(p)
1628         self.pg_enable_capture(self.pg_interfaces)
1629         self.pg_start()
1630         capture = self.pg1.get_capture(0)
1631
1632     def test_vrf_mode(self):
1633         """ S-NAT tenant VRF aware address pool mode """
1634
1635         vrf_id1 = 1
1636         vrf_id2 = 2
1637         nat_ip1 = "10.0.0.10"
1638         nat_ip2 = "10.0.0.11"
1639
1640         self.pg0.unconfig_ip4()
1641         self.pg1.unconfig_ip4()
1642         self.pg0.set_table_ip4(vrf_id1)
1643         self.pg1.set_table_ip4(vrf_id2)
1644         self.pg0.config_ip4()
1645         self.pg1.config_ip4()
1646
1647         self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1648         self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1649         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1650         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1651         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1652                                                  is_inside=0)
1653
1654         # first VRF
1655         pkts = self.create_stream_in(self.pg0, self.pg2)
1656         self.pg0.add_stream(pkts)
1657         self.pg_enable_capture(self.pg_interfaces)
1658         self.pg_start()
1659         capture = self.pg2.get_capture(len(pkts))
1660         self.verify_capture_out(capture, nat_ip1)
1661
1662         # second VRF
1663         pkts = self.create_stream_in(self.pg1, self.pg2)
1664         self.pg1.add_stream(pkts)
1665         self.pg_enable_capture(self.pg_interfaces)
1666         self.pg_start()
1667         capture = self.pg2.get_capture(len(pkts))
1668         self.verify_capture_out(capture, nat_ip2)
1669
1670     def test_vrf_feature_independent(self):
1671         """ S-NAT tenant VRF independent address pool mode """
1672
1673         nat_ip1 = "10.0.0.10"
1674         nat_ip2 = "10.0.0.11"
1675
1676         self.snat_add_address(nat_ip1)
1677         self.snat_add_address(nat_ip2)
1678         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1679         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1680         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1681                                                  is_inside=0)
1682
1683         # first VRF
1684         pkts = self.create_stream_in(self.pg0, self.pg2)
1685         self.pg0.add_stream(pkts)
1686         self.pg_enable_capture(self.pg_interfaces)
1687         self.pg_start()
1688         capture = self.pg2.get_capture(len(pkts))
1689         self.verify_capture_out(capture, nat_ip1)
1690
1691         # second VRF
1692         pkts = self.create_stream_in(self.pg1, self.pg2)
1693         self.pg1.add_stream(pkts)
1694         self.pg_enable_capture(self.pg_interfaces)
1695         self.pg_start()
1696         capture = self.pg2.get_capture(len(pkts))
1697         self.verify_capture_out(capture, nat_ip1)
1698
1699     def test_dynamic_ipless_interfaces(self):
1700         """ SNAT interfaces without configured ip dynamic map """
1701
1702         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1703                                       self.pg7.remote_mac,
1704                                       self.pg7.remote_ip4n,
1705                                       is_static=1)
1706         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1707                                       self.pg8.remote_mac,
1708                                       self.pg8.remote_ip4n,
1709                                       is_static=1)
1710
1711         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1712                                    dst_address_length=32,
1713                                    next_hop_address=self.pg7.remote_ip4n,
1714                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1715         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1716                                    dst_address_length=32,
1717                                    next_hop_address=self.pg8.remote_ip4n,
1718                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1719
1720         self.snat_add_address(self.snat_addr)
1721         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1722         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1723                                                  is_inside=0)
1724
1725         # in2out
1726         pkts = self.create_stream_in(self.pg7, self.pg8)
1727         self.pg7.add_stream(pkts)
1728         self.pg_enable_capture(self.pg_interfaces)
1729         self.pg_start()
1730         capture = self.pg8.get_capture(len(pkts))
1731         self.verify_capture_out(capture)
1732
1733         # out2in
1734         pkts = self.create_stream_out(self.pg8, self.snat_addr)
1735         self.pg8.add_stream(pkts)
1736         self.pg_enable_capture(self.pg_interfaces)
1737         self.pg_start()
1738         capture = self.pg7.get_capture(len(pkts))
1739         self.verify_capture_in(capture, self.pg7)
1740
1741     def test_static_ipless_interfaces(self):
1742         """ SNAT 1:1 NAT interfaces without configured ip """
1743
1744         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1745                                       self.pg7.remote_mac,
1746                                       self.pg7.remote_ip4n,
1747                                       is_static=1)
1748         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1749                                       self.pg8.remote_mac,
1750                                       self.pg8.remote_ip4n,
1751                                       is_static=1)
1752
1753         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1754                                    dst_address_length=32,
1755                                    next_hop_address=self.pg7.remote_ip4n,
1756                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1757         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1758                                    dst_address_length=32,
1759                                    next_hop_address=self.pg8.remote_ip4n,
1760                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1761
1762         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1763         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1764         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1765                                                  is_inside=0)
1766
1767         # out2in
1768         pkts = self.create_stream_out(self.pg8)
1769         self.pg8.add_stream(pkts)
1770         self.pg_enable_capture(self.pg_interfaces)
1771         self.pg_start()
1772         capture = self.pg7.get_capture(len(pkts))
1773         self.verify_capture_in(capture, self.pg7)
1774
1775         # in2out
1776         pkts = self.create_stream_in(self.pg7, self.pg8)
1777         self.pg7.add_stream(pkts)
1778         self.pg_enable_capture(self.pg_interfaces)
1779         self.pg_start()
1780         capture = self.pg8.get_capture(len(pkts))
1781         self.verify_capture_out(capture, self.snat_addr, True)
1782
1783     def test_static_with_port_ipless_interfaces(self):
1784         """ SNAT 1:1 NAT with port interfaces without configured ip """
1785
1786         self.tcp_port_out = 30606
1787         self.udp_port_out = 30607
1788         self.icmp_id_out = 30608
1789
1790         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1791                                       self.pg7.remote_mac,
1792                                       self.pg7.remote_ip4n,
1793                                       is_static=1)
1794         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1795                                       self.pg8.remote_mac,
1796                                       self.pg8.remote_ip4n,
1797                                       is_static=1)
1798
1799         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1800                                    dst_address_length=32,
1801                                    next_hop_address=self.pg7.remote_ip4n,
1802                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1803         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1804                                    dst_address_length=32,
1805                                    next_hop_address=self.pg8.remote_ip4n,
1806                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1807
1808         self.snat_add_address(self.snat_addr)
1809         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1810                                      self.tcp_port_in, self.tcp_port_out,
1811                                      proto=IP_PROTOS.tcp)
1812         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1813                                      self.udp_port_in, self.udp_port_out,
1814                                      proto=IP_PROTOS.udp)
1815         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1816                                      self.icmp_id_in, self.icmp_id_out,
1817                                      proto=IP_PROTOS.icmp)
1818         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1819         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1820                                                  is_inside=0)
1821
1822         # out2in
1823         pkts = self.create_stream_out(self.pg8)
1824         self.pg8.add_stream(pkts)
1825         self.pg_enable_capture(self.pg_interfaces)
1826         self.pg_start()
1827         capture = self.pg7.get_capture(len(pkts))
1828         self.verify_capture_in(capture, self.pg7)
1829
1830         # in2out
1831         pkts = self.create_stream_in(self.pg7, self.pg8)
1832         self.pg7.add_stream(pkts)
1833         self.pg_enable_capture(self.pg_interfaces)
1834         self.pg_start()
1835         capture = self.pg8.get_capture(len(pkts))
1836         self.verify_capture_out(capture)
1837
1838     def test_static_unknown_proto(self):
1839         """ 1:1 NAT translate packet with unknown protocol """
1840         nat_ip = "10.0.0.10"
1841         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1842         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1843         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1844                                                  is_inside=0)
1845
1846         # in2out
1847         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1848              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1849              GRE() /
1850              IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
1851              TCP(sport=1234, dport=1234))
1852         self.pg0.add_stream(p)
1853         self.pg_enable_capture(self.pg_interfaces)
1854         self.pg_start()
1855         p = self.pg1.get_capture(1)
1856         packet = p[0]
1857         try:
1858             self.assertEqual(packet[IP].src, nat_ip)
1859             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1860             self.assertTrue(packet.haslayer(GRE))
1861             self.check_ip_checksum(packet)
1862         except:
1863             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1864             raise
1865
1866         # out2in
1867         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1868              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1869              GRE() /
1870              IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
1871              TCP(sport=1234, dport=1234))
1872         self.pg1.add_stream(p)
1873         self.pg_enable_capture(self.pg_interfaces)
1874         self.pg_start()
1875         p = self.pg0.get_capture(1)
1876         packet = p[0]
1877         try:
1878             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
1879             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
1880             self.assertTrue(packet.haslayer(GRE))
1881             self.check_ip_checksum(packet)
1882         except:
1883             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1884             raise
1885
1886     def tearDown(self):
1887         super(TestSNAT, self).tearDown()
1888         if not self.vpp_dead:
1889             self.logger.info(self.vapi.cli("show snat verbose"))
1890             self.clear_snat()
1891
1892
1893 class TestDeterministicNAT(MethodHolder):
1894     """ Deterministic NAT Test Cases """
1895
1896     @classmethod
1897     def setUpConstants(cls):
1898         super(TestDeterministicNAT, cls).setUpConstants()
1899         cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1900
1901     @classmethod
1902     def setUpClass(cls):
1903         super(TestDeterministicNAT, cls).setUpClass()
1904
1905         try:
1906             cls.tcp_port_in = 6303
1907             cls.tcp_external_port = 6303
1908             cls.udp_port_in = 6304
1909             cls.udp_external_port = 6304
1910             cls.icmp_id_in = 6305
1911             cls.snat_addr = '10.0.0.3'
1912
1913             cls.create_pg_interfaces(range(3))
1914             cls.interfaces = list(cls.pg_interfaces)
1915
1916             for i in cls.interfaces:
1917                 i.admin_up()
1918                 i.config_ip4()
1919                 i.resolve_arp()
1920
1921             cls.pg0.generate_remote_hosts(2)
1922             cls.pg0.configure_ipv4_neighbors()
1923
1924         except Exception:
1925             super(TestDeterministicNAT, cls).tearDownClass()
1926             raise
1927
1928     def create_stream_in(self, in_if, out_if, ttl=64):
1929         """
1930         Create packet stream for inside network
1931
1932         :param in_if: Inside interface
1933         :param out_if: Outside interface
1934         :param ttl: TTL of generated packets
1935         """
1936         pkts = []
1937         # TCP
1938         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1939              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1940              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1941         pkts.append(p)
1942
1943         # UDP
1944         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1945              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1946              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
1947         pkts.append(p)
1948
1949         # ICMP
1950         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1951              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1952              ICMP(id=self.icmp_id_in, type='echo-request'))
1953         pkts.append(p)
1954
1955         return pkts
1956
1957     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
1958         """
1959         Create packet stream for outside network
1960
1961         :param out_if: Outside interface
1962         :param dst_ip: Destination IP address (Default use global SNAT address)
1963         :param ttl: TTL of generated packets
1964         """
1965         if dst_ip is None:
1966             dst_ip = self.snat_addr
1967         pkts = []
1968         # TCP
1969         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1970              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1971              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
1972         pkts.append(p)
1973
1974         # UDP
1975         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1976              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1977              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
1978         pkts.append(p)
1979
1980         # ICMP
1981         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1982              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1983              ICMP(id=self.icmp_external_id, type='echo-reply'))
1984         pkts.append(p)
1985
1986         return pkts
1987
1988     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
1989         """
1990         Verify captured packets on outside network
1991
1992         :param capture: Captured packets
1993         :param nat_ip: Translated IP address (Default use global SNAT address)
1994         :param same_port: Sorce port number is not translated (Default False)
1995         :param packet_num: Expected number of packets (Default 3)
1996         """
1997         if nat_ip is None:
1998             nat_ip = self.snat_addr
1999         self.assertEqual(packet_num, len(capture))
2000         for packet in capture:
2001             try:
2002                 self.assertEqual(packet[IP].src, nat_ip)
2003                 if packet.haslayer(TCP):
2004                     self.tcp_port_out = packet[TCP].sport
2005                 elif packet.haslayer(UDP):
2006                     self.udp_port_out = packet[UDP].sport
2007                 else:
2008                     self.icmp_external_id = packet[ICMP].id
2009             except:
2010                 self.logger.error(ppp("Unexpected or invalid packet "
2011                                       "(outside network):", packet))
2012                 raise
2013
2014     def initiate_tcp_session(self, in_if, out_if):
2015         """
2016         Initiates TCP session
2017
2018         :param in_if: Inside interface
2019         :param out_if: Outside interface
2020         """
2021         try:
2022             # SYN packet in->out
2023             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2024                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2025                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2026                      flags="S"))
2027             in_if.add_stream(p)
2028             self.pg_enable_capture(self.pg_interfaces)
2029             self.pg_start()
2030             capture = out_if.get_capture(1)
2031             p = capture[0]
2032             self.tcp_port_out = p[TCP].sport
2033
2034             # SYN + ACK packet out->in
2035             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2036                  IP(src=out_if.remote_ip4, dst=self.snat_addr) /
2037                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2038                      flags="SA"))
2039             out_if.add_stream(p)
2040             self.pg_enable_capture(self.pg_interfaces)
2041             self.pg_start()
2042             in_if.get_capture(1)
2043
2044             # ACK packet in->out
2045             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2046                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2047                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2048                      flags="A"))
2049             in_if.add_stream(p)
2050             self.pg_enable_capture(self.pg_interfaces)
2051             self.pg_start()
2052             out_if.get_capture(1)
2053
2054         except:
2055             self.logger.error("TCP 3 way handshake failed")
2056             raise
2057
2058     def verify_ipfix_max_entries_per_user(self, data):
2059         """
2060         Verify IPFIX maximum entries per user exceeded event
2061
2062         :param data: Decoded IPFIX data records
2063         """
2064         self.assertEqual(1, len(data))
2065         record = data[0]
2066         # natEvent
2067         self.assertEqual(ord(record[230]), 13)
2068         # natQuotaExceededEvent
2069         self.assertEqual('\x03\x00\x00\x00', record[466])
2070         # sourceIPv4Address
2071         self.assertEqual(self.pg0.remote_ip4n, record[8])
2072
2073     def test_deterministic_mode(self):
2074         """ S-NAT run deterministic mode """
2075         in_addr = '172.16.255.0'
2076         out_addr = '172.17.255.50'
2077         in_addr_t = '172.16.255.20'
2078         in_addr_n = socket.inet_aton(in_addr)
2079         out_addr_n = socket.inet_aton(out_addr)
2080         in_addr_t_n = socket.inet_aton(in_addr_t)
2081         in_plen = 24
2082         out_plen = 32
2083
2084         snat_config = self.vapi.snat_show_config()
2085         self.assertEqual(1, snat_config.deterministic)
2086
2087         self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
2088
2089         rep1 = self.vapi.snat_det_forward(in_addr_t_n)
2090         self.assertEqual(rep1.out_addr[:4], out_addr_n)
2091         rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
2092         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2093
2094         deterministic_mappings = self.vapi.snat_det_map_dump()
2095         self.assertEqual(len(deterministic_mappings), 1)
2096         dsm = deterministic_mappings[0]
2097         self.assertEqual(in_addr_n, dsm.in_addr[:4])
2098         self.assertEqual(in_plen, dsm.in_plen)
2099         self.assertEqual(out_addr_n, dsm.out_addr[:4])
2100         self.assertEqual(out_plen, dsm.out_plen)
2101
2102         self.clear_snat()
2103         deterministic_mappings = self.vapi.snat_det_map_dump()
2104         self.assertEqual(len(deterministic_mappings), 0)
2105
2106     def test_set_timeouts(self):
2107         """ Set deterministic NAT timeouts """
2108         timeouts_before = self.vapi.snat_det_get_timeouts()
2109
2110         self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
2111                                         timeouts_before.tcp_established + 10,
2112                                         timeouts_before.tcp_transitory + 10,
2113                                         timeouts_before.icmp + 10)
2114
2115         timeouts_after = self.vapi.snat_det_get_timeouts()
2116
2117         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2118         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2119         self.assertNotEqual(timeouts_before.tcp_established,
2120                             timeouts_after.tcp_established)
2121         self.assertNotEqual(timeouts_before.tcp_transitory,
2122                             timeouts_after.tcp_transitory)
2123
2124     def test_det_in(self):
2125         """ CGNAT translation test (TCP, UDP, ICMP) """
2126
2127         nat_ip = "10.0.0.10"
2128
2129         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2130                                    32,
2131                                    socket.inet_aton(nat_ip),
2132                                    32)
2133         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2134         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2135                                                  is_inside=0)
2136
2137         # in2out
2138         pkts = self.create_stream_in(self.pg0, self.pg1)
2139         self.pg0.add_stream(pkts)
2140         self.pg_enable_capture(self.pg_interfaces)
2141         self.pg_start()
2142         capture = self.pg1.get_capture(len(pkts))
2143         self.verify_capture_out(capture, nat_ip)
2144
2145         # out2in
2146         pkts = self.create_stream_out(self.pg1, nat_ip)
2147         self.pg1.add_stream(pkts)
2148         self.pg_enable_capture(self.pg_interfaces)
2149         self.pg_start()
2150         capture = self.pg0.get_capture(len(pkts))
2151         self.verify_capture_in(capture, self.pg0)
2152
2153         # session dump test
2154         sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
2155         self.assertEqual(len(sessions), 3)
2156
2157         # TCP session
2158         s = sessions[0]
2159         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2160         self.assertEqual(s.in_port, self.tcp_port_in)
2161         self.assertEqual(s.out_port, self.tcp_port_out)
2162         self.assertEqual(s.ext_port, self.tcp_external_port)
2163
2164         # UDP session
2165         s = sessions[1]
2166         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2167         self.assertEqual(s.in_port, self.udp_port_in)
2168         self.assertEqual(s.out_port, self.udp_port_out)
2169         self.assertEqual(s.ext_port, self.udp_external_port)
2170
2171         # ICMP session
2172         s = sessions[2]
2173         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2174         self.assertEqual(s.in_port, self.icmp_id_in)
2175         self.assertEqual(s.out_port, self.icmp_external_id)
2176
2177     def test_multiple_users(self):
2178         """ CGNAT multiple users """
2179
2180         nat_ip = "10.0.0.10"
2181         port_in = 80
2182         external_port = 6303
2183
2184         host0 = self.pg0.remote_hosts[0]
2185         host1 = self.pg0.remote_hosts[1]
2186
2187         self.vapi.snat_add_det_map(host0.ip4n,
2188                                    24,
2189                                    socket.inet_aton(nat_ip),
2190                                    32)
2191         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2192         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2193                                                  is_inside=0)
2194
2195         # host0 to out
2196         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2197              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2198              TCP(sport=port_in, dport=external_port))
2199         self.pg0.add_stream(p)
2200         self.pg_enable_capture(self.pg_interfaces)
2201         self.pg_start()
2202         capture = self.pg1.get_capture(1)
2203         p = capture[0]
2204         try:
2205             ip = p[IP]
2206             tcp = p[TCP]
2207             self.assertEqual(ip.src, nat_ip)
2208             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2209             self.assertEqual(tcp.dport, external_port)
2210             port_out0 = tcp.sport
2211         except:
2212             self.logger.error(ppp("Unexpected or invalid packet:", p))
2213             raise
2214
2215         # host1 to out
2216         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2217              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2218              TCP(sport=port_in, dport=external_port))
2219         self.pg0.add_stream(p)
2220         self.pg_enable_capture(self.pg_interfaces)
2221         self.pg_start()
2222         capture = self.pg1.get_capture(1)
2223         p = capture[0]
2224         try:
2225             ip = p[IP]
2226             tcp = p[TCP]
2227             self.assertEqual(ip.src, nat_ip)
2228             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2229             self.assertEqual(tcp.dport, external_port)
2230             port_out1 = tcp.sport
2231         except:
2232             self.logger.error(ppp("Unexpected or invalid packet:", p))
2233             raise
2234
2235         dms = self.vapi.snat_det_map_dump()
2236         self.assertEqual(1, len(dms))
2237         self.assertEqual(2, dms[0].ses_num)
2238
2239         # out to host0
2240         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2241              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2242              TCP(sport=external_port, dport=port_out0))
2243         self.pg1.add_stream(p)
2244         self.pg_enable_capture(self.pg_interfaces)
2245         self.pg_start()
2246         capture = self.pg0.get_capture(1)
2247         p = capture[0]
2248         try:
2249             ip = p[IP]
2250             tcp = p[TCP]
2251             self.assertEqual(ip.src, self.pg1.remote_ip4)
2252             self.assertEqual(ip.dst, host0.ip4)
2253             self.assertEqual(tcp.dport, port_in)
2254             self.assertEqual(tcp.sport, external_port)
2255         except:
2256             self.logger.error(ppp("Unexpected or invalid packet:", p))
2257             raise
2258
2259         # out to host1
2260         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2261              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2262              TCP(sport=external_port, dport=port_out1))
2263         self.pg1.add_stream(p)
2264         self.pg_enable_capture(self.pg_interfaces)
2265         self.pg_start()
2266         capture = self.pg0.get_capture(1)
2267         p = capture[0]
2268         try:
2269             ip = p[IP]
2270             tcp = p[TCP]
2271             self.assertEqual(ip.src, self.pg1.remote_ip4)
2272             self.assertEqual(ip.dst, host1.ip4)
2273             self.assertEqual(tcp.dport, port_in)
2274             self.assertEqual(tcp.sport, external_port)
2275         except:
2276             self.logger.error(ppp("Unexpected or invalid packet", p))
2277             raise
2278
2279         # session close api test
2280         self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
2281                                              port_out1,
2282                                              self.pg1.remote_ip4n,
2283                                              external_port)
2284         dms = self.vapi.snat_det_map_dump()
2285         self.assertEqual(dms[0].ses_num, 1)
2286
2287         self.vapi.snat_det_close_session_in(host0.ip4n,
2288                                             port_in,
2289                                             self.pg1.remote_ip4n,
2290                                             external_port)
2291         dms = self.vapi.snat_det_map_dump()
2292         self.assertEqual(dms[0].ses_num, 0)
2293
2294     def test_tcp_session_close_detection_in(self):
2295         """ CGNAT TCP session close initiated from inside network """
2296         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2297                                    32,
2298                                    socket.inet_aton(self.snat_addr),
2299                                    32)
2300         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2301         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2302                                                  is_inside=0)
2303
2304         self.initiate_tcp_session(self.pg0, self.pg1)
2305
2306         # close the session from inside
2307         try:
2308             # FIN packet in -> out
2309             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2310                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2311                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2312                      flags="F"))
2313             self.pg0.add_stream(p)
2314             self.pg_enable_capture(self.pg_interfaces)
2315             self.pg_start()
2316             self.pg1.get_capture(1)
2317
2318             pkts = []
2319
2320             # ACK packet out -> in
2321             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2322                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2323                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2324                      flags="A"))
2325             pkts.append(p)
2326
2327             # FIN packet out -> in
2328             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2329                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2330                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2331                      flags="F"))
2332             pkts.append(p)
2333
2334             self.pg1.add_stream(pkts)
2335             self.pg_enable_capture(self.pg_interfaces)
2336             self.pg_start()
2337             self.pg0.get_capture(2)
2338
2339             # ACK packet in -> out
2340             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2341                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2342                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2343                      flags="A"))
2344             self.pg0.add_stream(p)
2345             self.pg_enable_capture(self.pg_interfaces)
2346             self.pg_start()
2347             self.pg1.get_capture(1)
2348
2349             # Check if snat closed the session
2350             dms = self.vapi.snat_det_map_dump()
2351             self.assertEqual(0, dms[0].ses_num)
2352         except:
2353             self.logger.error("TCP session termination failed")
2354             raise
2355
2356     def test_tcp_session_close_detection_out(self):
2357         """ CGNAT TCP session close initiated from outside network """
2358         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2359                                    32,
2360                                    socket.inet_aton(self.snat_addr),
2361                                    32)
2362         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2363         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2364                                                  is_inside=0)
2365
2366         self.initiate_tcp_session(self.pg0, self.pg1)
2367
2368         # close the session from outside
2369         try:
2370             # FIN packet out -> in
2371             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2372                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2373                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2374                      flags="F"))
2375             self.pg1.add_stream(p)
2376             self.pg_enable_capture(self.pg_interfaces)
2377             self.pg_start()
2378             self.pg0.get_capture(1)
2379
2380             pkts = []
2381
2382             # ACK packet in -> out
2383             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2384                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2385                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2386                      flags="A"))
2387             pkts.append(p)
2388
2389             # ACK packet in -> out
2390             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2391                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2392                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2393                      flags="F"))
2394             pkts.append(p)
2395
2396             self.pg0.add_stream(pkts)
2397             self.pg_enable_capture(self.pg_interfaces)
2398             self.pg_start()
2399             self.pg1.get_capture(2)
2400
2401             # ACK packet out -> in
2402             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2403                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2404                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2405                      flags="A"))
2406             self.pg1.add_stream(p)
2407             self.pg_enable_capture(self.pg_interfaces)
2408             self.pg_start()
2409             self.pg0.get_capture(1)
2410
2411             # Check if snat closed the session
2412             dms = self.vapi.snat_det_map_dump()
2413             self.assertEqual(0, dms[0].ses_num)
2414         except:
2415             self.logger.error("TCP session termination failed")
2416             raise
2417
2418     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2419     def test_session_timeout(self):
2420         """ CGNAT session timeouts """
2421         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2422                                    32,
2423                                    socket.inet_aton(self.snat_addr),
2424                                    32)
2425         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2426         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2427                                                  is_inside=0)
2428
2429         self.initiate_tcp_session(self.pg0, self.pg1)
2430         self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2431         pkts = self.create_stream_in(self.pg0, self.pg1)
2432         self.pg0.add_stream(pkts)
2433         self.pg_enable_capture(self.pg_interfaces)
2434         self.pg_start()
2435         capture = self.pg1.get_capture(len(pkts))
2436         sleep(15)
2437
2438         dms = self.vapi.snat_det_map_dump()
2439         self.assertEqual(0, dms[0].ses_num)
2440
2441     def test_session_limit_per_user(self):
2442         """ CGNAT maximum 1000 sessions per user should be created """
2443         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2444                                    32,
2445                                    socket.inet_aton(self.snat_addr),
2446                                    32)
2447         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2448         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2449                                                  is_inside=0)
2450         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2451                                      src_address=self.pg2.local_ip4n,
2452                                      path_mtu=512,
2453                                      template_interval=10)
2454         self.vapi.snat_ipfix()
2455
2456         pkts = []
2457         for port in range(1025, 2025):
2458             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2459                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2460                  UDP(sport=port, dport=port))
2461             pkts.append(p)
2462
2463         self.pg0.add_stream(pkts)
2464         self.pg_enable_capture(self.pg_interfaces)
2465         self.pg_start()
2466         capture = self.pg1.get_capture(len(pkts))
2467
2468         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2469              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2470              UDP(sport=3001, dport=3002))
2471         self.pg0.add_stream(p)
2472         self.pg_enable_capture(self.pg_interfaces)
2473         self.pg_start()
2474         capture = self.pg1.assert_nothing_captured()
2475
2476         # verify ICMP error packet
2477         capture = self.pg0.get_capture(1)
2478         p = capture[0]
2479         self.assertTrue(p.haslayer(ICMP))
2480         icmp = p[ICMP]
2481         self.assertEqual(icmp.type, 3)
2482         self.assertEqual(icmp.code, 1)
2483         self.assertTrue(icmp.haslayer(IPerror))
2484         inner_ip = icmp[IPerror]
2485         self.assertEqual(inner_ip[UDPerror].sport, 3001)
2486         self.assertEqual(inner_ip[UDPerror].dport, 3002)
2487
2488         dms = self.vapi.snat_det_map_dump()
2489
2490         self.assertEqual(1000, dms[0].ses_num)
2491
2492         # verify IPFIX logging
2493         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2494         capture = self.pg2.get_capture(2)
2495         ipfix = IPFIXDecoder()
2496         # first load template
2497         for p in capture:
2498             self.assertTrue(p.haslayer(IPFIX))
2499             if p.haslayer(Template):
2500                 ipfix.add_template(p.getlayer(Template))
2501         # verify events in data set
2502         for p in capture:
2503             if p.haslayer(Data):
2504                 data = ipfix.decode_data_set(p.getlayer(Set))
2505                 self.verify_ipfix_max_entries_per_user(data)
2506
2507     def clear_snat(self):
2508         """
2509         Clear SNAT configuration.
2510         """
2511         self.vapi.snat_ipfix(enable=0)
2512         self.vapi.snat_det_set_timeouts()
2513         deterministic_mappings = self.vapi.snat_det_map_dump()
2514         for dsm in deterministic_mappings:
2515             self.vapi.snat_add_det_map(dsm.in_addr,
2516                                        dsm.in_plen,
2517                                        dsm.out_addr,
2518                                        dsm.out_plen,
2519                                        is_add=0)
2520
2521         interfaces = self.vapi.snat_interface_dump()
2522         for intf in interfaces:
2523             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2524                                                      intf.is_inside,
2525                                                      is_add=0)
2526
2527     def tearDown(self):
2528         super(TestDeterministicNAT, self).tearDown()
2529         if not self.vpp_dead:
2530             self.logger.info(self.vapi.cli("show snat detail"))
2531             self.clear_snat()
2532
2533
2534 class TestNAT64(MethodHolder):
2535     """ NAT64 Test Cases """
2536
2537     @classmethod
2538     def setUpClass(cls):
2539         super(TestNAT64, cls).setUpClass()
2540
2541         try:
2542             cls.tcp_port_in = 6303
2543             cls.tcp_port_out = 6303
2544             cls.udp_port_in = 6304
2545             cls.udp_port_out = 6304
2546             cls.icmp_id_in = 6305
2547             cls.icmp_id_out = 6305
2548             cls.nat_addr = '10.0.0.3'
2549             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
2550             cls.vrf1_id = 10
2551             cls.vrf1_nat_addr = '10.0.10.3'
2552             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
2553                                                    cls.vrf1_nat_addr)
2554
2555             cls.create_pg_interfaces(range(3))
2556             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
2557             cls.ip6_interfaces.append(cls.pg_interfaces[2])
2558             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
2559
2560             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
2561
2562             cls.pg0.generate_remote_hosts(2)
2563
2564             for i in cls.ip6_interfaces:
2565                 i.admin_up()
2566                 i.config_ip6()
2567                 i.configure_ipv6_neighbors()
2568
2569             for i in cls.ip4_interfaces:
2570                 i.admin_up()
2571                 i.config_ip4()
2572                 i.resolve_arp()
2573
2574         except Exception:
2575             super(TestNAT64, cls).tearDownClass()
2576             raise
2577
2578     def test_pool(self):
2579         """ Add/delete address to NAT64 pool """
2580         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
2581
2582         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
2583
2584         addresses = self.vapi.nat64_pool_addr_dump()
2585         self.assertEqual(len(addresses), 1)
2586         self.assertEqual(addresses[0].address, nat_addr)
2587
2588         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
2589
2590         addresses = self.vapi.nat64_pool_addr_dump()
2591         self.assertEqual(len(addresses), 0)
2592
2593     def test_interface(self):
2594         """ Enable/disable NAT64 feature on the interface """
2595         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2596         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2597
2598         interfaces = self.vapi.nat64_interface_dump()
2599         self.assertEqual(len(interfaces), 2)
2600         pg0_found = False
2601         pg1_found = False
2602         for intf in interfaces:
2603             if intf.sw_if_index == self.pg0.sw_if_index:
2604                 self.assertEqual(intf.is_inside, 1)
2605                 pg0_found = True
2606             elif intf.sw_if_index == self.pg1.sw_if_index:
2607                 self.assertEqual(intf.is_inside, 0)
2608                 pg1_found = True
2609         self.assertTrue(pg0_found)
2610         self.assertTrue(pg1_found)
2611
2612         features = self.vapi.cli("show interface features pg0")
2613         self.assertNotEqual(features.find('nat64-in2out'), -1)
2614         features = self.vapi.cli("show interface features pg1")
2615         self.assertNotEqual(features.find('nat64-out2in'), -1)
2616
2617         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
2618         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
2619
2620         interfaces = self.vapi.nat64_interface_dump()
2621         self.assertEqual(len(interfaces), 0)
2622
2623     def test_static_bib(self):
2624         """ Add/delete static BIB entry """
2625         in_addr = socket.inet_pton(socket.AF_INET6,
2626                                    '2001:db8:85a3::8a2e:370:7334')
2627         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
2628         in_port = 1234
2629         out_port = 5678
2630         proto = IP_PROTOS.tcp
2631
2632         self.vapi.nat64_add_del_static_bib(in_addr,
2633                                            out_addr,
2634                                            in_port,
2635                                            out_port,
2636                                            proto)
2637         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2638         static_bib_num = 0
2639         for bibe in bib:
2640             if bibe.is_static:
2641                 static_bib_num += 1
2642                 self.assertEqual(bibe.i_addr, in_addr)
2643                 self.assertEqual(bibe.o_addr, out_addr)
2644                 self.assertEqual(bibe.i_port, in_port)
2645                 self.assertEqual(bibe.o_port, out_port)
2646         self.assertEqual(static_bib_num, 1)
2647
2648         self.vapi.nat64_add_del_static_bib(in_addr,
2649                                            out_addr,
2650                                            in_port,
2651                                            out_port,
2652                                            proto,
2653                                            is_add=0)
2654         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2655         static_bib_num = 0
2656         for bibe in bib:
2657             if bibe.is_static:
2658                 static_bib_num += 1
2659         self.assertEqual(static_bib_num, 0)
2660
2661     def test_set_timeouts(self):
2662         """ Set NAT64 timeouts """
2663         # verify default values
2664         timeouts = self.vapi.nat64_get_timeouts()
2665         self.assertEqual(timeouts.udp, 300)
2666         self.assertEqual(timeouts.icmp, 60)
2667         self.assertEqual(timeouts.tcp_trans, 240)
2668         self.assertEqual(timeouts.tcp_est, 7440)
2669         self.assertEqual(timeouts.tcp_incoming_syn, 6)
2670
2671         # set and verify custom values
2672         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
2673                                      tcp_est=7450, tcp_incoming_syn=10)
2674         timeouts = self.vapi.nat64_get_timeouts()
2675         self.assertEqual(timeouts.udp, 200)
2676         self.assertEqual(timeouts.icmp, 30)
2677         self.assertEqual(timeouts.tcp_trans, 250)
2678         self.assertEqual(timeouts.tcp_est, 7450)
2679         self.assertEqual(timeouts.tcp_incoming_syn, 10)
2680
2681     def test_dynamic(self):
2682         """ NAT64 dynamic translation test """
2683         self.tcp_port_in = 6303
2684         self.udp_port_in = 6304
2685         self.icmp_id_in = 6305
2686
2687         ses_num_start = self.nat64_get_ses_num()
2688
2689         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2690                                                 self.nat_addr_n)
2691         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2692         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2693
2694         # in2out
2695         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2696         self.pg0.add_stream(pkts)
2697         self.pg_enable_capture(self.pg_interfaces)
2698         self.pg_start()
2699         capture = self.pg1.get_capture(len(pkts))
2700         self.verify_capture_out(capture, nat_ip=self.nat_addr,
2701                                 dst_ip=self.pg1.remote_ip4)
2702
2703         # out2in
2704         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2705         self.pg1.add_stream(pkts)
2706         self.pg_enable_capture(self.pg_interfaces)
2707         self.pg_start()
2708         capture = self.pg0.get_capture(len(pkts))
2709         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2710         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2711
2712         # in2out
2713         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2714         self.pg0.add_stream(pkts)
2715         self.pg_enable_capture(self.pg_interfaces)
2716         self.pg_start()
2717         capture = self.pg1.get_capture(len(pkts))
2718         self.verify_capture_out(capture, nat_ip=self.nat_addr,
2719                                 dst_ip=self.pg1.remote_ip4)
2720
2721         # out2in
2722         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2723         self.pg1.add_stream(pkts)
2724         self.pg_enable_capture(self.pg_interfaces)
2725         self.pg_start()
2726         capture = self.pg0.get_capture(len(pkts))
2727         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2728
2729         ses_num_end = self.nat64_get_ses_num()
2730
2731         self.assertEqual(ses_num_end - ses_num_start, 3)
2732
2733         # tenant with specific VRF
2734         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
2735                                                 self.vrf1_nat_addr_n,
2736                                                 vrf_id=self.vrf1_id)
2737         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
2738
2739         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
2740         self.pg2.add_stream(pkts)
2741         self.pg_enable_capture(self.pg_interfaces)
2742         self.pg_start()
2743         capture = self.pg1.get_capture(len(pkts))
2744         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
2745                                 dst_ip=self.pg1.remote_ip4)
2746
2747         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
2748         self.pg1.add_stream(pkts)
2749         self.pg_enable_capture(self.pg_interfaces)
2750         self.pg_start()
2751         capture = self.pg2.get_capture(len(pkts))
2752         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
2753
2754     def test_static(self):
2755         """ NAT64 static translation test """
2756         self.tcp_port_in = 60303
2757         self.udp_port_in = 60304
2758         self.icmp_id_in = 60305
2759         self.tcp_port_out = 60303
2760         self.udp_port_out = 60304
2761         self.icmp_id_out = 60305
2762
2763         ses_num_start = self.nat64_get_ses_num()
2764
2765         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2766                                                 self.nat_addr_n)
2767         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2768         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2769
2770         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2771                                            self.nat_addr_n,
2772                                            self.tcp_port_in,
2773                                            self.tcp_port_out,
2774                                            IP_PROTOS.tcp)
2775         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2776                                            self.nat_addr_n,
2777                                            self.udp_port_in,
2778                                            self.udp_port_out,
2779                                            IP_PROTOS.udp)
2780         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2781                                            self.nat_addr_n,
2782                                            self.icmp_id_in,
2783                                            self.icmp_id_out,
2784                                            IP_PROTOS.icmp)
2785
2786         # in2out
2787         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2788         self.pg0.add_stream(pkts)
2789         self.pg_enable_capture(self.pg_interfaces)
2790         self.pg_start()
2791         capture = self.pg1.get_capture(len(pkts))
2792         self.verify_capture_out(capture, nat_ip=self.nat_addr,
2793                                 dst_ip=self.pg1.remote_ip4, same_port=True)
2794
2795         # out2in
2796         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2797         self.pg1.add_stream(pkts)
2798         self.pg_enable_capture(self.pg_interfaces)
2799         self.pg_start()
2800         capture = self.pg0.get_capture(len(pkts))
2801         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2802         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2803
2804         ses_num_end = self.nat64_get_ses_num()
2805
2806         self.assertEqual(ses_num_end - ses_num_start, 3)
2807
2808     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2809     def test_session_timeout(self):
2810         """ NAT64 session timeout """
2811         self.icmp_id_in = 1234
2812         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2813                                                 self.nat_addr_n)
2814         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2815         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2816         self.vapi.nat64_set_timeouts(icmp=5)
2817
2818         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2819         self.pg0.add_stream(pkts)
2820         self.pg_enable_capture(self.pg_interfaces)
2821         self.pg_start()
2822         capture = self.pg1.get_capture(len(pkts))
2823
2824         ses_num_before_timeout = self.nat64_get_ses_num()
2825
2826         sleep(15)
2827
2828         # ICMP session after timeout
2829         ses_num_after_timeout = self.nat64_get_ses_num()
2830         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
2831
2832     def test_icmp_error(self):
2833         """ NAT64 ICMP Error message translation """
2834         self.tcp_port_in = 6303
2835         self.udp_port_in = 6304
2836         self.icmp_id_in = 6305
2837
2838         ses_num_start = self.nat64_get_ses_num()
2839
2840         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2841                                                 self.nat_addr_n)
2842         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2843         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2844
2845         # send some packets to create sessions
2846         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2847         self.pg0.add_stream(pkts)
2848         self.pg_enable_capture(self.pg_interfaces)
2849         self.pg_start()
2850         capture_ip4 = self.pg1.get_capture(len(pkts))
2851         self.verify_capture_out(capture_ip4,
2852                                 nat_ip=self.nat_addr,
2853                                 dst_ip=self.pg1.remote_ip4)
2854
2855         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2856         self.pg1.add_stream(pkts)
2857         self.pg_enable_capture(self.pg_interfaces)
2858         self.pg_start()
2859         capture_ip6 = self.pg0.get_capture(len(pkts))
2860         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2861         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
2862                                    self.pg0.remote_ip6)
2863
2864         # in2out
2865         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2866                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
2867                 ICMPv6DestUnreach(code=1) /
2868                 packet[IPv6] for packet in capture_ip6]
2869         self.pg0.add_stream(pkts)
2870         self.pg_enable_capture(self.pg_interfaces)
2871         self.pg_start()
2872         capture = self.pg1.get_capture(len(pkts))
2873         for packet in capture:
2874             try:
2875                 self.assertEqual(packet[IP].src, self.nat_addr)
2876                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2877                 self.assertEqual(packet[ICMP].type, 3)
2878                 self.assertEqual(packet[ICMP].code, 13)
2879                 inner = packet[IPerror]
2880                 self.assertEqual(inner.src, self.pg1.remote_ip4)
2881                 self.assertEqual(inner.dst, self.nat_addr)
2882                 self.check_icmp_checksum(packet)
2883                 if inner.haslayer(TCPerror):
2884                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
2885                 elif inner.haslayer(UDPerror):
2886                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
2887                 else:
2888                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
2889             except:
2890                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2891                 raise
2892
2893         # out2in
2894         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2895                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2896                 ICMP(type=3, code=13) /
2897                 packet[IP] for packet in capture_ip4]
2898         self.pg1.add_stream(pkts)
2899         self.pg_enable_capture(self.pg_interfaces)
2900         self.pg_start()
2901         capture = self.pg0.get_capture(len(pkts))
2902         for packet in capture:
2903             try:
2904                 self.assertEqual(packet[IPv6].src, ip.src)
2905                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
2906                 icmp = packet[ICMPv6DestUnreach]
2907                 self.assertEqual(icmp.code, 1)
2908                 inner = icmp[IPerror6]
2909                 self.assertEqual(inner.src, self.pg0.remote_ip6)
2910                 self.assertEqual(inner.dst, ip.src)
2911                 self.check_icmpv6_checksum(packet)
2912                 if inner.haslayer(TCPerror):
2913                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
2914                 elif inner.haslayer(UDPerror):
2915                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
2916                 else:
2917                     self.assertEqual(inner[ICMPv6EchoRequest].id,
2918                                      self.icmp_id_in)
2919             except:
2920                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2921                 raise
2922
2923     def test_hairpinning(self):
2924         """ NAT64 hairpinning """
2925
2926         client = self.pg0.remote_hosts[0]
2927         server = self.pg0.remote_hosts[1]
2928         server_tcp_in_port = 22
2929         server_tcp_out_port = 4022
2930         server_udp_in_port = 23
2931         server_udp_out_port = 4023
2932         client_tcp_in_port = 1234
2933         client_udp_in_port = 1235
2934         client_tcp_out_port = 0
2935         client_udp_out_port = 0
2936         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
2937         nat_addr_ip6 = ip.src
2938
2939         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2940                                                 self.nat_addr_n)
2941         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2942         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2943
2944         self.vapi.nat64_add_del_static_bib(server.ip6n,
2945                                            self.nat_addr_n,
2946                                            server_tcp_in_port,
2947                                            server_tcp_out_port,
2948                                            IP_PROTOS.tcp)
2949         self.vapi.nat64_add_del_static_bib(server.ip6n,
2950                                            self.nat_addr_n,
2951                                            server_udp_in_port,
2952                                            server_udp_out_port,
2953                                            IP_PROTOS.udp)
2954
2955         # client to server
2956         pkts = []
2957         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2958              IPv6(src=client.ip6, dst=nat_addr_ip6) /
2959              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
2960         pkts.append(p)
2961         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2962              IPv6(src=client.ip6, dst=nat_addr_ip6) /
2963              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
2964         pkts.append(p)
2965         self.pg0.add_stream(pkts)
2966         self.pg_enable_capture(self.pg_interfaces)
2967         self.pg_start()
2968         capture = self.pg0.get_capture(len(pkts))
2969         for packet in capture:
2970             try:
2971                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
2972                 self.assertEqual(packet[IPv6].dst, server.ip6)
2973                 if packet.haslayer(TCP):
2974                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
2975                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
2976                     self.check_tcp_checksum(packet)
2977                     client_tcp_out_port = packet[TCP].sport
2978                 else:
2979                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
2980                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
2981                     self.check_udp_checksum(packet)
2982                     client_udp_out_port = packet[UDP].sport
2983             except:
2984                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2985                 raise
2986
2987         # server to client
2988         pkts = []
2989         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2990              IPv6(src=server.ip6, dst=nat_addr_ip6) /
2991              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
2992         pkts.append(p)
2993         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2994              IPv6(src=server.ip6, dst=nat_addr_ip6) /
2995              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
2996         pkts.append(p)
2997         self.pg0.add_stream(pkts)
2998         self.pg_enable_capture(self.pg_interfaces)
2999         self.pg_start()
3000         capture = self.pg0.get_capture(len(pkts))
3001         for packet in capture:
3002             try:
3003                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3004                 self.assertEqual(packet[IPv6].dst, client.ip6)
3005                 if packet.haslayer(TCP):
3006                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3007                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3008                     self.check_tcp_checksum(packet)
3009                 else:
3010                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
3011                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
3012                     self.check_udp_checksum(packet)
3013             except:
3014                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3015                 raise
3016
3017         # ICMP error
3018         pkts = []
3019         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3020                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3021                 ICMPv6DestUnreach(code=1) /
3022                 packet[IPv6] for packet in capture]
3023         self.pg0.add_stream(pkts)
3024         self.pg_enable_capture(self.pg_interfaces)
3025         self.pg_start()
3026         capture = self.pg0.get_capture(len(pkts))
3027         for packet in capture:
3028             try:
3029                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3030                 self.assertEqual(packet[IPv6].dst, server.ip6)
3031                 icmp = packet[ICMPv6DestUnreach]
3032                 self.assertEqual(icmp.code, 1)
3033                 inner = icmp[IPerror6]
3034                 self.assertEqual(inner.src, server.ip6)
3035                 self.assertEqual(inner.dst, nat_addr_ip6)
3036                 self.check_icmpv6_checksum(packet)
3037                 if inner.haslayer(TCPerror):
3038                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3039                     self.assertEqual(inner[TCPerror].dport,
3040                                      client_tcp_out_port)
3041                 else:
3042                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3043                     self.assertEqual(inner[UDPerror].dport,
3044                                      client_udp_out_port)
3045             except:
3046                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3047                 raise
3048
3049     def nat64_get_ses_num(self):
3050         """
3051         Return number of active NAT64 sessions.
3052         """
3053         ses_num = 0
3054         st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
3055         ses_num += len(st)
3056         st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
3057         ses_num += len(st)
3058         st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
3059         ses_num += len(st)
3060         return ses_num
3061
3062     def clear_nat64(self):
3063         """
3064         Clear NAT64 configuration.
3065         """
3066         self.vapi.nat64_set_timeouts()
3067
3068         interfaces = self.vapi.nat64_interface_dump()
3069         for intf in interfaces:
3070             self.vapi.nat64_add_del_interface(intf.sw_if_index,
3071                                               intf.is_inside,
3072                                               is_add=0)
3073
3074         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3075         for bibe in bib:
3076             if bibe.is_static:
3077                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3078                                                    bibe.o_addr,
3079                                                    bibe.i_port,
3080                                                    bibe.o_port,
3081                                                    bibe.proto,
3082                                                    bibe.vrf_id,
3083                                                    is_add=0)
3084
3085         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3086         for bibe in bib:
3087             if bibe.is_static:
3088                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3089                                                    bibe.o_addr,
3090                                                    bibe.i_port,
3091                                                    bibe.o_port,
3092                                                    bibe.proto,
3093                                                    bibe.vrf_id,
3094                                                    is_add=0)
3095
3096         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3097         for bibe in bib:
3098             if bibe.is_static:
3099                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3100                                                    bibe.o_addr,
3101                                                    bibe.i_port,
3102                                                    bibe.o_port,
3103                                                    bibe.proto,
3104                                                    bibe.vrf_id,
3105                                                    is_add=0)
3106
3107         adresses = self.vapi.nat64_pool_addr_dump()
3108         for addr in adresses:
3109             self.vapi.nat64_add_del_pool_addr_range(addr.address,
3110                                                     addr.address,
3111                                                     vrf_id=addr.vrf_id,
3112                                                     is_add=0)
3113
3114     def tearDown(self):
3115         super(TestNAT64, self).tearDown()
3116         if not self.vpp_dead:
3117             self.logger.info(self.vapi.cli("show nat64 pool"))
3118             self.logger.info(self.vapi.cli("show nat64 interfaces"))
3119             self.logger.info(self.vapi.cli("show nat64 bib tcp"))
3120             self.logger.info(self.vapi.cli("show nat64 bib udp"))
3121             self.logger.info(self.vapi.cli("show nat64 bib icmp"))
3122             self.logger.info(self.vapi.cli("show nat64 session table tcp"))
3123             self.logger.info(self.vapi.cli("show nat64 session table udp"))
3124             self.logger.info(self.vapi.cli("show nat64 session table icmp"))
3125             self.clear_nat64()
3126
3127 if __name__ == '__main__':
3128     unittest.main(testRunner=VppTestRunner)