ARP: ignore non-connected routes and non-interface sources when determing if source...
[vpp.git] / test / test_snat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
15 from util import ppp
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
18
19
20 class MethodHolder(VppTestCase):
21     """ SNAT create capture and verify method holder """
22
23     @classmethod
24     def setUpClass(cls):
25         super(MethodHolder, cls).setUpClass()
26
27     def tearDown(self):
28         super(MethodHolder, self).tearDown()
29
30     def check_ip_checksum(self, pkt):
31         """
32         Check IP checksum of the packet
33
34         :param pkt: Packet to check IP checksum
35         """
36         new = pkt.__class__(str(pkt))
37         del new['IP'].chksum
38         new = new.__class__(str(new))
39         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
40
41     def check_tcp_checksum(self, pkt):
42         """
43         Check TCP checksum in IP packet
44
45         :param pkt: Packet to check TCP checksum
46         """
47         new = pkt.__class__(str(pkt))
48         del new['TCP'].chksum
49         new = new.__class__(str(new))
50         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
51
52     def check_udp_checksum(self, pkt):
53         """
54         Check UDP checksum in IP packet
55
56         :param pkt: Packet to check UDP checksum
57         """
58         new = pkt.__class__(str(pkt))
59         del new['UDP'].chksum
60         new = new.__class__(str(new))
61         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
62
63     def check_icmp_errror_embedded(self, pkt):
64         """
65         Check ICMP error embeded packet checksum
66
67         :param pkt: Packet to check ICMP error embeded packet checksum
68         """
69         if pkt.haslayer(IPerror):
70             new = pkt.__class__(str(pkt))
71             del new['IPerror'].chksum
72             new = new.__class__(str(new))
73             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
74
75         if pkt.haslayer(TCPerror):
76             new = pkt.__class__(str(pkt))
77             del new['TCPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
80
81         if pkt.haslayer(UDPerror):
82             if pkt['UDPerror'].chksum != 0:
83                 new = pkt.__class__(str(pkt))
84                 del new['UDPerror'].chksum
85                 new = new.__class__(str(new))
86                 self.assertEqual(new['UDPerror'].chksum,
87                                  pkt['UDPerror'].chksum)
88
89         if pkt.haslayer(ICMPerror):
90             del new['ICMPerror'].chksum
91             new = new.__class__(str(new))
92             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
93
94     def check_icmp_checksum(self, pkt):
95         """
96         Check ICMP checksum in IPv4 packet
97
98         :param pkt: Packet to check ICMP checksum
99         """
100         new = pkt.__class__(str(pkt))
101         del new['ICMP'].chksum
102         new = new.__class__(str(new))
103         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
104         if pkt.haslayer(IPerror):
105             self.check_icmp_errror_embedded(pkt)
106
107     def check_icmpv6_checksum(self, pkt):
108         """
109         Check ICMPv6 checksum in IPv4 packet
110
111         :param pkt: Packet to check ICMPv6 checksum
112         """
113         new = pkt.__class__(str(pkt))
114         if pkt.haslayer(ICMPv6DestUnreach):
115             del new['ICMPv6DestUnreach'].cksum
116             new = new.__class__(str(new))
117             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
118                              pkt['ICMPv6DestUnreach'].cksum)
119             self.check_icmp_errror_embedded(pkt)
120         if pkt.haslayer(ICMPv6EchoRequest):
121             del new['ICMPv6EchoRequest'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
124                              pkt['ICMPv6EchoRequest'].cksum)
125         if pkt.haslayer(ICMPv6EchoReply):
126             del new['ICMPv6EchoReply'].cksum
127             new = new.__class__(str(new))
128             self.assertEqual(new['ICMPv6EchoReply'].cksum,
129                              pkt['ICMPv6EchoReply'].cksum)
130
131     def create_stream_in(self, in_if, out_if, ttl=64):
132         """
133         Create packet stream for inside network
134
135         :param in_if: Inside interface
136         :param out_if: Outside interface
137         :param ttl: TTL of generated packets
138         """
139         pkts = []
140         # TCP
141         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
142              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
143              TCP(sport=self.tcp_port_in, dport=20))
144         pkts.append(p)
145
146         # UDP
147         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149              UDP(sport=self.udp_port_in, dport=20))
150         pkts.append(p)
151
152         # ICMP
153         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155              ICMP(id=self.icmp_id_in, type='echo-request'))
156         pkts.append(p)
157
158         return pkts
159
160     def create_stream_in_ip6(self, in_if, out_if, hlim=64):
161         """
162         Create IPv6 packet stream for inside network
163
164         :param in_if: Inside interface
165         :param out_if: Outside interface
166         :param ttl: Hop Limit of generated packets
167         """
168         pkts = []
169         dst = ''.join(['64:ff9b::', out_if.remote_ip4])
170         # TCP
171         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
172              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
173              TCP(sport=self.tcp_port_in, dport=20))
174         pkts.append(p)
175
176         # UDP
177         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
178              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
179              UDP(sport=self.udp_port_in, dport=20))
180         pkts.append(p)
181
182         # ICMP
183         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
184              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
185              ICMPv6EchoRequest(id=self.icmp_id_in))
186         pkts.append(p)
187
188         return pkts
189
190     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
191         """
192         Create packet stream for outside network
193
194         :param out_if: Outside interface
195         :param dst_ip: Destination IP address (Default use global SNAT address)
196         :param ttl: TTL of generated packets
197         """
198         if dst_ip is None:
199             dst_ip = self.snat_addr
200         pkts = []
201         # TCP
202         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
203              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
204              TCP(dport=self.tcp_port_out, sport=20))
205         pkts.append(p)
206
207         # UDP
208         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
209              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
210              UDP(dport=self.udp_port_out, sport=20))
211         pkts.append(p)
212
213         # ICMP
214         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
215              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
216              ICMP(id=self.icmp_id_out, type='echo-reply'))
217         pkts.append(p)
218
219         return pkts
220
221     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
222                            packet_num=3, dst_ip=None):
223         """
224         Verify captured packets on outside network
225
226         :param capture: Captured packets
227         :param nat_ip: Translated IP address (Default use global SNAT address)
228         :param same_port: Sorce port number is not translated (Default False)
229         :param packet_num: Expected number of packets (Default 3)
230         :param dst_ip: Destination IP address (Default do not verify)
231         """
232         if nat_ip is None:
233             nat_ip = self.snat_addr
234         self.assertEqual(packet_num, len(capture))
235         for packet in capture:
236             try:
237                 self.check_ip_checksum(packet)
238                 self.assertEqual(packet[IP].src, nat_ip)
239                 if dst_ip is not None:
240                     self.assertEqual(packet[IP].dst, dst_ip)
241                 if packet.haslayer(TCP):
242                     if same_port:
243                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
244                     else:
245                         self.assertNotEqual(
246                             packet[TCP].sport, self.tcp_port_in)
247                     self.tcp_port_out = packet[TCP].sport
248                     self.check_tcp_checksum(packet)
249                 elif packet.haslayer(UDP):
250                     if same_port:
251                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
252                     else:
253                         self.assertNotEqual(
254                             packet[UDP].sport, self.udp_port_in)
255                     self.udp_port_out = packet[UDP].sport
256                 else:
257                     if same_port:
258                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
259                     else:
260                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
261                     self.icmp_id_out = packet[ICMP].id
262                     self.check_icmp_checksum(packet)
263             except:
264                 self.logger.error(ppp("Unexpected or invalid packet "
265                                       "(outside network):", packet))
266                 raise
267
268     def verify_capture_in(self, capture, in_if, packet_num=3):
269         """
270         Verify captured packets on inside network
271
272         :param capture: Captured packets
273         :param in_if: Inside interface
274         :param packet_num: Expected number of packets (Default 3)
275         """
276         self.assertEqual(packet_num, len(capture))
277         for packet in capture:
278             try:
279                 self.check_ip_checksum(packet)
280                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
281                 if packet.haslayer(TCP):
282                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
283                     self.check_tcp_checksum(packet)
284                 elif packet.haslayer(UDP):
285                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
286                 else:
287                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
288                     self.check_icmp_checksum(packet)
289             except:
290                 self.logger.error(ppp("Unexpected or invalid packet "
291                                       "(inside network):", packet))
292                 raise
293
294     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
295         """
296         Verify captured IPv6 packets on inside network
297
298         :param capture: Captured packets
299         :param src_ip: Source IP
300         :param dst_ip: Destination IP address
301         :param packet_num: Expected number of packets (Default 3)
302         """
303         self.assertEqual(packet_num, len(capture))
304         for packet in capture:
305             try:
306                 self.assertEqual(packet[IPv6].src, src_ip)
307                 self.assertEqual(packet[IPv6].dst, dst_ip)
308                 if packet.haslayer(TCP):
309                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
310                     self.check_tcp_checksum(packet)
311                 elif packet.haslayer(UDP):
312                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
313                     self.check_udp_checksum(packet)
314                 else:
315                     self.assertEqual(packet[ICMPv6EchoReply].id,
316                                      self.icmp_id_in)
317                     self.check_icmpv6_checksum(packet)
318             except:
319                 self.logger.error(ppp("Unexpected or invalid packet "
320                                       "(inside network):", packet))
321                 raise
322
323     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
324         """
325         Verify captured packet that don't have to be translated
326
327         :param capture: Captured packets
328         :param ingress_if: Ingress interface
329         :param egress_if: Egress interface
330         """
331         for packet in capture:
332             try:
333                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
334                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
335                 if packet.haslayer(TCP):
336                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
337                 elif packet.haslayer(UDP):
338                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
339                 else:
340                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
341             except:
342                 self.logger.error(ppp("Unexpected or invalid packet "
343                                       "(inside network):", packet))
344                 raise
345
346     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
347                                             packet_num=3, icmp_type=11):
348         """
349         Verify captured packets with ICMP errors on outside network
350
351         :param capture: Captured packets
352         :param src_ip: Translated IP address or IP address of VPP
353                        (Default use global SNAT address)
354         :param packet_num: Expected number of packets (Default 3)
355         :param icmp_type: Type of error ICMP packet
356                           we are expecting (Default 11)
357         """
358         if src_ip is None:
359             src_ip = self.snat_addr
360         self.assertEqual(packet_num, len(capture))
361         for packet in capture:
362             try:
363                 self.assertEqual(packet[IP].src, src_ip)
364                 self.assertTrue(packet.haslayer(ICMP))
365                 icmp = packet[ICMP]
366                 self.assertEqual(icmp.type, icmp_type)
367                 self.assertTrue(icmp.haslayer(IPerror))
368                 inner_ip = icmp[IPerror]
369                 if inner_ip.haslayer(TCPerror):
370                     self.assertEqual(inner_ip[TCPerror].dport,
371                                      self.tcp_port_out)
372                 elif inner_ip.haslayer(UDPerror):
373                     self.assertEqual(inner_ip[UDPerror].dport,
374                                      self.udp_port_out)
375                 else:
376                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
377             except:
378                 self.logger.error(ppp("Unexpected or invalid packet "
379                                       "(outside network):", packet))
380                 raise
381
382     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
383                                            icmp_type=11):
384         """
385         Verify captured packets with ICMP errors on inside network
386
387         :param capture: Captured packets
388         :param in_if: Inside interface
389         :param packet_num: Expected number of packets (Default 3)
390         :param icmp_type: Type of error ICMP packet
391                           we are expecting (Default 11)
392         """
393         self.assertEqual(packet_num, len(capture))
394         for packet in capture:
395             try:
396                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
397                 self.assertTrue(packet.haslayer(ICMP))
398                 icmp = packet[ICMP]
399                 self.assertEqual(icmp.type, icmp_type)
400                 self.assertTrue(icmp.haslayer(IPerror))
401                 inner_ip = icmp[IPerror]
402                 if inner_ip.haslayer(TCPerror):
403                     self.assertEqual(inner_ip[TCPerror].sport,
404                                      self.tcp_port_in)
405                 elif inner_ip.haslayer(UDPerror):
406                     self.assertEqual(inner_ip[UDPerror].sport,
407                                      self.udp_port_in)
408                 else:
409                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
410             except:
411                 self.logger.error(ppp("Unexpected or invalid packet "
412                                       "(inside network):", packet))
413                 raise
414
415     def verify_ipfix_nat44_ses(self, data):
416         """
417         Verify IPFIX NAT44 session create/delete event
418
419         :param data: Decoded IPFIX data records
420         """
421         nat44_ses_create_num = 0
422         nat44_ses_delete_num = 0
423         self.assertEqual(6, len(data))
424         for record in data:
425             # natEvent
426             self.assertIn(ord(record[230]), [4, 5])
427             if ord(record[230]) == 4:
428                 nat44_ses_create_num += 1
429             else:
430                 nat44_ses_delete_num += 1
431             # sourceIPv4Address
432             self.assertEqual(self.pg0.remote_ip4n, record[8])
433             # postNATSourceIPv4Address
434             self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
435                              record[225])
436             # ingressVRFID
437             self.assertEqual(struct.pack("!I", 0), record[234])
438             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
439             if IP_PROTOS.icmp == ord(record[4]):
440                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
441                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
442                                  record[227])
443             elif IP_PROTOS.tcp == ord(record[4]):
444                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
445                                  record[7])
446                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
447                                  record[227])
448             elif IP_PROTOS.udp == ord(record[4]):
449                 self.assertEqual(struct.pack("!H", self.udp_port_in),
450                                  record[7])
451                 self.assertEqual(struct.pack("!H", self.udp_port_out),
452                                  record[227])
453             else:
454                 self.fail("Invalid protocol")
455         self.assertEqual(3, nat44_ses_create_num)
456         self.assertEqual(3, nat44_ses_delete_num)
457
458     def verify_ipfix_addr_exhausted(self, data):
459         """
460         Verify IPFIX NAT addresses event
461
462         :param data: Decoded IPFIX data records
463         """
464         self.assertEqual(1, len(data))
465         record = data[0]
466         # natEvent
467         self.assertEqual(ord(record[230]), 3)
468         # natPoolID
469         self.assertEqual(struct.pack("!I", 0), record[283])
470
471
472 class TestSNAT(MethodHolder):
473     """ SNAT Test Cases """
474
475     @classmethod
476     def setUpClass(cls):
477         super(TestSNAT, cls).setUpClass()
478
479         try:
480             cls.tcp_port_in = 6303
481             cls.tcp_port_out = 6303
482             cls.udp_port_in = 6304
483             cls.udp_port_out = 6304
484             cls.icmp_id_in = 6305
485             cls.icmp_id_out = 6305
486             cls.snat_addr = '10.0.0.3'
487             cls.ipfix_src_port = 4739
488             cls.ipfix_domain_id = 1
489
490             cls.create_pg_interfaces(range(9))
491             cls.interfaces = list(cls.pg_interfaces[0:4])
492
493             for i in cls.interfaces:
494                 i.admin_up()
495                 i.config_ip4()
496                 i.resolve_arp()
497
498             cls.pg0.generate_remote_hosts(3)
499             cls.pg0.configure_ipv4_neighbors()
500
501             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
502
503             cls.pg4._local_ip4 = "172.16.255.1"
504             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
505             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
506             cls.pg4.set_table_ip4(10)
507             cls.pg5._local_ip4 = "172.16.255.3"
508             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
509             cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
510             cls.pg5.set_table_ip4(10)
511             cls.pg6._local_ip4 = "172.16.255.1"
512             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
513             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
514             cls.pg6.set_table_ip4(20)
515             for i in cls.overlapping_interfaces:
516                 i.config_ip4()
517                 i.admin_up()
518                 i.resolve_arp()
519
520             cls.pg7.admin_up()
521             cls.pg8.admin_up()
522
523         except Exception:
524             super(TestSNAT, cls).tearDownClass()
525             raise
526
527     def clear_snat(self):
528         """
529         Clear SNAT configuration.
530         """
531         # I found no elegant way to do this
532         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
533                                    dst_address_length=32,
534                                    next_hop_address=self.pg7.remote_ip4n,
535                                    next_hop_sw_if_index=self.pg7.sw_if_index,
536                                    is_add=0)
537         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
538                                    dst_address_length=32,
539                                    next_hop_address=self.pg8.remote_ip4n,
540                                    next_hop_sw_if_index=self.pg8.sw_if_index,
541                                    is_add=0)
542
543         for intf in [self.pg7, self.pg8]:
544             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
545             for n in neighbors:
546                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
547                                               n.mac_address,
548                                               n.ip_address,
549                                               is_add=0)
550
551         if self.pg7.has_ip4_config:
552             self.pg7.unconfig_ip4()
553
554         interfaces = self.vapi.snat_interface_addr_dump()
555         for intf in interfaces:
556             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
557
558         self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
559                              domain_id=self.ipfix_domain_id)
560         self.ipfix_src_port = 4739
561         self.ipfix_domain_id = 1
562
563         interfaces = self.vapi.snat_interface_dump()
564         for intf in interfaces:
565             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
566                                                      intf.is_inside,
567                                                      is_add=0)
568
569         static_mappings = self.vapi.snat_static_mapping_dump()
570         for sm in static_mappings:
571             self.vapi.snat_add_static_mapping(sm.local_ip_address,
572                                               sm.external_ip_address,
573                                               local_port=sm.local_port,
574                                               external_port=sm.external_port,
575                                               addr_only=sm.addr_only,
576                                               vrf_id=sm.vrf_id,
577                                               protocol=sm.protocol,
578                                               is_add=0)
579
580         adresses = self.vapi.snat_address_dump()
581         for addr in adresses:
582             self.vapi.snat_add_address_range(addr.ip_address,
583                                              addr.ip_address,
584                                              is_add=0)
585
586     def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
587                                 local_port=0, external_port=0, vrf_id=0,
588                                 is_add=1, external_sw_if_index=0xFFFFFFFF,
589                                 proto=0):
590         """
591         Add/delete S-NAT static mapping
592
593         :param local_ip: Local IP address
594         :param external_ip: External IP address
595         :param local_port: Local port number (Optional)
596         :param external_port: External port number (Optional)
597         :param vrf_id: VRF ID (Default 0)
598         :param is_add: 1 if add, 0 if delete (Default add)
599         :param external_sw_if_index: External interface instead of IP address
600         :param proto: IP protocol (Mandatory if port specified)
601         """
602         addr_only = 1
603         if local_port and external_port:
604             addr_only = 0
605         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
606         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
607         self.vapi.snat_add_static_mapping(
608             l_ip,
609             e_ip,
610             external_sw_if_index,
611             local_port,
612             external_port,
613             addr_only,
614             vrf_id,
615             proto,
616             is_add)
617
618     def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
619         """
620         Add/delete S-NAT address
621
622         :param ip: IP address
623         :param is_add: 1 if add, 0 if delete (Default add)
624         """
625         snat_addr = socket.inet_pton(socket.AF_INET, ip)
626         self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
627                                          vrf_id=vrf_id)
628
629     def test_dynamic(self):
630         """ SNAT dynamic translation test """
631
632         self.snat_add_address(self.snat_addr)
633         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
634         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
635                                                  is_inside=0)
636
637         # in2out
638         pkts = self.create_stream_in(self.pg0, self.pg1)
639         self.pg0.add_stream(pkts)
640         self.pg_enable_capture(self.pg_interfaces)
641         self.pg_start()
642         capture = self.pg1.get_capture(len(pkts))
643         self.verify_capture_out(capture)
644
645         # out2in
646         pkts = self.create_stream_out(self.pg1)
647         self.pg1.add_stream(pkts)
648         self.pg_enable_capture(self.pg_interfaces)
649         self.pg_start()
650         capture = self.pg0.get_capture(len(pkts))
651         self.verify_capture_in(capture, self.pg0)
652
653     def test_dynamic_icmp_errors_in2out_ttl_1(self):
654         """ SNAT handling of client packets with TTL=1 """
655
656         self.snat_add_address(self.snat_addr)
657         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
658         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
659                                                  is_inside=0)
660
661         # Client side - generate traffic
662         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
663         self.pg0.add_stream(pkts)
664         self.pg_enable_capture(self.pg_interfaces)
665         self.pg_start()
666
667         # Client side - verify ICMP type 11 packets
668         capture = self.pg0.get_capture(len(pkts))
669         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
670
671     def test_dynamic_icmp_errors_out2in_ttl_1(self):
672         """ SNAT handling of server packets with TTL=1 """
673
674         self.snat_add_address(self.snat_addr)
675         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
676         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
677                                                  is_inside=0)
678
679         # Client side - create sessions
680         pkts = self.create_stream_in(self.pg0, self.pg1)
681         self.pg0.add_stream(pkts)
682         self.pg_enable_capture(self.pg_interfaces)
683         self.pg_start()
684
685         # Server side - generate traffic
686         capture = self.pg1.get_capture(len(pkts))
687         self.verify_capture_out(capture)
688         pkts = self.create_stream_out(self.pg1, ttl=1)
689         self.pg1.add_stream(pkts)
690         self.pg_enable_capture(self.pg_interfaces)
691         self.pg_start()
692
693         # Server side - verify ICMP type 11 packets
694         capture = self.pg1.get_capture(len(pkts))
695         self.verify_capture_out_with_icmp_errors(capture,
696                                                  src_ip=self.pg1.local_ip4)
697
698     def test_dynamic_icmp_errors_in2out_ttl_2(self):
699         """ SNAT handling of error responses to client packets with TTL=2 """
700
701         self.snat_add_address(self.snat_addr)
702         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
703         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
704                                                  is_inside=0)
705
706         # Client side - generate traffic
707         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
708         self.pg0.add_stream(pkts)
709         self.pg_enable_capture(self.pg_interfaces)
710         self.pg_start()
711
712         # Server side - simulate ICMP type 11 response
713         capture = self.pg1.get_capture(len(pkts))
714         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
715                 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
716                 ICMP(type=11) / packet[IP] for packet in capture]
717         self.pg1.add_stream(pkts)
718         self.pg_enable_capture(self.pg_interfaces)
719         self.pg_start()
720
721         # Client side - verify ICMP type 11 packets
722         capture = self.pg0.get_capture(len(pkts))
723         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
724
725     def test_dynamic_icmp_errors_out2in_ttl_2(self):
726         """ SNAT handling of error responses to server packets with TTL=2 """
727
728         self.snat_add_address(self.snat_addr)
729         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
730         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
731                                                  is_inside=0)
732
733         # Client side - create sessions
734         pkts = self.create_stream_in(self.pg0, self.pg1)
735         self.pg0.add_stream(pkts)
736         self.pg_enable_capture(self.pg_interfaces)
737         self.pg_start()
738
739         # Server side - generate traffic
740         capture = self.pg1.get_capture(len(pkts))
741         self.verify_capture_out(capture)
742         pkts = self.create_stream_out(self.pg1, ttl=2)
743         self.pg1.add_stream(pkts)
744         self.pg_enable_capture(self.pg_interfaces)
745         self.pg_start()
746
747         # Client side - simulate ICMP type 11 response
748         capture = self.pg0.get_capture(len(pkts))
749         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
750                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
751                 ICMP(type=11) / packet[IP] for packet in capture]
752         self.pg0.add_stream(pkts)
753         self.pg_enable_capture(self.pg_interfaces)
754         self.pg_start()
755
756         # Server side - verify ICMP type 11 packets
757         capture = self.pg1.get_capture(len(pkts))
758         self.verify_capture_out_with_icmp_errors(capture)
759
760     def test_ping_out_interface_from_outside(self):
761         """ Ping SNAT out interface from outside network """
762
763         self.snat_add_address(self.snat_addr)
764         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
765         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
766                                                  is_inside=0)
767
768         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
769              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
770              ICMP(id=self.icmp_id_out, type='echo-request'))
771         pkts = [p]
772         self.pg1.add_stream(pkts)
773         self.pg_enable_capture(self.pg_interfaces)
774         self.pg_start()
775         capture = self.pg1.get_capture(len(pkts))
776         self.assertEqual(1, len(capture))
777         packet = capture[0]
778         try:
779             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
780             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
781             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
782             self.assertEqual(packet[ICMP].type, 0)  # echo reply
783         except:
784             self.logger.error(ppp("Unexpected or invalid packet "
785                                   "(outside network):", packet))
786             raise
787
788     def test_ping_internal_host_from_outside(self):
789         """ Ping internal host from outside network """
790
791         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
792         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
793         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
794                                                  is_inside=0)
795
796         # out2in
797         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
798                IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
799                ICMP(id=self.icmp_id_out, type='echo-request'))
800         self.pg1.add_stream(pkt)
801         self.pg_enable_capture(self.pg_interfaces)
802         self.pg_start()
803         capture = self.pg0.get_capture(1)
804         self.verify_capture_in(capture, self.pg0, packet_num=1)
805         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
806
807         # in2out
808         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
809                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
810                ICMP(id=self.icmp_id_in, type='echo-reply'))
811         self.pg0.add_stream(pkt)
812         self.pg_enable_capture(self.pg_interfaces)
813         self.pg_start()
814         capture = self.pg1.get_capture(1)
815         self.verify_capture_out(capture, same_port=True, packet_num=1)
816         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
817
818     def test_static_in(self):
819         """ SNAT 1:1 NAT initialized from inside network """
820
821         nat_ip = "10.0.0.10"
822         self.tcp_port_out = 6303
823         self.udp_port_out = 6304
824         self.icmp_id_out = 6305
825
826         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
827         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
828         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
829                                                  is_inside=0)
830
831         # in2out
832         pkts = self.create_stream_in(self.pg0, self.pg1)
833         self.pg0.add_stream(pkts)
834         self.pg_enable_capture(self.pg_interfaces)
835         self.pg_start()
836         capture = self.pg1.get_capture(len(pkts))
837         self.verify_capture_out(capture, nat_ip, True)
838
839         # out2in
840         pkts = self.create_stream_out(self.pg1, nat_ip)
841         self.pg1.add_stream(pkts)
842         self.pg_enable_capture(self.pg_interfaces)
843         self.pg_start()
844         capture = self.pg0.get_capture(len(pkts))
845         self.verify_capture_in(capture, self.pg0)
846
847     def test_static_out(self):
848         """ SNAT 1:1 NAT initialized from outside network """
849
850         nat_ip = "10.0.0.20"
851         self.tcp_port_out = 6303
852         self.udp_port_out = 6304
853         self.icmp_id_out = 6305
854
855         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
856         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
857         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
858                                                  is_inside=0)
859
860         # out2in
861         pkts = self.create_stream_out(self.pg1, nat_ip)
862         self.pg1.add_stream(pkts)
863         self.pg_enable_capture(self.pg_interfaces)
864         self.pg_start()
865         capture = self.pg0.get_capture(len(pkts))
866         self.verify_capture_in(capture, self.pg0)
867
868         # in2out
869         pkts = self.create_stream_in(self.pg0, self.pg1)
870         self.pg0.add_stream(pkts)
871         self.pg_enable_capture(self.pg_interfaces)
872         self.pg_start()
873         capture = self.pg1.get_capture(len(pkts))
874         self.verify_capture_out(capture, nat_ip, True)
875
876     def test_static_with_port_in(self):
877         """ SNAT 1:1 NAT with port initialized from inside network """
878
879         self.tcp_port_out = 3606
880         self.udp_port_out = 3607
881         self.icmp_id_out = 3608
882
883         self.snat_add_address(self.snat_addr)
884         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
885                                      self.tcp_port_in, self.tcp_port_out,
886                                      proto=IP_PROTOS.tcp)
887         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
888                                      self.udp_port_in, self.udp_port_out,
889                                      proto=IP_PROTOS.udp)
890         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
891                                      self.icmp_id_in, self.icmp_id_out,
892                                      proto=IP_PROTOS.icmp)
893         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
894         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
895                                                  is_inside=0)
896
897         # in2out
898         pkts = self.create_stream_in(self.pg0, self.pg1)
899         self.pg0.add_stream(pkts)
900         self.pg_enable_capture(self.pg_interfaces)
901         self.pg_start()
902         capture = self.pg1.get_capture(len(pkts))
903         self.verify_capture_out(capture)
904
905         # out2in
906         pkts = self.create_stream_out(self.pg1)
907         self.pg1.add_stream(pkts)
908         self.pg_enable_capture(self.pg_interfaces)
909         self.pg_start()
910         capture = self.pg0.get_capture(len(pkts))
911         self.verify_capture_in(capture, self.pg0)
912
913     def test_static_with_port_out(self):
914         """ SNAT 1:1 NAT with port initialized from outside network """
915
916         self.tcp_port_out = 30606
917         self.udp_port_out = 30607
918         self.icmp_id_out = 30608
919
920         self.snat_add_address(self.snat_addr)
921         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
922                                      self.tcp_port_in, self.tcp_port_out,
923                                      proto=IP_PROTOS.tcp)
924         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
925                                      self.udp_port_in, self.udp_port_out,
926                                      proto=IP_PROTOS.udp)
927         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
928                                      self.icmp_id_in, self.icmp_id_out,
929                                      proto=IP_PROTOS.icmp)
930         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
931         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
932                                                  is_inside=0)
933
934         # out2in
935         pkts = self.create_stream_out(self.pg1)
936         self.pg1.add_stream(pkts)
937         self.pg_enable_capture(self.pg_interfaces)
938         self.pg_start()
939         capture = self.pg0.get_capture(len(pkts))
940         self.verify_capture_in(capture, self.pg0)
941
942         # in2out
943         pkts = self.create_stream_in(self.pg0, self.pg1)
944         self.pg0.add_stream(pkts)
945         self.pg_enable_capture(self.pg_interfaces)
946         self.pg_start()
947         capture = self.pg1.get_capture(len(pkts))
948         self.verify_capture_out(capture)
949
950     def test_static_vrf_aware(self):
951         """ SNAT 1:1 NAT VRF awareness """
952
953         nat_ip1 = "10.0.0.30"
954         nat_ip2 = "10.0.0.40"
955         self.tcp_port_out = 6303
956         self.udp_port_out = 6304
957         self.icmp_id_out = 6305
958
959         self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
960                                      vrf_id=10)
961         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
962                                      vrf_id=10)
963         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
964                                                  is_inside=0)
965         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
966         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
967
968         # inside interface VRF match SNAT static mapping VRF
969         pkts = self.create_stream_in(self.pg4, self.pg3)
970         self.pg4.add_stream(pkts)
971         self.pg_enable_capture(self.pg_interfaces)
972         self.pg_start()
973         capture = self.pg3.get_capture(len(pkts))
974         self.verify_capture_out(capture, nat_ip1, True)
975
976         # inside interface VRF don't match SNAT static mapping VRF (packets
977         # are dropped)
978         pkts = self.create_stream_in(self.pg0, self.pg3)
979         self.pg0.add_stream(pkts)
980         self.pg_enable_capture(self.pg_interfaces)
981         self.pg_start()
982         self.pg3.assert_nothing_captured()
983
984     def test_multiple_inside_interfaces(self):
985         """ SNAT multiple inside interfaces (non-overlapping address space) """
986
987         self.snat_add_address(self.snat_addr)
988         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
989         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
990         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
991                                                  is_inside=0)
992
993         # between two S-NAT inside interfaces (no translation)
994         pkts = self.create_stream_in(self.pg0, self.pg1)
995         self.pg0.add_stream(pkts)
996         self.pg_enable_capture(self.pg_interfaces)
997         self.pg_start()
998         capture = self.pg1.get_capture(len(pkts))
999         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1000
1001         # from S-NAT inside to interface without S-NAT feature (no translation)
1002         pkts = self.create_stream_in(self.pg0, self.pg2)
1003         self.pg0.add_stream(pkts)
1004         self.pg_enable_capture(self.pg_interfaces)
1005         self.pg_start()
1006         capture = self.pg2.get_capture(len(pkts))
1007         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1008
1009         # in2out 1st interface
1010         pkts = self.create_stream_in(self.pg0, self.pg3)
1011         self.pg0.add_stream(pkts)
1012         self.pg_enable_capture(self.pg_interfaces)
1013         self.pg_start()
1014         capture = self.pg3.get_capture(len(pkts))
1015         self.verify_capture_out(capture)
1016
1017         # out2in 1st interface
1018         pkts = self.create_stream_out(self.pg3)
1019         self.pg3.add_stream(pkts)
1020         self.pg_enable_capture(self.pg_interfaces)
1021         self.pg_start()
1022         capture = self.pg0.get_capture(len(pkts))
1023         self.verify_capture_in(capture, self.pg0)
1024
1025         # in2out 2nd interface
1026         pkts = self.create_stream_in(self.pg1, self.pg3)
1027         self.pg1.add_stream(pkts)
1028         self.pg_enable_capture(self.pg_interfaces)
1029         self.pg_start()
1030         capture = self.pg3.get_capture(len(pkts))
1031         self.verify_capture_out(capture)
1032
1033         # out2in 2nd interface
1034         pkts = self.create_stream_out(self.pg3)
1035         self.pg3.add_stream(pkts)
1036         self.pg_enable_capture(self.pg_interfaces)
1037         self.pg_start()
1038         capture = self.pg1.get_capture(len(pkts))
1039         self.verify_capture_in(capture, self.pg1)
1040
1041     def test_inside_overlapping_interfaces(self):
1042         """ SNAT multiple inside interfaces with overlapping address space """
1043
1044         static_nat_ip = "10.0.0.10"
1045         self.snat_add_address(self.snat_addr)
1046         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1047                                                  is_inside=0)
1048         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
1049         self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
1050         self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
1051         self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1052                                      vrf_id=20)
1053
1054         # between S-NAT inside interfaces with same VRF (no translation)
1055         pkts = self.create_stream_in(self.pg4, self.pg5)
1056         self.pg4.add_stream(pkts)
1057         self.pg_enable_capture(self.pg_interfaces)
1058         self.pg_start()
1059         capture = self.pg5.get_capture(len(pkts))
1060         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1061
1062         # between S-NAT inside interfaces with different VRF (hairpinning)
1063         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1064              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1065              TCP(sport=1234, dport=5678))
1066         self.pg4.add_stream(p)
1067         self.pg_enable_capture(self.pg_interfaces)
1068         self.pg_start()
1069         capture = self.pg6.get_capture(1)
1070         p = capture[0]
1071         try:
1072             ip = p[IP]
1073             tcp = p[TCP]
1074             self.assertEqual(ip.src, self.snat_addr)
1075             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1076             self.assertNotEqual(tcp.sport, 1234)
1077             self.assertEqual(tcp.dport, 5678)
1078         except:
1079             self.logger.error(ppp("Unexpected or invalid packet:", p))
1080             raise
1081
1082         # in2out 1st interface
1083         pkts = self.create_stream_in(self.pg4, self.pg3)
1084         self.pg4.add_stream(pkts)
1085         self.pg_enable_capture(self.pg_interfaces)
1086         self.pg_start()
1087         capture = self.pg3.get_capture(len(pkts))
1088         self.verify_capture_out(capture)
1089
1090         # out2in 1st interface
1091         pkts = self.create_stream_out(self.pg3)
1092         self.pg3.add_stream(pkts)
1093         self.pg_enable_capture(self.pg_interfaces)
1094         self.pg_start()
1095         capture = self.pg4.get_capture(len(pkts))
1096         self.verify_capture_in(capture, self.pg4)
1097
1098         # in2out 2nd interface
1099         pkts = self.create_stream_in(self.pg5, self.pg3)
1100         self.pg5.add_stream(pkts)
1101         self.pg_enable_capture(self.pg_interfaces)
1102         self.pg_start()
1103         capture = self.pg3.get_capture(len(pkts))
1104         self.verify_capture_out(capture)
1105
1106         # out2in 2nd interface
1107         pkts = self.create_stream_out(self.pg3)
1108         self.pg3.add_stream(pkts)
1109         self.pg_enable_capture(self.pg_interfaces)
1110         self.pg_start()
1111         capture = self.pg5.get_capture(len(pkts))
1112         self.verify_capture_in(capture, self.pg5)
1113
1114         # pg5 session dump
1115         addresses = self.vapi.snat_address_dump()
1116         self.assertEqual(len(addresses), 1)
1117         sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
1118         self.assertEqual(len(sessions), 3)
1119         for session in sessions:
1120             self.assertFalse(session.is_static)
1121             self.assertEqual(session.inside_ip_address[0:4],
1122                              self.pg5.remote_ip4n)
1123             self.assertEqual(session.outside_ip_address,
1124                              addresses[0].ip_address)
1125         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1126         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1127         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1128         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1129         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1130         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1131         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1132         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1133         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1134
1135         # in2out 3rd interface
1136         pkts = self.create_stream_in(self.pg6, self.pg3)
1137         self.pg6.add_stream(pkts)
1138         self.pg_enable_capture(self.pg_interfaces)
1139         self.pg_start()
1140         capture = self.pg3.get_capture(len(pkts))
1141         self.verify_capture_out(capture, static_nat_ip, True)
1142
1143         # out2in 3rd interface
1144         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1145         self.pg3.add_stream(pkts)
1146         self.pg_enable_capture(self.pg_interfaces)
1147         self.pg_start()
1148         capture = self.pg6.get_capture(len(pkts))
1149         self.verify_capture_in(capture, self.pg6)
1150
1151         # general user and session dump verifications
1152         users = self.vapi.snat_user_dump()
1153         self.assertTrue(len(users) >= 3)
1154         addresses = self.vapi.snat_address_dump()
1155         self.assertEqual(len(addresses), 1)
1156         for user in users:
1157             sessions = self.vapi.snat_user_session_dump(user.ip_address,
1158                                                         user.vrf_id)
1159             for session in sessions:
1160                 self.assertEqual(user.ip_address, session.inside_ip_address)
1161                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1162                 self.assertTrue(session.protocol in
1163                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1164                                  IP_PROTOS.icmp])
1165
1166         # pg4 session dump
1167         sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
1168         self.assertTrue(len(sessions) >= 4)
1169         for session in sessions:
1170             self.assertFalse(session.is_static)
1171             self.assertEqual(session.inside_ip_address[0:4],
1172                              self.pg4.remote_ip4n)
1173             self.assertEqual(session.outside_ip_address,
1174                              addresses[0].ip_address)
1175
1176         # pg6 session dump
1177         sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1178         self.assertTrue(len(sessions) >= 3)
1179         for session in sessions:
1180             self.assertTrue(session.is_static)
1181             self.assertEqual(session.inside_ip_address[0:4],
1182                              self.pg6.remote_ip4n)
1183             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1184                              map(int, static_nat_ip.split('.')))
1185             self.assertTrue(session.inside_port in
1186                             [self.tcp_port_in, self.udp_port_in,
1187                              self.icmp_id_in])
1188
1189     def test_hairpinning(self):
1190         """ SNAT hairpinning - 1:1 NAT with port"""
1191
1192         host = self.pg0.remote_hosts[0]
1193         server = self.pg0.remote_hosts[1]
1194         host_in_port = 1234
1195         host_out_port = 0
1196         server_in_port = 5678
1197         server_out_port = 8765
1198
1199         self.snat_add_address(self.snat_addr)
1200         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1201         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1202                                                  is_inside=0)
1203         # add static mapping for server
1204         self.snat_add_static_mapping(server.ip4, self.snat_addr,
1205                                      server_in_port, server_out_port,
1206                                      proto=IP_PROTOS.tcp)
1207
1208         # send packet from host to server
1209         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1210              IP(src=host.ip4, dst=self.snat_addr) /
1211              TCP(sport=host_in_port, dport=server_out_port))
1212         self.pg0.add_stream(p)
1213         self.pg_enable_capture(self.pg_interfaces)
1214         self.pg_start()
1215         capture = self.pg0.get_capture(1)
1216         p = capture[0]
1217         try:
1218             ip = p[IP]
1219             tcp = p[TCP]
1220             self.assertEqual(ip.src, self.snat_addr)
1221             self.assertEqual(ip.dst, server.ip4)
1222             self.assertNotEqual(tcp.sport, host_in_port)
1223             self.assertEqual(tcp.dport, server_in_port)
1224             self.check_tcp_checksum(p)
1225             host_out_port = tcp.sport
1226         except:
1227             self.logger.error(ppp("Unexpected or invalid packet:", p))
1228             raise
1229
1230         # send reply from server to host
1231         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1232              IP(src=server.ip4, dst=self.snat_addr) /
1233              TCP(sport=server_in_port, dport=host_out_port))
1234         self.pg0.add_stream(p)
1235         self.pg_enable_capture(self.pg_interfaces)
1236         self.pg_start()
1237         capture = self.pg0.get_capture(1)
1238         p = capture[0]
1239         try:
1240             ip = p[IP]
1241             tcp = p[TCP]
1242             self.assertEqual(ip.src, self.snat_addr)
1243             self.assertEqual(ip.dst, host.ip4)
1244             self.assertEqual(tcp.sport, server_out_port)
1245             self.assertEqual(tcp.dport, host_in_port)
1246             self.check_tcp_checksum(p)
1247         except:
1248             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1249             raise
1250
1251     def test_hairpinning2(self):
1252         """ SNAT hairpinning - 1:1 NAT"""
1253
1254         server1_nat_ip = "10.0.0.10"
1255         server2_nat_ip = "10.0.0.11"
1256         host = self.pg0.remote_hosts[0]
1257         server1 = self.pg0.remote_hosts[1]
1258         server2 = self.pg0.remote_hosts[2]
1259         server_tcp_port = 22
1260         server_udp_port = 20
1261
1262         self.snat_add_address(self.snat_addr)
1263         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1264         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1265                                                  is_inside=0)
1266
1267         # add static mapping for servers
1268         self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
1269         self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
1270
1271         # host to server1
1272         pkts = []
1273         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1274              IP(src=host.ip4, dst=server1_nat_ip) /
1275              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1276         pkts.append(p)
1277         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1278              IP(src=host.ip4, dst=server1_nat_ip) /
1279              UDP(sport=self.udp_port_in, dport=server_udp_port))
1280         pkts.append(p)
1281         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1282              IP(src=host.ip4, dst=server1_nat_ip) /
1283              ICMP(id=self.icmp_id_in, type='echo-request'))
1284         pkts.append(p)
1285         self.pg0.add_stream(pkts)
1286         self.pg_enable_capture(self.pg_interfaces)
1287         self.pg_start()
1288         capture = self.pg0.get_capture(len(pkts))
1289         for packet in capture:
1290             try:
1291                 self.assertEqual(packet[IP].src, self.snat_addr)
1292                 self.assertEqual(packet[IP].dst, server1.ip4)
1293                 if packet.haslayer(TCP):
1294                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1295                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1296                     self.tcp_port_out = packet[TCP].sport
1297                     self.check_tcp_checksum(packet)
1298                 elif packet.haslayer(UDP):
1299                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1300                     self.assertEqual(packet[UDP].dport, server_udp_port)
1301                     self.udp_port_out = packet[UDP].sport
1302                 else:
1303                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1304                     self.icmp_id_out = packet[ICMP].id
1305             except:
1306                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1307                 raise
1308
1309         # server1 to host
1310         pkts = []
1311         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1312              IP(src=server1.ip4, dst=self.snat_addr) /
1313              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1314         pkts.append(p)
1315         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1316              IP(src=server1.ip4, dst=self.snat_addr) /
1317              UDP(sport=server_udp_port, dport=self.udp_port_out))
1318         pkts.append(p)
1319         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1320              IP(src=server1.ip4, dst=self.snat_addr) /
1321              ICMP(id=self.icmp_id_out, type='echo-reply'))
1322         pkts.append(p)
1323         self.pg0.add_stream(pkts)
1324         self.pg_enable_capture(self.pg_interfaces)
1325         self.pg_start()
1326         capture = self.pg0.get_capture(len(pkts))
1327         for packet in capture:
1328             try:
1329                 self.assertEqual(packet[IP].src, server1_nat_ip)
1330                 self.assertEqual(packet[IP].dst, host.ip4)
1331                 if packet.haslayer(TCP):
1332                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1333                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1334                     self.check_tcp_checksum(packet)
1335                 elif packet.haslayer(UDP):
1336                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1337                     self.assertEqual(packet[UDP].sport, server_udp_port)
1338                 else:
1339                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1340             except:
1341                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1342                 raise
1343
1344         # server2 to server1
1345         pkts = []
1346         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1347              IP(src=server2.ip4, dst=server1_nat_ip) /
1348              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1349         pkts.append(p)
1350         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1351              IP(src=server2.ip4, dst=server1_nat_ip) /
1352              UDP(sport=self.udp_port_in, dport=server_udp_port))
1353         pkts.append(p)
1354         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1355              IP(src=server2.ip4, dst=server1_nat_ip) /
1356              ICMP(id=self.icmp_id_in, type='echo-request'))
1357         pkts.append(p)
1358         self.pg0.add_stream(pkts)
1359         self.pg_enable_capture(self.pg_interfaces)
1360         self.pg_start()
1361         capture = self.pg0.get_capture(len(pkts))
1362         for packet in capture:
1363             try:
1364                 self.assertEqual(packet[IP].src, server2_nat_ip)
1365                 self.assertEqual(packet[IP].dst, server1.ip4)
1366                 if packet.haslayer(TCP):
1367                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1368                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1369                     self.tcp_port_out = packet[TCP].sport
1370                     self.check_tcp_checksum(packet)
1371                 elif packet.haslayer(UDP):
1372                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1373                     self.assertEqual(packet[UDP].dport, server_udp_port)
1374                     self.udp_port_out = packet[UDP].sport
1375                 else:
1376                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1377                     self.icmp_id_out = packet[ICMP].id
1378             except:
1379                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1380                 raise
1381
1382         # server1 to server2
1383         pkts = []
1384         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1385              IP(src=server1.ip4, dst=server2_nat_ip) /
1386              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1387         pkts.append(p)
1388         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1389              IP(src=server1.ip4, dst=server2_nat_ip) /
1390              UDP(sport=server_udp_port, dport=self.udp_port_out))
1391         pkts.append(p)
1392         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1393              IP(src=server1.ip4, dst=server2_nat_ip) /
1394              ICMP(id=self.icmp_id_out, type='echo-reply'))
1395         pkts.append(p)
1396         self.pg0.add_stream(pkts)
1397         self.pg_enable_capture(self.pg_interfaces)
1398         self.pg_start()
1399         capture = self.pg0.get_capture(len(pkts))
1400         for packet in capture:
1401             try:
1402                 self.assertEqual(packet[IP].src, server1_nat_ip)
1403                 self.assertEqual(packet[IP].dst, server2.ip4)
1404                 if packet.haslayer(TCP):
1405                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1406                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1407                     self.check_tcp_checksum(packet)
1408                 elif packet.haslayer(UDP):
1409                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1410                     self.assertEqual(packet[UDP].sport, server_udp_port)
1411                 else:
1412                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1413             except:
1414                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1415                 raise
1416
1417     def test_max_translations_per_user(self):
1418         """ MAX translations per user - recycle the least recently used """
1419
1420         self.snat_add_address(self.snat_addr)
1421         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1422         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1423                                                  is_inside=0)
1424
1425         # get maximum number of translations per user
1426         snat_config = self.vapi.snat_show_config()
1427
1428         # send more than maximum number of translations per user packets
1429         pkts_num = snat_config.max_translations_per_user + 5
1430         pkts = []
1431         for port in range(0, pkts_num):
1432             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1433                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1434                  TCP(sport=1025 + port))
1435             pkts.append(p)
1436         self.pg0.add_stream(pkts)
1437         self.pg_enable_capture(self.pg_interfaces)
1438         self.pg_start()
1439
1440         # verify number of translated packet
1441         self.pg1.get_capture(pkts_num)
1442
1443     def test_interface_addr(self):
1444         """ Acquire SNAT addresses from interface """
1445         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1446
1447         # no address in NAT pool
1448         adresses = self.vapi.snat_address_dump()
1449         self.assertEqual(0, len(adresses))
1450
1451         # configure interface address and check NAT address pool
1452         self.pg7.config_ip4()
1453         adresses = self.vapi.snat_address_dump()
1454         self.assertEqual(1, len(adresses))
1455         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1456
1457         # remove interface address and check NAT address pool
1458         self.pg7.unconfig_ip4()
1459         adresses = self.vapi.snat_address_dump()
1460         self.assertEqual(0, len(adresses))
1461
1462     def test_interface_addr_static_mapping(self):
1463         """ Static mapping with addresses from interface """
1464         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1465         self.snat_add_static_mapping('1.2.3.4',
1466                                      external_sw_if_index=self.pg7.sw_if_index)
1467
1468         # static mappings with external interface
1469         static_mappings = self.vapi.snat_static_mapping_dump()
1470         self.assertEqual(1, len(static_mappings))
1471         self.assertEqual(self.pg7.sw_if_index,
1472                          static_mappings[0].external_sw_if_index)
1473
1474         # configure interface address and check static mappings
1475         self.pg7.config_ip4()
1476         static_mappings = self.vapi.snat_static_mapping_dump()
1477         self.assertEqual(1, len(static_mappings))
1478         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1479                          self.pg7.local_ip4n)
1480         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1481
1482         # remove interface address and check static mappings
1483         self.pg7.unconfig_ip4()
1484         static_mappings = self.vapi.snat_static_mapping_dump()
1485         self.assertEqual(0, len(static_mappings))
1486
1487     def test_ipfix_nat44_sess(self):
1488         """ S-NAT IPFIX logging NAT44 session created/delted """
1489         self.ipfix_domain_id = 10
1490         self.ipfix_src_port = 20202
1491         colector_port = 30303
1492         bind_layers(UDP, IPFIX, dport=30303)
1493         self.snat_add_address(self.snat_addr)
1494         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1495         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1496                                                  is_inside=0)
1497         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1498                                      src_address=self.pg3.local_ip4n,
1499                                      path_mtu=512,
1500                                      template_interval=10,
1501                                      collector_port=colector_port)
1502         self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1503                              src_port=self.ipfix_src_port)
1504
1505         pkts = self.create_stream_in(self.pg0, self.pg1)
1506         self.pg0.add_stream(pkts)
1507         self.pg_enable_capture(self.pg_interfaces)
1508         self.pg_start()
1509         capture = self.pg1.get_capture(len(pkts))
1510         self.verify_capture_out(capture)
1511         self.snat_add_address(self.snat_addr, is_add=0)
1512         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1513         capture = self.pg3.get_capture(3)
1514         ipfix = IPFIXDecoder()
1515         # first load template
1516         for p in capture:
1517             self.assertTrue(p.haslayer(IPFIX))
1518             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1519             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1520             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1521             self.assertEqual(p[UDP].dport, colector_port)
1522             self.assertEqual(p[IPFIX].observationDomainID,
1523                              self.ipfix_domain_id)
1524             if p.haslayer(Template):
1525                 ipfix.add_template(p.getlayer(Template))
1526         # verify events in data set
1527         for p in capture:
1528             if p.haslayer(Data):
1529                 data = ipfix.decode_data_set(p.getlayer(Set))
1530                 self.verify_ipfix_nat44_ses(data)
1531
1532     def test_ipfix_addr_exhausted(self):
1533         """ S-NAT IPFIX logging NAT addresses exhausted """
1534         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1535         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1536                                                  is_inside=0)
1537         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1538                                      src_address=self.pg3.local_ip4n,
1539                                      path_mtu=512,
1540                                      template_interval=10)
1541         self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1542                              src_port=self.ipfix_src_port)
1543
1544         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1545              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1546              TCP(sport=3025))
1547         self.pg0.add_stream(p)
1548         self.pg_enable_capture(self.pg_interfaces)
1549         self.pg_start()
1550         capture = self.pg1.get_capture(0)
1551         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1552         capture = self.pg3.get_capture(3)
1553         ipfix = IPFIXDecoder()
1554         # first load template
1555         for p in capture:
1556             self.assertTrue(p.haslayer(IPFIX))
1557             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1558             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1559             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1560             self.assertEqual(p[UDP].dport, 4739)
1561             self.assertEqual(p[IPFIX].observationDomainID,
1562                              self.ipfix_domain_id)
1563             if p.haslayer(Template):
1564                 ipfix.add_template(p.getlayer(Template))
1565         # verify events in data set
1566         for p in capture:
1567             if p.haslayer(Data):
1568                 data = ipfix.decode_data_set(p.getlayer(Set))
1569                 self.verify_ipfix_addr_exhausted(data)
1570
1571     def test_pool_addr_fib(self):
1572         """ S-NAT add pool addresses to FIB """
1573         static_addr = '10.0.0.10'
1574         self.snat_add_address(self.snat_addr)
1575         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1576         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1577                                                  is_inside=0)
1578         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1579
1580         # SNAT address
1581         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1582              ARP(op=ARP.who_has, pdst=self.snat_addr,
1583                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1584         self.pg1.add_stream(p)
1585         self.pg_enable_capture(self.pg_interfaces)
1586         self.pg_start()
1587         capture = self.pg1.get_capture(1)
1588         self.assertTrue(capture[0].haslayer(ARP))
1589         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1590
1591         # 1:1 NAT address
1592         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1593              ARP(op=ARP.who_has, pdst=static_addr,
1594                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1595         self.pg1.add_stream(p)
1596         self.pg_enable_capture(self.pg_interfaces)
1597         self.pg_start()
1598         capture = self.pg1.get_capture(1)
1599         self.assertTrue(capture[0].haslayer(ARP))
1600         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1601
1602         # send ARP to non-SNAT interface
1603         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1604              ARP(op=ARP.who_has, pdst=self.snat_addr,
1605                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1606         self.pg2.add_stream(p)
1607         self.pg_enable_capture(self.pg_interfaces)
1608         self.pg_start()
1609         capture = self.pg1.get_capture(0)
1610
1611         # remove addresses and verify
1612         self.snat_add_address(self.snat_addr, is_add=0)
1613         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1614                                      is_add=0)
1615
1616         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1617              ARP(op=ARP.who_has, pdst=self.snat_addr,
1618                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1619         self.pg1.add_stream(p)
1620         self.pg_enable_capture(self.pg_interfaces)
1621         self.pg_start()
1622         capture = self.pg1.get_capture(0)
1623
1624         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1625              ARP(op=ARP.who_has, pdst=static_addr,
1626                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1627         self.pg1.add_stream(p)
1628         self.pg_enable_capture(self.pg_interfaces)
1629         self.pg_start()
1630         capture = self.pg1.get_capture(0)
1631
1632     def test_vrf_mode(self):
1633         """ S-NAT tenant VRF aware address pool mode """
1634
1635         vrf_id1 = 1
1636         vrf_id2 = 2
1637         nat_ip1 = "10.0.0.10"
1638         nat_ip2 = "10.0.0.11"
1639
1640         self.pg0.unconfig_ip4()
1641         self.pg1.unconfig_ip4()
1642         self.pg0.set_table_ip4(vrf_id1)
1643         self.pg1.set_table_ip4(vrf_id2)
1644         self.pg0.config_ip4()
1645         self.pg1.config_ip4()
1646
1647         self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1648         self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1649         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1650         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1651         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1652                                                  is_inside=0)
1653
1654         # first VRF
1655         pkts = self.create_stream_in(self.pg0, self.pg2)
1656         self.pg0.add_stream(pkts)
1657         self.pg_enable_capture(self.pg_interfaces)
1658         self.pg_start()
1659         capture = self.pg2.get_capture(len(pkts))
1660         self.verify_capture_out(capture, nat_ip1)
1661
1662         # second VRF
1663         pkts = self.create_stream_in(self.pg1, self.pg2)
1664         self.pg1.add_stream(pkts)
1665         self.pg_enable_capture(self.pg_interfaces)
1666         self.pg_start()
1667         capture = self.pg2.get_capture(len(pkts))
1668         self.verify_capture_out(capture, nat_ip2)
1669
1670     def test_vrf_feature_independent(self):
1671         """ S-NAT tenant VRF independent address pool mode """
1672
1673         nat_ip1 = "10.0.0.10"
1674         nat_ip2 = "10.0.0.11"
1675
1676         self.snat_add_address(nat_ip1)
1677         self.snat_add_address(nat_ip2)
1678         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1679         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1680         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1681                                                  is_inside=0)
1682
1683         # first VRF
1684         pkts = self.create_stream_in(self.pg0, self.pg2)
1685         self.pg0.add_stream(pkts)
1686         self.pg_enable_capture(self.pg_interfaces)
1687         self.pg_start()
1688         capture = self.pg2.get_capture(len(pkts))
1689         self.verify_capture_out(capture, nat_ip1)
1690
1691         # second VRF
1692         pkts = self.create_stream_in(self.pg1, self.pg2)
1693         self.pg1.add_stream(pkts)
1694         self.pg_enable_capture(self.pg_interfaces)
1695         self.pg_start()
1696         capture = self.pg2.get_capture(len(pkts))
1697         self.verify_capture_out(capture, nat_ip1)
1698
1699     def test_dynamic_ipless_interfaces(self):
1700         """ SNAT interfaces without configured ip dynamic map """
1701
1702         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1703                                       self.pg7.remote_mac,
1704                                       self.pg7.remote_ip4n,
1705                                       is_static=1)
1706         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1707                                       self.pg8.remote_mac,
1708                                       self.pg8.remote_ip4n,
1709                                       is_static=1)
1710
1711         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1712                                    dst_address_length=32,
1713                                    next_hop_address=self.pg7.remote_ip4n,
1714                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1715         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1716                                    dst_address_length=32,
1717                                    next_hop_address=self.pg8.remote_ip4n,
1718                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1719
1720         self.snat_add_address(self.snat_addr)
1721         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1722         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1723                                                  is_inside=0)
1724
1725         # in2out
1726         pkts = self.create_stream_in(self.pg7, self.pg8)
1727         self.pg7.add_stream(pkts)
1728         self.pg_enable_capture(self.pg_interfaces)
1729         self.pg_start()
1730         capture = self.pg8.get_capture(len(pkts))
1731         self.verify_capture_out(capture)
1732
1733         # out2in
1734         pkts = self.create_stream_out(self.pg8, self.snat_addr)
1735         self.pg8.add_stream(pkts)
1736         self.pg_enable_capture(self.pg_interfaces)
1737         self.pg_start()
1738         capture = self.pg7.get_capture(len(pkts))
1739         self.verify_capture_in(capture, self.pg7)
1740
1741     def test_static_ipless_interfaces(self):
1742         """ SNAT 1:1 NAT interfaces without configured ip """
1743
1744         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1745                                       self.pg7.remote_mac,
1746                                       self.pg7.remote_ip4n,
1747                                       is_static=1)
1748         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1749                                       self.pg8.remote_mac,
1750                                       self.pg8.remote_ip4n,
1751                                       is_static=1)
1752
1753         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1754                                    dst_address_length=32,
1755                                    next_hop_address=self.pg7.remote_ip4n,
1756                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1757         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1758                                    dst_address_length=32,
1759                                    next_hop_address=self.pg8.remote_ip4n,
1760                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1761
1762         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1763         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1764         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1765                                                  is_inside=0)
1766
1767         # out2in
1768         pkts = self.create_stream_out(self.pg8)
1769         self.pg8.add_stream(pkts)
1770         self.pg_enable_capture(self.pg_interfaces)
1771         self.pg_start()
1772         capture = self.pg7.get_capture(len(pkts))
1773         self.verify_capture_in(capture, self.pg7)
1774
1775         # in2out
1776         pkts = self.create_stream_in(self.pg7, self.pg8)
1777         self.pg7.add_stream(pkts)
1778         self.pg_enable_capture(self.pg_interfaces)
1779         self.pg_start()
1780         capture = self.pg8.get_capture(len(pkts))
1781         self.verify_capture_out(capture, self.snat_addr, True)
1782
1783     def test_static_with_port_ipless_interfaces(self):
1784         """ SNAT 1:1 NAT with port interfaces without configured ip """
1785
1786         self.tcp_port_out = 30606
1787         self.udp_port_out = 30607
1788         self.icmp_id_out = 30608
1789
1790         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1791                                       self.pg7.remote_mac,
1792                                       self.pg7.remote_ip4n,
1793                                       is_static=1)
1794         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1795                                       self.pg8.remote_mac,
1796                                       self.pg8.remote_ip4n,
1797                                       is_static=1)
1798
1799         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1800                                    dst_address_length=32,
1801                                    next_hop_address=self.pg7.remote_ip4n,
1802                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1803         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1804                                    dst_address_length=32,
1805                                    next_hop_address=self.pg8.remote_ip4n,
1806                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1807
1808         self.snat_add_address(self.snat_addr)
1809         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1810                                      self.tcp_port_in, self.tcp_port_out,
1811                                      proto=IP_PROTOS.tcp)
1812         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1813                                      self.udp_port_in, self.udp_port_out,
1814                                      proto=IP_PROTOS.udp)
1815         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1816                                      self.icmp_id_in, self.icmp_id_out,
1817                                      proto=IP_PROTOS.icmp)
1818         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1819         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1820                                                  is_inside=0)
1821
1822         # out2in
1823         pkts = self.create_stream_out(self.pg8)
1824         self.pg8.add_stream(pkts)
1825         self.pg_enable_capture(self.pg_interfaces)
1826         self.pg_start()
1827         capture = self.pg7.get_capture(len(pkts))
1828         self.verify_capture_in(capture, self.pg7)
1829
1830         # in2out
1831         pkts = self.create_stream_in(self.pg7, self.pg8)
1832         self.pg7.add_stream(pkts)
1833         self.pg_enable_capture(self.pg_interfaces)
1834         self.pg_start()
1835         capture = self.pg8.get_capture(len(pkts))
1836         self.verify_capture_out(capture)
1837
1838     def test_static_unknown_proto(self):
1839         """ 1:1 NAT translate packet with unknown protocol """
1840         nat_ip = "10.0.0.10"
1841         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1842         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1843         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1844                                                  is_inside=0)
1845
1846         # in2out
1847         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1848              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1849              GRE() /
1850              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
1851              TCP(sport=1234, dport=1234))
1852         self.pg0.add_stream(p)
1853         self.pg_enable_capture(self.pg_interfaces)
1854         self.pg_start()
1855         p = self.pg1.get_capture(1)
1856         packet = p[0]
1857         try:
1858             self.assertEqual(packet[IP].src, nat_ip)
1859             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1860             self.assertTrue(packet.haslayer(GRE))
1861             self.check_ip_checksum(packet)
1862         except:
1863             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1864             raise
1865
1866         # out2in
1867         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1868              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1869              GRE() /
1870              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
1871              TCP(sport=1234, dport=1234))
1872         self.pg1.add_stream(p)
1873         self.pg_enable_capture(self.pg_interfaces)
1874         self.pg_start()
1875         p = self.pg0.get_capture(1)
1876         packet = p[0]
1877         try:
1878             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
1879             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
1880             self.assertTrue(packet.haslayer(GRE))
1881             self.check_ip_checksum(packet)
1882         except:
1883             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1884             raise
1885
1886     def test_hairpinning_unknown_proto(self):
1887         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
1888
1889         host = self.pg0.remote_hosts[0]
1890         server = self.pg0.remote_hosts[1]
1891
1892         host_nat_ip = "10.0.0.10"
1893         server_nat_ip = "10.0.0.11"
1894
1895         self.snat_add_static_mapping(host.ip4, host_nat_ip)
1896         self.snat_add_static_mapping(server.ip4, server_nat_ip)
1897         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1898         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1899                                                  is_inside=0)
1900
1901         # host to server
1902         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
1903              IP(src=host.ip4, dst=server_nat_ip) /
1904              GRE() /
1905              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
1906              TCP(sport=1234, dport=1234))
1907         self.pg0.add_stream(p)
1908         self.pg_enable_capture(self.pg_interfaces)
1909         self.pg_start()
1910         p = self.pg0.get_capture(1)
1911         packet = p[0]
1912         try:
1913             self.assertEqual(packet[IP].src, host_nat_ip)
1914             self.assertEqual(packet[IP].dst, server.ip4)
1915             self.assertTrue(packet.haslayer(GRE))
1916             self.check_ip_checksum(packet)
1917         except:
1918             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1919             raise
1920
1921         # server to host
1922         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
1923              IP(src=server.ip4, dst=host_nat_ip) /
1924              GRE() /
1925              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
1926              TCP(sport=1234, dport=1234))
1927         self.pg0.add_stream(p)
1928         self.pg_enable_capture(self.pg_interfaces)
1929         self.pg_start()
1930         p = self.pg0.get_capture(1)
1931         packet = p[0]
1932         try:
1933             self.assertEqual(packet[IP].src, server_nat_ip)
1934             self.assertEqual(packet[IP].dst, host.ip4)
1935             self.assertTrue(packet.haslayer(GRE))
1936             self.check_ip_checksum(packet)
1937         except:
1938             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1939             raise
1940
1941     def tearDown(self):
1942         super(TestSNAT, self).tearDown()
1943         if not self.vpp_dead:
1944             self.logger.info(self.vapi.cli("show snat verbose"))
1945             self.clear_snat()
1946
1947
1948 class TestDeterministicNAT(MethodHolder):
1949     """ Deterministic NAT Test Cases """
1950
1951     @classmethod
1952     def setUpConstants(cls):
1953         super(TestDeterministicNAT, cls).setUpConstants()
1954         cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1955
1956     @classmethod
1957     def setUpClass(cls):
1958         super(TestDeterministicNAT, cls).setUpClass()
1959
1960         try:
1961             cls.tcp_port_in = 6303
1962             cls.tcp_external_port = 6303
1963             cls.udp_port_in = 6304
1964             cls.udp_external_port = 6304
1965             cls.icmp_id_in = 6305
1966             cls.snat_addr = '10.0.0.3'
1967
1968             cls.create_pg_interfaces(range(3))
1969             cls.interfaces = list(cls.pg_interfaces)
1970
1971             for i in cls.interfaces:
1972                 i.admin_up()
1973                 i.config_ip4()
1974                 i.resolve_arp()
1975
1976             cls.pg0.generate_remote_hosts(2)
1977             cls.pg0.configure_ipv4_neighbors()
1978
1979         except Exception:
1980             super(TestDeterministicNAT, cls).tearDownClass()
1981             raise
1982
1983     def create_stream_in(self, in_if, out_if, ttl=64):
1984         """
1985         Create packet stream for inside network
1986
1987         :param in_if: Inside interface
1988         :param out_if: Outside interface
1989         :param ttl: TTL of generated packets
1990         """
1991         pkts = []
1992         # TCP
1993         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1994              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1995              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1996         pkts.append(p)
1997
1998         # UDP
1999         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2000              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2001              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2002         pkts.append(p)
2003
2004         # ICMP
2005         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2006              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2007              ICMP(id=self.icmp_id_in, type='echo-request'))
2008         pkts.append(p)
2009
2010         return pkts
2011
2012     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2013         """
2014         Create packet stream for outside network
2015
2016         :param out_if: Outside interface
2017         :param dst_ip: Destination IP address (Default use global SNAT address)
2018         :param ttl: TTL of generated packets
2019         """
2020         if dst_ip is None:
2021             dst_ip = self.snat_addr
2022         pkts = []
2023         # TCP
2024         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2025              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2026              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2027         pkts.append(p)
2028
2029         # UDP
2030         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2031              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2032              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2033         pkts.append(p)
2034
2035         # ICMP
2036         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2037              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2038              ICMP(id=self.icmp_external_id, type='echo-reply'))
2039         pkts.append(p)
2040
2041         return pkts
2042
2043     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2044         """
2045         Verify captured packets on outside network
2046
2047         :param capture: Captured packets
2048         :param nat_ip: Translated IP address (Default use global SNAT address)
2049         :param same_port: Sorce port number is not translated (Default False)
2050         :param packet_num: Expected number of packets (Default 3)
2051         """
2052         if nat_ip is None:
2053             nat_ip = self.snat_addr
2054         self.assertEqual(packet_num, len(capture))
2055         for packet in capture:
2056             try:
2057                 self.assertEqual(packet[IP].src, nat_ip)
2058                 if packet.haslayer(TCP):
2059                     self.tcp_port_out = packet[TCP].sport
2060                 elif packet.haslayer(UDP):
2061                     self.udp_port_out = packet[UDP].sport
2062                 else:
2063                     self.icmp_external_id = packet[ICMP].id
2064             except:
2065                 self.logger.error(ppp("Unexpected or invalid packet "
2066                                       "(outside network):", packet))
2067                 raise
2068
2069     def initiate_tcp_session(self, in_if, out_if):
2070         """
2071         Initiates TCP session
2072
2073         :param in_if: Inside interface
2074         :param out_if: Outside interface
2075         """
2076         try:
2077             # SYN packet in->out
2078             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2079                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2080                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2081                      flags="S"))
2082             in_if.add_stream(p)
2083             self.pg_enable_capture(self.pg_interfaces)
2084             self.pg_start()
2085             capture = out_if.get_capture(1)
2086             p = capture[0]
2087             self.tcp_port_out = p[TCP].sport
2088
2089             # SYN + ACK packet out->in
2090             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2091                  IP(src=out_if.remote_ip4, dst=self.snat_addr) /
2092                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2093                      flags="SA"))
2094             out_if.add_stream(p)
2095             self.pg_enable_capture(self.pg_interfaces)
2096             self.pg_start()
2097             in_if.get_capture(1)
2098
2099             # ACK packet in->out
2100             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2101                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2102                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2103                      flags="A"))
2104             in_if.add_stream(p)
2105             self.pg_enable_capture(self.pg_interfaces)
2106             self.pg_start()
2107             out_if.get_capture(1)
2108
2109         except:
2110             self.logger.error("TCP 3 way handshake failed")
2111             raise
2112
2113     def verify_ipfix_max_entries_per_user(self, data):
2114         """
2115         Verify IPFIX maximum entries per user exceeded event
2116
2117         :param data: Decoded IPFIX data records
2118         """
2119         self.assertEqual(1, len(data))
2120         record = data[0]
2121         # natEvent
2122         self.assertEqual(ord(record[230]), 13)
2123         # natQuotaExceededEvent
2124         self.assertEqual('\x03\x00\x00\x00', record[466])
2125         # sourceIPv4Address
2126         self.assertEqual(self.pg0.remote_ip4n, record[8])
2127
2128     def test_deterministic_mode(self):
2129         """ S-NAT run deterministic mode """
2130         in_addr = '172.16.255.0'
2131         out_addr = '172.17.255.50'
2132         in_addr_t = '172.16.255.20'
2133         in_addr_n = socket.inet_aton(in_addr)
2134         out_addr_n = socket.inet_aton(out_addr)
2135         in_addr_t_n = socket.inet_aton(in_addr_t)
2136         in_plen = 24
2137         out_plen = 32
2138
2139         snat_config = self.vapi.snat_show_config()
2140         self.assertEqual(1, snat_config.deterministic)
2141
2142         self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
2143
2144         rep1 = self.vapi.snat_det_forward(in_addr_t_n)
2145         self.assertEqual(rep1.out_addr[:4], out_addr_n)
2146         rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
2147         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2148
2149         deterministic_mappings = self.vapi.snat_det_map_dump()
2150         self.assertEqual(len(deterministic_mappings), 1)
2151         dsm = deterministic_mappings[0]
2152         self.assertEqual(in_addr_n, dsm.in_addr[:4])
2153         self.assertEqual(in_plen, dsm.in_plen)
2154         self.assertEqual(out_addr_n, dsm.out_addr[:4])
2155         self.assertEqual(out_plen, dsm.out_plen)
2156
2157         self.clear_snat()
2158         deterministic_mappings = self.vapi.snat_det_map_dump()
2159         self.assertEqual(len(deterministic_mappings), 0)
2160
2161     def test_set_timeouts(self):
2162         """ Set deterministic NAT timeouts """
2163         timeouts_before = self.vapi.snat_det_get_timeouts()
2164
2165         self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
2166                                         timeouts_before.tcp_established + 10,
2167                                         timeouts_before.tcp_transitory + 10,
2168                                         timeouts_before.icmp + 10)
2169
2170         timeouts_after = self.vapi.snat_det_get_timeouts()
2171
2172         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2173         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2174         self.assertNotEqual(timeouts_before.tcp_established,
2175                             timeouts_after.tcp_established)
2176         self.assertNotEqual(timeouts_before.tcp_transitory,
2177                             timeouts_after.tcp_transitory)
2178
2179     def test_det_in(self):
2180         """ CGNAT translation test (TCP, UDP, ICMP) """
2181
2182         nat_ip = "10.0.0.10"
2183
2184         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2185                                    32,
2186                                    socket.inet_aton(nat_ip),
2187                                    32)
2188         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2189         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2190                                                  is_inside=0)
2191
2192         # in2out
2193         pkts = self.create_stream_in(self.pg0, self.pg1)
2194         self.pg0.add_stream(pkts)
2195         self.pg_enable_capture(self.pg_interfaces)
2196         self.pg_start()
2197         capture = self.pg1.get_capture(len(pkts))
2198         self.verify_capture_out(capture, nat_ip)
2199
2200         # out2in
2201         pkts = self.create_stream_out(self.pg1, nat_ip)
2202         self.pg1.add_stream(pkts)
2203         self.pg_enable_capture(self.pg_interfaces)
2204         self.pg_start()
2205         capture = self.pg0.get_capture(len(pkts))
2206         self.verify_capture_in(capture, self.pg0)
2207
2208         # session dump test
2209         sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
2210         self.assertEqual(len(sessions), 3)
2211
2212         # TCP session
2213         s = sessions[0]
2214         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2215         self.assertEqual(s.in_port, self.tcp_port_in)
2216         self.assertEqual(s.out_port, self.tcp_port_out)
2217         self.assertEqual(s.ext_port, self.tcp_external_port)
2218
2219         # UDP session
2220         s = sessions[1]
2221         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2222         self.assertEqual(s.in_port, self.udp_port_in)
2223         self.assertEqual(s.out_port, self.udp_port_out)
2224         self.assertEqual(s.ext_port, self.udp_external_port)
2225
2226         # ICMP session
2227         s = sessions[2]
2228         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2229         self.assertEqual(s.in_port, self.icmp_id_in)
2230         self.assertEqual(s.out_port, self.icmp_external_id)
2231
2232     def test_multiple_users(self):
2233         """ CGNAT multiple users """
2234
2235         nat_ip = "10.0.0.10"
2236         port_in = 80
2237         external_port = 6303
2238
2239         host0 = self.pg0.remote_hosts[0]
2240         host1 = self.pg0.remote_hosts[1]
2241
2242         self.vapi.snat_add_det_map(host0.ip4n,
2243                                    24,
2244                                    socket.inet_aton(nat_ip),
2245                                    32)
2246         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2247         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2248                                                  is_inside=0)
2249
2250         # host0 to out
2251         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2252              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2253              TCP(sport=port_in, dport=external_port))
2254         self.pg0.add_stream(p)
2255         self.pg_enable_capture(self.pg_interfaces)
2256         self.pg_start()
2257         capture = self.pg1.get_capture(1)
2258         p = capture[0]
2259         try:
2260             ip = p[IP]
2261             tcp = p[TCP]
2262             self.assertEqual(ip.src, nat_ip)
2263             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2264             self.assertEqual(tcp.dport, external_port)
2265             port_out0 = tcp.sport
2266         except:
2267             self.logger.error(ppp("Unexpected or invalid packet:", p))
2268             raise
2269
2270         # host1 to out
2271         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2272              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2273              TCP(sport=port_in, dport=external_port))
2274         self.pg0.add_stream(p)
2275         self.pg_enable_capture(self.pg_interfaces)
2276         self.pg_start()
2277         capture = self.pg1.get_capture(1)
2278         p = capture[0]
2279         try:
2280             ip = p[IP]
2281             tcp = p[TCP]
2282             self.assertEqual(ip.src, nat_ip)
2283             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2284             self.assertEqual(tcp.dport, external_port)
2285             port_out1 = tcp.sport
2286         except:
2287             self.logger.error(ppp("Unexpected or invalid packet:", p))
2288             raise
2289
2290         dms = self.vapi.snat_det_map_dump()
2291         self.assertEqual(1, len(dms))
2292         self.assertEqual(2, dms[0].ses_num)
2293
2294         # out to host0
2295         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2296              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2297              TCP(sport=external_port, dport=port_out0))
2298         self.pg1.add_stream(p)
2299         self.pg_enable_capture(self.pg_interfaces)
2300         self.pg_start()
2301         capture = self.pg0.get_capture(1)
2302         p = capture[0]
2303         try:
2304             ip = p[IP]
2305             tcp = p[TCP]
2306             self.assertEqual(ip.src, self.pg1.remote_ip4)
2307             self.assertEqual(ip.dst, host0.ip4)
2308             self.assertEqual(tcp.dport, port_in)
2309             self.assertEqual(tcp.sport, external_port)
2310         except:
2311             self.logger.error(ppp("Unexpected or invalid packet:", p))
2312             raise
2313
2314         # out to host1
2315         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2316              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2317              TCP(sport=external_port, dport=port_out1))
2318         self.pg1.add_stream(p)
2319         self.pg_enable_capture(self.pg_interfaces)
2320         self.pg_start()
2321         capture = self.pg0.get_capture(1)
2322         p = capture[0]
2323         try:
2324             ip = p[IP]
2325             tcp = p[TCP]
2326             self.assertEqual(ip.src, self.pg1.remote_ip4)
2327             self.assertEqual(ip.dst, host1.ip4)
2328             self.assertEqual(tcp.dport, port_in)
2329             self.assertEqual(tcp.sport, external_port)
2330         except:
2331             self.logger.error(ppp("Unexpected or invalid packet", p))
2332             raise
2333
2334         # session close api test
2335         self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
2336                                              port_out1,
2337                                              self.pg1.remote_ip4n,
2338                                              external_port)
2339         dms = self.vapi.snat_det_map_dump()
2340         self.assertEqual(dms[0].ses_num, 1)
2341
2342         self.vapi.snat_det_close_session_in(host0.ip4n,
2343                                             port_in,
2344                                             self.pg1.remote_ip4n,
2345                                             external_port)
2346         dms = self.vapi.snat_det_map_dump()
2347         self.assertEqual(dms[0].ses_num, 0)
2348
2349     def test_tcp_session_close_detection_in(self):
2350         """ CGNAT TCP session close initiated from inside network """
2351         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2352                                    32,
2353                                    socket.inet_aton(self.snat_addr),
2354                                    32)
2355         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2356         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2357                                                  is_inside=0)
2358
2359         self.initiate_tcp_session(self.pg0, self.pg1)
2360
2361         # close the session from inside
2362         try:
2363             # FIN packet in -> out
2364             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2365                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2366                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2367                      flags="F"))
2368             self.pg0.add_stream(p)
2369             self.pg_enable_capture(self.pg_interfaces)
2370             self.pg_start()
2371             self.pg1.get_capture(1)
2372
2373             pkts = []
2374
2375             # ACK packet out -> in
2376             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2377                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2378                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2379                      flags="A"))
2380             pkts.append(p)
2381
2382             # FIN packet out -> in
2383             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2384                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2385                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2386                      flags="F"))
2387             pkts.append(p)
2388
2389             self.pg1.add_stream(pkts)
2390             self.pg_enable_capture(self.pg_interfaces)
2391             self.pg_start()
2392             self.pg0.get_capture(2)
2393
2394             # ACK packet in -> out
2395             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2396                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2397                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2398                      flags="A"))
2399             self.pg0.add_stream(p)
2400             self.pg_enable_capture(self.pg_interfaces)
2401             self.pg_start()
2402             self.pg1.get_capture(1)
2403
2404             # Check if snat closed the session
2405             dms = self.vapi.snat_det_map_dump()
2406             self.assertEqual(0, dms[0].ses_num)
2407         except:
2408             self.logger.error("TCP session termination failed")
2409             raise
2410
2411     def test_tcp_session_close_detection_out(self):
2412         """ CGNAT TCP session close initiated from outside network """
2413         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2414                                    32,
2415                                    socket.inet_aton(self.snat_addr),
2416                                    32)
2417         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2418         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2419                                                  is_inside=0)
2420
2421         self.initiate_tcp_session(self.pg0, self.pg1)
2422
2423         # close the session from outside
2424         try:
2425             # FIN packet out -> in
2426             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2427                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2428                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2429                      flags="F"))
2430             self.pg1.add_stream(p)
2431             self.pg_enable_capture(self.pg_interfaces)
2432             self.pg_start()
2433             self.pg0.get_capture(1)
2434
2435             pkts = []
2436
2437             # ACK packet in -> out
2438             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2439                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2440                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2441                      flags="A"))
2442             pkts.append(p)
2443
2444             # ACK packet in -> out
2445             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2446                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2447                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2448                      flags="F"))
2449             pkts.append(p)
2450
2451             self.pg0.add_stream(pkts)
2452             self.pg_enable_capture(self.pg_interfaces)
2453             self.pg_start()
2454             self.pg1.get_capture(2)
2455
2456             # ACK packet out -> in
2457             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2458                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2459                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2460                      flags="A"))
2461             self.pg1.add_stream(p)
2462             self.pg_enable_capture(self.pg_interfaces)
2463             self.pg_start()
2464             self.pg0.get_capture(1)
2465
2466             # Check if snat closed the session
2467             dms = self.vapi.snat_det_map_dump()
2468             self.assertEqual(0, dms[0].ses_num)
2469         except:
2470             self.logger.error("TCP session termination failed")
2471             raise
2472
2473     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2474     def test_session_timeout(self):
2475         """ CGNAT session timeouts """
2476         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2477                                    32,
2478                                    socket.inet_aton(self.snat_addr),
2479                                    32)
2480         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2481         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2482                                                  is_inside=0)
2483
2484         self.initiate_tcp_session(self.pg0, self.pg1)
2485         self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2486         pkts = self.create_stream_in(self.pg0, self.pg1)
2487         self.pg0.add_stream(pkts)
2488         self.pg_enable_capture(self.pg_interfaces)
2489         self.pg_start()
2490         capture = self.pg1.get_capture(len(pkts))
2491         sleep(15)
2492
2493         dms = self.vapi.snat_det_map_dump()
2494         self.assertEqual(0, dms[0].ses_num)
2495
2496     def test_session_limit_per_user(self):
2497         """ CGNAT maximum 1000 sessions per user should be created """
2498         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2499                                    32,
2500                                    socket.inet_aton(self.snat_addr),
2501                                    32)
2502         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2503         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2504                                                  is_inside=0)
2505         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2506                                      src_address=self.pg2.local_ip4n,
2507                                      path_mtu=512,
2508                                      template_interval=10)
2509         self.vapi.snat_ipfix()
2510
2511         pkts = []
2512         for port in range(1025, 2025):
2513             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2514                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2515                  UDP(sport=port, dport=port))
2516             pkts.append(p)
2517
2518         self.pg0.add_stream(pkts)
2519         self.pg_enable_capture(self.pg_interfaces)
2520         self.pg_start()
2521         capture = self.pg1.get_capture(len(pkts))
2522
2523         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2524              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2525              UDP(sport=3001, dport=3002))
2526         self.pg0.add_stream(p)
2527         self.pg_enable_capture(self.pg_interfaces)
2528         self.pg_start()
2529         capture = self.pg1.assert_nothing_captured()
2530
2531         # verify ICMP error packet
2532         capture = self.pg0.get_capture(1)
2533         p = capture[0]
2534         self.assertTrue(p.haslayer(ICMP))
2535         icmp = p[ICMP]
2536         self.assertEqual(icmp.type, 3)
2537         self.assertEqual(icmp.code, 1)
2538         self.assertTrue(icmp.haslayer(IPerror))
2539         inner_ip = icmp[IPerror]
2540         self.assertEqual(inner_ip[UDPerror].sport, 3001)
2541         self.assertEqual(inner_ip[UDPerror].dport, 3002)
2542
2543         dms = self.vapi.snat_det_map_dump()
2544
2545         self.assertEqual(1000, dms[0].ses_num)
2546
2547         # verify IPFIX logging
2548         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2549         capture = self.pg2.get_capture(2)
2550         ipfix = IPFIXDecoder()
2551         # first load template
2552         for p in capture:
2553             self.assertTrue(p.haslayer(IPFIX))
2554             if p.haslayer(Template):
2555                 ipfix.add_template(p.getlayer(Template))
2556         # verify events in data set
2557         for p in capture:
2558             if p.haslayer(Data):
2559                 data = ipfix.decode_data_set(p.getlayer(Set))
2560                 self.verify_ipfix_max_entries_per_user(data)
2561
2562     def clear_snat(self):
2563         """
2564         Clear SNAT configuration.
2565         """
2566         self.vapi.snat_ipfix(enable=0)
2567         self.vapi.snat_det_set_timeouts()
2568         deterministic_mappings = self.vapi.snat_det_map_dump()
2569         for dsm in deterministic_mappings:
2570             self.vapi.snat_add_det_map(dsm.in_addr,
2571                                        dsm.in_plen,
2572                                        dsm.out_addr,
2573                                        dsm.out_plen,
2574                                        is_add=0)
2575
2576         interfaces = self.vapi.snat_interface_dump()
2577         for intf in interfaces:
2578             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2579                                                      intf.is_inside,
2580                                                      is_add=0)
2581
2582     def tearDown(self):
2583         super(TestDeterministicNAT, self).tearDown()
2584         if not self.vpp_dead:
2585             self.logger.info(self.vapi.cli("show snat detail"))
2586             self.clear_snat()
2587
2588
2589 class TestNAT64(MethodHolder):
2590     """ NAT64 Test Cases """
2591
2592     @classmethod
2593     def setUpClass(cls):
2594         super(TestNAT64, cls).setUpClass()
2595
2596         try:
2597             cls.tcp_port_in = 6303
2598             cls.tcp_port_out = 6303
2599             cls.udp_port_in = 6304
2600             cls.udp_port_out = 6304
2601             cls.icmp_id_in = 6305
2602             cls.icmp_id_out = 6305
2603             cls.nat_addr = '10.0.0.3'
2604             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
2605             cls.vrf1_id = 10
2606             cls.vrf1_nat_addr = '10.0.10.3'
2607             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
2608                                                    cls.vrf1_nat_addr)
2609
2610             cls.create_pg_interfaces(range(3))
2611             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
2612             cls.ip6_interfaces.append(cls.pg_interfaces[2])
2613             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
2614
2615             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
2616
2617             cls.pg0.generate_remote_hosts(2)
2618
2619             for i in cls.ip6_interfaces:
2620                 i.admin_up()
2621                 i.config_ip6()
2622                 i.configure_ipv6_neighbors()
2623
2624             for i in cls.ip4_interfaces:
2625                 i.admin_up()
2626                 i.config_ip4()
2627                 i.resolve_arp()
2628
2629         except Exception:
2630             super(TestNAT64, cls).tearDownClass()
2631             raise
2632
2633     def test_pool(self):
2634         """ Add/delete address to NAT64 pool """
2635         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
2636
2637         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
2638
2639         addresses = self.vapi.nat64_pool_addr_dump()
2640         self.assertEqual(len(addresses), 1)
2641         self.assertEqual(addresses[0].address, nat_addr)
2642
2643         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
2644
2645         addresses = self.vapi.nat64_pool_addr_dump()
2646         self.assertEqual(len(addresses), 0)
2647
2648     def test_interface(self):
2649         """ Enable/disable NAT64 feature on the interface """
2650         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2651         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2652
2653         interfaces = self.vapi.nat64_interface_dump()
2654         self.assertEqual(len(interfaces), 2)
2655         pg0_found = False
2656         pg1_found = False
2657         for intf in interfaces:
2658             if intf.sw_if_index == self.pg0.sw_if_index:
2659                 self.assertEqual(intf.is_inside, 1)
2660                 pg0_found = True
2661             elif intf.sw_if_index == self.pg1.sw_if_index:
2662                 self.assertEqual(intf.is_inside, 0)
2663                 pg1_found = True
2664         self.assertTrue(pg0_found)
2665         self.assertTrue(pg1_found)
2666
2667         features = self.vapi.cli("show interface features pg0")
2668         self.assertNotEqual(features.find('nat64-in2out'), -1)
2669         features = self.vapi.cli("show interface features pg1")
2670         self.assertNotEqual(features.find('nat64-out2in'), -1)
2671
2672         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
2673         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
2674
2675         interfaces = self.vapi.nat64_interface_dump()
2676         self.assertEqual(len(interfaces), 0)
2677
2678     def test_static_bib(self):
2679         """ Add/delete static BIB entry """
2680         in_addr = socket.inet_pton(socket.AF_INET6,
2681                                    '2001:db8:85a3::8a2e:370:7334')
2682         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
2683         in_port = 1234
2684         out_port = 5678
2685         proto = IP_PROTOS.tcp
2686
2687         self.vapi.nat64_add_del_static_bib(in_addr,
2688                                            out_addr,
2689                                            in_port,
2690                                            out_port,
2691                                            proto)
2692         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2693         static_bib_num = 0
2694         for bibe in bib:
2695             if bibe.is_static:
2696                 static_bib_num += 1
2697                 self.assertEqual(bibe.i_addr, in_addr)
2698                 self.assertEqual(bibe.o_addr, out_addr)
2699                 self.assertEqual(bibe.i_port, in_port)
2700                 self.assertEqual(bibe.o_port, out_port)
2701         self.assertEqual(static_bib_num, 1)
2702
2703         self.vapi.nat64_add_del_static_bib(in_addr,
2704                                            out_addr,
2705                                            in_port,
2706                                            out_port,
2707                                            proto,
2708                                            is_add=0)
2709         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2710         static_bib_num = 0
2711         for bibe in bib:
2712             if bibe.is_static:
2713                 static_bib_num += 1
2714         self.assertEqual(static_bib_num, 0)
2715
2716     def test_set_timeouts(self):
2717         """ Set NAT64 timeouts """
2718         # verify default values
2719         timeouts = self.vapi.nat64_get_timeouts()
2720         self.assertEqual(timeouts.udp, 300)
2721         self.assertEqual(timeouts.icmp, 60)
2722         self.assertEqual(timeouts.tcp_trans, 240)
2723         self.assertEqual(timeouts.tcp_est, 7440)
2724         self.assertEqual(timeouts.tcp_incoming_syn, 6)
2725
2726         # set and verify custom values
2727         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
2728                                      tcp_est=7450, tcp_incoming_syn=10)
2729         timeouts = self.vapi.nat64_get_timeouts()
2730         self.assertEqual(timeouts.udp, 200)
2731         self.assertEqual(timeouts.icmp, 30)
2732         self.assertEqual(timeouts.tcp_trans, 250)
2733         self.assertEqual(timeouts.tcp_est, 7450)
2734         self.assertEqual(timeouts.tcp_incoming_syn, 10)
2735
2736     def test_dynamic(self):
2737         """ NAT64 dynamic translation test """
2738         self.tcp_port_in = 6303
2739         self.udp_port_in = 6304
2740         self.icmp_id_in = 6305
2741
2742         ses_num_start = self.nat64_get_ses_num()
2743
2744         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2745                                                 self.nat_addr_n)
2746         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2747         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2748
2749         # in2out
2750         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2751         self.pg0.add_stream(pkts)
2752         self.pg_enable_capture(self.pg_interfaces)
2753         self.pg_start()
2754         capture = self.pg1.get_capture(len(pkts))
2755         self.verify_capture_out(capture, nat_ip=self.nat_addr,
2756                                 dst_ip=self.pg1.remote_ip4)
2757
2758         # out2in
2759         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2760         self.pg1.add_stream(pkts)
2761         self.pg_enable_capture(self.pg_interfaces)
2762         self.pg_start()
2763         capture = self.pg0.get_capture(len(pkts))
2764         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2765         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2766
2767         # in2out
2768         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2769         self.pg0.add_stream(pkts)
2770         self.pg_enable_capture(self.pg_interfaces)
2771         self.pg_start()
2772         capture = self.pg1.get_capture(len(pkts))
2773         self.verify_capture_out(capture, nat_ip=self.nat_addr,
2774                                 dst_ip=self.pg1.remote_ip4)
2775
2776         # out2in
2777         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2778         self.pg1.add_stream(pkts)
2779         self.pg_enable_capture(self.pg_interfaces)
2780         self.pg_start()
2781         capture = self.pg0.get_capture(len(pkts))
2782         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2783
2784         ses_num_end = self.nat64_get_ses_num()
2785
2786         self.assertEqual(ses_num_end - ses_num_start, 3)
2787
2788         # tenant with specific VRF
2789         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
2790                                                 self.vrf1_nat_addr_n,
2791                                                 vrf_id=self.vrf1_id)
2792         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
2793
2794         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
2795         self.pg2.add_stream(pkts)
2796         self.pg_enable_capture(self.pg_interfaces)
2797         self.pg_start()
2798         capture = self.pg1.get_capture(len(pkts))
2799         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
2800                                 dst_ip=self.pg1.remote_ip4)
2801
2802         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
2803         self.pg1.add_stream(pkts)
2804         self.pg_enable_capture(self.pg_interfaces)
2805         self.pg_start()
2806         capture = self.pg2.get_capture(len(pkts))
2807         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
2808
2809     def test_static(self):
2810         """ NAT64 static translation test """
2811         self.tcp_port_in = 60303
2812         self.udp_port_in = 60304
2813         self.icmp_id_in = 60305
2814         self.tcp_port_out = 60303
2815         self.udp_port_out = 60304
2816         self.icmp_id_out = 60305
2817
2818         ses_num_start = self.nat64_get_ses_num()
2819
2820         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2821                                                 self.nat_addr_n)
2822         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2823         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2824
2825         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2826                                            self.nat_addr_n,
2827                                            self.tcp_port_in,
2828                                            self.tcp_port_out,
2829                                            IP_PROTOS.tcp)
2830         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2831                                            self.nat_addr_n,
2832                                            self.udp_port_in,
2833                                            self.udp_port_out,
2834                                            IP_PROTOS.udp)
2835         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2836                                            self.nat_addr_n,
2837                                            self.icmp_id_in,
2838                                            self.icmp_id_out,
2839                                            IP_PROTOS.icmp)
2840
2841         # in2out
2842         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2843         self.pg0.add_stream(pkts)
2844         self.pg_enable_capture(self.pg_interfaces)
2845         self.pg_start()
2846         capture = self.pg1.get_capture(len(pkts))
2847         self.verify_capture_out(capture, nat_ip=self.nat_addr,
2848                                 dst_ip=self.pg1.remote_ip4, same_port=True)
2849
2850         # out2in
2851         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2852         self.pg1.add_stream(pkts)
2853         self.pg_enable_capture(self.pg_interfaces)
2854         self.pg_start()
2855         capture = self.pg0.get_capture(len(pkts))
2856         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2857         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2858
2859         ses_num_end = self.nat64_get_ses_num()
2860
2861         self.assertEqual(ses_num_end - ses_num_start, 3)
2862
2863     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2864     def test_session_timeout(self):
2865         """ NAT64 session timeout """
2866         self.icmp_id_in = 1234
2867         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2868                                                 self.nat_addr_n)
2869         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2870         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2871         self.vapi.nat64_set_timeouts(icmp=5)
2872
2873         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2874         self.pg0.add_stream(pkts)
2875         self.pg_enable_capture(self.pg_interfaces)
2876         self.pg_start()
2877         capture = self.pg1.get_capture(len(pkts))
2878
2879         ses_num_before_timeout = self.nat64_get_ses_num()
2880
2881         sleep(15)
2882
2883         # ICMP session after timeout
2884         ses_num_after_timeout = self.nat64_get_ses_num()
2885         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
2886
2887     def test_icmp_error(self):
2888         """ NAT64 ICMP Error message translation """
2889         self.tcp_port_in = 6303
2890         self.udp_port_in = 6304
2891         self.icmp_id_in = 6305
2892
2893         ses_num_start = self.nat64_get_ses_num()
2894
2895         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2896                                                 self.nat_addr_n)
2897         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2898         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2899
2900         # send some packets to create sessions
2901         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2902         self.pg0.add_stream(pkts)
2903         self.pg_enable_capture(self.pg_interfaces)
2904         self.pg_start()
2905         capture_ip4 = self.pg1.get_capture(len(pkts))
2906         self.verify_capture_out(capture_ip4,
2907                                 nat_ip=self.nat_addr,
2908                                 dst_ip=self.pg1.remote_ip4)
2909
2910         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2911         self.pg1.add_stream(pkts)
2912         self.pg_enable_capture(self.pg_interfaces)
2913         self.pg_start()
2914         capture_ip6 = self.pg0.get_capture(len(pkts))
2915         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2916         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
2917                                    self.pg0.remote_ip6)
2918
2919         # in2out
2920         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2921                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
2922                 ICMPv6DestUnreach(code=1) /
2923                 packet[IPv6] for packet in capture_ip6]
2924         self.pg0.add_stream(pkts)
2925         self.pg_enable_capture(self.pg_interfaces)
2926         self.pg_start()
2927         capture = self.pg1.get_capture(len(pkts))
2928         for packet in capture:
2929             try:
2930                 self.assertEqual(packet[IP].src, self.nat_addr)
2931                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2932                 self.assertEqual(packet[ICMP].type, 3)
2933                 self.assertEqual(packet[ICMP].code, 13)
2934                 inner = packet[IPerror]
2935                 self.assertEqual(inner.src, self.pg1.remote_ip4)
2936                 self.assertEqual(inner.dst, self.nat_addr)
2937                 self.check_icmp_checksum(packet)
2938                 if inner.haslayer(TCPerror):
2939                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
2940                 elif inner.haslayer(UDPerror):
2941                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
2942                 else:
2943                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
2944             except:
2945                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2946                 raise
2947
2948         # out2in
2949         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2950                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2951                 ICMP(type=3, code=13) /
2952                 packet[IP] for packet in capture_ip4]
2953         self.pg1.add_stream(pkts)
2954         self.pg_enable_capture(self.pg_interfaces)
2955         self.pg_start()
2956         capture = self.pg0.get_capture(len(pkts))
2957         for packet in capture:
2958             try:
2959                 self.assertEqual(packet[IPv6].src, ip.src)
2960                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
2961                 icmp = packet[ICMPv6DestUnreach]
2962                 self.assertEqual(icmp.code, 1)
2963                 inner = icmp[IPerror6]
2964                 self.assertEqual(inner.src, self.pg0.remote_ip6)
2965                 self.assertEqual(inner.dst, ip.src)
2966                 self.check_icmpv6_checksum(packet)
2967                 if inner.haslayer(TCPerror):
2968                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
2969                 elif inner.haslayer(UDPerror):
2970                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
2971                 else:
2972                     self.assertEqual(inner[ICMPv6EchoRequest].id,
2973                                      self.icmp_id_in)
2974             except:
2975                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2976                 raise
2977
2978     def test_hairpinning(self):
2979         """ NAT64 hairpinning """
2980
2981         client = self.pg0.remote_hosts[0]
2982         server = self.pg0.remote_hosts[1]
2983         server_tcp_in_port = 22
2984         server_tcp_out_port = 4022
2985         server_udp_in_port = 23
2986         server_udp_out_port = 4023
2987         client_tcp_in_port = 1234
2988         client_udp_in_port = 1235
2989         client_tcp_out_port = 0
2990         client_udp_out_port = 0
2991         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
2992         nat_addr_ip6 = ip.src
2993
2994         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2995                                                 self.nat_addr_n)
2996         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2997         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2998
2999         self.vapi.nat64_add_del_static_bib(server.ip6n,
3000                                            self.nat_addr_n,
3001                                            server_tcp_in_port,
3002                                            server_tcp_out_port,
3003                                            IP_PROTOS.tcp)
3004         self.vapi.nat64_add_del_static_bib(server.ip6n,
3005                                            self.nat_addr_n,
3006                                            server_udp_in_port,
3007                                            server_udp_out_port,
3008                                            IP_PROTOS.udp)
3009
3010         # client to server
3011         pkts = []
3012         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3013              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3014              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3015         pkts.append(p)
3016         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3017              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3018              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3019         pkts.append(p)
3020         self.pg0.add_stream(pkts)
3021         self.pg_enable_capture(self.pg_interfaces)
3022         self.pg_start()
3023         capture = self.pg0.get_capture(len(pkts))
3024         for packet in capture:
3025             try:
3026                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3027                 self.assertEqual(packet[IPv6].dst, server.ip6)
3028                 if packet.haslayer(TCP):
3029                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3030                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3031                     self.check_tcp_checksum(packet)
3032                     client_tcp_out_port = packet[TCP].sport
3033                 else:
3034                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3035                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
3036                     self.check_udp_checksum(packet)
3037                     client_udp_out_port = packet[UDP].sport
3038             except:
3039                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3040                 raise
3041
3042         # server to client
3043         pkts = []
3044         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3045              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3046              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3047         pkts.append(p)
3048         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3049              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3050              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3051         pkts.append(p)
3052         self.pg0.add_stream(pkts)
3053         self.pg_enable_capture(self.pg_interfaces)
3054         self.pg_start()
3055         capture = self.pg0.get_capture(len(pkts))
3056         for packet in capture:
3057             try:
3058                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3059                 self.assertEqual(packet[IPv6].dst, client.ip6)
3060                 if packet.haslayer(TCP):
3061                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3062                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3063                     self.check_tcp_checksum(packet)
3064                 else:
3065                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
3066                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
3067                     self.check_udp_checksum(packet)
3068             except:
3069                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3070                 raise
3071
3072         # ICMP error
3073         pkts = []
3074         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3075                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3076                 ICMPv6DestUnreach(code=1) /
3077                 packet[IPv6] for packet in capture]
3078         self.pg0.add_stream(pkts)
3079         self.pg_enable_capture(self.pg_interfaces)
3080         self.pg_start()
3081         capture = self.pg0.get_capture(len(pkts))
3082         for packet in capture:
3083             try:
3084                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3085                 self.assertEqual(packet[IPv6].dst, server.ip6)
3086                 icmp = packet[ICMPv6DestUnreach]
3087                 self.assertEqual(icmp.code, 1)
3088                 inner = icmp[IPerror6]
3089                 self.assertEqual(inner.src, server.ip6)
3090                 self.assertEqual(inner.dst, nat_addr_ip6)
3091                 self.check_icmpv6_checksum(packet)
3092                 if inner.haslayer(TCPerror):
3093                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3094                     self.assertEqual(inner[TCPerror].dport,
3095                                      client_tcp_out_port)
3096                 else:
3097                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3098                     self.assertEqual(inner[UDPerror].dport,
3099                                      client_udp_out_port)
3100             except:
3101                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3102                 raise
3103
3104     def nat64_get_ses_num(self):
3105         """
3106         Return number of active NAT64 sessions.
3107         """
3108         ses_num = 0
3109         st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
3110         ses_num += len(st)
3111         st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
3112         ses_num += len(st)
3113         st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
3114         ses_num += len(st)
3115         return ses_num
3116
3117     def clear_nat64(self):
3118         """
3119         Clear NAT64 configuration.
3120         """
3121         self.vapi.nat64_set_timeouts()
3122
3123         interfaces = self.vapi.nat64_interface_dump()
3124         for intf in interfaces:
3125             self.vapi.nat64_add_del_interface(intf.sw_if_index,
3126                                               intf.is_inside,
3127                                               is_add=0)
3128
3129         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3130         for bibe in bib:
3131             if bibe.is_static:
3132                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3133                                                    bibe.o_addr,
3134                                                    bibe.i_port,
3135                                                    bibe.o_port,
3136                                                    bibe.proto,
3137                                                    bibe.vrf_id,
3138                                                    is_add=0)
3139
3140         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3141         for bibe in bib:
3142             if bibe.is_static:
3143                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3144                                                    bibe.o_addr,
3145                                                    bibe.i_port,
3146                                                    bibe.o_port,
3147                                                    bibe.proto,
3148                                                    bibe.vrf_id,
3149                                                    is_add=0)
3150
3151         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3152         for bibe in bib:
3153             if bibe.is_static:
3154                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3155                                                    bibe.o_addr,
3156                                                    bibe.i_port,
3157                                                    bibe.o_port,
3158                                                    bibe.proto,
3159                                                    bibe.vrf_id,
3160                                                    is_add=0)
3161
3162         adresses = self.vapi.nat64_pool_addr_dump()
3163         for addr in adresses:
3164             self.vapi.nat64_add_del_pool_addr_range(addr.address,
3165                                                     addr.address,
3166                                                     vrf_id=addr.vrf_id,
3167                                                     is_add=0)
3168
3169     def tearDown(self):
3170         super(TestNAT64, self).tearDown()
3171         if not self.vpp_dead:
3172             self.logger.info(self.vapi.cli("show nat64 pool"))
3173             self.logger.info(self.vapi.cli("show nat64 interfaces"))
3174             self.logger.info(self.vapi.cli("show nat64 bib tcp"))
3175             self.logger.info(self.vapi.cli("show nat64 bib udp"))
3176             self.logger.info(self.vapi.cli("show nat64 bib icmp"))
3177             self.logger.info(self.vapi.cli("show nat64 session table tcp"))
3178             self.logger.info(self.vapi.cli("show nat64 session table udp"))
3179             self.logger.info(self.vapi.cli("show nat64 session table icmp"))
3180             self.clear_nat64()
3181
3182 if __name__ == '__main__':
3183     unittest.main(testRunner=VppTestRunner)