Fix vl_map_shmem() root_path dangling reference.
[vpp.git] / test / test_snat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
15 from util import ppp
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
18
19
20 class MethodHolder(VppTestCase):
21     """ SNAT create capture and verify method holder """
22
23     @classmethod
24     def setUpClass(cls):
25         super(MethodHolder, cls).setUpClass()
26
27     def tearDown(self):
28         super(MethodHolder, self).tearDown()
29
30     def check_ip_checksum(self, pkt):
31         """
32         Check IP checksum of the packet
33
34         :param pkt: Packet to check IP checksum
35         """
36         new = pkt.__class__(str(pkt))
37         del new['IP'].chksum
38         new = new.__class__(str(new))
39         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
40
41     def check_tcp_checksum(self, pkt):
42         """
43         Check TCP checksum in IP packet
44
45         :param pkt: Packet to check TCP checksum
46         """
47         new = pkt.__class__(str(pkt))
48         del new['TCP'].chksum
49         new = new.__class__(str(new))
50         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
51
52     def check_udp_checksum(self, pkt):
53         """
54         Check UDP checksum in IP packet
55
56         :param pkt: Packet to check UDP checksum
57         """
58         new = pkt.__class__(str(pkt))
59         del new['UDP'].chksum
60         new = new.__class__(str(new))
61         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
62
63     def check_icmp_errror_embedded(self, pkt):
64         """
65         Check ICMP error embeded packet checksum
66
67         :param pkt: Packet to check ICMP error embeded packet checksum
68         """
69         if pkt.haslayer(IPerror):
70             new = pkt.__class__(str(pkt))
71             del new['IPerror'].chksum
72             new = new.__class__(str(new))
73             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
74
75         if pkt.haslayer(TCPerror):
76             new = pkt.__class__(str(pkt))
77             del new['TCPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
80
81         if pkt.haslayer(UDPerror):
82             if pkt['UDPerror'].chksum != 0:
83                 new = pkt.__class__(str(pkt))
84                 del new['UDPerror'].chksum
85                 new = new.__class__(str(new))
86                 self.assertEqual(new['UDPerror'].chksum,
87                                  pkt['UDPerror'].chksum)
88
89         if pkt.haslayer(ICMPerror):
90             del new['ICMPerror'].chksum
91             new = new.__class__(str(new))
92             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
93
94     def check_icmp_checksum(self, pkt):
95         """
96         Check ICMP checksum in IPv4 packet
97
98         :param pkt: Packet to check ICMP checksum
99         """
100         new = pkt.__class__(str(pkt))
101         del new['ICMP'].chksum
102         new = new.__class__(str(new))
103         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
104         if pkt.haslayer(IPerror):
105             self.check_icmp_errror_embedded(pkt)
106
107     def check_icmpv6_checksum(self, pkt):
108         """
109         Check ICMPv6 checksum in IPv4 packet
110
111         :param pkt: Packet to check ICMPv6 checksum
112         """
113         new = pkt.__class__(str(pkt))
114         if pkt.haslayer(ICMPv6DestUnreach):
115             del new['ICMPv6DestUnreach'].cksum
116             new = new.__class__(str(new))
117             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
118                              pkt['ICMPv6DestUnreach'].cksum)
119             self.check_icmp_errror_embedded(pkt)
120         if pkt.haslayer(ICMPv6EchoRequest):
121             del new['ICMPv6EchoRequest'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
124                              pkt['ICMPv6EchoRequest'].cksum)
125         if pkt.haslayer(ICMPv6EchoReply):
126             del new['ICMPv6EchoReply'].cksum
127             new = new.__class__(str(new))
128             self.assertEqual(new['ICMPv6EchoReply'].cksum,
129                              pkt['ICMPv6EchoReply'].cksum)
130
131     def create_stream_in(self, in_if, out_if, ttl=64):
132         """
133         Create packet stream for inside network
134
135         :param in_if: Inside interface
136         :param out_if: Outside interface
137         :param ttl: TTL of generated packets
138         """
139         pkts = []
140         # TCP
141         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
142              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
143              TCP(sport=self.tcp_port_in, dport=20))
144         pkts.append(p)
145
146         # UDP
147         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149              UDP(sport=self.udp_port_in, dport=20))
150         pkts.append(p)
151
152         # ICMP
153         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155              ICMP(id=self.icmp_id_in, type='echo-request'))
156         pkts.append(p)
157
158         return pkts
159
160     def compose_ip6(self, ip4, pref, plen):
161         """
162         Compose IPv4-embedded IPv6 addresses
163
164         :param ip4: IPv4 address
165         :param pref: IPv6 prefix
166         :param plen: IPv6 prefix length
167         :returns: IPv4-embedded IPv6 addresses
168         """
169         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
170         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
171         if plen == 32:
172             pref_n[4] = ip4_n[0]
173             pref_n[5] = ip4_n[1]
174             pref_n[6] = ip4_n[2]
175             pref_n[7] = ip4_n[3]
176         elif plen == 40:
177             pref_n[5] = ip4_n[0]
178             pref_n[6] = ip4_n[1]
179             pref_n[7] = ip4_n[2]
180             pref_n[9] = ip4_n[3]
181         elif plen == 48:
182             pref_n[6] = ip4_n[0]
183             pref_n[7] = ip4_n[1]
184             pref_n[9] = ip4_n[2]
185             pref_n[10] = ip4_n[3]
186         elif plen == 56:
187             pref_n[7] = ip4_n[0]
188             pref_n[9] = ip4_n[1]
189             pref_n[10] = ip4_n[2]
190             pref_n[11] = ip4_n[3]
191         elif plen == 64:
192             pref_n[9] = ip4_n[0]
193             pref_n[10] = ip4_n[1]
194             pref_n[11] = ip4_n[2]
195             pref_n[12] = ip4_n[3]
196         elif plen == 96:
197             pref_n[12] = ip4_n[0]
198             pref_n[13] = ip4_n[1]
199             pref_n[14] = ip4_n[2]
200             pref_n[15] = ip4_n[3]
201         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
202
203     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
204         """
205         Create IPv6 packet stream for inside network
206
207         :param in_if: Inside interface
208         :param out_if: Outside interface
209         :param ttl: Hop Limit of generated packets
210         :param pref: NAT64 prefix
211         :param plen: NAT64 prefix length
212         """
213         pkts = []
214         if pref is None:
215             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
216         else:
217             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
218
219         # TCP
220         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
221              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
222              TCP(sport=self.tcp_port_in, dport=20))
223         pkts.append(p)
224
225         # UDP
226         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
227              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
228              UDP(sport=self.udp_port_in, dport=20))
229         pkts.append(p)
230
231         # ICMP
232         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
233              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
234              ICMPv6EchoRequest(id=self.icmp_id_in))
235         pkts.append(p)
236
237         return pkts
238
239     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
240         """
241         Create packet stream for outside network
242
243         :param out_if: Outside interface
244         :param dst_ip: Destination IP address (Default use global SNAT address)
245         :param ttl: TTL of generated packets
246         """
247         if dst_ip is None:
248             dst_ip = self.snat_addr
249         pkts = []
250         # TCP
251         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
252              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
253              TCP(dport=self.tcp_port_out, sport=20))
254         pkts.append(p)
255
256         # UDP
257         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
258              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
259              UDP(dport=self.udp_port_out, sport=20))
260         pkts.append(p)
261
262         # ICMP
263         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
264              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
265              ICMP(id=self.icmp_id_out, type='echo-reply'))
266         pkts.append(p)
267
268         return pkts
269
270     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
271                            packet_num=3, dst_ip=None):
272         """
273         Verify captured packets on outside network
274
275         :param capture: Captured packets
276         :param nat_ip: Translated IP address (Default use global SNAT address)
277         :param same_port: Sorce port number is not translated (Default False)
278         :param packet_num: Expected number of packets (Default 3)
279         :param dst_ip: Destination IP address (Default do not verify)
280         """
281         if nat_ip is None:
282             nat_ip = self.snat_addr
283         self.assertEqual(packet_num, len(capture))
284         for packet in capture:
285             try:
286                 self.check_ip_checksum(packet)
287                 self.assertEqual(packet[IP].src, nat_ip)
288                 if dst_ip is not None:
289                     self.assertEqual(packet[IP].dst, dst_ip)
290                 if packet.haslayer(TCP):
291                     if same_port:
292                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
293                     else:
294                         self.assertNotEqual(
295                             packet[TCP].sport, self.tcp_port_in)
296                     self.tcp_port_out = packet[TCP].sport
297                     self.check_tcp_checksum(packet)
298                 elif packet.haslayer(UDP):
299                     if same_port:
300                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
301                     else:
302                         self.assertNotEqual(
303                             packet[UDP].sport, self.udp_port_in)
304                     self.udp_port_out = packet[UDP].sport
305                 else:
306                     if same_port:
307                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
308                     else:
309                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
310                     self.icmp_id_out = packet[ICMP].id
311                     self.check_icmp_checksum(packet)
312             except:
313                 self.logger.error(ppp("Unexpected or invalid packet "
314                                       "(outside network):", packet))
315                 raise
316
317     def verify_capture_in(self, capture, in_if, packet_num=3):
318         """
319         Verify captured packets on inside network
320
321         :param capture: Captured packets
322         :param in_if: Inside interface
323         :param packet_num: Expected number of packets (Default 3)
324         """
325         self.assertEqual(packet_num, len(capture))
326         for packet in capture:
327             try:
328                 self.check_ip_checksum(packet)
329                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
330                 if packet.haslayer(TCP):
331                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
332                     self.check_tcp_checksum(packet)
333                 elif packet.haslayer(UDP):
334                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
335                 else:
336                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
337                     self.check_icmp_checksum(packet)
338             except:
339                 self.logger.error(ppp("Unexpected or invalid packet "
340                                       "(inside network):", packet))
341                 raise
342
343     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
344         """
345         Verify captured IPv6 packets on inside network
346
347         :param capture: Captured packets
348         :param src_ip: Source IP
349         :param dst_ip: Destination IP address
350         :param packet_num: Expected number of packets (Default 3)
351         """
352         self.assertEqual(packet_num, len(capture))
353         for packet in capture:
354             try:
355                 self.assertEqual(packet[IPv6].src, src_ip)
356                 self.assertEqual(packet[IPv6].dst, dst_ip)
357                 if packet.haslayer(TCP):
358                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
359                     self.check_tcp_checksum(packet)
360                 elif packet.haslayer(UDP):
361                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
362                     self.check_udp_checksum(packet)
363                 else:
364                     self.assertEqual(packet[ICMPv6EchoReply].id,
365                                      self.icmp_id_in)
366                     self.check_icmpv6_checksum(packet)
367             except:
368                 self.logger.error(ppp("Unexpected or invalid packet "
369                                       "(inside network):", packet))
370                 raise
371
372     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
373         """
374         Verify captured packet that don't have to be translated
375
376         :param capture: Captured packets
377         :param ingress_if: Ingress interface
378         :param egress_if: Egress interface
379         """
380         for packet in capture:
381             try:
382                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
383                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
384                 if packet.haslayer(TCP):
385                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
386                 elif packet.haslayer(UDP):
387                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
388                 else:
389                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
390             except:
391                 self.logger.error(ppp("Unexpected or invalid packet "
392                                       "(inside network):", packet))
393                 raise
394
395     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
396                                             packet_num=3, icmp_type=11):
397         """
398         Verify captured packets with ICMP errors on outside network
399
400         :param capture: Captured packets
401         :param src_ip: Translated IP address or IP address of VPP
402                        (Default use global SNAT address)
403         :param packet_num: Expected number of packets (Default 3)
404         :param icmp_type: Type of error ICMP packet
405                           we are expecting (Default 11)
406         """
407         if src_ip is None:
408             src_ip = self.snat_addr
409         self.assertEqual(packet_num, len(capture))
410         for packet in capture:
411             try:
412                 self.assertEqual(packet[IP].src, src_ip)
413                 self.assertTrue(packet.haslayer(ICMP))
414                 icmp = packet[ICMP]
415                 self.assertEqual(icmp.type, icmp_type)
416                 self.assertTrue(icmp.haslayer(IPerror))
417                 inner_ip = icmp[IPerror]
418                 if inner_ip.haslayer(TCPerror):
419                     self.assertEqual(inner_ip[TCPerror].dport,
420                                      self.tcp_port_out)
421                 elif inner_ip.haslayer(UDPerror):
422                     self.assertEqual(inner_ip[UDPerror].dport,
423                                      self.udp_port_out)
424                 else:
425                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
426             except:
427                 self.logger.error(ppp("Unexpected or invalid packet "
428                                       "(outside network):", packet))
429                 raise
430
431     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
432                                            icmp_type=11):
433         """
434         Verify captured packets with ICMP errors on inside network
435
436         :param capture: Captured packets
437         :param in_if: Inside interface
438         :param packet_num: Expected number of packets (Default 3)
439         :param icmp_type: Type of error ICMP packet
440                           we are expecting (Default 11)
441         """
442         self.assertEqual(packet_num, len(capture))
443         for packet in capture:
444             try:
445                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
446                 self.assertTrue(packet.haslayer(ICMP))
447                 icmp = packet[ICMP]
448                 self.assertEqual(icmp.type, icmp_type)
449                 self.assertTrue(icmp.haslayer(IPerror))
450                 inner_ip = icmp[IPerror]
451                 if inner_ip.haslayer(TCPerror):
452                     self.assertEqual(inner_ip[TCPerror].sport,
453                                      self.tcp_port_in)
454                 elif inner_ip.haslayer(UDPerror):
455                     self.assertEqual(inner_ip[UDPerror].sport,
456                                      self.udp_port_in)
457                 else:
458                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
459             except:
460                 self.logger.error(ppp("Unexpected or invalid packet "
461                                       "(inside network):", packet))
462                 raise
463
464     def verify_ipfix_nat44_ses(self, data):
465         """
466         Verify IPFIX NAT44 session create/delete event
467
468         :param data: Decoded IPFIX data records
469         """
470         nat44_ses_create_num = 0
471         nat44_ses_delete_num = 0
472         self.assertEqual(6, len(data))
473         for record in data:
474             # natEvent
475             self.assertIn(ord(record[230]), [4, 5])
476             if ord(record[230]) == 4:
477                 nat44_ses_create_num += 1
478             else:
479                 nat44_ses_delete_num += 1
480             # sourceIPv4Address
481             self.assertEqual(self.pg0.remote_ip4n, record[8])
482             # postNATSourceIPv4Address
483             self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
484                              record[225])
485             # ingressVRFID
486             self.assertEqual(struct.pack("!I", 0), record[234])
487             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
488             if IP_PROTOS.icmp == ord(record[4]):
489                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
490                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
491                                  record[227])
492             elif IP_PROTOS.tcp == ord(record[4]):
493                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
494                                  record[7])
495                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
496                                  record[227])
497             elif IP_PROTOS.udp == ord(record[4]):
498                 self.assertEqual(struct.pack("!H", self.udp_port_in),
499                                  record[7])
500                 self.assertEqual(struct.pack("!H", self.udp_port_out),
501                                  record[227])
502             else:
503                 self.fail("Invalid protocol")
504         self.assertEqual(3, nat44_ses_create_num)
505         self.assertEqual(3, nat44_ses_delete_num)
506
507     def verify_ipfix_addr_exhausted(self, data):
508         """
509         Verify IPFIX NAT addresses event
510
511         :param data: Decoded IPFIX data records
512         """
513         self.assertEqual(1, len(data))
514         record = data[0]
515         # natEvent
516         self.assertEqual(ord(record[230]), 3)
517         # natPoolID
518         self.assertEqual(struct.pack("!I", 0), record[283])
519
520
521 class TestSNAT(MethodHolder):
522     """ SNAT Test Cases """
523
524     @classmethod
525     def setUpClass(cls):
526         super(TestSNAT, cls).setUpClass()
527
528         try:
529             cls.tcp_port_in = 6303
530             cls.tcp_port_out = 6303
531             cls.udp_port_in = 6304
532             cls.udp_port_out = 6304
533             cls.icmp_id_in = 6305
534             cls.icmp_id_out = 6305
535             cls.snat_addr = '10.0.0.3'
536             cls.ipfix_src_port = 4739
537             cls.ipfix_domain_id = 1
538
539             cls.create_pg_interfaces(range(9))
540             cls.interfaces = list(cls.pg_interfaces[0:4])
541
542             for i in cls.interfaces:
543                 i.admin_up()
544                 i.config_ip4()
545                 i.resolve_arp()
546
547             cls.pg0.generate_remote_hosts(3)
548             cls.pg0.configure_ipv4_neighbors()
549
550             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
551
552             cls.pg4._local_ip4 = "172.16.255.1"
553             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
554             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
555             cls.pg4.set_table_ip4(10)
556             cls.pg5._local_ip4 = "172.17.255.3"
557             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
558             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
559             cls.pg5.set_table_ip4(10)
560             cls.pg6._local_ip4 = "172.16.255.1"
561             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
562             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
563             cls.pg6.set_table_ip4(20)
564             for i in cls.overlapping_interfaces:
565                 i.config_ip4()
566                 i.admin_up()
567                 i.resolve_arp()
568
569             cls.pg7.admin_up()
570             cls.pg8.admin_up()
571
572         except Exception:
573             super(TestSNAT, cls).tearDownClass()
574             raise
575
576     def clear_snat(self):
577         """
578         Clear SNAT configuration.
579         """
580         # I found no elegant way to do this
581         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
582                                    dst_address_length=32,
583                                    next_hop_address=self.pg7.remote_ip4n,
584                                    next_hop_sw_if_index=self.pg7.sw_if_index,
585                                    is_add=0)
586         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
587                                    dst_address_length=32,
588                                    next_hop_address=self.pg8.remote_ip4n,
589                                    next_hop_sw_if_index=self.pg8.sw_if_index,
590                                    is_add=0)
591
592         for intf in [self.pg7, self.pg8]:
593             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
594             for n in neighbors:
595                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
596                                               n.mac_address,
597                                               n.ip_address,
598                                               is_add=0)
599
600         if self.pg7.has_ip4_config:
601             self.pg7.unconfig_ip4()
602
603         interfaces = self.vapi.snat_interface_addr_dump()
604         for intf in interfaces:
605             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
606
607         self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
608                              domain_id=self.ipfix_domain_id)
609         self.ipfix_src_port = 4739
610         self.ipfix_domain_id = 1
611
612         interfaces = self.vapi.snat_interface_dump()
613         for intf in interfaces:
614             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
615                                                      intf.is_inside,
616                                                      is_add=0)
617
618         interfaces = self.vapi.snat_interface_output_feature_dump()
619         for intf in interfaces:
620             self.vapi.snat_interface_add_del_output_feature(intf.sw_if_index,
621                                                             intf.is_inside,
622                                                             is_add=0)
623
624         static_mappings = self.vapi.snat_static_mapping_dump()
625         for sm in static_mappings:
626             self.vapi.snat_add_static_mapping(sm.local_ip_address,
627                                               sm.external_ip_address,
628                                               local_port=sm.local_port,
629                                               external_port=sm.external_port,
630                                               addr_only=sm.addr_only,
631                                               vrf_id=sm.vrf_id,
632                                               protocol=sm.protocol,
633                                               is_add=0)
634
635         adresses = self.vapi.snat_address_dump()
636         for addr in adresses:
637             self.vapi.snat_add_address_range(addr.ip_address,
638                                              addr.ip_address,
639                                              is_add=0)
640
641     def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
642                                 local_port=0, external_port=0, vrf_id=0,
643                                 is_add=1, external_sw_if_index=0xFFFFFFFF,
644                                 proto=0):
645         """
646         Add/delete S-NAT static mapping
647
648         :param local_ip: Local IP address
649         :param external_ip: External IP address
650         :param local_port: Local port number (Optional)
651         :param external_port: External port number (Optional)
652         :param vrf_id: VRF ID (Default 0)
653         :param is_add: 1 if add, 0 if delete (Default add)
654         :param external_sw_if_index: External interface instead of IP address
655         :param proto: IP protocol (Mandatory if port specified)
656         """
657         addr_only = 1
658         if local_port and external_port:
659             addr_only = 0
660         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
661         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
662         self.vapi.snat_add_static_mapping(
663             l_ip,
664             e_ip,
665             external_sw_if_index,
666             local_port,
667             external_port,
668             addr_only,
669             vrf_id,
670             proto,
671             is_add)
672
673     def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
674         """
675         Add/delete S-NAT address
676
677         :param ip: IP address
678         :param is_add: 1 if add, 0 if delete (Default add)
679         """
680         snat_addr = socket.inet_pton(socket.AF_INET, ip)
681         self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
682                                          vrf_id=vrf_id)
683
684     def test_dynamic(self):
685         """ SNAT dynamic translation test """
686
687         self.snat_add_address(self.snat_addr)
688         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
689         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
690                                                  is_inside=0)
691
692         # in2out
693         pkts = self.create_stream_in(self.pg0, self.pg1)
694         self.pg0.add_stream(pkts)
695         self.pg_enable_capture(self.pg_interfaces)
696         self.pg_start()
697         capture = self.pg1.get_capture(len(pkts))
698         self.verify_capture_out(capture)
699
700         # out2in
701         pkts = self.create_stream_out(self.pg1)
702         self.pg1.add_stream(pkts)
703         self.pg_enable_capture(self.pg_interfaces)
704         self.pg_start()
705         capture = self.pg0.get_capture(len(pkts))
706         self.verify_capture_in(capture, self.pg0)
707
708     def test_dynamic_icmp_errors_in2out_ttl_1(self):
709         """ SNAT handling of client packets with TTL=1 """
710
711         self.snat_add_address(self.snat_addr)
712         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
713         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
714                                                  is_inside=0)
715
716         # Client side - generate traffic
717         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
718         self.pg0.add_stream(pkts)
719         self.pg_enable_capture(self.pg_interfaces)
720         self.pg_start()
721
722         # Client side - verify ICMP type 11 packets
723         capture = self.pg0.get_capture(len(pkts))
724         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
725
726     def test_dynamic_icmp_errors_out2in_ttl_1(self):
727         """ SNAT handling of server packets with TTL=1 """
728
729         self.snat_add_address(self.snat_addr)
730         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
731         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
732                                                  is_inside=0)
733
734         # Client side - create sessions
735         pkts = self.create_stream_in(self.pg0, self.pg1)
736         self.pg0.add_stream(pkts)
737         self.pg_enable_capture(self.pg_interfaces)
738         self.pg_start()
739
740         # Server side - generate traffic
741         capture = self.pg1.get_capture(len(pkts))
742         self.verify_capture_out(capture)
743         pkts = self.create_stream_out(self.pg1, ttl=1)
744         self.pg1.add_stream(pkts)
745         self.pg_enable_capture(self.pg_interfaces)
746         self.pg_start()
747
748         # Server side - verify ICMP type 11 packets
749         capture = self.pg1.get_capture(len(pkts))
750         self.verify_capture_out_with_icmp_errors(capture,
751                                                  src_ip=self.pg1.local_ip4)
752
753     def test_dynamic_icmp_errors_in2out_ttl_2(self):
754         """ SNAT handling of error responses to client packets with TTL=2 """
755
756         self.snat_add_address(self.snat_addr)
757         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
758         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
759                                                  is_inside=0)
760
761         # Client side - generate traffic
762         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
763         self.pg0.add_stream(pkts)
764         self.pg_enable_capture(self.pg_interfaces)
765         self.pg_start()
766
767         # Server side - simulate ICMP type 11 response
768         capture = self.pg1.get_capture(len(pkts))
769         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
770                 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
771                 ICMP(type=11) / packet[IP] for packet in capture]
772         self.pg1.add_stream(pkts)
773         self.pg_enable_capture(self.pg_interfaces)
774         self.pg_start()
775
776         # Client side - verify ICMP type 11 packets
777         capture = self.pg0.get_capture(len(pkts))
778         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
779
780     def test_dynamic_icmp_errors_out2in_ttl_2(self):
781         """ SNAT handling of error responses to server packets with TTL=2 """
782
783         self.snat_add_address(self.snat_addr)
784         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
785         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
786                                                  is_inside=0)
787
788         # Client side - create sessions
789         pkts = self.create_stream_in(self.pg0, self.pg1)
790         self.pg0.add_stream(pkts)
791         self.pg_enable_capture(self.pg_interfaces)
792         self.pg_start()
793
794         # Server side - generate traffic
795         capture = self.pg1.get_capture(len(pkts))
796         self.verify_capture_out(capture)
797         pkts = self.create_stream_out(self.pg1, ttl=2)
798         self.pg1.add_stream(pkts)
799         self.pg_enable_capture(self.pg_interfaces)
800         self.pg_start()
801
802         # Client side - simulate ICMP type 11 response
803         capture = self.pg0.get_capture(len(pkts))
804         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
805                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
806                 ICMP(type=11) / packet[IP] for packet in capture]
807         self.pg0.add_stream(pkts)
808         self.pg_enable_capture(self.pg_interfaces)
809         self.pg_start()
810
811         # Server side - verify ICMP type 11 packets
812         capture = self.pg1.get_capture(len(pkts))
813         self.verify_capture_out_with_icmp_errors(capture)
814
815     def test_ping_out_interface_from_outside(self):
816         """ Ping SNAT out interface from outside network """
817
818         self.snat_add_address(self.snat_addr)
819         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
820         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
821                                                  is_inside=0)
822
823         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
824              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
825              ICMP(id=self.icmp_id_out, type='echo-request'))
826         pkts = [p]
827         self.pg1.add_stream(pkts)
828         self.pg_enable_capture(self.pg_interfaces)
829         self.pg_start()
830         capture = self.pg1.get_capture(len(pkts))
831         self.assertEqual(1, len(capture))
832         packet = capture[0]
833         try:
834             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
835             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
836             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
837             self.assertEqual(packet[ICMP].type, 0)  # echo reply
838         except:
839             self.logger.error(ppp("Unexpected or invalid packet "
840                                   "(outside network):", packet))
841             raise
842
843     def test_ping_internal_host_from_outside(self):
844         """ Ping internal host from outside network """
845
846         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
847         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
848         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
849                                                  is_inside=0)
850
851         # out2in
852         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
853                IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
854                ICMP(id=self.icmp_id_out, type='echo-request'))
855         self.pg1.add_stream(pkt)
856         self.pg_enable_capture(self.pg_interfaces)
857         self.pg_start()
858         capture = self.pg0.get_capture(1)
859         self.verify_capture_in(capture, self.pg0, packet_num=1)
860         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
861
862         # in2out
863         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
864                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
865                ICMP(id=self.icmp_id_in, type='echo-reply'))
866         self.pg0.add_stream(pkt)
867         self.pg_enable_capture(self.pg_interfaces)
868         self.pg_start()
869         capture = self.pg1.get_capture(1)
870         self.verify_capture_out(capture, same_port=True, packet_num=1)
871         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
872
873     def test_static_in(self):
874         """ SNAT 1:1 NAT initialized from inside network """
875
876         nat_ip = "10.0.0.10"
877         self.tcp_port_out = 6303
878         self.udp_port_out = 6304
879         self.icmp_id_out = 6305
880
881         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
882         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
883         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
884                                                  is_inside=0)
885
886         # in2out
887         pkts = self.create_stream_in(self.pg0, self.pg1)
888         self.pg0.add_stream(pkts)
889         self.pg_enable_capture(self.pg_interfaces)
890         self.pg_start()
891         capture = self.pg1.get_capture(len(pkts))
892         self.verify_capture_out(capture, nat_ip, True)
893
894         # out2in
895         pkts = self.create_stream_out(self.pg1, nat_ip)
896         self.pg1.add_stream(pkts)
897         self.pg_enable_capture(self.pg_interfaces)
898         self.pg_start()
899         capture = self.pg0.get_capture(len(pkts))
900         self.verify_capture_in(capture, self.pg0)
901
902     def test_static_out(self):
903         """ SNAT 1:1 NAT initialized from outside network """
904
905         nat_ip = "10.0.0.20"
906         self.tcp_port_out = 6303
907         self.udp_port_out = 6304
908         self.icmp_id_out = 6305
909
910         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
911         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
912         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
913                                                  is_inside=0)
914
915         # out2in
916         pkts = self.create_stream_out(self.pg1, nat_ip)
917         self.pg1.add_stream(pkts)
918         self.pg_enable_capture(self.pg_interfaces)
919         self.pg_start()
920         capture = self.pg0.get_capture(len(pkts))
921         self.verify_capture_in(capture, self.pg0)
922
923         # in2out
924         pkts = self.create_stream_in(self.pg0, self.pg1)
925         self.pg0.add_stream(pkts)
926         self.pg_enable_capture(self.pg_interfaces)
927         self.pg_start()
928         capture = self.pg1.get_capture(len(pkts))
929         self.verify_capture_out(capture, nat_ip, True)
930
931     def test_static_with_port_in(self):
932         """ SNAT 1:1 NAT with port initialized from inside network """
933
934         self.tcp_port_out = 3606
935         self.udp_port_out = 3607
936         self.icmp_id_out = 3608
937
938         self.snat_add_address(self.snat_addr)
939         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
940                                      self.tcp_port_in, self.tcp_port_out,
941                                      proto=IP_PROTOS.tcp)
942         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
943                                      self.udp_port_in, self.udp_port_out,
944                                      proto=IP_PROTOS.udp)
945         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
946                                      self.icmp_id_in, self.icmp_id_out,
947                                      proto=IP_PROTOS.icmp)
948         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
949         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
950                                                  is_inside=0)
951
952         # in2out
953         pkts = self.create_stream_in(self.pg0, self.pg1)
954         self.pg0.add_stream(pkts)
955         self.pg_enable_capture(self.pg_interfaces)
956         self.pg_start()
957         capture = self.pg1.get_capture(len(pkts))
958         self.verify_capture_out(capture)
959
960         # out2in
961         pkts = self.create_stream_out(self.pg1)
962         self.pg1.add_stream(pkts)
963         self.pg_enable_capture(self.pg_interfaces)
964         self.pg_start()
965         capture = self.pg0.get_capture(len(pkts))
966         self.verify_capture_in(capture, self.pg0)
967
968     def test_static_with_port_out(self):
969         """ SNAT 1:1 NAT with port initialized from outside network """
970
971         self.tcp_port_out = 30606
972         self.udp_port_out = 30607
973         self.icmp_id_out = 30608
974
975         self.snat_add_address(self.snat_addr)
976         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
977                                      self.tcp_port_in, self.tcp_port_out,
978                                      proto=IP_PROTOS.tcp)
979         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
980                                      self.udp_port_in, self.udp_port_out,
981                                      proto=IP_PROTOS.udp)
982         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
983                                      self.icmp_id_in, self.icmp_id_out,
984                                      proto=IP_PROTOS.icmp)
985         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
986         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
987                                                  is_inside=0)
988
989         # out2in
990         pkts = self.create_stream_out(self.pg1)
991         self.pg1.add_stream(pkts)
992         self.pg_enable_capture(self.pg_interfaces)
993         self.pg_start()
994         capture = self.pg0.get_capture(len(pkts))
995         self.verify_capture_in(capture, self.pg0)
996
997         # in2out
998         pkts = self.create_stream_in(self.pg0, self.pg1)
999         self.pg0.add_stream(pkts)
1000         self.pg_enable_capture(self.pg_interfaces)
1001         self.pg_start()
1002         capture = self.pg1.get_capture(len(pkts))
1003         self.verify_capture_out(capture)
1004
1005     def test_static_vrf_aware(self):
1006         """ SNAT 1:1 NAT VRF awareness """
1007
1008         nat_ip1 = "10.0.0.30"
1009         nat_ip2 = "10.0.0.40"
1010         self.tcp_port_out = 6303
1011         self.udp_port_out = 6304
1012         self.icmp_id_out = 6305
1013
1014         self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1015                                      vrf_id=10)
1016         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1017                                      vrf_id=10)
1018         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1019                                                  is_inside=0)
1020         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1021         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
1022
1023         # inside interface VRF match SNAT static mapping VRF
1024         pkts = self.create_stream_in(self.pg4, self.pg3)
1025         self.pg4.add_stream(pkts)
1026         self.pg_enable_capture(self.pg_interfaces)
1027         self.pg_start()
1028         capture = self.pg3.get_capture(len(pkts))
1029         self.verify_capture_out(capture, nat_ip1, True)
1030
1031         # inside interface VRF don't match SNAT static mapping VRF (packets
1032         # are dropped)
1033         pkts = self.create_stream_in(self.pg0, self.pg3)
1034         self.pg0.add_stream(pkts)
1035         self.pg_enable_capture(self.pg_interfaces)
1036         self.pg_start()
1037         self.pg3.assert_nothing_captured()
1038
1039     def test_multiple_inside_interfaces(self):
1040         """ SNAT multiple inside interfaces (non-overlapping address space) """
1041
1042         self.snat_add_address(self.snat_addr)
1043         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1044         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1045         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1046                                                  is_inside=0)
1047
1048         # between two S-NAT inside interfaces (no translation)
1049         pkts = self.create_stream_in(self.pg0, self.pg1)
1050         self.pg0.add_stream(pkts)
1051         self.pg_enable_capture(self.pg_interfaces)
1052         self.pg_start()
1053         capture = self.pg1.get_capture(len(pkts))
1054         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1055
1056         # from S-NAT inside to interface without S-NAT feature (no translation)
1057         pkts = self.create_stream_in(self.pg0, self.pg2)
1058         self.pg0.add_stream(pkts)
1059         self.pg_enable_capture(self.pg_interfaces)
1060         self.pg_start()
1061         capture = self.pg2.get_capture(len(pkts))
1062         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1063
1064         # in2out 1st interface
1065         pkts = self.create_stream_in(self.pg0, self.pg3)
1066         self.pg0.add_stream(pkts)
1067         self.pg_enable_capture(self.pg_interfaces)
1068         self.pg_start()
1069         capture = self.pg3.get_capture(len(pkts))
1070         self.verify_capture_out(capture)
1071
1072         # out2in 1st interface
1073         pkts = self.create_stream_out(self.pg3)
1074         self.pg3.add_stream(pkts)
1075         self.pg_enable_capture(self.pg_interfaces)
1076         self.pg_start()
1077         capture = self.pg0.get_capture(len(pkts))
1078         self.verify_capture_in(capture, self.pg0)
1079
1080         # in2out 2nd interface
1081         pkts = self.create_stream_in(self.pg1, self.pg3)
1082         self.pg1.add_stream(pkts)
1083         self.pg_enable_capture(self.pg_interfaces)
1084         self.pg_start()
1085         capture = self.pg3.get_capture(len(pkts))
1086         self.verify_capture_out(capture)
1087
1088         # out2in 2nd interface
1089         pkts = self.create_stream_out(self.pg3)
1090         self.pg3.add_stream(pkts)
1091         self.pg_enable_capture(self.pg_interfaces)
1092         self.pg_start()
1093         capture = self.pg1.get_capture(len(pkts))
1094         self.verify_capture_in(capture, self.pg1)
1095
1096     def test_inside_overlapping_interfaces(self):
1097         """ SNAT multiple inside interfaces with overlapping address space """
1098
1099         static_nat_ip = "10.0.0.10"
1100         self.snat_add_address(self.snat_addr)
1101         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1102                                                  is_inside=0)
1103         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
1104         self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
1105         self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
1106         self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1107                                      vrf_id=20)
1108
1109         # between S-NAT inside interfaces with same VRF (no translation)
1110         pkts = self.create_stream_in(self.pg4, self.pg5)
1111         self.pg4.add_stream(pkts)
1112         self.pg_enable_capture(self.pg_interfaces)
1113         self.pg_start()
1114         capture = self.pg5.get_capture(len(pkts))
1115         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1116
1117         # between S-NAT inside interfaces with different VRF (hairpinning)
1118         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1119              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1120              TCP(sport=1234, dport=5678))
1121         self.pg4.add_stream(p)
1122         self.pg_enable_capture(self.pg_interfaces)
1123         self.pg_start()
1124         capture = self.pg6.get_capture(1)
1125         p = capture[0]
1126         try:
1127             ip = p[IP]
1128             tcp = p[TCP]
1129             self.assertEqual(ip.src, self.snat_addr)
1130             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1131             self.assertNotEqual(tcp.sport, 1234)
1132             self.assertEqual(tcp.dport, 5678)
1133         except:
1134             self.logger.error(ppp("Unexpected or invalid packet:", p))
1135             raise
1136
1137         # in2out 1st interface
1138         pkts = self.create_stream_in(self.pg4, self.pg3)
1139         self.pg4.add_stream(pkts)
1140         self.pg_enable_capture(self.pg_interfaces)
1141         self.pg_start()
1142         capture = self.pg3.get_capture(len(pkts))
1143         self.verify_capture_out(capture)
1144
1145         # out2in 1st interface
1146         pkts = self.create_stream_out(self.pg3)
1147         self.pg3.add_stream(pkts)
1148         self.pg_enable_capture(self.pg_interfaces)
1149         self.pg_start()
1150         capture = self.pg4.get_capture(len(pkts))
1151         self.verify_capture_in(capture, self.pg4)
1152
1153         # in2out 2nd interface
1154         pkts = self.create_stream_in(self.pg5, self.pg3)
1155         self.pg5.add_stream(pkts)
1156         self.pg_enable_capture(self.pg_interfaces)
1157         self.pg_start()
1158         capture = self.pg3.get_capture(len(pkts))
1159         self.verify_capture_out(capture)
1160
1161         # out2in 2nd interface
1162         pkts = self.create_stream_out(self.pg3)
1163         self.pg3.add_stream(pkts)
1164         self.pg_enable_capture(self.pg_interfaces)
1165         self.pg_start()
1166         capture = self.pg5.get_capture(len(pkts))
1167         self.verify_capture_in(capture, self.pg5)
1168
1169         # pg5 session dump
1170         addresses = self.vapi.snat_address_dump()
1171         self.assertEqual(len(addresses), 1)
1172         sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
1173         self.assertEqual(len(sessions), 3)
1174         for session in sessions:
1175             self.assertFalse(session.is_static)
1176             self.assertEqual(session.inside_ip_address[0:4],
1177                              self.pg5.remote_ip4n)
1178             self.assertEqual(session.outside_ip_address,
1179                              addresses[0].ip_address)
1180         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1181         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1182         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1183         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1184         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1185         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1186         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1187         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1188         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1189
1190         # in2out 3rd interface
1191         pkts = self.create_stream_in(self.pg6, self.pg3)
1192         self.pg6.add_stream(pkts)
1193         self.pg_enable_capture(self.pg_interfaces)
1194         self.pg_start()
1195         capture = self.pg3.get_capture(len(pkts))
1196         self.verify_capture_out(capture, static_nat_ip, True)
1197
1198         # out2in 3rd interface
1199         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1200         self.pg3.add_stream(pkts)
1201         self.pg_enable_capture(self.pg_interfaces)
1202         self.pg_start()
1203         capture = self.pg6.get_capture(len(pkts))
1204         self.verify_capture_in(capture, self.pg6)
1205
1206         # general user and session dump verifications
1207         users = self.vapi.snat_user_dump()
1208         self.assertTrue(len(users) >= 3)
1209         addresses = self.vapi.snat_address_dump()
1210         self.assertEqual(len(addresses), 1)
1211         for user in users:
1212             sessions = self.vapi.snat_user_session_dump(user.ip_address,
1213                                                         user.vrf_id)
1214             for session in sessions:
1215                 self.assertEqual(user.ip_address, session.inside_ip_address)
1216                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1217                 self.assertTrue(session.protocol in
1218                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1219                                  IP_PROTOS.icmp])
1220
1221         # pg4 session dump
1222         sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
1223         self.assertTrue(len(sessions) >= 4)
1224         for session in sessions:
1225             self.assertFalse(session.is_static)
1226             self.assertEqual(session.inside_ip_address[0:4],
1227                              self.pg4.remote_ip4n)
1228             self.assertEqual(session.outside_ip_address,
1229                              addresses[0].ip_address)
1230
1231         # pg6 session dump
1232         sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1233         self.assertTrue(len(sessions) >= 3)
1234         for session in sessions:
1235             self.assertTrue(session.is_static)
1236             self.assertEqual(session.inside_ip_address[0:4],
1237                              self.pg6.remote_ip4n)
1238             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1239                              map(int, static_nat_ip.split('.')))
1240             self.assertTrue(session.inside_port in
1241                             [self.tcp_port_in, self.udp_port_in,
1242                              self.icmp_id_in])
1243
1244     def test_hairpinning(self):
1245         """ SNAT hairpinning - 1:1 NAT with port"""
1246
1247         host = self.pg0.remote_hosts[0]
1248         server = self.pg0.remote_hosts[1]
1249         host_in_port = 1234
1250         host_out_port = 0
1251         server_in_port = 5678
1252         server_out_port = 8765
1253
1254         self.snat_add_address(self.snat_addr)
1255         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1256         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1257                                                  is_inside=0)
1258         # add static mapping for server
1259         self.snat_add_static_mapping(server.ip4, self.snat_addr,
1260                                      server_in_port, server_out_port,
1261                                      proto=IP_PROTOS.tcp)
1262
1263         # send packet from host to server
1264         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1265              IP(src=host.ip4, dst=self.snat_addr) /
1266              TCP(sport=host_in_port, dport=server_out_port))
1267         self.pg0.add_stream(p)
1268         self.pg_enable_capture(self.pg_interfaces)
1269         self.pg_start()
1270         capture = self.pg0.get_capture(1)
1271         p = capture[0]
1272         try:
1273             ip = p[IP]
1274             tcp = p[TCP]
1275             self.assertEqual(ip.src, self.snat_addr)
1276             self.assertEqual(ip.dst, server.ip4)
1277             self.assertNotEqual(tcp.sport, host_in_port)
1278             self.assertEqual(tcp.dport, server_in_port)
1279             self.check_tcp_checksum(p)
1280             host_out_port = tcp.sport
1281         except:
1282             self.logger.error(ppp("Unexpected or invalid packet:", p))
1283             raise
1284
1285         # send reply from server to host
1286         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1287              IP(src=server.ip4, dst=self.snat_addr) /
1288              TCP(sport=server_in_port, dport=host_out_port))
1289         self.pg0.add_stream(p)
1290         self.pg_enable_capture(self.pg_interfaces)
1291         self.pg_start()
1292         capture = self.pg0.get_capture(1)
1293         p = capture[0]
1294         try:
1295             ip = p[IP]
1296             tcp = p[TCP]
1297             self.assertEqual(ip.src, self.snat_addr)
1298             self.assertEqual(ip.dst, host.ip4)
1299             self.assertEqual(tcp.sport, server_out_port)
1300             self.assertEqual(tcp.dport, host_in_port)
1301             self.check_tcp_checksum(p)
1302         except:
1303             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1304             raise
1305
1306     def test_hairpinning2(self):
1307         """ SNAT hairpinning - 1:1 NAT"""
1308
1309         server1_nat_ip = "10.0.0.10"
1310         server2_nat_ip = "10.0.0.11"
1311         host = self.pg0.remote_hosts[0]
1312         server1 = self.pg0.remote_hosts[1]
1313         server2 = self.pg0.remote_hosts[2]
1314         server_tcp_port = 22
1315         server_udp_port = 20
1316
1317         self.snat_add_address(self.snat_addr)
1318         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1319         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1320                                                  is_inside=0)
1321
1322         # add static mapping for servers
1323         self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
1324         self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
1325
1326         # host to server1
1327         pkts = []
1328         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1329              IP(src=host.ip4, dst=server1_nat_ip) /
1330              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1331         pkts.append(p)
1332         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1333              IP(src=host.ip4, dst=server1_nat_ip) /
1334              UDP(sport=self.udp_port_in, dport=server_udp_port))
1335         pkts.append(p)
1336         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1337              IP(src=host.ip4, dst=server1_nat_ip) /
1338              ICMP(id=self.icmp_id_in, type='echo-request'))
1339         pkts.append(p)
1340         self.pg0.add_stream(pkts)
1341         self.pg_enable_capture(self.pg_interfaces)
1342         self.pg_start()
1343         capture = self.pg0.get_capture(len(pkts))
1344         for packet in capture:
1345             try:
1346                 self.assertEqual(packet[IP].src, self.snat_addr)
1347                 self.assertEqual(packet[IP].dst, server1.ip4)
1348                 if packet.haslayer(TCP):
1349                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1350                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1351                     self.tcp_port_out = packet[TCP].sport
1352                     self.check_tcp_checksum(packet)
1353                 elif packet.haslayer(UDP):
1354                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1355                     self.assertEqual(packet[UDP].dport, server_udp_port)
1356                     self.udp_port_out = packet[UDP].sport
1357                 else:
1358                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1359                     self.icmp_id_out = packet[ICMP].id
1360             except:
1361                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1362                 raise
1363
1364         # server1 to host
1365         pkts = []
1366         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1367              IP(src=server1.ip4, dst=self.snat_addr) /
1368              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1369         pkts.append(p)
1370         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1371              IP(src=server1.ip4, dst=self.snat_addr) /
1372              UDP(sport=server_udp_port, dport=self.udp_port_out))
1373         pkts.append(p)
1374         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1375              IP(src=server1.ip4, dst=self.snat_addr) /
1376              ICMP(id=self.icmp_id_out, type='echo-reply'))
1377         pkts.append(p)
1378         self.pg0.add_stream(pkts)
1379         self.pg_enable_capture(self.pg_interfaces)
1380         self.pg_start()
1381         capture = self.pg0.get_capture(len(pkts))
1382         for packet in capture:
1383             try:
1384                 self.assertEqual(packet[IP].src, server1_nat_ip)
1385                 self.assertEqual(packet[IP].dst, host.ip4)
1386                 if packet.haslayer(TCP):
1387                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1388                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1389                     self.check_tcp_checksum(packet)
1390                 elif packet.haslayer(UDP):
1391                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1392                     self.assertEqual(packet[UDP].sport, server_udp_port)
1393                 else:
1394                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1395             except:
1396                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1397                 raise
1398
1399         # server2 to server1
1400         pkts = []
1401         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1402              IP(src=server2.ip4, dst=server1_nat_ip) /
1403              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1404         pkts.append(p)
1405         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1406              IP(src=server2.ip4, dst=server1_nat_ip) /
1407              UDP(sport=self.udp_port_in, dport=server_udp_port))
1408         pkts.append(p)
1409         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1410              IP(src=server2.ip4, dst=server1_nat_ip) /
1411              ICMP(id=self.icmp_id_in, type='echo-request'))
1412         pkts.append(p)
1413         self.pg0.add_stream(pkts)
1414         self.pg_enable_capture(self.pg_interfaces)
1415         self.pg_start()
1416         capture = self.pg0.get_capture(len(pkts))
1417         for packet in capture:
1418             try:
1419                 self.assertEqual(packet[IP].src, server2_nat_ip)
1420                 self.assertEqual(packet[IP].dst, server1.ip4)
1421                 if packet.haslayer(TCP):
1422                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1423                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1424                     self.tcp_port_out = packet[TCP].sport
1425                     self.check_tcp_checksum(packet)
1426                 elif packet.haslayer(UDP):
1427                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1428                     self.assertEqual(packet[UDP].dport, server_udp_port)
1429                     self.udp_port_out = packet[UDP].sport
1430                 else:
1431                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1432                     self.icmp_id_out = packet[ICMP].id
1433             except:
1434                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1435                 raise
1436
1437         # server1 to server2
1438         pkts = []
1439         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1440              IP(src=server1.ip4, dst=server2_nat_ip) /
1441              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1442         pkts.append(p)
1443         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1444              IP(src=server1.ip4, dst=server2_nat_ip) /
1445              UDP(sport=server_udp_port, dport=self.udp_port_out))
1446         pkts.append(p)
1447         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1448              IP(src=server1.ip4, dst=server2_nat_ip) /
1449              ICMP(id=self.icmp_id_out, type='echo-reply'))
1450         pkts.append(p)
1451         self.pg0.add_stream(pkts)
1452         self.pg_enable_capture(self.pg_interfaces)
1453         self.pg_start()
1454         capture = self.pg0.get_capture(len(pkts))
1455         for packet in capture:
1456             try:
1457                 self.assertEqual(packet[IP].src, server1_nat_ip)
1458                 self.assertEqual(packet[IP].dst, server2.ip4)
1459                 if packet.haslayer(TCP):
1460                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1461                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1462                     self.check_tcp_checksum(packet)
1463                 elif packet.haslayer(UDP):
1464                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1465                     self.assertEqual(packet[UDP].sport, server_udp_port)
1466                 else:
1467                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1468             except:
1469                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1470                 raise
1471
1472     def test_max_translations_per_user(self):
1473         """ MAX translations per user - recycle the least recently used """
1474
1475         self.snat_add_address(self.snat_addr)
1476         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1477         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1478                                                  is_inside=0)
1479
1480         # get maximum number of translations per user
1481         snat_config = self.vapi.snat_show_config()
1482
1483         # send more than maximum number of translations per user packets
1484         pkts_num = snat_config.max_translations_per_user + 5
1485         pkts = []
1486         for port in range(0, pkts_num):
1487             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1488                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1489                  TCP(sport=1025 + port))
1490             pkts.append(p)
1491         self.pg0.add_stream(pkts)
1492         self.pg_enable_capture(self.pg_interfaces)
1493         self.pg_start()
1494
1495         # verify number of translated packet
1496         self.pg1.get_capture(pkts_num)
1497
1498     def test_interface_addr(self):
1499         """ Acquire SNAT addresses from interface """
1500         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1501
1502         # no address in NAT pool
1503         adresses = self.vapi.snat_address_dump()
1504         self.assertEqual(0, len(adresses))
1505
1506         # configure interface address and check NAT address pool
1507         self.pg7.config_ip4()
1508         adresses = self.vapi.snat_address_dump()
1509         self.assertEqual(1, len(adresses))
1510         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1511
1512         # remove interface address and check NAT address pool
1513         self.pg7.unconfig_ip4()
1514         adresses = self.vapi.snat_address_dump()
1515         self.assertEqual(0, len(adresses))
1516
1517     def test_interface_addr_static_mapping(self):
1518         """ Static mapping with addresses from interface """
1519         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1520         self.snat_add_static_mapping('1.2.3.4',
1521                                      external_sw_if_index=self.pg7.sw_if_index)
1522
1523         # static mappings with external interface
1524         static_mappings = self.vapi.snat_static_mapping_dump()
1525         self.assertEqual(1, len(static_mappings))
1526         self.assertEqual(self.pg7.sw_if_index,
1527                          static_mappings[0].external_sw_if_index)
1528
1529         # configure interface address and check static mappings
1530         self.pg7.config_ip4()
1531         static_mappings = self.vapi.snat_static_mapping_dump()
1532         self.assertEqual(1, len(static_mappings))
1533         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1534                          self.pg7.local_ip4n)
1535         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1536
1537         # remove interface address and check static mappings
1538         self.pg7.unconfig_ip4()
1539         static_mappings = self.vapi.snat_static_mapping_dump()
1540         self.assertEqual(0, len(static_mappings))
1541
1542     def test_ipfix_nat44_sess(self):
1543         """ S-NAT IPFIX logging NAT44 session created/delted """
1544         self.ipfix_domain_id = 10
1545         self.ipfix_src_port = 20202
1546         colector_port = 30303
1547         bind_layers(UDP, IPFIX, dport=30303)
1548         self.snat_add_address(self.snat_addr)
1549         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1550         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1551                                                  is_inside=0)
1552         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1553                                      src_address=self.pg3.local_ip4n,
1554                                      path_mtu=512,
1555                                      template_interval=10,
1556                                      collector_port=colector_port)
1557         self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1558                              src_port=self.ipfix_src_port)
1559
1560         pkts = self.create_stream_in(self.pg0, self.pg1)
1561         self.pg0.add_stream(pkts)
1562         self.pg_enable_capture(self.pg_interfaces)
1563         self.pg_start()
1564         capture = self.pg1.get_capture(len(pkts))
1565         self.verify_capture_out(capture)
1566         self.snat_add_address(self.snat_addr, is_add=0)
1567         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1568         capture = self.pg3.get_capture(3)
1569         ipfix = IPFIXDecoder()
1570         # first load template
1571         for p in capture:
1572             self.assertTrue(p.haslayer(IPFIX))
1573             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1574             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1575             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1576             self.assertEqual(p[UDP].dport, colector_port)
1577             self.assertEqual(p[IPFIX].observationDomainID,
1578                              self.ipfix_domain_id)
1579             if p.haslayer(Template):
1580                 ipfix.add_template(p.getlayer(Template))
1581         # verify events in data set
1582         for p in capture:
1583             if p.haslayer(Data):
1584                 data = ipfix.decode_data_set(p.getlayer(Set))
1585                 self.verify_ipfix_nat44_ses(data)
1586
1587     def test_ipfix_addr_exhausted(self):
1588         """ S-NAT IPFIX logging NAT addresses exhausted """
1589         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1590         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1591                                                  is_inside=0)
1592         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1593                                      src_address=self.pg3.local_ip4n,
1594                                      path_mtu=512,
1595                                      template_interval=10)
1596         self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1597                              src_port=self.ipfix_src_port)
1598
1599         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1600              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1601              TCP(sport=3025))
1602         self.pg0.add_stream(p)
1603         self.pg_enable_capture(self.pg_interfaces)
1604         self.pg_start()
1605         capture = self.pg1.get_capture(0)
1606         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1607         capture = self.pg3.get_capture(3)
1608         ipfix = IPFIXDecoder()
1609         # first load template
1610         for p in capture:
1611             self.assertTrue(p.haslayer(IPFIX))
1612             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1613             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1614             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1615             self.assertEqual(p[UDP].dport, 4739)
1616             self.assertEqual(p[IPFIX].observationDomainID,
1617                              self.ipfix_domain_id)
1618             if p.haslayer(Template):
1619                 ipfix.add_template(p.getlayer(Template))
1620         # verify events in data set
1621         for p in capture:
1622             if p.haslayer(Data):
1623                 data = ipfix.decode_data_set(p.getlayer(Set))
1624                 self.verify_ipfix_addr_exhausted(data)
1625
1626     def test_pool_addr_fib(self):
1627         """ S-NAT add pool addresses to FIB """
1628         static_addr = '10.0.0.10'
1629         self.snat_add_address(self.snat_addr)
1630         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1631         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1632                                                  is_inside=0)
1633         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1634
1635         # SNAT address
1636         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1637              ARP(op=ARP.who_has, pdst=self.snat_addr,
1638                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1639         self.pg1.add_stream(p)
1640         self.pg_enable_capture(self.pg_interfaces)
1641         self.pg_start()
1642         capture = self.pg1.get_capture(1)
1643         self.assertTrue(capture[0].haslayer(ARP))
1644         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1645
1646         # 1:1 NAT address
1647         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1648              ARP(op=ARP.who_has, pdst=static_addr,
1649                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1650         self.pg1.add_stream(p)
1651         self.pg_enable_capture(self.pg_interfaces)
1652         self.pg_start()
1653         capture = self.pg1.get_capture(1)
1654         self.assertTrue(capture[0].haslayer(ARP))
1655         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1656
1657         # send ARP to non-SNAT interface
1658         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1659              ARP(op=ARP.who_has, pdst=self.snat_addr,
1660                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1661         self.pg2.add_stream(p)
1662         self.pg_enable_capture(self.pg_interfaces)
1663         self.pg_start()
1664         capture = self.pg1.get_capture(0)
1665
1666         # remove addresses and verify
1667         self.snat_add_address(self.snat_addr, is_add=0)
1668         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1669                                      is_add=0)
1670
1671         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1672              ARP(op=ARP.who_has, pdst=self.snat_addr,
1673                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1674         self.pg1.add_stream(p)
1675         self.pg_enable_capture(self.pg_interfaces)
1676         self.pg_start()
1677         capture = self.pg1.get_capture(0)
1678
1679         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1680              ARP(op=ARP.who_has, pdst=static_addr,
1681                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1682         self.pg1.add_stream(p)
1683         self.pg_enable_capture(self.pg_interfaces)
1684         self.pg_start()
1685         capture = self.pg1.get_capture(0)
1686
1687     def test_vrf_mode(self):
1688         """ S-NAT tenant VRF aware address pool mode """
1689
1690         vrf_id1 = 1
1691         vrf_id2 = 2
1692         nat_ip1 = "10.0.0.10"
1693         nat_ip2 = "10.0.0.11"
1694
1695         self.pg0.unconfig_ip4()
1696         self.pg1.unconfig_ip4()
1697         self.pg0.set_table_ip4(vrf_id1)
1698         self.pg1.set_table_ip4(vrf_id2)
1699         self.pg0.config_ip4()
1700         self.pg1.config_ip4()
1701
1702         self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1703         self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1704         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1705         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1706         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1707                                                  is_inside=0)
1708
1709         # first VRF
1710         pkts = self.create_stream_in(self.pg0, self.pg2)
1711         self.pg0.add_stream(pkts)
1712         self.pg_enable_capture(self.pg_interfaces)
1713         self.pg_start()
1714         capture = self.pg2.get_capture(len(pkts))
1715         self.verify_capture_out(capture, nat_ip1)
1716
1717         # second VRF
1718         pkts = self.create_stream_in(self.pg1, self.pg2)
1719         self.pg1.add_stream(pkts)
1720         self.pg_enable_capture(self.pg_interfaces)
1721         self.pg_start()
1722         capture = self.pg2.get_capture(len(pkts))
1723         self.verify_capture_out(capture, nat_ip2)
1724
1725     def test_vrf_feature_independent(self):
1726         """ S-NAT tenant VRF independent address pool mode """
1727
1728         nat_ip1 = "10.0.0.10"
1729         nat_ip2 = "10.0.0.11"
1730
1731         self.snat_add_address(nat_ip1)
1732         self.snat_add_address(nat_ip2)
1733         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1734         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1735         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1736                                                  is_inside=0)
1737
1738         # first VRF
1739         pkts = self.create_stream_in(self.pg0, self.pg2)
1740         self.pg0.add_stream(pkts)
1741         self.pg_enable_capture(self.pg_interfaces)
1742         self.pg_start()
1743         capture = self.pg2.get_capture(len(pkts))
1744         self.verify_capture_out(capture, nat_ip1)
1745
1746         # second VRF
1747         pkts = self.create_stream_in(self.pg1, self.pg2)
1748         self.pg1.add_stream(pkts)
1749         self.pg_enable_capture(self.pg_interfaces)
1750         self.pg_start()
1751         capture = self.pg2.get_capture(len(pkts))
1752         self.verify_capture_out(capture, nat_ip1)
1753
1754     def test_dynamic_ipless_interfaces(self):
1755         """ SNAT interfaces without configured ip dynamic map """
1756
1757         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1758                                       self.pg7.remote_mac,
1759                                       self.pg7.remote_ip4n,
1760                                       is_static=1)
1761         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1762                                       self.pg8.remote_mac,
1763                                       self.pg8.remote_ip4n,
1764                                       is_static=1)
1765
1766         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1767                                    dst_address_length=32,
1768                                    next_hop_address=self.pg7.remote_ip4n,
1769                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1770         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1771                                    dst_address_length=32,
1772                                    next_hop_address=self.pg8.remote_ip4n,
1773                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1774
1775         self.snat_add_address(self.snat_addr)
1776         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1777         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1778                                                  is_inside=0)
1779
1780         # in2out
1781         pkts = self.create_stream_in(self.pg7, self.pg8)
1782         self.pg7.add_stream(pkts)
1783         self.pg_enable_capture(self.pg_interfaces)
1784         self.pg_start()
1785         capture = self.pg8.get_capture(len(pkts))
1786         self.verify_capture_out(capture)
1787
1788         # out2in
1789         pkts = self.create_stream_out(self.pg8, self.snat_addr)
1790         self.pg8.add_stream(pkts)
1791         self.pg_enable_capture(self.pg_interfaces)
1792         self.pg_start()
1793         capture = self.pg7.get_capture(len(pkts))
1794         self.verify_capture_in(capture, self.pg7)
1795
1796     def test_static_ipless_interfaces(self):
1797         """ SNAT 1:1 NAT interfaces without configured ip """
1798
1799         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1800                                       self.pg7.remote_mac,
1801                                       self.pg7.remote_ip4n,
1802                                       is_static=1)
1803         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1804                                       self.pg8.remote_mac,
1805                                       self.pg8.remote_ip4n,
1806                                       is_static=1)
1807
1808         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1809                                    dst_address_length=32,
1810                                    next_hop_address=self.pg7.remote_ip4n,
1811                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1812         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1813                                    dst_address_length=32,
1814                                    next_hop_address=self.pg8.remote_ip4n,
1815                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1816
1817         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1818         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1819         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1820                                                  is_inside=0)
1821
1822         # out2in
1823         pkts = self.create_stream_out(self.pg8)
1824         self.pg8.add_stream(pkts)
1825         self.pg_enable_capture(self.pg_interfaces)
1826         self.pg_start()
1827         capture = self.pg7.get_capture(len(pkts))
1828         self.verify_capture_in(capture, self.pg7)
1829
1830         # in2out
1831         pkts = self.create_stream_in(self.pg7, self.pg8)
1832         self.pg7.add_stream(pkts)
1833         self.pg_enable_capture(self.pg_interfaces)
1834         self.pg_start()
1835         capture = self.pg8.get_capture(len(pkts))
1836         self.verify_capture_out(capture, self.snat_addr, True)
1837
1838     def test_static_with_port_ipless_interfaces(self):
1839         """ SNAT 1:1 NAT with port interfaces without configured ip """
1840
1841         self.tcp_port_out = 30606
1842         self.udp_port_out = 30607
1843         self.icmp_id_out = 30608
1844
1845         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1846                                       self.pg7.remote_mac,
1847                                       self.pg7.remote_ip4n,
1848                                       is_static=1)
1849         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1850                                       self.pg8.remote_mac,
1851                                       self.pg8.remote_ip4n,
1852                                       is_static=1)
1853
1854         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1855                                    dst_address_length=32,
1856                                    next_hop_address=self.pg7.remote_ip4n,
1857                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1858         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1859                                    dst_address_length=32,
1860                                    next_hop_address=self.pg8.remote_ip4n,
1861                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1862
1863         self.snat_add_address(self.snat_addr)
1864         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1865                                      self.tcp_port_in, self.tcp_port_out,
1866                                      proto=IP_PROTOS.tcp)
1867         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1868                                      self.udp_port_in, self.udp_port_out,
1869                                      proto=IP_PROTOS.udp)
1870         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1871                                      self.icmp_id_in, self.icmp_id_out,
1872                                      proto=IP_PROTOS.icmp)
1873         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1874         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1875                                                  is_inside=0)
1876
1877         # out2in
1878         pkts = self.create_stream_out(self.pg8)
1879         self.pg8.add_stream(pkts)
1880         self.pg_enable_capture(self.pg_interfaces)
1881         self.pg_start()
1882         capture = self.pg7.get_capture(len(pkts))
1883         self.verify_capture_in(capture, self.pg7)
1884
1885         # in2out
1886         pkts = self.create_stream_in(self.pg7, self.pg8)
1887         self.pg7.add_stream(pkts)
1888         self.pg_enable_capture(self.pg_interfaces)
1889         self.pg_start()
1890         capture = self.pg8.get_capture(len(pkts))
1891         self.verify_capture_out(capture)
1892
1893     def test_static_unknown_proto(self):
1894         """ 1:1 NAT translate packet with unknown protocol """
1895         nat_ip = "10.0.0.10"
1896         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1897         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1898         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1899                                                  is_inside=0)
1900
1901         # in2out
1902         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1903              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1904              GRE() /
1905              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
1906              TCP(sport=1234, dport=1234))
1907         self.pg0.add_stream(p)
1908         self.pg_enable_capture(self.pg_interfaces)
1909         self.pg_start()
1910         p = self.pg1.get_capture(1)
1911         packet = p[0]
1912         try:
1913             self.assertEqual(packet[IP].src, nat_ip)
1914             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1915             self.assertTrue(packet.haslayer(GRE))
1916             self.check_ip_checksum(packet)
1917         except:
1918             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1919             raise
1920
1921         # out2in
1922         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1923              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1924              GRE() /
1925              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
1926              TCP(sport=1234, dport=1234))
1927         self.pg1.add_stream(p)
1928         self.pg_enable_capture(self.pg_interfaces)
1929         self.pg_start()
1930         p = self.pg0.get_capture(1)
1931         packet = p[0]
1932         try:
1933             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
1934             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
1935             self.assertTrue(packet.haslayer(GRE))
1936             self.check_ip_checksum(packet)
1937         except:
1938             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1939             raise
1940
1941     def test_hairpinning_static_unknown_proto(self):
1942         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
1943
1944         host = self.pg0.remote_hosts[0]
1945         server = self.pg0.remote_hosts[1]
1946
1947         host_nat_ip = "10.0.0.10"
1948         server_nat_ip = "10.0.0.11"
1949
1950         self.snat_add_static_mapping(host.ip4, host_nat_ip)
1951         self.snat_add_static_mapping(server.ip4, server_nat_ip)
1952         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1953         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1954                                                  is_inside=0)
1955
1956         # host to server
1957         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
1958              IP(src=host.ip4, dst=server_nat_ip) /
1959              GRE() /
1960              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
1961              TCP(sport=1234, dport=1234))
1962         self.pg0.add_stream(p)
1963         self.pg_enable_capture(self.pg_interfaces)
1964         self.pg_start()
1965         p = self.pg0.get_capture(1)
1966         packet = p[0]
1967         try:
1968             self.assertEqual(packet[IP].src, host_nat_ip)
1969             self.assertEqual(packet[IP].dst, server.ip4)
1970             self.assertTrue(packet.haslayer(GRE))
1971             self.check_ip_checksum(packet)
1972         except:
1973             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1974             raise
1975
1976         # server to host
1977         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
1978              IP(src=server.ip4, dst=host_nat_ip) /
1979              GRE() /
1980              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
1981              TCP(sport=1234, dport=1234))
1982         self.pg0.add_stream(p)
1983         self.pg_enable_capture(self.pg_interfaces)
1984         self.pg_start()
1985         p = self.pg0.get_capture(1)
1986         packet = p[0]
1987         try:
1988             self.assertEqual(packet[IP].src, server_nat_ip)
1989             self.assertEqual(packet[IP].dst, host.ip4)
1990             self.assertTrue(packet.haslayer(GRE))
1991             self.check_ip_checksum(packet)
1992         except:
1993             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1994             raise
1995
1996     def test_unknown_proto(self):
1997         """ SNAT translate packet with unknown protocol """
1998         self.snat_add_address(self.snat_addr)
1999         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2000         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2001                                                  is_inside=0)
2002
2003         # in2out
2004         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2005              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2006              TCP(sport=self.tcp_port_in, dport=20))
2007         self.pg0.add_stream(p)
2008         self.pg_enable_capture(self.pg_interfaces)
2009         self.pg_start()
2010         p = self.pg1.get_capture(1)
2011
2012         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2013              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2014              GRE() /
2015              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2016              TCP(sport=1234, dport=1234))
2017         self.pg0.add_stream(p)
2018         self.pg_enable_capture(self.pg_interfaces)
2019         self.pg_start()
2020         p = self.pg1.get_capture(1)
2021         packet = p[0]
2022         try:
2023             self.assertEqual(packet[IP].src, self.snat_addr)
2024             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2025             self.assertTrue(packet.haslayer(GRE))
2026             self.check_ip_checksum(packet)
2027         except:
2028             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2029             raise
2030
2031         # out2in
2032         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2033              IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2034              GRE() /
2035              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2036              TCP(sport=1234, dport=1234))
2037         self.pg1.add_stream(p)
2038         self.pg_enable_capture(self.pg_interfaces)
2039         self.pg_start()
2040         p = self.pg0.get_capture(1)
2041         packet = p[0]
2042         try:
2043             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2044             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2045             self.assertTrue(packet.haslayer(GRE))
2046             self.check_ip_checksum(packet)
2047         except:
2048             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2049             raise
2050
2051     def test_hairpinning_unknown_proto(self):
2052         """ SNAT translate packet with unknown protocol - hairpinning """
2053         host = self.pg0.remote_hosts[0]
2054         server = self.pg0.remote_hosts[1]
2055         host_in_port = 1234
2056         host_out_port = 0
2057         server_in_port = 5678
2058         server_out_port = 8765
2059         server_nat_ip = "10.0.0.11"
2060
2061         self.snat_add_address(self.snat_addr)
2062         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2063         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2064                                                  is_inside=0)
2065
2066         # add static mapping for server
2067         self.snat_add_static_mapping(server.ip4, server_nat_ip)
2068
2069         # host to server
2070         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2071              IP(src=host.ip4, dst=server_nat_ip) /
2072              TCP(sport=host_in_port, dport=server_out_port))
2073         self.pg0.add_stream(p)
2074         self.pg_enable_capture(self.pg_interfaces)
2075         self.pg_start()
2076         capture = self.pg0.get_capture(1)
2077
2078         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2079              IP(src=host.ip4, dst=server_nat_ip) /
2080              GRE() /
2081              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2082              TCP(sport=1234, dport=1234))
2083         self.pg0.add_stream(p)
2084         self.pg_enable_capture(self.pg_interfaces)
2085         self.pg_start()
2086         p = self.pg0.get_capture(1)
2087         packet = p[0]
2088         try:
2089             self.assertEqual(packet[IP].src, self.snat_addr)
2090             self.assertEqual(packet[IP].dst, server.ip4)
2091             self.assertTrue(packet.haslayer(GRE))
2092             self.check_ip_checksum(packet)
2093         except:
2094             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2095             raise
2096
2097         # server to host
2098         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2099              IP(src=server.ip4, dst=self.snat_addr) /
2100              GRE() /
2101              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2102              TCP(sport=1234, dport=1234))
2103         self.pg0.add_stream(p)
2104         self.pg_enable_capture(self.pg_interfaces)
2105         self.pg_start()
2106         p = self.pg0.get_capture(1)
2107         packet = p[0]
2108         try:
2109             self.assertEqual(packet[IP].src, server_nat_ip)
2110             self.assertEqual(packet[IP].dst, host.ip4)
2111             self.assertTrue(packet.haslayer(GRE))
2112             self.check_ip_checksum(packet)
2113         except:
2114             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2115             raise
2116
2117     def test_output_feature(self):
2118         """ S-NAT interface output feature (in2out postrouting) """
2119         self.snat_add_address(self.snat_addr)
2120         self.vapi.snat_interface_add_del_output_feature(self.pg0.sw_if_index)
2121         self.vapi.snat_interface_add_del_output_feature(self.pg1.sw_if_index,
2122                                                         is_inside=0)
2123
2124         # in2out
2125         pkts = self.create_stream_in(self.pg0, self.pg1)
2126         self.pg0.add_stream(pkts)
2127         self.pg_enable_capture(self.pg_interfaces)
2128         self.pg_start()
2129         capture = self.pg1.get_capture(len(pkts))
2130         self.verify_capture_out(capture)
2131
2132         # out2in
2133         pkts = self.create_stream_out(self.pg1)
2134         self.pg1.add_stream(pkts)
2135         self.pg_enable_capture(self.pg_interfaces)
2136         self.pg_start()
2137         capture = self.pg0.get_capture(len(pkts))
2138         self.verify_capture_in(capture, self.pg0)
2139
2140     def test_output_feature_vrf_aware(self):
2141         """ S-NAT interface output feature VRF aware (in2out postrouting) """
2142         nat_ip_vrf10 = "10.0.0.10"
2143         nat_ip_vrf20 = "10.0.0.20"
2144
2145         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2146                                    dst_address_length=32,
2147                                    next_hop_address=self.pg3.remote_ip4n,
2148                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2149                                    table_id=10)
2150         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2151                                    dst_address_length=32,
2152                                    next_hop_address=self.pg3.remote_ip4n,
2153                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2154                                    table_id=20)
2155
2156         self.snat_add_address(nat_ip_vrf10, vrf_id=10)
2157         self.snat_add_address(nat_ip_vrf20, vrf_id=20)
2158         self.vapi.snat_interface_add_del_output_feature(self.pg4.sw_if_index)
2159         self.vapi.snat_interface_add_del_output_feature(self.pg6.sw_if_index)
2160         self.vapi.snat_interface_add_del_output_feature(self.pg3.sw_if_index,
2161                                                         is_inside=0)
2162
2163         # in2out VRF 10
2164         pkts = self.create_stream_in(self.pg4, self.pg3)
2165         self.pg4.add_stream(pkts)
2166         self.pg_enable_capture(self.pg_interfaces)
2167         self.pg_start()
2168         capture = self.pg3.get_capture(len(pkts))
2169         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2170
2171         # out2in VRF 10
2172         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2173         self.pg3.add_stream(pkts)
2174         self.pg_enable_capture(self.pg_interfaces)
2175         self.pg_start()
2176         capture = self.pg4.get_capture(len(pkts))
2177         self.verify_capture_in(capture, self.pg4)
2178
2179         # in2out VRF 20
2180         pkts = self.create_stream_in(self.pg6, self.pg3)
2181         self.pg6.add_stream(pkts)
2182         self.pg_enable_capture(self.pg_interfaces)
2183         self.pg_start()
2184         capture = self.pg3.get_capture(len(pkts))
2185         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2186
2187         # out2in VRF 20
2188         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2189         self.pg3.add_stream(pkts)
2190         self.pg_enable_capture(self.pg_interfaces)
2191         self.pg_start()
2192         capture = self.pg6.get_capture(len(pkts))
2193         self.verify_capture_in(capture, self.pg6)
2194
2195     def test_output_feature_hairpinning(self):
2196         """ S-NAT interface output feature hairpinning (in2out postrouting) """
2197         host = self.pg0.remote_hosts[0]
2198         server = self.pg0.remote_hosts[1]
2199         host_in_port = 1234
2200         host_out_port = 0
2201         server_in_port = 5678
2202         server_out_port = 8765
2203
2204         self.snat_add_address(self.snat_addr)
2205         self.vapi.snat_interface_add_del_output_feature(self.pg0.sw_if_index)
2206         self.vapi.snat_interface_add_del_output_feature(self.pg1.sw_if_index,
2207                                                         is_inside=0)
2208
2209         # add static mapping for server
2210         self.snat_add_static_mapping(server.ip4, self.snat_addr,
2211                                      server_in_port, server_out_port,
2212                                      proto=IP_PROTOS.tcp)
2213
2214         # send packet from host to server
2215         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2216              IP(src=host.ip4, dst=self.snat_addr) /
2217              TCP(sport=host_in_port, dport=server_out_port))
2218         self.pg0.add_stream(p)
2219         self.pg_enable_capture(self.pg_interfaces)
2220         self.pg_start()
2221         capture = self.pg0.get_capture(1)
2222         p = capture[0]
2223         try:
2224             ip = p[IP]
2225             tcp = p[TCP]
2226             self.assertEqual(ip.src, self.snat_addr)
2227             self.assertEqual(ip.dst, server.ip4)
2228             self.assertNotEqual(tcp.sport, host_in_port)
2229             self.assertEqual(tcp.dport, server_in_port)
2230             self.check_tcp_checksum(p)
2231             host_out_port = tcp.sport
2232         except:
2233             self.logger.error(ppp("Unexpected or invalid packet:", p))
2234             raise
2235
2236         # send reply from server to host
2237         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2238              IP(src=server.ip4, dst=self.snat_addr) /
2239              TCP(sport=server_in_port, dport=host_out_port))
2240         self.pg0.add_stream(p)
2241         self.pg_enable_capture(self.pg_interfaces)
2242         self.pg_start()
2243         capture = self.pg0.get_capture(1)
2244         p = capture[0]
2245         try:
2246             ip = p[IP]
2247             tcp = p[TCP]
2248             self.assertEqual(ip.src, self.snat_addr)
2249             self.assertEqual(ip.dst, host.ip4)
2250             self.assertEqual(tcp.sport, server_out_port)
2251             self.assertEqual(tcp.dport, host_in_port)
2252             self.check_tcp_checksum(p)
2253         except:
2254             self.logger.error(ppp("Unexpected or invalid packet:"), p)
2255             raise
2256
2257     def tearDown(self):
2258         super(TestSNAT, self).tearDown()
2259         if not self.vpp_dead:
2260             self.logger.info(self.vapi.cli("show snat verbose"))
2261             self.clear_snat()
2262
2263
2264 class TestDeterministicNAT(MethodHolder):
2265     """ Deterministic NAT Test Cases """
2266
2267     @classmethod
2268     def setUpConstants(cls):
2269         super(TestDeterministicNAT, cls).setUpConstants()
2270         cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
2271
2272     @classmethod
2273     def setUpClass(cls):
2274         super(TestDeterministicNAT, cls).setUpClass()
2275
2276         try:
2277             cls.tcp_port_in = 6303
2278             cls.tcp_external_port = 6303
2279             cls.udp_port_in = 6304
2280             cls.udp_external_port = 6304
2281             cls.icmp_id_in = 6305
2282             cls.snat_addr = '10.0.0.3'
2283
2284             cls.create_pg_interfaces(range(3))
2285             cls.interfaces = list(cls.pg_interfaces)
2286
2287             for i in cls.interfaces:
2288                 i.admin_up()
2289                 i.config_ip4()
2290                 i.resolve_arp()
2291
2292             cls.pg0.generate_remote_hosts(2)
2293             cls.pg0.configure_ipv4_neighbors()
2294
2295         except Exception:
2296             super(TestDeterministicNAT, cls).tearDownClass()
2297             raise
2298
2299     def create_stream_in(self, in_if, out_if, ttl=64):
2300         """
2301         Create packet stream for inside network
2302
2303         :param in_if: Inside interface
2304         :param out_if: Outside interface
2305         :param ttl: TTL of generated packets
2306         """
2307         pkts = []
2308         # TCP
2309         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2310              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2311              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2312         pkts.append(p)
2313
2314         # UDP
2315         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2316              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2317              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2318         pkts.append(p)
2319
2320         # ICMP
2321         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2322              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2323              ICMP(id=self.icmp_id_in, type='echo-request'))
2324         pkts.append(p)
2325
2326         return pkts
2327
2328     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2329         """
2330         Create packet stream for outside network
2331
2332         :param out_if: Outside interface
2333         :param dst_ip: Destination IP address (Default use global SNAT address)
2334         :param ttl: TTL of generated packets
2335         """
2336         if dst_ip is None:
2337             dst_ip = self.snat_addr
2338         pkts = []
2339         # TCP
2340         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2341              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2342              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2343         pkts.append(p)
2344
2345         # UDP
2346         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2347              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2348              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2349         pkts.append(p)
2350
2351         # ICMP
2352         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2353              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2354              ICMP(id=self.icmp_external_id, type='echo-reply'))
2355         pkts.append(p)
2356
2357         return pkts
2358
2359     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2360         """
2361         Verify captured packets on outside network
2362
2363         :param capture: Captured packets
2364         :param nat_ip: Translated IP address (Default use global SNAT address)
2365         :param same_port: Sorce port number is not translated (Default False)
2366         :param packet_num: Expected number of packets (Default 3)
2367         """
2368         if nat_ip is None:
2369             nat_ip = self.snat_addr
2370         self.assertEqual(packet_num, len(capture))
2371         for packet in capture:
2372             try:
2373                 self.assertEqual(packet[IP].src, nat_ip)
2374                 if packet.haslayer(TCP):
2375                     self.tcp_port_out = packet[TCP].sport
2376                 elif packet.haslayer(UDP):
2377                     self.udp_port_out = packet[UDP].sport
2378                 else:
2379                     self.icmp_external_id = packet[ICMP].id
2380             except:
2381                 self.logger.error(ppp("Unexpected or invalid packet "
2382                                       "(outside network):", packet))
2383                 raise
2384
2385     def initiate_tcp_session(self, in_if, out_if):
2386         """
2387         Initiates TCP session
2388
2389         :param in_if: Inside interface
2390         :param out_if: Outside interface
2391         """
2392         try:
2393             # SYN packet in->out
2394             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2395                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2396                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2397                      flags="S"))
2398             in_if.add_stream(p)
2399             self.pg_enable_capture(self.pg_interfaces)
2400             self.pg_start()
2401             capture = out_if.get_capture(1)
2402             p = capture[0]
2403             self.tcp_port_out = p[TCP].sport
2404
2405             # SYN + ACK packet out->in
2406             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2407                  IP(src=out_if.remote_ip4, dst=self.snat_addr) /
2408                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2409                      flags="SA"))
2410             out_if.add_stream(p)
2411             self.pg_enable_capture(self.pg_interfaces)
2412             self.pg_start()
2413             in_if.get_capture(1)
2414
2415             # ACK packet in->out
2416             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2417                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2418                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2419                      flags="A"))
2420             in_if.add_stream(p)
2421             self.pg_enable_capture(self.pg_interfaces)
2422             self.pg_start()
2423             out_if.get_capture(1)
2424
2425         except:
2426             self.logger.error("TCP 3 way handshake failed")
2427             raise
2428
2429     def verify_ipfix_max_entries_per_user(self, data):
2430         """
2431         Verify IPFIX maximum entries per user exceeded event
2432
2433         :param data: Decoded IPFIX data records
2434         """
2435         self.assertEqual(1, len(data))
2436         record = data[0]
2437         # natEvent
2438         self.assertEqual(ord(record[230]), 13)
2439         # natQuotaExceededEvent
2440         self.assertEqual('\x03\x00\x00\x00', record[466])
2441         # sourceIPv4Address
2442         self.assertEqual(self.pg0.remote_ip4n, record[8])
2443
2444     def test_deterministic_mode(self):
2445         """ S-NAT run deterministic mode """
2446         in_addr = '172.16.255.0'
2447         out_addr = '172.17.255.50'
2448         in_addr_t = '172.16.255.20'
2449         in_addr_n = socket.inet_aton(in_addr)
2450         out_addr_n = socket.inet_aton(out_addr)
2451         in_addr_t_n = socket.inet_aton(in_addr_t)
2452         in_plen = 24
2453         out_plen = 32
2454
2455         snat_config = self.vapi.snat_show_config()
2456         self.assertEqual(1, snat_config.deterministic)
2457
2458         self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
2459
2460         rep1 = self.vapi.snat_det_forward(in_addr_t_n)
2461         self.assertEqual(rep1.out_addr[:4], out_addr_n)
2462         rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
2463         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2464
2465         deterministic_mappings = self.vapi.snat_det_map_dump()
2466         self.assertEqual(len(deterministic_mappings), 1)
2467         dsm = deterministic_mappings[0]
2468         self.assertEqual(in_addr_n, dsm.in_addr[:4])
2469         self.assertEqual(in_plen, dsm.in_plen)
2470         self.assertEqual(out_addr_n, dsm.out_addr[:4])
2471         self.assertEqual(out_plen, dsm.out_plen)
2472
2473         self.clear_snat()
2474         deterministic_mappings = self.vapi.snat_det_map_dump()
2475         self.assertEqual(len(deterministic_mappings), 0)
2476
2477     def test_set_timeouts(self):
2478         """ Set deterministic NAT timeouts """
2479         timeouts_before = self.vapi.snat_det_get_timeouts()
2480
2481         self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
2482                                         timeouts_before.tcp_established + 10,
2483                                         timeouts_before.tcp_transitory + 10,
2484                                         timeouts_before.icmp + 10)
2485
2486         timeouts_after = self.vapi.snat_det_get_timeouts()
2487
2488         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2489         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2490         self.assertNotEqual(timeouts_before.tcp_established,
2491                             timeouts_after.tcp_established)
2492         self.assertNotEqual(timeouts_before.tcp_transitory,
2493                             timeouts_after.tcp_transitory)
2494
2495     def test_det_in(self):
2496         """ CGNAT translation test (TCP, UDP, ICMP) """
2497
2498         nat_ip = "10.0.0.10"
2499
2500         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2501                                    32,
2502                                    socket.inet_aton(nat_ip),
2503                                    32)
2504         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2505         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2506                                                  is_inside=0)
2507
2508         # in2out
2509         pkts = self.create_stream_in(self.pg0, self.pg1)
2510         self.pg0.add_stream(pkts)
2511         self.pg_enable_capture(self.pg_interfaces)
2512         self.pg_start()
2513         capture = self.pg1.get_capture(len(pkts))
2514         self.verify_capture_out(capture, nat_ip)
2515
2516         # out2in
2517         pkts = self.create_stream_out(self.pg1, nat_ip)
2518         self.pg1.add_stream(pkts)
2519         self.pg_enable_capture(self.pg_interfaces)
2520         self.pg_start()
2521         capture = self.pg0.get_capture(len(pkts))
2522         self.verify_capture_in(capture, self.pg0)
2523
2524         # session dump test
2525         sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
2526         self.assertEqual(len(sessions), 3)
2527
2528         # TCP session
2529         s = sessions[0]
2530         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2531         self.assertEqual(s.in_port, self.tcp_port_in)
2532         self.assertEqual(s.out_port, self.tcp_port_out)
2533         self.assertEqual(s.ext_port, self.tcp_external_port)
2534
2535         # UDP session
2536         s = sessions[1]
2537         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2538         self.assertEqual(s.in_port, self.udp_port_in)
2539         self.assertEqual(s.out_port, self.udp_port_out)
2540         self.assertEqual(s.ext_port, self.udp_external_port)
2541
2542         # ICMP session
2543         s = sessions[2]
2544         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2545         self.assertEqual(s.in_port, self.icmp_id_in)
2546         self.assertEqual(s.out_port, self.icmp_external_id)
2547
2548     def test_multiple_users(self):
2549         """ CGNAT multiple users """
2550
2551         nat_ip = "10.0.0.10"
2552         port_in = 80
2553         external_port = 6303
2554
2555         host0 = self.pg0.remote_hosts[0]
2556         host1 = self.pg0.remote_hosts[1]
2557
2558         self.vapi.snat_add_det_map(host0.ip4n,
2559                                    24,
2560                                    socket.inet_aton(nat_ip),
2561                                    32)
2562         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2563         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2564                                                  is_inside=0)
2565
2566         # host0 to out
2567         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2568              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2569              TCP(sport=port_in, dport=external_port))
2570         self.pg0.add_stream(p)
2571         self.pg_enable_capture(self.pg_interfaces)
2572         self.pg_start()
2573         capture = self.pg1.get_capture(1)
2574         p = capture[0]
2575         try:
2576             ip = p[IP]
2577             tcp = p[TCP]
2578             self.assertEqual(ip.src, nat_ip)
2579             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2580             self.assertEqual(tcp.dport, external_port)
2581             port_out0 = tcp.sport
2582         except:
2583             self.logger.error(ppp("Unexpected or invalid packet:", p))
2584             raise
2585
2586         # host1 to out
2587         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2588              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2589              TCP(sport=port_in, dport=external_port))
2590         self.pg0.add_stream(p)
2591         self.pg_enable_capture(self.pg_interfaces)
2592         self.pg_start()
2593         capture = self.pg1.get_capture(1)
2594         p = capture[0]
2595         try:
2596             ip = p[IP]
2597             tcp = p[TCP]
2598             self.assertEqual(ip.src, nat_ip)
2599             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2600             self.assertEqual(tcp.dport, external_port)
2601             port_out1 = tcp.sport
2602         except:
2603             self.logger.error(ppp("Unexpected or invalid packet:", p))
2604             raise
2605
2606         dms = self.vapi.snat_det_map_dump()
2607         self.assertEqual(1, len(dms))
2608         self.assertEqual(2, dms[0].ses_num)
2609
2610         # out to host0
2611         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2612              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2613              TCP(sport=external_port, dport=port_out0))
2614         self.pg1.add_stream(p)
2615         self.pg_enable_capture(self.pg_interfaces)
2616         self.pg_start()
2617         capture = self.pg0.get_capture(1)
2618         p = capture[0]
2619         try:
2620             ip = p[IP]
2621             tcp = p[TCP]
2622             self.assertEqual(ip.src, self.pg1.remote_ip4)
2623             self.assertEqual(ip.dst, host0.ip4)
2624             self.assertEqual(tcp.dport, port_in)
2625             self.assertEqual(tcp.sport, external_port)
2626         except:
2627             self.logger.error(ppp("Unexpected or invalid packet:", p))
2628             raise
2629
2630         # out to host1
2631         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2632              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2633              TCP(sport=external_port, dport=port_out1))
2634         self.pg1.add_stream(p)
2635         self.pg_enable_capture(self.pg_interfaces)
2636         self.pg_start()
2637         capture = self.pg0.get_capture(1)
2638         p = capture[0]
2639         try:
2640             ip = p[IP]
2641             tcp = p[TCP]
2642             self.assertEqual(ip.src, self.pg1.remote_ip4)
2643             self.assertEqual(ip.dst, host1.ip4)
2644             self.assertEqual(tcp.dport, port_in)
2645             self.assertEqual(tcp.sport, external_port)
2646         except:
2647             self.logger.error(ppp("Unexpected or invalid packet", p))
2648             raise
2649
2650         # session close api test
2651         self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
2652                                              port_out1,
2653                                              self.pg1.remote_ip4n,
2654                                              external_port)
2655         dms = self.vapi.snat_det_map_dump()
2656         self.assertEqual(dms[0].ses_num, 1)
2657
2658         self.vapi.snat_det_close_session_in(host0.ip4n,
2659                                             port_in,
2660                                             self.pg1.remote_ip4n,
2661                                             external_port)
2662         dms = self.vapi.snat_det_map_dump()
2663         self.assertEqual(dms[0].ses_num, 0)
2664
2665     def test_tcp_session_close_detection_in(self):
2666         """ CGNAT TCP session close initiated from inside network """
2667         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2668                                    32,
2669                                    socket.inet_aton(self.snat_addr),
2670                                    32)
2671         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2672         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2673                                                  is_inside=0)
2674
2675         self.initiate_tcp_session(self.pg0, self.pg1)
2676
2677         # close the session from inside
2678         try:
2679             # FIN packet in -> out
2680             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2681                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2682                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2683                      flags="F"))
2684             self.pg0.add_stream(p)
2685             self.pg_enable_capture(self.pg_interfaces)
2686             self.pg_start()
2687             self.pg1.get_capture(1)
2688
2689             pkts = []
2690
2691             # ACK packet out -> in
2692             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2693                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2694                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2695                      flags="A"))
2696             pkts.append(p)
2697
2698             # FIN packet out -> in
2699             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2700                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2701                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2702                      flags="F"))
2703             pkts.append(p)
2704
2705             self.pg1.add_stream(pkts)
2706             self.pg_enable_capture(self.pg_interfaces)
2707             self.pg_start()
2708             self.pg0.get_capture(2)
2709
2710             # ACK packet in -> out
2711             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2712                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2713                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2714                      flags="A"))
2715             self.pg0.add_stream(p)
2716             self.pg_enable_capture(self.pg_interfaces)
2717             self.pg_start()
2718             self.pg1.get_capture(1)
2719
2720             # Check if snat closed the session
2721             dms = self.vapi.snat_det_map_dump()
2722             self.assertEqual(0, dms[0].ses_num)
2723         except:
2724             self.logger.error("TCP session termination failed")
2725             raise
2726
2727     def test_tcp_session_close_detection_out(self):
2728         """ CGNAT TCP session close initiated from outside network """
2729         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2730                                    32,
2731                                    socket.inet_aton(self.snat_addr),
2732                                    32)
2733         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2734         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2735                                                  is_inside=0)
2736
2737         self.initiate_tcp_session(self.pg0, self.pg1)
2738
2739         # close the session from outside
2740         try:
2741             # FIN packet out -> in
2742             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2743                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2744                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2745                      flags="F"))
2746             self.pg1.add_stream(p)
2747             self.pg_enable_capture(self.pg_interfaces)
2748             self.pg_start()
2749             self.pg0.get_capture(1)
2750
2751             pkts = []
2752
2753             # ACK packet in -> out
2754             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2755                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2756                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2757                      flags="A"))
2758             pkts.append(p)
2759
2760             # ACK packet in -> out
2761             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2762                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2763                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2764                      flags="F"))
2765             pkts.append(p)
2766
2767             self.pg0.add_stream(pkts)
2768             self.pg_enable_capture(self.pg_interfaces)
2769             self.pg_start()
2770             self.pg1.get_capture(2)
2771
2772             # ACK packet out -> in
2773             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2774                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2775                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2776                      flags="A"))
2777             self.pg1.add_stream(p)
2778             self.pg_enable_capture(self.pg_interfaces)
2779             self.pg_start()
2780             self.pg0.get_capture(1)
2781
2782             # Check if snat closed the session
2783             dms = self.vapi.snat_det_map_dump()
2784             self.assertEqual(0, dms[0].ses_num)
2785         except:
2786             self.logger.error("TCP session termination failed")
2787             raise
2788
2789     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2790     def test_session_timeout(self):
2791         """ CGNAT session timeouts """
2792         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2793                                    32,
2794                                    socket.inet_aton(self.snat_addr),
2795                                    32)
2796         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2797         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2798                                                  is_inside=0)
2799
2800         self.initiate_tcp_session(self.pg0, self.pg1)
2801         self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2802         pkts = self.create_stream_in(self.pg0, self.pg1)
2803         self.pg0.add_stream(pkts)
2804         self.pg_enable_capture(self.pg_interfaces)
2805         self.pg_start()
2806         capture = self.pg1.get_capture(len(pkts))
2807         sleep(15)
2808
2809         dms = self.vapi.snat_det_map_dump()
2810         self.assertEqual(0, dms[0].ses_num)
2811
2812     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2813     def test_session_limit_per_user(self):
2814         """ CGNAT maximum 1000 sessions per user should be created """
2815         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2816                                    32,
2817                                    socket.inet_aton(self.snat_addr),
2818                                    32)
2819         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2820         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2821                                                  is_inside=0)
2822         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2823                                      src_address=self.pg2.local_ip4n,
2824                                      path_mtu=512,
2825                                      template_interval=10)
2826         self.vapi.snat_ipfix()
2827
2828         pkts = []
2829         for port in range(1025, 2025):
2830             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2831                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2832                  UDP(sport=port, dport=port))
2833             pkts.append(p)
2834
2835         self.pg0.add_stream(pkts)
2836         self.pg_enable_capture(self.pg_interfaces)
2837         self.pg_start()
2838         capture = self.pg1.get_capture(len(pkts))
2839
2840         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2841              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2842              UDP(sport=3001, dport=3002))
2843         self.pg0.add_stream(p)
2844         self.pg_enable_capture(self.pg_interfaces)
2845         self.pg_start()
2846         capture = self.pg1.assert_nothing_captured()
2847
2848         # verify ICMP error packet
2849         capture = self.pg0.get_capture(1)
2850         p = capture[0]
2851         self.assertTrue(p.haslayer(ICMP))
2852         icmp = p[ICMP]
2853         self.assertEqual(icmp.type, 3)
2854         self.assertEqual(icmp.code, 1)
2855         self.assertTrue(icmp.haslayer(IPerror))
2856         inner_ip = icmp[IPerror]
2857         self.assertEqual(inner_ip[UDPerror].sport, 3001)
2858         self.assertEqual(inner_ip[UDPerror].dport, 3002)
2859
2860         dms = self.vapi.snat_det_map_dump()
2861
2862         self.assertEqual(1000, dms[0].ses_num)
2863
2864         # verify IPFIX logging
2865         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2866         sleep(1)
2867         capture = self.pg2.get_capture(2)
2868         ipfix = IPFIXDecoder()
2869         # first load template
2870         for p in capture:
2871             self.assertTrue(p.haslayer(IPFIX))
2872             if p.haslayer(Template):
2873                 ipfix.add_template(p.getlayer(Template))
2874         # verify events in data set
2875         for p in capture:
2876             if p.haslayer(Data):
2877                 data = ipfix.decode_data_set(p.getlayer(Set))
2878                 self.verify_ipfix_max_entries_per_user(data)
2879
2880     def clear_snat(self):
2881         """
2882         Clear SNAT configuration.
2883         """
2884         self.vapi.snat_ipfix(enable=0)
2885         self.vapi.snat_det_set_timeouts()
2886         deterministic_mappings = self.vapi.snat_det_map_dump()
2887         for dsm in deterministic_mappings:
2888             self.vapi.snat_add_det_map(dsm.in_addr,
2889                                        dsm.in_plen,
2890                                        dsm.out_addr,
2891                                        dsm.out_plen,
2892                                        is_add=0)
2893
2894         interfaces = self.vapi.snat_interface_dump()
2895         for intf in interfaces:
2896             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2897                                                      intf.is_inside,
2898                                                      is_add=0)
2899
2900     def tearDown(self):
2901         super(TestDeterministicNAT, self).tearDown()
2902         if not self.vpp_dead:
2903             self.logger.info(self.vapi.cli("show snat detail"))
2904             self.clear_snat()
2905
2906
2907 class TestNAT64(MethodHolder):
2908     """ NAT64 Test Cases """
2909
2910     @classmethod
2911     def setUpClass(cls):
2912         super(TestNAT64, cls).setUpClass()
2913
2914         try:
2915             cls.tcp_port_in = 6303
2916             cls.tcp_port_out = 6303
2917             cls.udp_port_in = 6304
2918             cls.udp_port_out = 6304
2919             cls.icmp_id_in = 6305
2920             cls.icmp_id_out = 6305
2921             cls.nat_addr = '10.0.0.3'
2922             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
2923             cls.vrf1_id = 10
2924             cls.vrf1_nat_addr = '10.0.10.3'
2925             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
2926                                                    cls.vrf1_nat_addr)
2927
2928             cls.create_pg_interfaces(range(3))
2929             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
2930             cls.ip6_interfaces.append(cls.pg_interfaces[2])
2931             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
2932
2933             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
2934
2935             cls.pg0.generate_remote_hosts(2)
2936
2937             for i in cls.ip6_interfaces:
2938                 i.admin_up()
2939                 i.config_ip6()
2940                 i.configure_ipv6_neighbors()
2941
2942             for i in cls.ip4_interfaces:
2943                 i.admin_up()
2944                 i.config_ip4()
2945                 i.resolve_arp()
2946
2947         except Exception:
2948             super(TestNAT64, cls).tearDownClass()
2949             raise
2950
2951     def test_pool(self):
2952         """ Add/delete address to NAT64 pool """
2953         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
2954
2955         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
2956
2957         addresses = self.vapi.nat64_pool_addr_dump()
2958         self.assertEqual(len(addresses), 1)
2959         self.assertEqual(addresses[0].address, nat_addr)
2960
2961         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
2962
2963         addresses = self.vapi.nat64_pool_addr_dump()
2964         self.assertEqual(len(addresses), 0)
2965
2966     def test_interface(self):
2967         """ Enable/disable NAT64 feature on the interface """
2968         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2969         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2970
2971         interfaces = self.vapi.nat64_interface_dump()
2972         self.assertEqual(len(interfaces), 2)
2973         pg0_found = False
2974         pg1_found = False
2975         for intf in interfaces:
2976             if intf.sw_if_index == self.pg0.sw_if_index:
2977                 self.assertEqual(intf.is_inside, 1)
2978                 pg0_found = True
2979             elif intf.sw_if_index == self.pg1.sw_if_index:
2980                 self.assertEqual(intf.is_inside, 0)
2981                 pg1_found = True
2982         self.assertTrue(pg0_found)
2983         self.assertTrue(pg1_found)
2984
2985         features = self.vapi.cli("show interface features pg0")
2986         self.assertNotEqual(features.find('nat64-in2out'), -1)
2987         features = self.vapi.cli("show interface features pg1")
2988         self.assertNotEqual(features.find('nat64-out2in'), -1)
2989
2990         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
2991         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
2992
2993         interfaces = self.vapi.nat64_interface_dump()
2994         self.assertEqual(len(interfaces), 0)
2995
2996     def test_static_bib(self):
2997         """ Add/delete static BIB entry """
2998         in_addr = socket.inet_pton(socket.AF_INET6,
2999                                    '2001:db8:85a3::8a2e:370:7334')
3000         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3001         in_port = 1234
3002         out_port = 5678
3003         proto = IP_PROTOS.tcp
3004
3005         self.vapi.nat64_add_del_static_bib(in_addr,
3006                                            out_addr,
3007                                            in_port,
3008                                            out_port,
3009                                            proto)
3010         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3011         static_bib_num = 0
3012         for bibe in bib:
3013             if bibe.is_static:
3014                 static_bib_num += 1
3015                 self.assertEqual(bibe.i_addr, in_addr)
3016                 self.assertEqual(bibe.o_addr, out_addr)
3017                 self.assertEqual(bibe.i_port, in_port)
3018                 self.assertEqual(bibe.o_port, out_port)
3019         self.assertEqual(static_bib_num, 1)
3020
3021         self.vapi.nat64_add_del_static_bib(in_addr,
3022                                            out_addr,
3023                                            in_port,
3024                                            out_port,
3025                                            proto,
3026                                            is_add=0)
3027         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3028         static_bib_num = 0
3029         for bibe in bib:
3030             if bibe.is_static:
3031                 static_bib_num += 1
3032         self.assertEqual(static_bib_num, 0)
3033
3034     def test_set_timeouts(self):
3035         """ Set NAT64 timeouts """
3036         # verify default values
3037         timeouts = self.vapi.nat64_get_timeouts()
3038         self.assertEqual(timeouts.udp, 300)
3039         self.assertEqual(timeouts.icmp, 60)
3040         self.assertEqual(timeouts.tcp_trans, 240)
3041         self.assertEqual(timeouts.tcp_est, 7440)
3042         self.assertEqual(timeouts.tcp_incoming_syn, 6)
3043
3044         # set and verify custom values
3045         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3046                                      tcp_est=7450, tcp_incoming_syn=10)
3047         timeouts = self.vapi.nat64_get_timeouts()
3048         self.assertEqual(timeouts.udp, 200)
3049         self.assertEqual(timeouts.icmp, 30)
3050         self.assertEqual(timeouts.tcp_trans, 250)
3051         self.assertEqual(timeouts.tcp_est, 7450)
3052         self.assertEqual(timeouts.tcp_incoming_syn, 10)
3053
3054     def test_dynamic(self):
3055         """ NAT64 dynamic translation test """
3056         self.tcp_port_in = 6303
3057         self.udp_port_in = 6304
3058         self.icmp_id_in = 6305
3059
3060         ses_num_start = self.nat64_get_ses_num()
3061
3062         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3063                                                 self.nat_addr_n)
3064         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3065         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3066
3067         # in2out
3068         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3069         self.pg0.add_stream(pkts)
3070         self.pg_enable_capture(self.pg_interfaces)
3071         self.pg_start()
3072         capture = self.pg1.get_capture(len(pkts))
3073         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3074                                 dst_ip=self.pg1.remote_ip4)
3075
3076         # out2in
3077         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3078         self.pg1.add_stream(pkts)
3079         self.pg_enable_capture(self.pg_interfaces)
3080         self.pg_start()
3081         capture = self.pg0.get_capture(len(pkts))
3082         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3083         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3084
3085         # in2out
3086         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3087         self.pg0.add_stream(pkts)
3088         self.pg_enable_capture(self.pg_interfaces)
3089         self.pg_start()
3090         capture = self.pg1.get_capture(len(pkts))
3091         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3092                                 dst_ip=self.pg1.remote_ip4)
3093
3094         # out2in
3095         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3096         self.pg1.add_stream(pkts)
3097         self.pg_enable_capture(self.pg_interfaces)
3098         self.pg_start()
3099         capture = self.pg0.get_capture(len(pkts))
3100         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3101
3102         ses_num_end = self.nat64_get_ses_num()
3103
3104         self.assertEqual(ses_num_end - ses_num_start, 3)
3105
3106         # tenant with specific VRF
3107         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3108                                                 self.vrf1_nat_addr_n,
3109                                                 vrf_id=self.vrf1_id)
3110         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3111
3112         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3113         self.pg2.add_stream(pkts)
3114         self.pg_enable_capture(self.pg_interfaces)
3115         self.pg_start()
3116         capture = self.pg1.get_capture(len(pkts))
3117         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3118                                 dst_ip=self.pg1.remote_ip4)
3119
3120         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3121         self.pg1.add_stream(pkts)
3122         self.pg_enable_capture(self.pg_interfaces)
3123         self.pg_start()
3124         capture = self.pg2.get_capture(len(pkts))
3125         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3126
3127     def test_static(self):
3128         """ NAT64 static translation test """
3129         self.tcp_port_in = 60303
3130         self.udp_port_in = 60304
3131         self.icmp_id_in = 60305
3132         self.tcp_port_out = 60303
3133         self.udp_port_out = 60304
3134         self.icmp_id_out = 60305
3135
3136         ses_num_start = self.nat64_get_ses_num()
3137
3138         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3139                                                 self.nat_addr_n)
3140         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3141         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3142
3143         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3144                                            self.nat_addr_n,
3145                                            self.tcp_port_in,
3146                                            self.tcp_port_out,
3147                                            IP_PROTOS.tcp)
3148         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3149                                            self.nat_addr_n,
3150                                            self.udp_port_in,
3151                                            self.udp_port_out,
3152                                            IP_PROTOS.udp)
3153         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3154                                            self.nat_addr_n,
3155                                            self.icmp_id_in,
3156                                            self.icmp_id_out,
3157                                            IP_PROTOS.icmp)
3158
3159         # in2out
3160         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3161         self.pg0.add_stream(pkts)
3162         self.pg_enable_capture(self.pg_interfaces)
3163         self.pg_start()
3164         capture = self.pg1.get_capture(len(pkts))
3165         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3166                                 dst_ip=self.pg1.remote_ip4, same_port=True)
3167
3168         # out2in
3169         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3170         self.pg1.add_stream(pkts)
3171         self.pg_enable_capture(self.pg_interfaces)
3172         self.pg_start()
3173         capture = self.pg0.get_capture(len(pkts))
3174         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3175         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3176
3177         ses_num_end = self.nat64_get_ses_num()
3178
3179         self.assertEqual(ses_num_end - ses_num_start, 3)
3180
3181     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3182     def test_session_timeout(self):
3183         """ NAT64 session timeout """
3184         self.icmp_id_in = 1234
3185         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3186                                                 self.nat_addr_n)
3187         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3188         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3189         self.vapi.nat64_set_timeouts(icmp=5)
3190
3191         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3192         self.pg0.add_stream(pkts)
3193         self.pg_enable_capture(self.pg_interfaces)
3194         self.pg_start()
3195         capture = self.pg1.get_capture(len(pkts))
3196
3197         ses_num_before_timeout = self.nat64_get_ses_num()
3198
3199         sleep(15)
3200
3201         # ICMP session after timeout
3202         ses_num_after_timeout = self.nat64_get_ses_num()
3203         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3204
3205     def test_icmp_error(self):
3206         """ NAT64 ICMP Error message translation """
3207         self.tcp_port_in = 6303
3208         self.udp_port_in = 6304
3209         self.icmp_id_in = 6305
3210
3211         ses_num_start = self.nat64_get_ses_num()
3212
3213         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3214                                                 self.nat_addr_n)
3215         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3216         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3217
3218         # send some packets to create sessions
3219         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3220         self.pg0.add_stream(pkts)
3221         self.pg_enable_capture(self.pg_interfaces)
3222         self.pg_start()
3223         capture_ip4 = self.pg1.get_capture(len(pkts))
3224         self.verify_capture_out(capture_ip4,
3225                                 nat_ip=self.nat_addr,
3226                                 dst_ip=self.pg1.remote_ip4)
3227
3228         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3229         self.pg1.add_stream(pkts)
3230         self.pg_enable_capture(self.pg_interfaces)
3231         self.pg_start()
3232         capture_ip6 = self.pg0.get_capture(len(pkts))
3233         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3234         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3235                                    self.pg0.remote_ip6)
3236
3237         # in2out
3238         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3239                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3240                 ICMPv6DestUnreach(code=1) /
3241                 packet[IPv6] for packet in capture_ip6]
3242         self.pg0.add_stream(pkts)
3243         self.pg_enable_capture(self.pg_interfaces)
3244         self.pg_start()
3245         capture = self.pg1.get_capture(len(pkts))
3246         for packet in capture:
3247             try:
3248                 self.assertEqual(packet[IP].src, self.nat_addr)
3249                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3250                 self.assertEqual(packet[ICMP].type, 3)
3251                 self.assertEqual(packet[ICMP].code, 13)
3252                 inner = packet[IPerror]
3253                 self.assertEqual(inner.src, self.pg1.remote_ip4)
3254                 self.assertEqual(inner.dst, self.nat_addr)
3255                 self.check_icmp_checksum(packet)
3256                 if inner.haslayer(TCPerror):
3257                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3258                 elif inner.haslayer(UDPerror):
3259                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3260                 else:
3261                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3262             except:
3263                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3264                 raise
3265
3266         # out2in
3267         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3268                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3269                 ICMP(type=3, code=13) /
3270                 packet[IP] for packet in capture_ip4]
3271         self.pg1.add_stream(pkts)
3272         self.pg_enable_capture(self.pg_interfaces)
3273         self.pg_start()
3274         capture = self.pg0.get_capture(len(pkts))
3275         for packet in capture:
3276             try:
3277                 self.assertEqual(packet[IPv6].src, ip.src)
3278                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3279                 icmp = packet[ICMPv6DestUnreach]
3280                 self.assertEqual(icmp.code, 1)
3281                 inner = icmp[IPerror6]
3282                 self.assertEqual(inner.src, self.pg0.remote_ip6)
3283                 self.assertEqual(inner.dst, ip.src)
3284                 self.check_icmpv6_checksum(packet)
3285                 if inner.haslayer(TCPerror):
3286                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3287                 elif inner.haslayer(UDPerror):
3288                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3289                 else:
3290                     self.assertEqual(inner[ICMPv6EchoRequest].id,
3291                                      self.icmp_id_in)
3292             except:
3293                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3294                 raise
3295
3296     def test_hairpinning(self):
3297         """ NAT64 hairpinning """
3298
3299         client = self.pg0.remote_hosts[0]
3300         server = self.pg0.remote_hosts[1]
3301         server_tcp_in_port = 22
3302         server_tcp_out_port = 4022
3303         server_udp_in_port = 23
3304         server_udp_out_port = 4023
3305         client_tcp_in_port = 1234
3306         client_udp_in_port = 1235
3307         client_tcp_out_port = 0
3308         client_udp_out_port = 0
3309         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3310         nat_addr_ip6 = ip.src
3311
3312         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3313                                                 self.nat_addr_n)
3314         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3315         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3316
3317         self.vapi.nat64_add_del_static_bib(server.ip6n,
3318                                            self.nat_addr_n,
3319                                            server_tcp_in_port,
3320                                            server_tcp_out_port,
3321                                            IP_PROTOS.tcp)
3322         self.vapi.nat64_add_del_static_bib(server.ip6n,
3323                                            self.nat_addr_n,
3324                                            server_udp_in_port,
3325                                            server_udp_out_port,
3326                                            IP_PROTOS.udp)
3327
3328         # client to server
3329         pkts = []
3330         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3331              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3332              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3333         pkts.append(p)
3334         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3335              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3336              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3337         pkts.append(p)
3338         self.pg0.add_stream(pkts)
3339         self.pg_enable_capture(self.pg_interfaces)
3340         self.pg_start()
3341         capture = self.pg0.get_capture(len(pkts))
3342         for packet in capture:
3343             try:
3344                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3345                 self.assertEqual(packet[IPv6].dst, server.ip6)
3346                 if packet.haslayer(TCP):
3347                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3348                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3349                     self.check_tcp_checksum(packet)
3350                     client_tcp_out_port = packet[TCP].sport
3351                 else:
3352                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3353                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
3354                     self.check_udp_checksum(packet)
3355                     client_udp_out_port = packet[UDP].sport
3356             except:
3357                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3358                 raise
3359
3360         # server to client
3361         pkts = []
3362         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3363              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3364              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3365         pkts.append(p)
3366         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3367              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3368              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3369         pkts.append(p)
3370         self.pg0.add_stream(pkts)
3371         self.pg_enable_capture(self.pg_interfaces)
3372         self.pg_start()
3373         capture = self.pg0.get_capture(len(pkts))
3374         for packet in capture:
3375             try:
3376                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3377                 self.assertEqual(packet[IPv6].dst, client.ip6)
3378                 if packet.haslayer(TCP):
3379                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3380                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3381                     self.check_tcp_checksum(packet)
3382                 else:
3383                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
3384                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
3385                     self.check_udp_checksum(packet)
3386             except:
3387                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3388                 raise
3389
3390         # ICMP error
3391         pkts = []
3392         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3393                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3394                 ICMPv6DestUnreach(code=1) /
3395                 packet[IPv6] for packet in capture]
3396         self.pg0.add_stream(pkts)
3397         self.pg_enable_capture(self.pg_interfaces)
3398         self.pg_start()
3399         capture = self.pg0.get_capture(len(pkts))
3400         for packet in capture:
3401             try:
3402                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3403                 self.assertEqual(packet[IPv6].dst, server.ip6)
3404                 icmp = packet[ICMPv6DestUnreach]
3405                 self.assertEqual(icmp.code, 1)
3406                 inner = icmp[IPerror6]
3407                 self.assertEqual(inner.src, server.ip6)
3408                 self.assertEqual(inner.dst, nat_addr_ip6)
3409                 self.check_icmpv6_checksum(packet)
3410                 if inner.haslayer(TCPerror):
3411                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3412                     self.assertEqual(inner[TCPerror].dport,
3413                                      client_tcp_out_port)
3414                 else:
3415                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3416                     self.assertEqual(inner[UDPerror].dport,
3417                                      client_udp_out_port)
3418             except:
3419                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3420                 raise
3421
3422     def test_prefix(self):
3423         """ NAT64 Network-Specific Prefix """
3424
3425         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3426                                                 self.nat_addr_n)
3427         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3428         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3429         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3430                                                 self.vrf1_nat_addr_n,
3431                                                 vrf_id=self.vrf1_id)
3432         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3433
3434         # Add global prefix
3435         global_pref64 = "2001:db8::"
3436         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3437         global_pref64_len = 32
3438         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3439
3440         prefix = self.vapi.nat64_prefix_dump()
3441         self.assertEqual(len(prefix), 1)
3442         self.assertEqual(prefix[0].prefix, global_pref64_n)
3443         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3444         self.assertEqual(prefix[0].vrf_id, 0)
3445
3446         # Add tenant specific prefix
3447         vrf1_pref64 = "2001:db8:122:300::"
3448         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3449         vrf1_pref64_len = 56
3450         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3451                                        vrf1_pref64_len,
3452                                        vrf_id=self.vrf1_id)
3453         prefix = self.vapi.nat64_prefix_dump()
3454         self.assertEqual(len(prefix), 2)
3455
3456         # Global prefix
3457         pkts = self.create_stream_in_ip6(self.pg0,
3458                                          self.pg1,
3459                                          pref=global_pref64,
3460                                          plen=global_pref64_len)
3461         self.pg0.add_stream(pkts)
3462         self.pg_enable_capture(self.pg_interfaces)
3463         self.pg_start()
3464         capture = self.pg1.get_capture(len(pkts))
3465         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3466                                 dst_ip=self.pg1.remote_ip4)
3467
3468         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3469         self.pg1.add_stream(pkts)
3470         self.pg_enable_capture(self.pg_interfaces)
3471         self.pg_start()
3472         capture = self.pg0.get_capture(len(pkts))
3473         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3474                                   global_pref64,
3475                                   global_pref64_len)
3476         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3477
3478         # Tenant specific prefix
3479         pkts = self.create_stream_in_ip6(self.pg2,
3480                                          self.pg1,
3481                                          pref=vrf1_pref64,
3482                                          plen=vrf1_pref64_len)
3483         self.pg2.add_stream(pkts)
3484         self.pg_enable_capture(self.pg_interfaces)
3485         self.pg_start()
3486         capture = self.pg1.get_capture(len(pkts))
3487         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3488                                 dst_ip=self.pg1.remote_ip4)
3489
3490         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3491         self.pg1.add_stream(pkts)
3492         self.pg_enable_capture(self.pg_interfaces)
3493         self.pg_start()
3494         capture = self.pg2.get_capture(len(pkts))
3495         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3496                                   vrf1_pref64,
3497                                   vrf1_pref64_len)
3498         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3499
3500     def test_unknown_proto(self):
3501         """ NAT64 translate packet with unknown protocol """
3502
3503         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3504                                                 self.nat_addr_n)
3505         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3506         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3507         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
3508
3509         # in2out
3510         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3511              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3512              TCP(sport=self.tcp_port_in, dport=20))
3513         self.pg0.add_stream(p)
3514         self.pg_enable_capture(self.pg_interfaces)
3515         self.pg_start()
3516         p = self.pg1.get_capture(1)
3517
3518         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3519              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
3520              GRE() /
3521              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3522              TCP(sport=1234, dport=1234))
3523         self.pg0.add_stream(p)
3524         self.pg_enable_capture(self.pg_interfaces)
3525         self.pg_start()
3526         p = self.pg1.get_capture(1)
3527         packet = p[0]
3528         try:
3529             self.assertEqual(packet[IP].src, self.nat_addr)
3530             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3531             self.assertTrue(packet.haslayer(GRE))
3532             self.check_ip_checksum(packet)
3533         except:
3534             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3535             raise
3536
3537         # out2in
3538         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3539              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3540              GRE() /
3541              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3542              TCP(sport=1234, dport=1234))
3543         self.pg1.add_stream(p)
3544         self.pg_enable_capture(self.pg_interfaces)
3545         self.pg_start()
3546         p = self.pg0.get_capture(1)
3547         packet = p[0]
3548         try:
3549             self.assertEqual(packet[IPv6].src, remote_ip6)
3550             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3551             self.assertEqual(packet[IPv6].nh, 47)
3552         except:
3553             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3554             raise
3555
3556     def test_hairpinning_unknown_proto(self):
3557         """ NAT64 translate packet with unknown protocol - hairpinning """
3558
3559         client = self.pg0.remote_hosts[0]
3560         server = self.pg0.remote_hosts[1]
3561         server_tcp_in_port = 22
3562         server_tcp_out_port = 4022
3563         client_tcp_in_port = 1234
3564         client_tcp_out_port = 1235
3565         server_nat_ip = "10.0.0.100"
3566         client_nat_ip = "10.0.0.110"
3567         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
3568         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
3569         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
3570         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
3571
3572         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
3573                                                 client_nat_ip_n)
3574         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3575         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3576
3577         self.vapi.nat64_add_del_static_bib(server.ip6n,
3578                                            server_nat_ip_n,
3579                                            server_tcp_in_port,
3580                                            server_tcp_out_port,
3581                                            IP_PROTOS.tcp)
3582
3583         self.vapi.nat64_add_del_static_bib(server.ip6n,
3584                                            server_nat_ip_n,
3585                                            0,
3586                                            0,
3587                                            IP_PROTOS.gre)
3588
3589         self.vapi.nat64_add_del_static_bib(client.ip6n,
3590                                            client_nat_ip_n,
3591                                            client_tcp_in_port,
3592                                            client_tcp_out_port,
3593                                            IP_PROTOS.tcp)
3594
3595         # client to server
3596         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3597              IPv6(src=client.ip6, dst=server_nat_ip6) /
3598              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3599         self.pg0.add_stream(p)
3600         self.pg_enable_capture(self.pg_interfaces)
3601         self.pg_start()
3602         p = self.pg0.get_capture(1)
3603
3604         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3605              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
3606              GRE() /
3607              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3608              TCP(sport=1234, dport=1234))
3609         self.pg0.add_stream(p)
3610         self.pg_enable_capture(self.pg_interfaces)
3611         self.pg_start()
3612         p = self.pg0.get_capture(1)
3613         packet = p[0]
3614         try:
3615             self.assertEqual(packet[IPv6].src, client_nat_ip6)
3616             self.assertEqual(packet[IPv6].dst, server.ip6)
3617             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3618         except:
3619             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3620             raise
3621
3622         # server to client
3623         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3624              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
3625              GRE() /
3626              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3627              TCP(sport=1234, dport=1234))
3628         self.pg0.add_stream(p)
3629         self.pg_enable_capture(self.pg_interfaces)
3630         self.pg_start()
3631         p = self.pg0.get_capture(1)
3632         packet = p[0]
3633         try:
3634             self.assertEqual(packet[IPv6].src, server_nat_ip6)
3635             self.assertEqual(packet[IPv6].dst, client.ip6)
3636             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3637         except:
3638             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3639             raise
3640
3641     def nat64_get_ses_num(self):
3642         """
3643         Return number of active NAT64 sessions.
3644         """
3645         st = self.vapi.nat64_st_dump()
3646         return len(st)
3647
3648     def clear_nat64(self):
3649         """
3650         Clear NAT64 configuration.
3651         """
3652         self.vapi.nat64_set_timeouts()
3653
3654         interfaces = self.vapi.nat64_interface_dump()
3655         for intf in interfaces:
3656             self.vapi.nat64_add_del_interface(intf.sw_if_index,
3657                                               intf.is_inside,
3658                                               is_add=0)
3659
3660         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3661         for bibe in bib:
3662             if bibe.is_static:
3663                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3664                                                    bibe.o_addr,
3665                                                    bibe.i_port,
3666                                                    bibe.o_port,
3667                                                    bibe.proto,
3668                                                    bibe.vrf_id,
3669                                                    is_add=0)
3670
3671         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3672         for bibe in bib:
3673             if bibe.is_static:
3674                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3675                                                    bibe.o_addr,
3676                                                    bibe.i_port,
3677                                                    bibe.o_port,
3678                                                    bibe.proto,
3679                                                    bibe.vrf_id,
3680                                                    is_add=0)
3681
3682         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3683         for bibe in bib:
3684             if bibe.is_static:
3685                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3686                                                    bibe.o_addr,
3687                                                    bibe.i_port,
3688                                                    bibe.o_port,
3689                                                    bibe.proto,
3690                                                    bibe.vrf_id,
3691                                                    is_add=0)
3692
3693         adresses = self.vapi.nat64_pool_addr_dump()
3694         for addr in adresses:
3695             self.vapi.nat64_add_del_pool_addr_range(addr.address,
3696                                                     addr.address,
3697                                                     vrf_id=addr.vrf_id,
3698                                                     is_add=0)
3699
3700         prefixes = self.vapi.nat64_prefix_dump()
3701         for prefix in prefixes:
3702             self.vapi.nat64_add_del_prefix(prefix.prefix,
3703                                            prefix.prefix_len,
3704                                            vrf_id=prefix.vrf_id,
3705                                            is_add=0)
3706
3707     def tearDown(self):
3708         super(TestNAT64, self).tearDown()
3709         if not self.vpp_dead:
3710             self.logger.info(self.vapi.cli("show nat64 pool"))
3711             self.logger.info(self.vapi.cli("show nat64 interfaces"))
3712             self.logger.info(self.vapi.cli("show nat64 prefix"))
3713             self.logger.info(self.vapi.cli("show nat64 bib all"))
3714             self.logger.info(self.vapi.cli("show nat64 session table all"))
3715             self.clear_nat64()
3716
3717 if __name__ == '__main__':
3718     unittest.main(testRunner=VppTestRunner)