FLOWPROBE: Add flowstartns, flowendns and tcpcontrolbits
[vpp.git] / test / test_snat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
15 from util import ppp
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
18
19
20 class MethodHolder(VppTestCase):
21     """ SNAT create capture and verify method holder """
22
23     @classmethod
24     def setUpClass(cls):
25         super(MethodHolder, cls).setUpClass()
26
27     def tearDown(self):
28         super(MethodHolder, self).tearDown()
29
30     def check_ip_checksum(self, pkt):
31         """
32         Check IP checksum of the packet
33
34         :param pkt: Packet to check IP checksum
35         """
36         new = pkt.__class__(str(pkt))
37         del new['IP'].chksum
38         new = new.__class__(str(new))
39         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
40
41     def check_tcp_checksum(self, pkt):
42         """
43         Check TCP checksum in IP packet
44
45         :param pkt: Packet to check TCP checksum
46         """
47         new = pkt.__class__(str(pkt))
48         del new['TCP'].chksum
49         new = new.__class__(str(new))
50         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
51
52     def check_udp_checksum(self, pkt):
53         """
54         Check UDP checksum in IP packet
55
56         :param pkt: Packet to check UDP checksum
57         """
58         new = pkt.__class__(str(pkt))
59         del new['UDP'].chksum
60         new = new.__class__(str(new))
61         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
62
63     def check_icmp_errror_embedded(self, pkt):
64         """
65         Check ICMP error embeded packet checksum
66
67         :param pkt: Packet to check ICMP error embeded packet checksum
68         """
69         if pkt.haslayer(IPerror):
70             new = pkt.__class__(str(pkt))
71             del new['IPerror'].chksum
72             new = new.__class__(str(new))
73             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
74
75         if pkt.haslayer(TCPerror):
76             new = pkt.__class__(str(pkt))
77             del new['TCPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
80
81         if pkt.haslayer(UDPerror):
82             if pkt['UDPerror'].chksum != 0:
83                 new = pkt.__class__(str(pkt))
84                 del new['UDPerror'].chksum
85                 new = new.__class__(str(new))
86                 self.assertEqual(new['UDPerror'].chksum,
87                                  pkt['UDPerror'].chksum)
88
89         if pkt.haslayer(ICMPerror):
90             del new['ICMPerror'].chksum
91             new = new.__class__(str(new))
92             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
93
94     def check_icmp_checksum(self, pkt):
95         """
96         Check ICMP checksum in IPv4 packet
97
98         :param pkt: Packet to check ICMP checksum
99         """
100         new = pkt.__class__(str(pkt))
101         del new['ICMP'].chksum
102         new = new.__class__(str(new))
103         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
104         if pkt.haslayer(IPerror):
105             self.check_icmp_errror_embedded(pkt)
106
107     def check_icmpv6_checksum(self, pkt):
108         """
109         Check ICMPv6 checksum in IPv4 packet
110
111         :param pkt: Packet to check ICMPv6 checksum
112         """
113         new = pkt.__class__(str(pkt))
114         if pkt.haslayer(ICMPv6DestUnreach):
115             del new['ICMPv6DestUnreach'].cksum
116             new = new.__class__(str(new))
117             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
118                              pkt['ICMPv6DestUnreach'].cksum)
119             self.check_icmp_errror_embedded(pkt)
120         if pkt.haslayer(ICMPv6EchoRequest):
121             del new['ICMPv6EchoRequest'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
124                              pkt['ICMPv6EchoRequest'].cksum)
125         if pkt.haslayer(ICMPv6EchoReply):
126             del new['ICMPv6EchoReply'].cksum
127             new = new.__class__(str(new))
128             self.assertEqual(new['ICMPv6EchoReply'].cksum,
129                              pkt['ICMPv6EchoReply'].cksum)
130
131     def create_stream_in(self, in_if, out_if, ttl=64):
132         """
133         Create packet stream for inside network
134
135         :param in_if: Inside interface
136         :param out_if: Outside interface
137         :param ttl: TTL of generated packets
138         """
139         pkts = []
140         # TCP
141         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
142              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
143              TCP(sport=self.tcp_port_in, dport=20))
144         pkts.append(p)
145
146         # UDP
147         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149              UDP(sport=self.udp_port_in, dport=20))
150         pkts.append(p)
151
152         # ICMP
153         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155              ICMP(id=self.icmp_id_in, type='echo-request'))
156         pkts.append(p)
157
158         return pkts
159
160     def compose_ip6(self, ip4, pref, plen):
161         """
162         Compose IPv4-embedded IPv6 addresses
163
164         :param ip4: IPv4 address
165         :param pref: IPv6 prefix
166         :param plen: IPv6 prefix length
167         :returns: IPv4-embedded IPv6 addresses
168         """
169         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
170         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
171         if plen == 32:
172             pref_n[4] = ip4_n[0]
173             pref_n[5] = ip4_n[1]
174             pref_n[6] = ip4_n[2]
175             pref_n[7] = ip4_n[3]
176         elif plen == 40:
177             pref_n[5] = ip4_n[0]
178             pref_n[6] = ip4_n[1]
179             pref_n[7] = ip4_n[2]
180             pref_n[9] = ip4_n[3]
181         elif plen == 48:
182             pref_n[6] = ip4_n[0]
183             pref_n[7] = ip4_n[1]
184             pref_n[9] = ip4_n[2]
185             pref_n[10] = ip4_n[3]
186         elif plen == 56:
187             pref_n[7] = ip4_n[0]
188             pref_n[9] = ip4_n[1]
189             pref_n[10] = ip4_n[2]
190             pref_n[11] = ip4_n[3]
191         elif plen == 64:
192             pref_n[9] = ip4_n[0]
193             pref_n[10] = ip4_n[1]
194             pref_n[11] = ip4_n[2]
195             pref_n[12] = ip4_n[3]
196         elif plen == 96:
197             pref_n[12] = ip4_n[0]
198             pref_n[13] = ip4_n[1]
199             pref_n[14] = ip4_n[2]
200             pref_n[15] = ip4_n[3]
201         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
202
203     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
204         """
205         Create IPv6 packet stream for inside network
206
207         :param in_if: Inside interface
208         :param out_if: Outside interface
209         :param ttl: Hop Limit of generated packets
210         :param pref: NAT64 prefix
211         :param plen: NAT64 prefix length
212         """
213         pkts = []
214         if pref is None:
215             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
216         else:
217             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
218
219         # TCP
220         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
221              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
222              TCP(sport=self.tcp_port_in, dport=20))
223         pkts.append(p)
224
225         # UDP
226         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
227              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
228              UDP(sport=self.udp_port_in, dport=20))
229         pkts.append(p)
230
231         # ICMP
232         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
233              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
234              ICMPv6EchoRequest(id=self.icmp_id_in))
235         pkts.append(p)
236
237         return pkts
238
239     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
240         """
241         Create packet stream for outside network
242
243         :param out_if: Outside interface
244         :param dst_ip: Destination IP address (Default use global SNAT address)
245         :param ttl: TTL of generated packets
246         """
247         if dst_ip is None:
248             dst_ip = self.snat_addr
249         pkts = []
250         # TCP
251         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
252              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
253              TCP(dport=self.tcp_port_out, sport=20))
254         pkts.append(p)
255
256         # UDP
257         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
258              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
259              UDP(dport=self.udp_port_out, sport=20))
260         pkts.append(p)
261
262         # ICMP
263         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
264              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
265              ICMP(id=self.icmp_id_out, type='echo-reply'))
266         pkts.append(p)
267
268         return pkts
269
270     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
271                            packet_num=3, dst_ip=None):
272         """
273         Verify captured packets on outside network
274
275         :param capture: Captured packets
276         :param nat_ip: Translated IP address (Default use global SNAT address)
277         :param same_port: Sorce port number is not translated (Default False)
278         :param packet_num: Expected number of packets (Default 3)
279         :param dst_ip: Destination IP address (Default do not verify)
280         """
281         if nat_ip is None:
282             nat_ip = self.snat_addr
283         self.assertEqual(packet_num, len(capture))
284         for packet in capture:
285             try:
286                 self.check_ip_checksum(packet)
287                 self.assertEqual(packet[IP].src, nat_ip)
288                 if dst_ip is not None:
289                     self.assertEqual(packet[IP].dst, dst_ip)
290                 if packet.haslayer(TCP):
291                     if same_port:
292                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
293                     else:
294                         self.assertNotEqual(
295                             packet[TCP].sport, self.tcp_port_in)
296                     self.tcp_port_out = packet[TCP].sport
297                     self.check_tcp_checksum(packet)
298                 elif packet.haslayer(UDP):
299                     if same_port:
300                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
301                     else:
302                         self.assertNotEqual(
303                             packet[UDP].sport, self.udp_port_in)
304                     self.udp_port_out = packet[UDP].sport
305                 else:
306                     if same_port:
307                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
308                     else:
309                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
310                     self.icmp_id_out = packet[ICMP].id
311                     self.check_icmp_checksum(packet)
312             except:
313                 self.logger.error(ppp("Unexpected or invalid packet "
314                                       "(outside network):", packet))
315                 raise
316
317     def verify_capture_in(self, capture, in_if, packet_num=3):
318         """
319         Verify captured packets on inside network
320
321         :param capture: Captured packets
322         :param in_if: Inside interface
323         :param packet_num: Expected number of packets (Default 3)
324         """
325         self.assertEqual(packet_num, len(capture))
326         for packet in capture:
327             try:
328                 self.check_ip_checksum(packet)
329                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
330                 if packet.haslayer(TCP):
331                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
332                     self.check_tcp_checksum(packet)
333                 elif packet.haslayer(UDP):
334                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
335                 else:
336                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
337                     self.check_icmp_checksum(packet)
338             except:
339                 self.logger.error(ppp("Unexpected or invalid packet "
340                                       "(inside network):", packet))
341                 raise
342
343     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
344         """
345         Verify captured IPv6 packets on inside network
346
347         :param capture: Captured packets
348         :param src_ip: Source IP
349         :param dst_ip: Destination IP address
350         :param packet_num: Expected number of packets (Default 3)
351         """
352         self.assertEqual(packet_num, len(capture))
353         for packet in capture:
354             try:
355                 self.assertEqual(packet[IPv6].src, src_ip)
356                 self.assertEqual(packet[IPv6].dst, dst_ip)
357                 if packet.haslayer(TCP):
358                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
359                     self.check_tcp_checksum(packet)
360                 elif packet.haslayer(UDP):
361                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
362                     self.check_udp_checksum(packet)
363                 else:
364                     self.assertEqual(packet[ICMPv6EchoReply].id,
365                                      self.icmp_id_in)
366                     self.check_icmpv6_checksum(packet)
367             except:
368                 self.logger.error(ppp("Unexpected or invalid packet "
369                                       "(inside network):", packet))
370                 raise
371
372     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
373         """
374         Verify captured packet that don't have to be translated
375
376         :param capture: Captured packets
377         :param ingress_if: Ingress interface
378         :param egress_if: Egress interface
379         """
380         for packet in capture:
381             try:
382                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
383                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
384                 if packet.haslayer(TCP):
385                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
386                 elif packet.haslayer(UDP):
387                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
388                 else:
389                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
390             except:
391                 self.logger.error(ppp("Unexpected or invalid packet "
392                                       "(inside network):", packet))
393                 raise
394
395     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
396                                             packet_num=3, icmp_type=11):
397         """
398         Verify captured packets with ICMP errors on outside network
399
400         :param capture: Captured packets
401         :param src_ip: Translated IP address or IP address of VPP
402                        (Default use global SNAT address)
403         :param packet_num: Expected number of packets (Default 3)
404         :param icmp_type: Type of error ICMP packet
405                           we are expecting (Default 11)
406         """
407         if src_ip is None:
408             src_ip = self.snat_addr
409         self.assertEqual(packet_num, len(capture))
410         for packet in capture:
411             try:
412                 self.assertEqual(packet[IP].src, src_ip)
413                 self.assertTrue(packet.haslayer(ICMP))
414                 icmp = packet[ICMP]
415                 self.assertEqual(icmp.type, icmp_type)
416                 self.assertTrue(icmp.haslayer(IPerror))
417                 inner_ip = icmp[IPerror]
418                 if inner_ip.haslayer(TCPerror):
419                     self.assertEqual(inner_ip[TCPerror].dport,
420                                      self.tcp_port_out)
421                 elif inner_ip.haslayer(UDPerror):
422                     self.assertEqual(inner_ip[UDPerror].dport,
423                                      self.udp_port_out)
424                 else:
425                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
426             except:
427                 self.logger.error(ppp("Unexpected or invalid packet "
428                                       "(outside network):", packet))
429                 raise
430
431     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
432                                            icmp_type=11):
433         """
434         Verify captured packets with ICMP errors on inside network
435
436         :param capture: Captured packets
437         :param in_if: Inside interface
438         :param packet_num: Expected number of packets (Default 3)
439         :param icmp_type: Type of error ICMP packet
440                           we are expecting (Default 11)
441         """
442         self.assertEqual(packet_num, len(capture))
443         for packet in capture:
444             try:
445                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
446                 self.assertTrue(packet.haslayer(ICMP))
447                 icmp = packet[ICMP]
448                 self.assertEqual(icmp.type, icmp_type)
449                 self.assertTrue(icmp.haslayer(IPerror))
450                 inner_ip = icmp[IPerror]
451                 if inner_ip.haslayer(TCPerror):
452                     self.assertEqual(inner_ip[TCPerror].sport,
453                                      self.tcp_port_in)
454                 elif inner_ip.haslayer(UDPerror):
455                     self.assertEqual(inner_ip[UDPerror].sport,
456                                      self.udp_port_in)
457                 else:
458                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
459             except:
460                 self.logger.error(ppp("Unexpected or invalid packet "
461                                       "(inside network):", packet))
462                 raise
463
464     def verify_ipfix_nat44_ses(self, data):
465         """
466         Verify IPFIX NAT44 session create/delete event
467
468         :param data: Decoded IPFIX data records
469         """
470         nat44_ses_create_num = 0
471         nat44_ses_delete_num = 0
472         self.assertEqual(6, len(data))
473         for record in data:
474             # natEvent
475             self.assertIn(ord(record[230]), [4, 5])
476             if ord(record[230]) == 4:
477                 nat44_ses_create_num += 1
478             else:
479                 nat44_ses_delete_num += 1
480             # sourceIPv4Address
481             self.assertEqual(self.pg0.remote_ip4n, record[8])
482             # postNATSourceIPv4Address
483             self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
484                              record[225])
485             # ingressVRFID
486             self.assertEqual(struct.pack("!I", 0), record[234])
487             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
488             if IP_PROTOS.icmp == ord(record[4]):
489                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
490                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
491                                  record[227])
492             elif IP_PROTOS.tcp == ord(record[4]):
493                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
494                                  record[7])
495                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
496                                  record[227])
497             elif IP_PROTOS.udp == ord(record[4]):
498                 self.assertEqual(struct.pack("!H", self.udp_port_in),
499                                  record[7])
500                 self.assertEqual(struct.pack("!H", self.udp_port_out),
501                                  record[227])
502             else:
503                 self.fail("Invalid protocol")
504         self.assertEqual(3, nat44_ses_create_num)
505         self.assertEqual(3, nat44_ses_delete_num)
506
507     def verify_ipfix_addr_exhausted(self, data):
508         """
509         Verify IPFIX NAT addresses event
510
511         :param data: Decoded IPFIX data records
512         """
513         self.assertEqual(1, len(data))
514         record = data[0]
515         # natEvent
516         self.assertEqual(ord(record[230]), 3)
517         # natPoolID
518         self.assertEqual(struct.pack("!I", 0), record[283])
519
520
521 class TestSNAT(MethodHolder):
522     """ SNAT Test Cases """
523
524     @classmethod
525     def setUpClass(cls):
526         super(TestSNAT, cls).setUpClass()
527
528         try:
529             cls.tcp_port_in = 6303
530             cls.tcp_port_out = 6303
531             cls.udp_port_in = 6304
532             cls.udp_port_out = 6304
533             cls.icmp_id_in = 6305
534             cls.icmp_id_out = 6305
535             cls.snat_addr = '10.0.0.3'
536             cls.ipfix_src_port = 4739
537             cls.ipfix_domain_id = 1
538
539             cls.create_pg_interfaces(range(9))
540             cls.interfaces = list(cls.pg_interfaces[0:4])
541
542             for i in cls.interfaces:
543                 i.admin_up()
544                 i.config_ip4()
545                 i.resolve_arp()
546
547             cls.pg0.generate_remote_hosts(3)
548             cls.pg0.configure_ipv4_neighbors()
549
550             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
551
552             cls.pg4._local_ip4 = "172.16.255.1"
553             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
554             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
555             cls.pg4.set_table_ip4(10)
556             cls.pg5._local_ip4 = "172.16.255.3"
557             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
558             cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
559             cls.pg5.set_table_ip4(10)
560             cls.pg6._local_ip4 = "172.16.255.1"
561             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
562             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
563             cls.pg6.set_table_ip4(20)
564             for i in cls.overlapping_interfaces:
565                 i.config_ip4()
566                 i.admin_up()
567                 i.resolve_arp()
568
569             cls.pg7.admin_up()
570             cls.pg8.admin_up()
571
572         except Exception:
573             super(TestSNAT, cls).tearDownClass()
574             raise
575
576     def clear_snat(self):
577         """
578         Clear SNAT configuration.
579         """
580         # I found no elegant way to do this
581         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
582                                    dst_address_length=32,
583                                    next_hop_address=self.pg7.remote_ip4n,
584                                    next_hop_sw_if_index=self.pg7.sw_if_index,
585                                    is_add=0)
586         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
587                                    dst_address_length=32,
588                                    next_hop_address=self.pg8.remote_ip4n,
589                                    next_hop_sw_if_index=self.pg8.sw_if_index,
590                                    is_add=0)
591
592         for intf in [self.pg7, self.pg8]:
593             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
594             for n in neighbors:
595                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
596                                               n.mac_address,
597                                               n.ip_address,
598                                               is_add=0)
599
600         if self.pg7.has_ip4_config:
601             self.pg7.unconfig_ip4()
602
603         interfaces = self.vapi.snat_interface_addr_dump()
604         for intf in interfaces:
605             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
606
607         self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
608                              domain_id=self.ipfix_domain_id)
609         self.ipfix_src_port = 4739
610         self.ipfix_domain_id = 1
611
612         interfaces = self.vapi.snat_interface_dump()
613         for intf in interfaces:
614             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
615                                                      intf.is_inside,
616                                                      is_add=0)
617
618         static_mappings = self.vapi.snat_static_mapping_dump()
619         for sm in static_mappings:
620             self.vapi.snat_add_static_mapping(sm.local_ip_address,
621                                               sm.external_ip_address,
622                                               local_port=sm.local_port,
623                                               external_port=sm.external_port,
624                                               addr_only=sm.addr_only,
625                                               vrf_id=sm.vrf_id,
626                                               protocol=sm.protocol,
627                                               is_add=0)
628
629         adresses = self.vapi.snat_address_dump()
630         for addr in adresses:
631             self.vapi.snat_add_address_range(addr.ip_address,
632                                              addr.ip_address,
633                                              is_add=0)
634
635     def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
636                                 local_port=0, external_port=0, vrf_id=0,
637                                 is_add=1, external_sw_if_index=0xFFFFFFFF,
638                                 proto=0):
639         """
640         Add/delete S-NAT static mapping
641
642         :param local_ip: Local IP address
643         :param external_ip: External IP address
644         :param local_port: Local port number (Optional)
645         :param external_port: External port number (Optional)
646         :param vrf_id: VRF ID (Default 0)
647         :param is_add: 1 if add, 0 if delete (Default add)
648         :param external_sw_if_index: External interface instead of IP address
649         :param proto: IP protocol (Mandatory if port specified)
650         """
651         addr_only = 1
652         if local_port and external_port:
653             addr_only = 0
654         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
655         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
656         self.vapi.snat_add_static_mapping(
657             l_ip,
658             e_ip,
659             external_sw_if_index,
660             local_port,
661             external_port,
662             addr_only,
663             vrf_id,
664             proto,
665             is_add)
666
667     def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
668         """
669         Add/delete S-NAT address
670
671         :param ip: IP address
672         :param is_add: 1 if add, 0 if delete (Default add)
673         """
674         snat_addr = socket.inet_pton(socket.AF_INET, ip)
675         self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
676                                          vrf_id=vrf_id)
677
678     def test_dynamic(self):
679         """ SNAT dynamic translation test """
680
681         self.snat_add_address(self.snat_addr)
682         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
683         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
684                                                  is_inside=0)
685
686         # in2out
687         pkts = self.create_stream_in(self.pg0, self.pg1)
688         self.pg0.add_stream(pkts)
689         self.pg_enable_capture(self.pg_interfaces)
690         self.pg_start()
691         capture = self.pg1.get_capture(len(pkts))
692         self.verify_capture_out(capture)
693
694         # out2in
695         pkts = self.create_stream_out(self.pg1)
696         self.pg1.add_stream(pkts)
697         self.pg_enable_capture(self.pg_interfaces)
698         self.pg_start()
699         capture = self.pg0.get_capture(len(pkts))
700         self.verify_capture_in(capture, self.pg0)
701
702     def test_dynamic_icmp_errors_in2out_ttl_1(self):
703         """ SNAT handling of client packets with TTL=1 """
704
705         self.snat_add_address(self.snat_addr)
706         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
707         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
708                                                  is_inside=0)
709
710         # Client side - generate traffic
711         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
712         self.pg0.add_stream(pkts)
713         self.pg_enable_capture(self.pg_interfaces)
714         self.pg_start()
715
716         # Client side - verify ICMP type 11 packets
717         capture = self.pg0.get_capture(len(pkts))
718         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
719
720     def test_dynamic_icmp_errors_out2in_ttl_1(self):
721         """ SNAT handling of server packets with TTL=1 """
722
723         self.snat_add_address(self.snat_addr)
724         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
725         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
726                                                  is_inside=0)
727
728         # Client side - create sessions
729         pkts = self.create_stream_in(self.pg0, self.pg1)
730         self.pg0.add_stream(pkts)
731         self.pg_enable_capture(self.pg_interfaces)
732         self.pg_start()
733
734         # Server side - generate traffic
735         capture = self.pg1.get_capture(len(pkts))
736         self.verify_capture_out(capture)
737         pkts = self.create_stream_out(self.pg1, ttl=1)
738         self.pg1.add_stream(pkts)
739         self.pg_enable_capture(self.pg_interfaces)
740         self.pg_start()
741
742         # Server side - verify ICMP type 11 packets
743         capture = self.pg1.get_capture(len(pkts))
744         self.verify_capture_out_with_icmp_errors(capture,
745                                                  src_ip=self.pg1.local_ip4)
746
747     def test_dynamic_icmp_errors_in2out_ttl_2(self):
748         """ SNAT handling of error responses to client packets with TTL=2 """
749
750         self.snat_add_address(self.snat_addr)
751         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
752         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
753                                                  is_inside=0)
754
755         # Client side - generate traffic
756         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
757         self.pg0.add_stream(pkts)
758         self.pg_enable_capture(self.pg_interfaces)
759         self.pg_start()
760
761         # Server side - simulate ICMP type 11 response
762         capture = self.pg1.get_capture(len(pkts))
763         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
764                 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
765                 ICMP(type=11) / packet[IP] for packet in capture]
766         self.pg1.add_stream(pkts)
767         self.pg_enable_capture(self.pg_interfaces)
768         self.pg_start()
769
770         # Client side - verify ICMP type 11 packets
771         capture = self.pg0.get_capture(len(pkts))
772         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
773
774     def test_dynamic_icmp_errors_out2in_ttl_2(self):
775         """ SNAT handling of error responses to server packets with TTL=2 """
776
777         self.snat_add_address(self.snat_addr)
778         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
779         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
780                                                  is_inside=0)
781
782         # Client side - create sessions
783         pkts = self.create_stream_in(self.pg0, self.pg1)
784         self.pg0.add_stream(pkts)
785         self.pg_enable_capture(self.pg_interfaces)
786         self.pg_start()
787
788         # Server side - generate traffic
789         capture = self.pg1.get_capture(len(pkts))
790         self.verify_capture_out(capture)
791         pkts = self.create_stream_out(self.pg1, ttl=2)
792         self.pg1.add_stream(pkts)
793         self.pg_enable_capture(self.pg_interfaces)
794         self.pg_start()
795
796         # Client side - simulate ICMP type 11 response
797         capture = self.pg0.get_capture(len(pkts))
798         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
799                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
800                 ICMP(type=11) / packet[IP] for packet in capture]
801         self.pg0.add_stream(pkts)
802         self.pg_enable_capture(self.pg_interfaces)
803         self.pg_start()
804
805         # Server side - verify ICMP type 11 packets
806         capture = self.pg1.get_capture(len(pkts))
807         self.verify_capture_out_with_icmp_errors(capture)
808
809     def test_ping_out_interface_from_outside(self):
810         """ Ping SNAT out interface from outside network """
811
812         self.snat_add_address(self.snat_addr)
813         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
814         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
815                                                  is_inside=0)
816
817         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
818              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
819              ICMP(id=self.icmp_id_out, type='echo-request'))
820         pkts = [p]
821         self.pg1.add_stream(pkts)
822         self.pg_enable_capture(self.pg_interfaces)
823         self.pg_start()
824         capture = self.pg1.get_capture(len(pkts))
825         self.assertEqual(1, len(capture))
826         packet = capture[0]
827         try:
828             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
829             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
830             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
831             self.assertEqual(packet[ICMP].type, 0)  # echo reply
832         except:
833             self.logger.error(ppp("Unexpected or invalid packet "
834                                   "(outside network):", packet))
835             raise
836
837     def test_ping_internal_host_from_outside(self):
838         """ Ping internal host from outside network """
839
840         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
841         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
842         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
843                                                  is_inside=0)
844
845         # out2in
846         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
847                IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
848                ICMP(id=self.icmp_id_out, type='echo-request'))
849         self.pg1.add_stream(pkt)
850         self.pg_enable_capture(self.pg_interfaces)
851         self.pg_start()
852         capture = self.pg0.get_capture(1)
853         self.verify_capture_in(capture, self.pg0, packet_num=1)
854         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
855
856         # in2out
857         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
858                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
859                ICMP(id=self.icmp_id_in, type='echo-reply'))
860         self.pg0.add_stream(pkt)
861         self.pg_enable_capture(self.pg_interfaces)
862         self.pg_start()
863         capture = self.pg1.get_capture(1)
864         self.verify_capture_out(capture, same_port=True, packet_num=1)
865         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
866
867     def test_static_in(self):
868         """ SNAT 1:1 NAT initialized from inside network """
869
870         nat_ip = "10.0.0.10"
871         self.tcp_port_out = 6303
872         self.udp_port_out = 6304
873         self.icmp_id_out = 6305
874
875         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
876         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
877         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
878                                                  is_inside=0)
879
880         # in2out
881         pkts = self.create_stream_in(self.pg0, self.pg1)
882         self.pg0.add_stream(pkts)
883         self.pg_enable_capture(self.pg_interfaces)
884         self.pg_start()
885         capture = self.pg1.get_capture(len(pkts))
886         self.verify_capture_out(capture, nat_ip, True)
887
888         # out2in
889         pkts = self.create_stream_out(self.pg1, nat_ip)
890         self.pg1.add_stream(pkts)
891         self.pg_enable_capture(self.pg_interfaces)
892         self.pg_start()
893         capture = self.pg0.get_capture(len(pkts))
894         self.verify_capture_in(capture, self.pg0)
895
896     def test_static_out(self):
897         """ SNAT 1:1 NAT initialized from outside network """
898
899         nat_ip = "10.0.0.20"
900         self.tcp_port_out = 6303
901         self.udp_port_out = 6304
902         self.icmp_id_out = 6305
903
904         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
905         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
906         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
907                                                  is_inside=0)
908
909         # out2in
910         pkts = self.create_stream_out(self.pg1, nat_ip)
911         self.pg1.add_stream(pkts)
912         self.pg_enable_capture(self.pg_interfaces)
913         self.pg_start()
914         capture = self.pg0.get_capture(len(pkts))
915         self.verify_capture_in(capture, self.pg0)
916
917         # in2out
918         pkts = self.create_stream_in(self.pg0, self.pg1)
919         self.pg0.add_stream(pkts)
920         self.pg_enable_capture(self.pg_interfaces)
921         self.pg_start()
922         capture = self.pg1.get_capture(len(pkts))
923         self.verify_capture_out(capture, nat_ip, True)
924
925     def test_static_with_port_in(self):
926         """ SNAT 1:1 NAT with port initialized from inside network """
927
928         self.tcp_port_out = 3606
929         self.udp_port_out = 3607
930         self.icmp_id_out = 3608
931
932         self.snat_add_address(self.snat_addr)
933         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
934                                      self.tcp_port_in, self.tcp_port_out,
935                                      proto=IP_PROTOS.tcp)
936         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
937                                      self.udp_port_in, self.udp_port_out,
938                                      proto=IP_PROTOS.udp)
939         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
940                                      self.icmp_id_in, self.icmp_id_out,
941                                      proto=IP_PROTOS.icmp)
942         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
943         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
944                                                  is_inside=0)
945
946         # in2out
947         pkts = self.create_stream_in(self.pg0, self.pg1)
948         self.pg0.add_stream(pkts)
949         self.pg_enable_capture(self.pg_interfaces)
950         self.pg_start()
951         capture = self.pg1.get_capture(len(pkts))
952         self.verify_capture_out(capture)
953
954         # out2in
955         pkts = self.create_stream_out(self.pg1)
956         self.pg1.add_stream(pkts)
957         self.pg_enable_capture(self.pg_interfaces)
958         self.pg_start()
959         capture = self.pg0.get_capture(len(pkts))
960         self.verify_capture_in(capture, self.pg0)
961
962     def test_static_with_port_out(self):
963         """ SNAT 1:1 NAT with port initialized from outside network """
964
965         self.tcp_port_out = 30606
966         self.udp_port_out = 30607
967         self.icmp_id_out = 30608
968
969         self.snat_add_address(self.snat_addr)
970         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
971                                      self.tcp_port_in, self.tcp_port_out,
972                                      proto=IP_PROTOS.tcp)
973         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
974                                      self.udp_port_in, self.udp_port_out,
975                                      proto=IP_PROTOS.udp)
976         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
977                                      self.icmp_id_in, self.icmp_id_out,
978                                      proto=IP_PROTOS.icmp)
979         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
980         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
981                                                  is_inside=0)
982
983         # out2in
984         pkts = self.create_stream_out(self.pg1)
985         self.pg1.add_stream(pkts)
986         self.pg_enable_capture(self.pg_interfaces)
987         self.pg_start()
988         capture = self.pg0.get_capture(len(pkts))
989         self.verify_capture_in(capture, self.pg0)
990
991         # in2out
992         pkts = self.create_stream_in(self.pg0, self.pg1)
993         self.pg0.add_stream(pkts)
994         self.pg_enable_capture(self.pg_interfaces)
995         self.pg_start()
996         capture = self.pg1.get_capture(len(pkts))
997         self.verify_capture_out(capture)
998
999     def test_static_vrf_aware(self):
1000         """ SNAT 1:1 NAT VRF awareness """
1001
1002         nat_ip1 = "10.0.0.30"
1003         nat_ip2 = "10.0.0.40"
1004         self.tcp_port_out = 6303
1005         self.udp_port_out = 6304
1006         self.icmp_id_out = 6305
1007
1008         self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1009                                      vrf_id=10)
1010         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1011                                      vrf_id=10)
1012         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1013                                                  is_inside=0)
1014         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1015         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
1016
1017         # inside interface VRF match SNAT static mapping VRF
1018         pkts = self.create_stream_in(self.pg4, self.pg3)
1019         self.pg4.add_stream(pkts)
1020         self.pg_enable_capture(self.pg_interfaces)
1021         self.pg_start()
1022         capture = self.pg3.get_capture(len(pkts))
1023         self.verify_capture_out(capture, nat_ip1, True)
1024
1025         # inside interface VRF don't match SNAT static mapping VRF (packets
1026         # are dropped)
1027         pkts = self.create_stream_in(self.pg0, self.pg3)
1028         self.pg0.add_stream(pkts)
1029         self.pg_enable_capture(self.pg_interfaces)
1030         self.pg_start()
1031         self.pg3.assert_nothing_captured()
1032
1033     def test_multiple_inside_interfaces(self):
1034         """ SNAT multiple inside interfaces (non-overlapping address space) """
1035
1036         self.snat_add_address(self.snat_addr)
1037         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1038         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1039         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1040                                                  is_inside=0)
1041
1042         # between two S-NAT inside interfaces (no translation)
1043         pkts = self.create_stream_in(self.pg0, self.pg1)
1044         self.pg0.add_stream(pkts)
1045         self.pg_enable_capture(self.pg_interfaces)
1046         self.pg_start()
1047         capture = self.pg1.get_capture(len(pkts))
1048         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1049
1050         # from S-NAT inside to interface without S-NAT feature (no translation)
1051         pkts = self.create_stream_in(self.pg0, self.pg2)
1052         self.pg0.add_stream(pkts)
1053         self.pg_enable_capture(self.pg_interfaces)
1054         self.pg_start()
1055         capture = self.pg2.get_capture(len(pkts))
1056         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1057
1058         # in2out 1st interface
1059         pkts = self.create_stream_in(self.pg0, self.pg3)
1060         self.pg0.add_stream(pkts)
1061         self.pg_enable_capture(self.pg_interfaces)
1062         self.pg_start()
1063         capture = self.pg3.get_capture(len(pkts))
1064         self.verify_capture_out(capture)
1065
1066         # out2in 1st interface
1067         pkts = self.create_stream_out(self.pg3)
1068         self.pg3.add_stream(pkts)
1069         self.pg_enable_capture(self.pg_interfaces)
1070         self.pg_start()
1071         capture = self.pg0.get_capture(len(pkts))
1072         self.verify_capture_in(capture, self.pg0)
1073
1074         # in2out 2nd interface
1075         pkts = self.create_stream_in(self.pg1, self.pg3)
1076         self.pg1.add_stream(pkts)
1077         self.pg_enable_capture(self.pg_interfaces)
1078         self.pg_start()
1079         capture = self.pg3.get_capture(len(pkts))
1080         self.verify_capture_out(capture)
1081
1082         # out2in 2nd interface
1083         pkts = self.create_stream_out(self.pg3)
1084         self.pg3.add_stream(pkts)
1085         self.pg_enable_capture(self.pg_interfaces)
1086         self.pg_start()
1087         capture = self.pg1.get_capture(len(pkts))
1088         self.verify_capture_in(capture, self.pg1)
1089
1090     def test_inside_overlapping_interfaces(self):
1091         """ SNAT multiple inside interfaces with overlapping address space """
1092
1093         static_nat_ip = "10.0.0.10"
1094         self.snat_add_address(self.snat_addr)
1095         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1096                                                  is_inside=0)
1097         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
1098         self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
1099         self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
1100         self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1101                                      vrf_id=20)
1102
1103         # between S-NAT inside interfaces with same VRF (no translation)
1104         pkts = self.create_stream_in(self.pg4, self.pg5)
1105         self.pg4.add_stream(pkts)
1106         self.pg_enable_capture(self.pg_interfaces)
1107         self.pg_start()
1108         capture = self.pg5.get_capture(len(pkts))
1109         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1110
1111         # between S-NAT inside interfaces with different VRF (hairpinning)
1112         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1113              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1114              TCP(sport=1234, dport=5678))
1115         self.pg4.add_stream(p)
1116         self.pg_enable_capture(self.pg_interfaces)
1117         self.pg_start()
1118         capture = self.pg6.get_capture(1)
1119         p = capture[0]
1120         try:
1121             ip = p[IP]
1122             tcp = p[TCP]
1123             self.assertEqual(ip.src, self.snat_addr)
1124             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1125             self.assertNotEqual(tcp.sport, 1234)
1126             self.assertEqual(tcp.dport, 5678)
1127         except:
1128             self.logger.error(ppp("Unexpected or invalid packet:", p))
1129             raise
1130
1131         # in2out 1st interface
1132         pkts = self.create_stream_in(self.pg4, self.pg3)
1133         self.pg4.add_stream(pkts)
1134         self.pg_enable_capture(self.pg_interfaces)
1135         self.pg_start()
1136         capture = self.pg3.get_capture(len(pkts))
1137         self.verify_capture_out(capture)
1138
1139         # out2in 1st interface
1140         pkts = self.create_stream_out(self.pg3)
1141         self.pg3.add_stream(pkts)
1142         self.pg_enable_capture(self.pg_interfaces)
1143         self.pg_start()
1144         capture = self.pg4.get_capture(len(pkts))
1145         self.verify_capture_in(capture, self.pg4)
1146
1147         # in2out 2nd interface
1148         pkts = self.create_stream_in(self.pg5, self.pg3)
1149         self.pg5.add_stream(pkts)
1150         self.pg_enable_capture(self.pg_interfaces)
1151         self.pg_start()
1152         capture = self.pg3.get_capture(len(pkts))
1153         self.verify_capture_out(capture)
1154
1155         # out2in 2nd interface
1156         pkts = self.create_stream_out(self.pg3)
1157         self.pg3.add_stream(pkts)
1158         self.pg_enable_capture(self.pg_interfaces)
1159         self.pg_start()
1160         capture = self.pg5.get_capture(len(pkts))
1161         self.verify_capture_in(capture, self.pg5)
1162
1163         # pg5 session dump
1164         addresses = self.vapi.snat_address_dump()
1165         self.assertEqual(len(addresses), 1)
1166         sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
1167         self.assertEqual(len(sessions), 3)
1168         for session in sessions:
1169             self.assertFalse(session.is_static)
1170             self.assertEqual(session.inside_ip_address[0:4],
1171                              self.pg5.remote_ip4n)
1172             self.assertEqual(session.outside_ip_address,
1173                              addresses[0].ip_address)
1174         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1175         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1176         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1177         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1178         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1179         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1180         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1181         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1182         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1183
1184         # in2out 3rd interface
1185         pkts = self.create_stream_in(self.pg6, self.pg3)
1186         self.pg6.add_stream(pkts)
1187         self.pg_enable_capture(self.pg_interfaces)
1188         self.pg_start()
1189         capture = self.pg3.get_capture(len(pkts))
1190         self.verify_capture_out(capture, static_nat_ip, True)
1191
1192         # out2in 3rd interface
1193         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1194         self.pg3.add_stream(pkts)
1195         self.pg_enable_capture(self.pg_interfaces)
1196         self.pg_start()
1197         capture = self.pg6.get_capture(len(pkts))
1198         self.verify_capture_in(capture, self.pg6)
1199
1200         # general user and session dump verifications
1201         users = self.vapi.snat_user_dump()
1202         self.assertTrue(len(users) >= 3)
1203         addresses = self.vapi.snat_address_dump()
1204         self.assertEqual(len(addresses), 1)
1205         for user in users:
1206             sessions = self.vapi.snat_user_session_dump(user.ip_address,
1207                                                         user.vrf_id)
1208             for session in sessions:
1209                 self.assertEqual(user.ip_address, session.inside_ip_address)
1210                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1211                 self.assertTrue(session.protocol in
1212                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1213                                  IP_PROTOS.icmp])
1214
1215         # pg4 session dump
1216         sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
1217         self.assertTrue(len(sessions) >= 4)
1218         for session in sessions:
1219             self.assertFalse(session.is_static)
1220             self.assertEqual(session.inside_ip_address[0:4],
1221                              self.pg4.remote_ip4n)
1222             self.assertEqual(session.outside_ip_address,
1223                              addresses[0].ip_address)
1224
1225         # pg6 session dump
1226         sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1227         self.assertTrue(len(sessions) >= 3)
1228         for session in sessions:
1229             self.assertTrue(session.is_static)
1230             self.assertEqual(session.inside_ip_address[0:4],
1231                              self.pg6.remote_ip4n)
1232             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1233                              map(int, static_nat_ip.split('.')))
1234             self.assertTrue(session.inside_port in
1235                             [self.tcp_port_in, self.udp_port_in,
1236                              self.icmp_id_in])
1237
1238     def test_hairpinning(self):
1239         """ SNAT hairpinning - 1:1 NAT with port"""
1240
1241         host = self.pg0.remote_hosts[0]
1242         server = self.pg0.remote_hosts[1]
1243         host_in_port = 1234
1244         host_out_port = 0
1245         server_in_port = 5678
1246         server_out_port = 8765
1247
1248         self.snat_add_address(self.snat_addr)
1249         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1250         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1251                                                  is_inside=0)
1252         # add static mapping for server
1253         self.snat_add_static_mapping(server.ip4, self.snat_addr,
1254                                      server_in_port, server_out_port,
1255                                      proto=IP_PROTOS.tcp)
1256
1257         # send packet from host to server
1258         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1259              IP(src=host.ip4, dst=self.snat_addr) /
1260              TCP(sport=host_in_port, dport=server_out_port))
1261         self.pg0.add_stream(p)
1262         self.pg_enable_capture(self.pg_interfaces)
1263         self.pg_start()
1264         capture = self.pg0.get_capture(1)
1265         p = capture[0]
1266         try:
1267             ip = p[IP]
1268             tcp = p[TCP]
1269             self.assertEqual(ip.src, self.snat_addr)
1270             self.assertEqual(ip.dst, server.ip4)
1271             self.assertNotEqual(tcp.sport, host_in_port)
1272             self.assertEqual(tcp.dport, server_in_port)
1273             self.check_tcp_checksum(p)
1274             host_out_port = tcp.sport
1275         except:
1276             self.logger.error(ppp("Unexpected or invalid packet:", p))
1277             raise
1278
1279         # send reply from server to host
1280         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1281              IP(src=server.ip4, dst=self.snat_addr) /
1282              TCP(sport=server_in_port, dport=host_out_port))
1283         self.pg0.add_stream(p)
1284         self.pg_enable_capture(self.pg_interfaces)
1285         self.pg_start()
1286         capture = self.pg0.get_capture(1)
1287         p = capture[0]
1288         try:
1289             ip = p[IP]
1290             tcp = p[TCP]
1291             self.assertEqual(ip.src, self.snat_addr)
1292             self.assertEqual(ip.dst, host.ip4)
1293             self.assertEqual(tcp.sport, server_out_port)
1294             self.assertEqual(tcp.dport, host_in_port)
1295             self.check_tcp_checksum(p)
1296         except:
1297             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1298             raise
1299
1300     def test_hairpinning2(self):
1301         """ SNAT hairpinning - 1:1 NAT"""
1302
1303         server1_nat_ip = "10.0.0.10"
1304         server2_nat_ip = "10.0.0.11"
1305         host = self.pg0.remote_hosts[0]
1306         server1 = self.pg0.remote_hosts[1]
1307         server2 = self.pg0.remote_hosts[2]
1308         server_tcp_port = 22
1309         server_udp_port = 20
1310
1311         self.snat_add_address(self.snat_addr)
1312         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1313         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1314                                                  is_inside=0)
1315
1316         # add static mapping for servers
1317         self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
1318         self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
1319
1320         # host to server1
1321         pkts = []
1322         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1323              IP(src=host.ip4, dst=server1_nat_ip) /
1324              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1325         pkts.append(p)
1326         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1327              IP(src=host.ip4, dst=server1_nat_ip) /
1328              UDP(sport=self.udp_port_in, dport=server_udp_port))
1329         pkts.append(p)
1330         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1331              IP(src=host.ip4, dst=server1_nat_ip) /
1332              ICMP(id=self.icmp_id_in, type='echo-request'))
1333         pkts.append(p)
1334         self.pg0.add_stream(pkts)
1335         self.pg_enable_capture(self.pg_interfaces)
1336         self.pg_start()
1337         capture = self.pg0.get_capture(len(pkts))
1338         for packet in capture:
1339             try:
1340                 self.assertEqual(packet[IP].src, self.snat_addr)
1341                 self.assertEqual(packet[IP].dst, server1.ip4)
1342                 if packet.haslayer(TCP):
1343                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1344                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1345                     self.tcp_port_out = packet[TCP].sport
1346                     self.check_tcp_checksum(packet)
1347                 elif packet.haslayer(UDP):
1348                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1349                     self.assertEqual(packet[UDP].dport, server_udp_port)
1350                     self.udp_port_out = packet[UDP].sport
1351                 else:
1352                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1353                     self.icmp_id_out = packet[ICMP].id
1354             except:
1355                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1356                 raise
1357
1358         # server1 to host
1359         pkts = []
1360         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1361              IP(src=server1.ip4, dst=self.snat_addr) /
1362              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1363         pkts.append(p)
1364         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1365              IP(src=server1.ip4, dst=self.snat_addr) /
1366              UDP(sport=server_udp_port, dport=self.udp_port_out))
1367         pkts.append(p)
1368         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1369              IP(src=server1.ip4, dst=self.snat_addr) /
1370              ICMP(id=self.icmp_id_out, type='echo-reply'))
1371         pkts.append(p)
1372         self.pg0.add_stream(pkts)
1373         self.pg_enable_capture(self.pg_interfaces)
1374         self.pg_start()
1375         capture = self.pg0.get_capture(len(pkts))
1376         for packet in capture:
1377             try:
1378                 self.assertEqual(packet[IP].src, server1_nat_ip)
1379                 self.assertEqual(packet[IP].dst, host.ip4)
1380                 if packet.haslayer(TCP):
1381                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1382                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1383                     self.check_tcp_checksum(packet)
1384                 elif packet.haslayer(UDP):
1385                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1386                     self.assertEqual(packet[UDP].sport, server_udp_port)
1387                 else:
1388                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1389             except:
1390                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1391                 raise
1392
1393         # server2 to server1
1394         pkts = []
1395         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1396              IP(src=server2.ip4, dst=server1_nat_ip) /
1397              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1398         pkts.append(p)
1399         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1400              IP(src=server2.ip4, dst=server1_nat_ip) /
1401              UDP(sport=self.udp_port_in, dport=server_udp_port))
1402         pkts.append(p)
1403         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1404              IP(src=server2.ip4, dst=server1_nat_ip) /
1405              ICMP(id=self.icmp_id_in, type='echo-request'))
1406         pkts.append(p)
1407         self.pg0.add_stream(pkts)
1408         self.pg_enable_capture(self.pg_interfaces)
1409         self.pg_start()
1410         capture = self.pg0.get_capture(len(pkts))
1411         for packet in capture:
1412             try:
1413                 self.assertEqual(packet[IP].src, server2_nat_ip)
1414                 self.assertEqual(packet[IP].dst, server1.ip4)
1415                 if packet.haslayer(TCP):
1416                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1417                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1418                     self.tcp_port_out = packet[TCP].sport
1419                     self.check_tcp_checksum(packet)
1420                 elif packet.haslayer(UDP):
1421                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1422                     self.assertEqual(packet[UDP].dport, server_udp_port)
1423                     self.udp_port_out = packet[UDP].sport
1424                 else:
1425                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1426                     self.icmp_id_out = packet[ICMP].id
1427             except:
1428                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1429                 raise
1430
1431         # server1 to server2
1432         pkts = []
1433         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1434              IP(src=server1.ip4, dst=server2_nat_ip) /
1435              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1436         pkts.append(p)
1437         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1438              IP(src=server1.ip4, dst=server2_nat_ip) /
1439              UDP(sport=server_udp_port, dport=self.udp_port_out))
1440         pkts.append(p)
1441         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1442              IP(src=server1.ip4, dst=server2_nat_ip) /
1443              ICMP(id=self.icmp_id_out, type='echo-reply'))
1444         pkts.append(p)
1445         self.pg0.add_stream(pkts)
1446         self.pg_enable_capture(self.pg_interfaces)
1447         self.pg_start()
1448         capture = self.pg0.get_capture(len(pkts))
1449         for packet in capture:
1450             try:
1451                 self.assertEqual(packet[IP].src, server1_nat_ip)
1452                 self.assertEqual(packet[IP].dst, server2.ip4)
1453                 if packet.haslayer(TCP):
1454                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1455                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1456                     self.check_tcp_checksum(packet)
1457                 elif packet.haslayer(UDP):
1458                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1459                     self.assertEqual(packet[UDP].sport, server_udp_port)
1460                 else:
1461                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1462             except:
1463                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1464                 raise
1465
1466     def test_max_translations_per_user(self):
1467         """ MAX translations per user - recycle the least recently used """
1468
1469         self.snat_add_address(self.snat_addr)
1470         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1471         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1472                                                  is_inside=0)
1473
1474         # get maximum number of translations per user
1475         snat_config = self.vapi.snat_show_config()
1476
1477         # send more than maximum number of translations per user packets
1478         pkts_num = snat_config.max_translations_per_user + 5
1479         pkts = []
1480         for port in range(0, pkts_num):
1481             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1482                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1483                  TCP(sport=1025 + port))
1484             pkts.append(p)
1485         self.pg0.add_stream(pkts)
1486         self.pg_enable_capture(self.pg_interfaces)
1487         self.pg_start()
1488
1489         # verify number of translated packet
1490         self.pg1.get_capture(pkts_num)
1491
1492     def test_interface_addr(self):
1493         """ Acquire SNAT addresses from interface """
1494         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1495
1496         # no address in NAT pool
1497         adresses = self.vapi.snat_address_dump()
1498         self.assertEqual(0, len(adresses))
1499
1500         # configure interface address and check NAT address pool
1501         self.pg7.config_ip4()
1502         adresses = self.vapi.snat_address_dump()
1503         self.assertEqual(1, len(adresses))
1504         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1505
1506         # remove interface address and check NAT address pool
1507         self.pg7.unconfig_ip4()
1508         adresses = self.vapi.snat_address_dump()
1509         self.assertEqual(0, len(adresses))
1510
1511     def test_interface_addr_static_mapping(self):
1512         """ Static mapping with addresses from interface """
1513         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1514         self.snat_add_static_mapping('1.2.3.4',
1515                                      external_sw_if_index=self.pg7.sw_if_index)
1516
1517         # static mappings with external interface
1518         static_mappings = self.vapi.snat_static_mapping_dump()
1519         self.assertEqual(1, len(static_mappings))
1520         self.assertEqual(self.pg7.sw_if_index,
1521                          static_mappings[0].external_sw_if_index)
1522
1523         # configure interface address and check static mappings
1524         self.pg7.config_ip4()
1525         static_mappings = self.vapi.snat_static_mapping_dump()
1526         self.assertEqual(1, len(static_mappings))
1527         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1528                          self.pg7.local_ip4n)
1529         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1530
1531         # remove interface address and check static mappings
1532         self.pg7.unconfig_ip4()
1533         static_mappings = self.vapi.snat_static_mapping_dump()
1534         self.assertEqual(0, len(static_mappings))
1535
1536     def test_ipfix_nat44_sess(self):
1537         """ S-NAT IPFIX logging NAT44 session created/delted """
1538         self.ipfix_domain_id = 10
1539         self.ipfix_src_port = 20202
1540         colector_port = 30303
1541         bind_layers(UDP, IPFIX, dport=30303)
1542         self.snat_add_address(self.snat_addr)
1543         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1544         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1545                                                  is_inside=0)
1546         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1547                                      src_address=self.pg3.local_ip4n,
1548                                      path_mtu=512,
1549                                      template_interval=10,
1550                                      collector_port=colector_port)
1551         self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1552                              src_port=self.ipfix_src_port)
1553
1554         pkts = self.create_stream_in(self.pg0, self.pg1)
1555         self.pg0.add_stream(pkts)
1556         self.pg_enable_capture(self.pg_interfaces)
1557         self.pg_start()
1558         capture = self.pg1.get_capture(len(pkts))
1559         self.verify_capture_out(capture)
1560         self.snat_add_address(self.snat_addr, is_add=0)
1561         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1562         capture = self.pg3.get_capture(3)
1563         ipfix = IPFIXDecoder()
1564         # first load template
1565         for p in capture:
1566             self.assertTrue(p.haslayer(IPFIX))
1567             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1568             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1569             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1570             self.assertEqual(p[UDP].dport, colector_port)
1571             self.assertEqual(p[IPFIX].observationDomainID,
1572                              self.ipfix_domain_id)
1573             if p.haslayer(Template):
1574                 ipfix.add_template(p.getlayer(Template))
1575         # verify events in data set
1576         for p in capture:
1577             if p.haslayer(Data):
1578                 data = ipfix.decode_data_set(p.getlayer(Set))
1579                 self.verify_ipfix_nat44_ses(data)
1580
1581     def test_ipfix_addr_exhausted(self):
1582         """ S-NAT IPFIX logging NAT addresses exhausted """
1583         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1584         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1585                                                  is_inside=0)
1586         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1587                                      src_address=self.pg3.local_ip4n,
1588                                      path_mtu=512,
1589                                      template_interval=10)
1590         self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1591                              src_port=self.ipfix_src_port)
1592
1593         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1594              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1595              TCP(sport=3025))
1596         self.pg0.add_stream(p)
1597         self.pg_enable_capture(self.pg_interfaces)
1598         self.pg_start()
1599         capture = self.pg1.get_capture(0)
1600         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1601         capture = self.pg3.get_capture(3)
1602         ipfix = IPFIXDecoder()
1603         # first load template
1604         for p in capture:
1605             self.assertTrue(p.haslayer(IPFIX))
1606             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1607             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1608             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1609             self.assertEqual(p[UDP].dport, 4739)
1610             self.assertEqual(p[IPFIX].observationDomainID,
1611                              self.ipfix_domain_id)
1612             if p.haslayer(Template):
1613                 ipfix.add_template(p.getlayer(Template))
1614         # verify events in data set
1615         for p in capture:
1616             if p.haslayer(Data):
1617                 data = ipfix.decode_data_set(p.getlayer(Set))
1618                 self.verify_ipfix_addr_exhausted(data)
1619
1620     def test_pool_addr_fib(self):
1621         """ S-NAT add pool addresses to FIB """
1622         static_addr = '10.0.0.10'
1623         self.snat_add_address(self.snat_addr)
1624         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1625         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1626                                                  is_inside=0)
1627         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1628
1629         # SNAT address
1630         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1631              ARP(op=ARP.who_has, pdst=self.snat_addr,
1632                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1633         self.pg1.add_stream(p)
1634         self.pg_enable_capture(self.pg_interfaces)
1635         self.pg_start()
1636         capture = self.pg1.get_capture(1)
1637         self.assertTrue(capture[0].haslayer(ARP))
1638         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1639
1640         # 1:1 NAT address
1641         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1642              ARP(op=ARP.who_has, pdst=static_addr,
1643                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1644         self.pg1.add_stream(p)
1645         self.pg_enable_capture(self.pg_interfaces)
1646         self.pg_start()
1647         capture = self.pg1.get_capture(1)
1648         self.assertTrue(capture[0].haslayer(ARP))
1649         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1650
1651         # send ARP to non-SNAT interface
1652         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1653              ARP(op=ARP.who_has, pdst=self.snat_addr,
1654                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1655         self.pg2.add_stream(p)
1656         self.pg_enable_capture(self.pg_interfaces)
1657         self.pg_start()
1658         capture = self.pg1.get_capture(0)
1659
1660         # remove addresses and verify
1661         self.snat_add_address(self.snat_addr, is_add=0)
1662         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1663                                      is_add=0)
1664
1665         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1666              ARP(op=ARP.who_has, pdst=self.snat_addr,
1667                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1668         self.pg1.add_stream(p)
1669         self.pg_enable_capture(self.pg_interfaces)
1670         self.pg_start()
1671         capture = self.pg1.get_capture(0)
1672
1673         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1674              ARP(op=ARP.who_has, pdst=static_addr,
1675                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1676         self.pg1.add_stream(p)
1677         self.pg_enable_capture(self.pg_interfaces)
1678         self.pg_start()
1679         capture = self.pg1.get_capture(0)
1680
1681     def test_vrf_mode(self):
1682         """ S-NAT tenant VRF aware address pool mode """
1683
1684         vrf_id1 = 1
1685         vrf_id2 = 2
1686         nat_ip1 = "10.0.0.10"
1687         nat_ip2 = "10.0.0.11"
1688
1689         self.pg0.unconfig_ip4()
1690         self.pg1.unconfig_ip4()
1691         self.pg0.set_table_ip4(vrf_id1)
1692         self.pg1.set_table_ip4(vrf_id2)
1693         self.pg0.config_ip4()
1694         self.pg1.config_ip4()
1695
1696         self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1697         self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1698         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1699         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1700         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1701                                                  is_inside=0)
1702
1703         # first VRF
1704         pkts = self.create_stream_in(self.pg0, self.pg2)
1705         self.pg0.add_stream(pkts)
1706         self.pg_enable_capture(self.pg_interfaces)
1707         self.pg_start()
1708         capture = self.pg2.get_capture(len(pkts))
1709         self.verify_capture_out(capture, nat_ip1)
1710
1711         # second VRF
1712         pkts = self.create_stream_in(self.pg1, self.pg2)
1713         self.pg1.add_stream(pkts)
1714         self.pg_enable_capture(self.pg_interfaces)
1715         self.pg_start()
1716         capture = self.pg2.get_capture(len(pkts))
1717         self.verify_capture_out(capture, nat_ip2)
1718
1719     def test_vrf_feature_independent(self):
1720         """ S-NAT tenant VRF independent address pool mode """
1721
1722         nat_ip1 = "10.0.0.10"
1723         nat_ip2 = "10.0.0.11"
1724
1725         self.snat_add_address(nat_ip1)
1726         self.snat_add_address(nat_ip2)
1727         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1728         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1729         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1730                                                  is_inside=0)
1731
1732         # first VRF
1733         pkts = self.create_stream_in(self.pg0, self.pg2)
1734         self.pg0.add_stream(pkts)
1735         self.pg_enable_capture(self.pg_interfaces)
1736         self.pg_start()
1737         capture = self.pg2.get_capture(len(pkts))
1738         self.verify_capture_out(capture, nat_ip1)
1739
1740         # second VRF
1741         pkts = self.create_stream_in(self.pg1, self.pg2)
1742         self.pg1.add_stream(pkts)
1743         self.pg_enable_capture(self.pg_interfaces)
1744         self.pg_start()
1745         capture = self.pg2.get_capture(len(pkts))
1746         self.verify_capture_out(capture, nat_ip1)
1747
1748     def test_dynamic_ipless_interfaces(self):
1749         """ SNAT interfaces without configured ip dynamic map """
1750
1751         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1752                                       self.pg7.remote_mac,
1753                                       self.pg7.remote_ip4n,
1754                                       is_static=1)
1755         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1756                                       self.pg8.remote_mac,
1757                                       self.pg8.remote_ip4n,
1758                                       is_static=1)
1759
1760         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1761                                    dst_address_length=32,
1762                                    next_hop_address=self.pg7.remote_ip4n,
1763                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1764         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1765                                    dst_address_length=32,
1766                                    next_hop_address=self.pg8.remote_ip4n,
1767                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1768
1769         self.snat_add_address(self.snat_addr)
1770         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1771         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1772                                                  is_inside=0)
1773
1774         # in2out
1775         pkts = self.create_stream_in(self.pg7, self.pg8)
1776         self.pg7.add_stream(pkts)
1777         self.pg_enable_capture(self.pg_interfaces)
1778         self.pg_start()
1779         capture = self.pg8.get_capture(len(pkts))
1780         self.verify_capture_out(capture)
1781
1782         # out2in
1783         pkts = self.create_stream_out(self.pg8, self.snat_addr)
1784         self.pg8.add_stream(pkts)
1785         self.pg_enable_capture(self.pg_interfaces)
1786         self.pg_start()
1787         capture = self.pg7.get_capture(len(pkts))
1788         self.verify_capture_in(capture, self.pg7)
1789
1790     def test_static_ipless_interfaces(self):
1791         """ SNAT 1:1 NAT interfaces without configured ip """
1792
1793         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1794                                       self.pg7.remote_mac,
1795                                       self.pg7.remote_ip4n,
1796                                       is_static=1)
1797         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1798                                       self.pg8.remote_mac,
1799                                       self.pg8.remote_ip4n,
1800                                       is_static=1)
1801
1802         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1803                                    dst_address_length=32,
1804                                    next_hop_address=self.pg7.remote_ip4n,
1805                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1806         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1807                                    dst_address_length=32,
1808                                    next_hop_address=self.pg8.remote_ip4n,
1809                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1810
1811         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1812         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1813         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1814                                                  is_inside=0)
1815
1816         # out2in
1817         pkts = self.create_stream_out(self.pg8)
1818         self.pg8.add_stream(pkts)
1819         self.pg_enable_capture(self.pg_interfaces)
1820         self.pg_start()
1821         capture = self.pg7.get_capture(len(pkts))
1822         self.verify_capture_in(capture, self.pg7)
1823
1824         # in2out
1825         pkts = self.create_stream_in(self.pg7, self.pg8)
1826         self.pg7.add_stream(pkts)
1827         self.pg_enable_capture(self.pg_interfaces)
1828         self.pg_start()
1829         capture = self.pg8.get_capture(len(pkts))
1830         self.verify_capture_out(capture, self.snat_addr, True)
1831
1832     def test_static_with_port_ipless_interfaces(self):
1833         """ SNAT 1:1 NAT with port interfaces without configured ip """
1834
1835         self.tcp_port_out = 30606
1836         self.udp_port_out = 30607
1837         self.icmp_id_out = 30608
1838
1839         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1840                                       self.pg7.remote_mac,
1841                                       self.pg7.remote_ip4n,
1842                                       is_static=1)
1843         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1844                                       self.pg8.remote_mac,
1845                                       self.pg8.remote_ip4n,
1846                                       is_static=1)
1847
1848         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1849                                    dst_address_length=32,
1850                                    next_hop_address=self.pg7.remote_ip4n,
1851                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1852         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1853                                    dst_address_length=32,
1854                                    next_hop_address=self.pg8.remote_ip4n,
1855                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1856
1857         self.snat_add_address(self.snat_addr)
1858         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1859                                      self.tcp_port_in, self.tcp_port_out,
1860                                      proto=IP_PROTOS.tcp)
1861         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1862                                      self.udp_port_in, self.udp_port_out,
1863                                      proto=IP_PROTOS.udp)
1864         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1865                                      self.icmp_id_in, self.icmp_id_out,
1866                                      proto=IP_PROTOS.icmp)
1867         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1868         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1869                                                  is_inside=0)
1870
1871         # out2in
1872         pkts = self.create_stream_out(self.pg8)
1873         self.pg8.add_stream(pkts)
1874         self.pg_enable_capture(self.pg_interfaces)
1875         self.pg_start()
1876         capture = self.pg7.get_capture(len(pkts))
1877         self.verify_capture_in(capture, self.pg7)
1878
1879         # in2out
1880         pkts = self.create_stream_in(self.pg7, self.pg8)
1881         self.pg7.add_stream(pkts)
1882         self.pg_enable_capture(self.pg_interfaces)
1883         self.pg_start()
1884         capture = self.pg8.get_capture(len(pkts))
1885         self.verify_capture_out(capture)
1886
1887     def test_static_unknown_proto(self):
1888         """ 1:1 NAT translate packet with unknown protocol """
1889         nat_ip = "10.0.0.10"
1890         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1891         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1892         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1893                                                  is_inside=0)
1894
1895         # in2out
1896         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1897              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1898              GRE() /
1899              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
1900              TCP(sport=1234, dport=1234))
1901         self.pg0.add_stream(p)
1902         self.pg_enable_capture(self.pg_interfaces)
1903         self.pg_start()
1904         p = self.pg1.get_capture(1)
1905         packet = p[0]
1906         try:
1907             self.assertEqual(packet[IP].src, nat_ip)
1908             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1909             self.assertTrue(packet.haslayer(GRE))
1910             self.check_ip_checksum(packet)
1911         except:
1912             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1913             raise
1914
1915         # out2in
1916         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1917              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1918              GRE() /
1919              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
1920              TCP(sport=1234, dport=1234))
1921         self.pg1.add_stream(p)
1922         self.pg_enable_capture(self.pg_interfaces)
1923         self.pg_start()
1924         p = self.pg0.get_capture(1)
1925         packet = p[0]
1926         try:
1927             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
1928             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
1929             self.assertTrue(packet.haslayer(GRE))
1930             self.check_ip_checksum(packet)
1931         except:
1932             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1933             raise
1934
1935     def test_hairpinning_unknown_proto(self):
1936         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
1937
1938         host = self.pg0.remote_hosts[0]
1939         server = self.pg0.remote_hosts[1]
1940
1941         host_nat_ip = "10.0.0.10"
1942         server_nat_ip = "10.0.0.11"
1943
1944         self.snat_add_static_mapping(host.ip4, host_nat_ip)
1945         self.snat_add_static_mapping(server.ip4, server_nat_ip)
1946         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1947         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1948                                                  is_inside=0)
1949
1950         # host to server
1951         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
1952              IP(src=host.ip4, dst=server_nat_ip) /
1953              GRE() /
1954              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
1955              TCP(sport=1234, dport=1234))
1956         self.pg0.add_stream(p)
1957         self.pg_enable_capture(self.pg_interfaces)
1958         self.pg_start()
1959         p = self.pg0.get_capture(1)
1960         packet = p[0]
1961         try:
1962             self.assertEqual(packet[IP].src, host_nat_ip)
1963             self.assertEqual(packet[IP].dst, server.ip4)
1964             self.assertTrue(packet.haslayer(GRE))
1965             self.check_ip_checksum(packet)
1966         except:
1967             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1968             raise
1969
1970         # server to host
1971         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
1972              IP(src=server.ip4, dst=host_nat_ip) /
1973              GRE() /
1974              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
1975              TCP(sport=1234, dport=1234))
1976         self.pg0.add_stream(p)
1977         self.pg_enable_capture(self.pg_interfaces)
1978         self.pg_start()
1979         p = self.pg0.get_capture(1)
1980         packet = p[0]
1981         try:
1982             self.assertEqual(packet[IP].src, server_nat_ip)
1983             self.assertEqual(packet[IP].dst, host.ip4)
1984             self.assertTrue(packet.haslayer(GRE))
1985             self.check_ip_checksum(packet)
1986         except:
1987             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1988             raise
1989
1990     def tearDown(self):
1991         super(TestSNAT, self).tearDown()
1992         if not self.vpp_dead:
1993             self.logger.info(self.vapi.cli("show snat verbose"))
1994             self.clear_snat()
1995
1996
1997 class TestDeterministicNAT(MethodHolder):
1998     """ Deterministic NAT Test Cases """
1999
2000     @classmethod
2001     def setUpConstants(cls):
2002         super(TestDeterministicNAT, cls).setUpConstants()
2003         cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
2004
2005     @classmethod
2006     def setUpClass(cls):
2007         super(TestDeterministicNAT, cls).setUpClass()
2008
2009         try:
2010             cls.tcp_port_in = 6303
2011             cls.tcp_external_port = 6303
2012             cls.udp_port_in = 6304
2013             cls.udp_external_port = 6304
2014             cls.icmp_id_in = 6305
2015             cls.snat_addr = '10.0.0.3'
2016
2017             cls.create_pg_interfaces(range(3))
2018             cls.interfaces = list(cls.pg_interfaces)
2019
2020             for i in cls.interfaces:
2021                 i.admin_up()
2022                 i.config_ip4()
2023                 i.resolve_arp()
2024
2025             cls.pg0.generate_remote_hosts(2)
2026             cls.pg0.configure_ipv4_neighbors()
2027
2028         except Exception:
2029             super(TestDeterministicNAT, cls).tearDownClass()
2030             raise
2031
2032     def create_stream_in(self, in_if, out_if, ttl=64):
2033         """
2034         Create packet stream for inside network
2035
2036         :param in_if: Inside interface
2037         :param out_if: Outside interface
2038         :param ttl: TTL of generated packets
2039         """
2040         pkts = []
2041         # TCP
2042         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2043              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2044              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2045         pkts.append(p)
2046
2047         # UDP
2048         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2049              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2050              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2051         pkts.append(p)
2052
2053         # ICMP
2054         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2055              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2056              ICMP(id=self.icmp_id_in, type='echo-request'))
2057         pkts.append(p)
2058
2059         return pkts
2060
2061     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2062         """
2063         Create packet stream for outside network
2064
2065         :param out_if: Outside interface
2066         :param dst_ip: Destination IP address (Default use global SNAT address)
2067         :param ttl: TTL of generated packets
2068         """
2069         if dst_ip is None:
2070             dst_ip = self.snat_addr
2071         pkts = []
2072         # TCP
2073         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2074              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2075              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2076         pkts.append(p)
2077
2078         # UDP
2079         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2080              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2081              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2082         pkts.append(p)
2083
2084         # ICMP
2085         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2086              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2087              ICMP(id=self.icmp_external_id, type='echo-reply'))
2088         pkts.append(p)
2089
2090         return pkts
2091
2092     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2093         """
2094         Verify captured packets on outside network
2095
2096         :param capture: Captured packets
2097         :param nat_ip: Translated IP address (Default use global SNAT address)
2098         :param same_port: Sorce port number is not translated (Default False)
2099         :param packet_num: Expected number of packets (Default 3)
2100         """
2101         if nat_ip is None:
2102             nat_ip = self.snat_addr
2103         self.assertEqual(packet_num, len(capture))
2104         for packet in capture:
2105             try:
2106                 self.assertEqual(packet[IP].src, nat_ip)
2107                 if packet.haslayer(TCP):
2108                     self.tcp_port_out = packet[TCP].sport
2109                 elif packet.haslayer(UDP):
2110                     self.udp_port_out = packet[UDP].sport
2111                 else:
2112                     self.icmp_external_id = packet[ICMP].id
2113             except:
2114                 self.logger.error(ppp("Unexpected or invalid packet "
2115                                       "(outside network):", packet))
2116                 raise
2117
2118     def initiate_tcp_session(self, in_if, out_if):
2119         """
2120         Initiates TCP session
2121
2122         :param in_if: Inside interface
2123         :param out_if: Outside interface
2124         """
2125         try:
2126             # SYN packet in->out
2127             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2128                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2129                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2130                      flags="S"))
2131             in_if.add_stream(p)
2132             self.pg_enable_capture(self.pg_interfaces)
2133             self.pg_start()
2134             capture = out_if.get_capture(1)
2135             p = capture[0]
2136             self.tcp_port_out = p[TCP].sport
2137
2138             # SYN + ACK packet out->in
2139             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2140                  IP(src=out_if.remote_ip4, dst=self.snat_addr) /
2141                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2142                      flags="SA"))
2143             out_if.add_stream(p)
2144             self.pg_enable_capture(self.pg_interfaces)
2145             self.pg_start()
2146             in_if.get_capture(1)
2147
2148             # ACK packet in->out
2149             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2150                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2151                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2152                      flags="A"))
2153             in_if.add_stream(p)
2154             self.pg_enable_capture(self.pg_interfaces)
2155             self.pg_start()
2156             out_if.get_capture(1)
2157
2158         except:
2159             self.logger.error("TCP 3 way handshake failed")
2160             raise
2161
2162     def verify_ipfix_max_entries_per_user(self, data):
2163         """
2164         Verify IPFIX maximum entries per user exceeded event
2165
2166         :param data: Decoded IPFIX data records
2167         """
2168         self.assertEqual(1, len(data))
2169         record = data[0]
2170         # natEvent
2171         self.assertEqual(ord(record[230]), 13)
2172         # natQuotaExceededEvent
2173         self.assertEqual('\x03\x00\x00\x00', record[466])
2174         # sourceIPv4Address
2175         self.assertEqual(self.pg0.remote_ip4n, record[8])
2176
2177     def test_deterministic_mode(self):
2178         """ S-NAT run deterministic mode """
2179         in_addr = '172.16.255.0'
2180         out_addr = '172.17.255.50'
2181         in_addr_t = '172.16.255.20'
2182         in_addr_n = socket.inet_aton(in_addr)
2183         out_addr_n = socket.inet_aton(out_addr)
2184         in_addr_t_n = socket.inet_aton(in_addr_t)
2185         in_plen = 24
2186         out_plen = 32
2187
2188         snat_config = self.vapi.snat_show_config()
2189         self.assertEqual(1, snat_config.deterministic)
2190
2191         self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
2192
2193         rep1 = self.vapi.snat_det_forward(in_addr_t_n)
2194         self.assertEqual(rep1.out_addr[:4], out_addr_n)
2195         rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
2196         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2197
2198         deterministic_mappings = self.vapi.snat_det_map_dump()
2199         self.assertEqual(len(deterministic_mappings), 1)
2200         dsm = deterministic_mappings[0]
2201         self.assertEqual(in_addr_n, dsm.in_addr[:4])
2202         self.assertEqual(in_plen, dsm.in_plen)
2203         self.assertEqual(out_addr_n, dsm.out_addr[:4])
2204         self.assertEqual(out_plen, dsm.out_plen)
2205
2206         self.clear_snat()
2207         deterministic_mappings = self.vapi.snat_det_map_dump()
2208         self.assertEqual(len(deterministic_mappings), 0)
2209
2210     def test_set_timeouts(self):
2211         """ Set deterministic NAT timeouts """
2212         timeouts_before = self.vapi.snat_det_get_timeouts()
2213
2214         self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
2215                                         timeouts_before.tcp_established + 10,
2216                                         timeouts_before.tcp_transitory + 10,
2217                                         timeouts_before.icmp + 10)
2218
2219         timeouts_after = self.vapi.snat_det_get_timeouts()
2220
2221         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2222         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2223         self.assertNotEqual(timeouts_before.tcp_established,
2224                             timeouts_after.tcp_established)
2225         self.assertNotEqual(timeouts_before.tcp_transitory,
2226                             timeouts_after.tcp_transitory)
2227
2228     def test_det_in(self):
2229         """ CGNAT translation test (TCP, UDP, ICMP) """
2230
2231         nat_ip = "10.0.0.10"
2232
2233         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2234                                    32,
2235                                    socket.inet_aton(nat_ip),
2236                                    32)
2237         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2238         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2239                                                  is_inside=0)
2240
2241         # in2out
2242         pkts = self.create_stream_in(self.pg0, self.pg1)
2243         self.pg0.add_stream(pkts)
2244         self.pg_enable_capture(self.pg_interfaces)
2245         self.pg_start()
2246         capture = self.pg1.get_capture(len(pkts))
2247         self.verify_capture_out(capture, nat_ip)
2248
2249         # out2in
2250         pkts = self.create_stream_out(self.pg1, nat_ip)
2251         self.pg1.add_stream(pkts)
2252         self.pg_enable_capture(self.pg_interfaces)
2253         self.pg_start()
2254         capture = self.pg0.get_capture(len(pkts))
2255         self.verify_capture_in(capture, self.pg0)
2256
2257         # session dump test
2258         sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
2259         self.assertEqual(len(sessions), 3)
2260
2261         # TCP session
2262         s = sessions[0]
2263         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2264         self.assertEqual(s.in_port, self.tcp_port_in)
2265         self.assertEqual(s.out_port, self.tcp_port_out)
2266         self.assertEqual(s.ext_port, self.tcp_external_port)
2267
2268         # UDP session
2269         s = sessions[1]
2270         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2271         self.assertEqual(s.in_port, self.udp_port_in)
2272         self.assertEqual(s.out_port, self.udp_port_out)
2273         self.assertEqual(s.ext_port, self.udp_external_port)
2274
2275         # ICMP session
2276         s = sessions[2]
2277         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2278         self.assertEqual(s.in_port, self.icmp_id_in)
2279         self.assertEqual(s.out_port, self.icmp_external_id)
2280
2281     def test_multiple_users(self):
2282         """ CGNAT multiple users """
2283
2284         nat_ip = "10.0.0.10"
2285         port_in = 80
2286         external_port = 6303
2287
2288         host0 = self.pg0.remote_hosts[0]
2289         host1 = self.pg0.remote_hosts[1]
2290
2291         self.vapi.snat_add_det_map(host0.ip4n,
2292                                    24,
2293                                    socket.inet_aton(nat_ip),
2294                                    32)
2295         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2296         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2297                                                  is_inside=0)
2298
2299         # host0 to out
2300         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2301              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2302              TCP(sport=port_in, dport=external_port))
2303         self.pg0.add_stream(p)
2304         self.pg_enable_capture(self.pg_interfaces)
2305         self.pg_start()
2306         capture = self.pg1.get_capture(1)
2307         p = capture[0]
2308         try:
2309             ip = p[IP]
2310             tcp = p[TCP]
2311             self.assertEqual(ip.src, nat_ip)
2312             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2313             self.assertEqual(tcp.dport, external_port)
2314             port_out0 = tcp.sport
2315         except:
2316             self.logger.error(ppp("Unexpected or invalid packet:", p))
2317             raise
2318
2319         # host1 to out
2320         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2321              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2322              TCP(sport=port_in, dport=external_port))
2323         self.pg0.add_stream(p)
2324         self.pg_enable_capture(self.pg_interfaces)
2325         self.pg_start()
2326         capture = self.pg1.get_capture(1)
2327         p = capture[0]
2328         try:
2329             ip = p[IP]
2330             tcp = p[TCP]
2331             self.assertEqual(ip.src, nat_ip)
2332             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2333             self.assertEqual(tcp.dport, external_port)
2334             port_out1 = tcp.sport
2335         except:
2336             self.logger.error(ppp("Unexpected or invalid packet:", p))
2337             raise
2338
2339         dms = self.vapi.snat_det_map_dump()
2340         self.assertEqual(1, len(dms))
2341         self.assertEqual(2, dms[0].ses_num)
2342
2343         # out to host0
2344         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2345              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2346              TCP(sport=external_port, dport=port_out0))
2347         self.pg1.add_stream(p)
2348         self.pg_enable_capture(self.pg_interfaces)
2349         self.pg_start()
2350         capture = self.pg0.get_capture(1)
2351         p = capture[0]
2352         try:
2353             ip = p[IP]
2354             tcp = p[TCP]
2355             self.assertEqual(ip.src, self.pg1.remote_ip4)
2356             self.assertEqual(ip.dst, host0.ip4)
2357             self.assertEqual(tcp.dport, port_in)
2358             self.assertEqual(tcp.sport, external_port)
2359         except:
2360             self.logger.error(ppp("Unexpected or invalid packet:", p))
2361             raise
2362
2363         # out to host1
2364         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2365              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2366              TCP(sport=external_port, dport=port_out1))
2367         self.pg1.add_stream(p)
2368         self.pg_enable_capture(self.pg_interfaces)
2369         self.pg_start()
2370         capture = self.pg0.get_capture(1)
2371         p = capture[0]
2372         try:
2373             ip = p[IP]
2374             tcp = p[TCP]
2375             self.assertEqual(ip.src, self.pg1.remote_ip4)
2376             self.assertEqual(ip.dst, host1.ip4)
2377             self.assertEqual(tcp.dport, port_in)
2378             self.assertEqual(tcp.sport, external_port)
2379         except:
2380             self.logger.error(ppp("Unexpected or invalid packet", p))
2381             raise
2382
2383         # session close api test
2384         self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
2385                                              port_out1,
2386                                              self.pg1.remote_ip4n,
2387                                              external_port)
2388         dms = self.vapi.snat_det_map_dump()
2389         self.assertEqual(dms[0].ses_num, 1)
2390
2391         self.vapi.snat_det_close_session_in(host0.ip4n,
2392                                             port_in,
2393                                             self.pg1.remote_ip4n,
2394                                             external_port)
2395         dms = self.vapi.snat_det_map_dump()
2396         self.assertEqual(dms[0].ses_num, 0)
2397
2398     def test_tcp_session_close_detection_in(self):
2399         """ CGNAT TCP session close initiated from inside network """
2400         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2401                                    32,
2402                                    socket.inet_aton(self.snat_addr),
2403                                    32)
2404         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2405         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2406                                                  is_inside=0)
2407
2408         self.initiate_tcp_session(self.pg0, self.pg1)
2409
2410         # close the session from inside
2411         try:
2412             # FIN packet in -> out
2413             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2414                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2415                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2416                      flags="F"))
2417             self.pg0.add_stream(p)
2418             self.pg_enable_capture(self.pg_interfaces)
2419             self.pg_start()
2420             self.pg1.get_capture(1)
2421
2422             pkts = []
2423
2424             # ACK packet out -> in
2425             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2426                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2427                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2428                      flags="A"))
2429             pkts.append(p)
2430
2431             # FIN packet out -> in
2432             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2433                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2434                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2435                      flags="F"))
2436             pkts.append(p)
2437
2438             self.pg1.add_stream(pkts)
2439             self.pg_enable_capture(self.pg_interfaces)
2440             self.pg_start()
2441             self.pg0.get_capture(2)
2442
2443             # ACK packet in -> out
2444             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2445                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2446                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2447                      flags="A"))
2448             self.pg0.add_stream(p)
2449             self.pg_enable_capture(self.pg_interfaces)
2450             self.pg_start()
2451             self.pg1.get_capture(1)
2452
2453             # Check if snat closed the session
2454             dms = self.vapi.snat_det_map_dump()
2455             self.assertEqual(0, dms[0].ses_num)
2456         except:
2457             self.logger.error("TCP session termination failed")
2458             raise
2459
2460     def test_tcp_session_close_detection_out(self):
2461         """ CGNAT TCP session close initiated from outside network """
2462         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2463                                    32,
2464                                    socket.inet_aton(self.snat_addr),
2465                                    32)
2466         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2467         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2468                                                  is_inside=0)
2469
2470         self.initiate_tcp_session(self.pg0, self.pg1)
2471
2472         # close the session from outside
2473         try:
2474             # FIN packet out -> in
2475             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2476                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2477                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2478                      flags="F"))
2479             self.pg1.add_stream(p)
2480             self.pg_enable_capture(self.pg_interfaces)
2481             self.pg_start()
2482             self.pg0.get_capture(1)
2483
2484             pkts = []
2485
2486             # ACK packet in -> out
2487             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2488                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2489                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2490                      flags="A"))
2491             pkts.append(p)
2492
2493             # ACK packet in -> out
2494             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2495                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2496                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2497                      flags="F"))
2498             pkts.append(p)
2499
2500             self.pg0.add_stream(pkts)
2501             self.pg_enable_capture(self.pg_interfaces)
2502             self.pg_start()
2503             self.pg1.get_capture(2)
2504
2505             # ACK packet out -> in
2506             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2507                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2508                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2509                      flags="A"))
2510             self.pg1.add_stream(p)
2511             self.pg_enable_capture(self.pg_interfaces)
2512             self.pg_start()
2513             self.pg0.get_capture(1)
2514
2515             # Check if snat closed the session
2516             dms = self.vapi.snat_det_map_dump()
2517             self.assertEqual(0, dms[0].ses_num)
2518         except:
2519             self.logger.error("TCP session termination failed")
2520             raise
2521
2522     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2523     def test_session_timeout(self):
2524         """ CGNAT session timeouts """
2525         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2526                                    32,
2527                                    socket.inet_aton(self.snat_addr),
2528                                    32)
2529         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2530         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2531                                                  is_inside=0)
2532
2533         self.initiate_tcp_session(self.pg0, self.pg1)
2534         self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2535         pkts = self.create_stream_in(self.pg0, self.pg1)
2536         self.pg0.add_stream(pkts)
2537         self.pg_enable_capture(self.pg_interfaces)
2538         self.pg_start()
2539         capture = self.pg1.get_capture(len(pkts))
2540         sleep(15)
2541
2542         dms = self.vapi.snat_det_map_dump()
2543         self.assertEqual(0, dms[0].ses_num)
2544
2545     def test_session_limit_per_user(self):
2546         """ CGNAT maximum 1000 sessions per user should be created """
2547         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2548                                    32,
2549                                    socket.inet_aton(self.snat_addr),
2550                                    32)
2551         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2552         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2553                                                  is_inside=0)
2554         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2555                                      src_address=self.pg2.local_ip4n,
2556                                      path_mtu=512,
2557                                      template_interval=10)
2558         self.vapi.snat_ipfix()
2559
2560         pkts = []
2561         for port in range(1025, 2025):
2562             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2563                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2564                  UDP(sport=port, dport=port))
2565             pkts.append(p)
2566
2567         self.pg0.add_stream(pkts)
2568         self.pg_enable_capture(self.pg_interfaces)
2569         self.pg_start()
2570         capture = self.pg1.get_capture(len(pkts))
2571
2572         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2573              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2574              UDP(sport=3001, dport=3002))
2575         self.pg0.add_stream(p)
2576         self.pg_enable_capture(self.pg_interfaces)
2577         self.pg_start()
2578         capture = self.pg1.assert_nothing_captured()
2579
2580         # verify ICMP error packet
2581         capture = self.pg0.get_capture(1)
2582         p = capture[0]
2583         self.assertTrue(p.haslayer(ICMP))
2584         icmp = p[ICMP]
2585         self.assertEqual(icmp.type, 3)
2586         self.assertEqual(icmp.code, 1)
2587         self.assertTrue(icmp.haslayer(IPerror))
2588         inner_ip = icmp[IPerror]
2589         self.assertEqual(inner_ip[UDPerror].sport, 3001)
2590         self.assertEqual(inner_ip[UDPerror].dport, 3002)
2591
2592         dms = self.vapi.snat_det_map_dump()
2593
2594         self.assertEqual(1000, dms[0].ses_num)
2595
2596         # verify IPFIX logging
2597         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2598         capture = self.pg2.get_capture(2)
2599         ipfix = IPFIXDecoder()
2600         # first load template
2601         for p in capture:
2602             self.assertTrue(p.haslayer(IPFIX))
2603             if p.haslayer(Template):
2604                 ipfix.add_template(p.getlayer(Template))
2605         # verify events in data set
2606         for p in capture:
2607             if p.haslayer(Data):
2608                 data = ipfix.decode_data_set(p.getlayer(Set))
2609                 self.verify_ipfix_max_entries_per_user(data)
2610
2611     def clear_snat(self):
2612         """
2613         Clear SNAT configuration.
2614         """
2615         self.vapi.snat_ipfix(enable=0)
2616         self.vapi.snat_det_set_timeouts()
2617         deterministic_mappings = self.vapi.snat_det_map_dump()
2618         for dsm in deterministic_mappings:
2619             self.vapi.snat_add_det_map(dsm.in_addr,
2620                                        dsm.in_plen,
2621                                        dsm.out_addr,
2622                                        dsm.out_plen,
2623                                        is_add=0)
2624
2625         interfaces = self.vapi.snat_interface_dump()
2626         for intf in interfaces:
2627             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2628                                                      intf.is_inside,
2629                                                      is_add=0)
2630
2631     def tearDown(self):
2632         super(TestDeterministicNAT, self).tearDown()
2633         if not self.vpp_dead:
2634             self.logger.info(self.vapi.cli("show snat detail"))
2635             self.clear_snat()
2636
2637
2638 class TestNAT64(MethodHolder):
2639     """ NAT64 Test Cases """
2640
2641     @classmethod
2642     def setUpClass(cls):
2643         super(TestNAT64, cls).setUpClass()
2644
2645         try:
2646             cls.tcp_port_in = 6303
2647             cls.tcp_port_out = 6303
2648             cls.udp_port_in = 6304
2649             cls.udp_port_out = 6304
2650             cls.icmp_id_in = 6305
2651             cls.icmp_id_out = 6305
2652             cls.nat_addr = '10.0.0.3'
2653             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
2654             cls.vrf1_id = 10
2655             cls.vrf1_nat_addr = '10.0.10.3'
2656             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
2657                                                    cls.vrf1_nat_addr)
2658
2659             cls.create_pg_interfaces(range(3))
2660             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
2661             cls.ip6_interfaces.append(cls.pg_interfaces[2])
2662             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
2663
2664             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
2665
2666             cls.pg0.generate_remote_hosts(2)
2667
2668             for i in cls.ip6_interfaces:
2669                 i.admin_up()
2670                 i.config_ip6()
2671                 i.configure_ipv6_neighbors()
2672
2673             for i in cls.ip4_interfaces:
2674                 i.admin_up()
2675                 i.config_ip4()
2676                 i.resolve_arp()
2677
2678         except Exception:
2679             super(TestNAT64, cls).tearDownClass()
2680             raise
2681
2682     def test_pool(self):
2683         """ Add/delete address to NAT64 pool """
2684         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
2685
2686         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
2687
2688         addresses = self.vapi.nat64_pool_addr_dump()
2689         self.assertEqual(len(addresses), 1)
2690         self.assertEqual(addresses[0].address, nat_addr)
2691
2692         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
2693
2694         addresses = self.vapi.nat64_pool_addr_dump()
2695         self.assertEqual(len(addresses), 0)
2696
2697     def test_interface(self):
2698         """ Enable/disable NAT64 feature on the interface """
2699         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2700         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2701
2702         interfaces = self.vapi.nat64_interface_dump()
2703         self.assertEqual(len(interfaces), 2)
2704         pg0_found = False
2705         pg1_found = False
2706         for intf in interfaces:
2707             if intf.sw_if_index == self.pg0.sw_if_index:
2708                 self.assertEqual(intf.is_inside, 1)
2709                 pg0_found = True
2710             elif intf.sw_if_index == self.pg1.sw_if_index:
2711                 self.assertEqual(intf.is_inside, 0)
2712                 pg1_found = True
2713         self.assertTrue(pg0_found)
2714         self.assertTrue(pg1_found)
2715
2716         features = self.vapi.cli("show interface features pg0")
2717         self.assertNotEqual(features.find('nat64-in2out'), -1)
2718         features = self.vapi.cli("show interface features pg1")
2719         self.assertNotEqual(features.find('nat64-out2in'), -1)
2720
2721         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
2722         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
2723
2724         interfaces = self.vapi.nat64_interface_dump()
2725         self.assertEqual(len(interfaces), 0)
2726
2727     def test_static_bib(self):
2728         """ Add/delete static BIB entry """
2729         in_addr = socket.inet_pton(socket.AF_INET6,
2730                                    '2001:db8:85a3::8a2e:370:7334')
2731         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
2732         in_port = 1234
2733         out_port = 5678
2734         proto = IP_PROTOS.tcp
2735
2736         self.vapi.nat64_add_del_static_bib(in_addr,
2737                                            out_addr,
2738                                            in_port,
2739                                            out_port,
2740                                            proto)
2741         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2742         static_bib_num = 0
2743         for bibe in bib:
2744             if bibe.is_static:
2745                 static_bib_num += 1
2746                 self.assertEqual(bibe.i_addr, in_addr)
2747                 self.assertEqual(bibe.o_addr, out_addr)
2748                 self.assertEqual(bibe.i_port, in_port)
2749                 self.assertEqual(bibe.o_port, out_port)
2750         self.assertEqual(static_bib_num, 1)
2751
2752         self.vapi.nat64_add_del_static_bib(in_addr,
2753                                            out_addr,
2754                                            in_port,
2755                                            out_port,
2756                                            proto,
2757                                            is_add=0)
2758         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2759         static_bib_num = 0
2760         for bibe in bib:
2761             if bibe.is_static:
2762                 static_bib_num += 1
2763         self.assertEqual(static_bib_num, 0)
2764
2765     def test_set_timeouts(self):
2766         """ Set NAT64 timeouts """
2767         # verify default values
2768         timeouts = self.vapi.nat64_get_timeouts()
2769         self.assertEqual(timeouts.udp, 300)
2770         self.assertEqual(timeouts.icmp, 60)
2771         self.assertEqual(timeouts.tcp_trans, 240)
2772         self.assertEqual(timeouts.tcp_est, 7440)
2773         self.assertEqual(timeouts.tcp_incoming_syn, 6)
2774
2775         # set and verify custom values
2776         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
2777                                      tcp_est=7450, tcp_incoming_syn=10)
2778         timeouts = self.vapi.nat64_get_timeouts()
2779         self.assertEqual(timeouts.udp, 200)
2780         self.assertEqual(timeouts.icmp, 30)
2781         self.assertEqual(timeouts.tcp_trans, 250)
2782         self.assertEqual(timeouts.tcp_est, 7450)
2783         self.assertEqual(timeouts.tcp_incoming_syn, 10)
2784
2785     def test_dynamic(self):
2786         """ NAT64 dynamic translation test """
2787         self.tcp_port_in = 6303
2788         self.udp_port_in = 6304
2789         self.icmp_id_in = 6305
2790
2791         ses_num_start = self.nat64_get_ses_num()
2792
2793         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2794                                                 self.nat_addr_n)
2795         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2796         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2797
2798         # in2out
2799         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2800         self.pg0.add_stream(pkts)
2801         self.pg_enable_capture(self.pg_interfaces)
2802         self.pg_start()
2803         capture = self.pg1.get_capture(len(pkts))
2804         self.verify_capture_out(capture, nat_ip=self.nat_addr,
2805                                 dst_ip=self.pg1.remote_ip4)
2806
2807         # out2in
2808         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2809         self.pg1.add_stream(pkts)
2810         self.pg_enable_capture(self.pg_interfaces)
2811         self.pg_start()
2812         capture = self.pg0.get_capture(len(pkts))
2813         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2814         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2815
2816         # in2out
2817         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2818         self.pg0.add_stream(pkts)
2819         self.pg_enable_capture(self.pg_interfaces)
2820         self.pg_start()
2821         capture = self.pg1.get_capture(len(pkts))
2822         self.verify_capture_out(capture, nat_ip=self.nat_addr,
2823                                 dst_ip=self.pg1.remote_ip4)
2824
2825         # out2in
2826         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2827         self.pg1.add_stream(pkts)
2828         self.pg_enable_capture(self.pg_interfaces)
2829         self.pg_start()
2830         capture = self.pg0.get_capture(len(pkts))
2831         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2832
2833         ses_num_end = self.nat64_get_ses_num()
2834
2835         self.assertEqual(ses_num_end - ses_num_start, 3)
2836
2837         # tenant with specific VRF
2838         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
2839                                                 self.vrf1_nat_addr_n,
2840                                                 vrf_id=self.vrf1_id)
2841         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
2842
2843         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
2844         self.pg2.add_stream(pkts)
2845         self.pg_enable_capture(self.pg_interfaces)
2846         self.pg_start()
2847         capture = self.pg1.get_capture(len(pkts))
2848         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
2849                                 dst_ip=self.pg1.remote_ip4)
2850
2851         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
2852         self.pg1.add_stream(pkts)
2853         self.pg_enable_capture(self.pg_interfaces)
2854         self.pg_start()
2855         capture = self.pg2.get_capture(len(pkts))
2856         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
2857
2858     def test_static(self):
2859         """ NAT64 static translation test """
2860         self.tcp_port_in = 60303
2861         self.udp_port_in = 60304
2862         self.icmp_id_in = 60305
2863         self.tcp_port_out = 60303
2864         self.udp_port_out = 60304
2865         self.icmp_id_out = 60305
2866
2867         ses_num_start = self.nat64_get_ses_num()
2868
2869         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2870                                                 self.nat_addr_n)
2871         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2872         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2873
2874         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2875                                            self.nat_addr_n,
2876                                            self.tcp_port_in,
2877                                            self.tcp_port_out,
2878                                            IP_PROTOS.tcp)
2879         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2880                                            self.nat_addr_n,
2881                                            self.udp_port_in,
2882                                            self.udp_port_out,
2883                                            IP_PROTOS.udp)
2884         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2885                                            self.nat_addr_n,
2886                                            self.icmp_id_in,
2887                                            self.icmp_id_out,
2888                                            IP_PROTOS.icmp)
2889
2890         # in2out
2891         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2892         self.pg0.add_stream(pkts)
2893         self.pg_enable_capture(self.pg_interfaces)
2894         self.pg_start()
2895         capture = self.pg1.get_capture(len(pkts))
2896         self.verify_capture_out(capture, nat_ip=self.nat_addr,
2897                                 dst_ip=self.pg1.remote_ip4, same_port=True)
2898
2899         # out2in
2900         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2901         self.pg1.add_stream(pkts)
2902         self.pg_enable_capture(self.pg_interfaces)
2903         self.pg_start()
2904         capture = self.pg0.get_capture(len(pkts))
2905         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2906         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2907
2908         ses_num_end = self.nat64_get_ses_num()
2909
2910         self.assertEqual(ses_num_end - ses_num_start, 3)
2911
2912     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2913     def test_session_timeout(self):
2914         """ NAT64 session timeout """
2915         self.icmp_id_in = 1234
2916         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2917                                                 self.nat_addr_n)
2918         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2919         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2920         self.vapi.nat64_set_timeouts(icmp=5)
2921
2922         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2923         self.pg0.add_stream(pkts)
2924         self.pg_enable_capture(self.pg_interfaces)
2925         self.pg_start()
2926         capture = self.pg1.get_capture(len(pkts))
2927
2928         ses_num_before_timeout = self.nat64_get_ses_num()
2929
2930         sleep(15)
2931
2932         # ICMP session after timeout
2933         ses_num_after_timeout = self.nat64_get_ses_num()
2934         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
2935
2936     def test_icmp_error(self):
2937         """ NAT64 ICMP Error message translation """
2938         self.tcp_port_in = 6303
2939         self.udp_port_in = 6304
2940         self.icmp_id_in = 6305
2941
2942         ses_num_start = self.nat64_get_ses_num()
2943
2944         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2945                                                 self.nat_addr_n)
2946         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2947         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2948
2949         # send some packets to create sessions
2950         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2951         self.pg0.add_stream(pkts)
2952         self.pg_enable_capture(self.pg_interfaces)
2953         self.pg_start()
2954         capture_ip4 = self.pg1.get_capture(len(pkts))
2955         self.verify_capture_out(capture_ip4,
2956                                 nat_ip=self.nat_addr,
2957                                 dst_ip=self.pg1.remote_ip4)
2958
2959         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2960         self.pg1.add_stream(pkts)
2961         self.pg_enable_capture(self.pg_interfaces)
2962         self.pg_start()
2963         capture_ip6 = self.pg0.get_capture(len(pkts))
2964         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2965         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
2966                                    self.pg0.remote_ip6)
2967
2968         # in2out
2969         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2970                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
2971                 ICMPv6DestUnreach(code=1) /
2972                 packet[IPv6] for packet in capture_ip6]
2973         self.pg0.add_stream(pkts)
2974         self.pg_enable_capture(self.pg_interfaces)
2975         self.pg_start()
2976         capture = self.pg1.get_capture(len(pkts))
2977         for packet in capture:
2978             try:
2979                 self.assertEqual(packet[IP].src, self.nat_addr)
2980                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2981                 self.assertEqual(packet[ICMP].type, 3)
2982                 self.assertEqual(packet[ICMP].code, 13)
2983                 inner = packet[IPerror]
2984                 self.assertEqual(inner.src, self.pg1.remote_ip4)
2985                 self.assertEqual(inner.dst, self.nat_addr)
2986                 self.check_icmp_checksum(packet)
2987                 if inner.haslayer(TCPerror):
2988                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
2989                 elif inner.haslayer(UDPerror):
2990                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
2991                 else:
2992                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
2993             except:
2994                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2995                 raise
2996
2997         # out2in
2998         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2999                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3000                 ICMP(type=3, code=13) /
3001                 packet[IP] for packet in capture_ip4]
3002         self.pg1.add_stream(pkts)
3003         self.pg_enable_capture(self.pg_interfaces)
3004         self.pg_start()
3005         capture = self.pg0.get_capture(len(pkts))
3006         for packet in capture:
3007             try:
3008                 self.assertEqual(packet[IPv6].src, ip.src)
3009                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3010                 icmp = packet[ICMPv6DestUnreach]
3011                 self.assertEqual(icmp.code, 1)
3012                 inner = icmp[IPerror6]
3013                 self.assertEqual(inner.src, self.pg0.remote_ip6)
3014                 self.assertEqual(inner.dst, ip.src)
3015                 self.check_icmpv6_checksum(packet)
3016                 if inner.haslayer(TCPerror):
3017                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3018                 elif inner.haslayer(UDPerror):
3019                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3020                 else:
3021                     self.assertEqual(inner[ICMPv6EchoRequest].id,
3022                                      self.icmp_id_in)
3023             except:
3024                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3025                 raise
3026
3027     def test_hairpinning(self):
3028         """ NAT64 hairpinning """
3029
3030         client = self.pg0.remote_hosts[0]
3031         server = self.pg0.remote_hosts[1]
3032         server_tcp_in_port = 22
3033         server_tcp_out_port = 4022
3034         server_udp_in_port = 23
3035         server_udp_out_port = 4023
3036         client_tcp_in_port = 1234
3037         client_udp_in_port = 1235
3038         client_tcp_out_port = 0
3039         client_udp_out_port = 0
3040         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3041         nat_addr_ip6 = ip.src
3042
3043         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3044                                                 self.nat_addr_n)
3045         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3046         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3047
3048         self.vapi.nat64_add_del_static_bib(server.ip6n,
3049                                            self.nat_addr_n,
3050                                            server_tcp_in_port,
3051                                            server_tcp_out_port,
3052                                            IP_PROTOS.tcp)
3053         self.vapi.nat64_add_del_static_bib(server.ip6n,
3054                                            self.nat_addr_n,
3055                                            server_udp_in_port,
3056                                            server_udp_out_port,
3057                                            IP_PROTOS.udp)
3058
3059         # client to server
3060         pkts = []
3061         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3062              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3063              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3064         pkts.append(p)
3065         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3066              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3067              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3068         pkts.append(p)
3069         self.pg0.add_stream(pkts)
3070         self.pg_enable_capture(self.pg_interfaces)
3071         self.pg_start()
3072         capture = self.pg0.get_capture(len(pkts))
3073         for packet in capture:
3074             try:
3075                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3076                 self.assertEqual(packet[IPv6].dst, server.ip6)
3077                 if packet.haslayer(TCP):
3078                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3079                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3080                     self.check_tcp_checksum(packet)
3081                     client_tcp_out_port = packet[TCP].sport
3082                 else:
3083                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3084                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
3085                     self.check_udp_checksum(packet)
3086                     client_udp_out_port = packet[UDP].sport
3087             except:
3088                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3089                 raise
3090
3091         # server to client
3092         pkts = []
3093         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3094              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3095              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3096         pkts.append(p)
3097         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3098              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3099              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3100         pkts.append(p)
3101         self.pg0.add_stream(pkts)
3102         self.pg_enable_capture(self.pg_interfaces)
3103         self.pg_start()
3104         capture = self.pg0.get_capture(len(pkts))
3105         for packet in capture:
3106             try:
3107                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3108                 self.assertEqual(packet[IPv6].dst, client.ip6)
3109                 if packet.haslayer(TCP):
3110                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3111                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3112                     self.check_tcp_checksum(packet)
3113                 else:
3114                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
3115                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
3116                     self.check_udp_checksum(packet)
3117             except:
3118                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3119                 raise
3120
3121         # ICMP error
3122         pkts = []
3123         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3124                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3125                 ICMPv6DestUnreach(code=1) /
3126                 packet[IPv6] for packet in capture]
3127         self.pg0.add_stream(pkts)
3128         self.pg_enable_capture(self.pg_interfaces)
3129         self.pg_start()
3130         capture = self.pg0.get_capture(len(pkts))
3131         for packet in capture:
3132             try:
3133                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3134                 self.assertEqual(packet[IPv6].dst, server.ip6)
3135                 icmp = packet[ICMPv6DestUnreach]
3136                 self.assertEqual(icmp.code, 1)
3137                 inner = icmp[IPerror6]
3138                 self.assertEqual(inner.src, server.ip6)
3139                 self.assertEqual(inner.dst, nat_addr_ip6)
3140                 self.check_icmpv6_checksum(packet)
3141                 if inner.haslayer(TCPerror):
3142                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3143                     self.assertEqual(inner[TCPerror].dport,
3144                                      client_tcp_out_port)
3145                 else:
3146                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3147                     self.assertEqual(inner[UDPerror].dport,
3148                                      client_udp_out_port)
3149             except:
3150                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3151                 raise
3152
3153     def test_prefix(self):
3154         """ NAT64 Network-Specific Prefix """
3155
3156         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3157                                                 self.nat_addr_n)
3158         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3159         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3160         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3161                                                 self.vrf1_nat_addr_n,
3162                                                 vrf_id=self.vrf1_id)
3163         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3164
3165         # Add global prefix
3166         global_pref64 = "2001:db8::"
3167         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3168         global_pref64_len = 32
3169         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3170
3171         prefix = self.vapi.nat64_prefix_dump()
3172         self.assertEqual(len(prefix), 1)
3173         self.assertEqual(prefix[0].prefix, global_pref64_n)
3174         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3175         self.assertEqual(prefix[0].vrf_id, 0)
3176
3177         # Add tenant specific prefix
3178         vrf1_pref64 = "2001:db8:122:300::"
3179         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3180         vrf1_pref64_len = 56
3181         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3182                                        vrf1_pref64_len,
3183                                        vrf_id=self.vrf1_id)
3184         prefix = self.vapi.nat64_prefix_dump()
3185         self.assertEqual(len(prefix), 2)
3186
3187         # Global prefix
3188         pkts = self.create_stream_in_ip6(self.pg0,
3189                                          self.pg1,
3190                                          pref=global_pref64,
3191                                          plen=global_pref64_len)
3192         self.pg0.add_stream(pkts)
3193         self.pg_enable_capture(self.pg_interfaces)
3194         self.pg_start()
3195         capture = self.pg1.get_capture(len(pkts))
3196         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3197                                 dst_ip=self.pg1.remote_ip4)
3198
3199         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3200         self.pg1.add_stream(pkts)
3201         self.pg_enable_capture(self.pg_interfaces)
3202         self.pg_start()
3203         capture = self.pg0.get_capture(len(pkts))
3204         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3205                                   global_pref64,
3206                                   global_pref64_len)
3207         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3208
3209         # Tenant specific prefix
3210         pkts = self.create_stream_in_ip6(self.pg2,
3211                                          self.pg1,
3212                                          pref=vrf1_pref64,
3213                                          plen=vrf1_pref64_len)
3214         self.pg2.add_stream(pkts)
3215         self.pg_enable_capture(self.pg_interfaces)
3216         self.pg_start()
3217         capture = self.pg1.get_capture(len(pkts))
3218         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3219                                 dst_ip=self.pg1.remote_ip4)
3220
3221         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3222         self.pg1.add_stream(pkts)
3223         self.pg_enable_capture(self.pg_interfaces)
3224         self.pg_start()
3225         capture = self.pg2.get_capture(len(pkts))
3226         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3227                                   vrf1_pref64,
3228                                   vrf1_pref64_len)
3229         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3230
3231     def nat64_get_ses_num(self):
3232         """
3233         Return number of active NAT64 sessions.
3234         """
3235         ses_num = 0
3236         st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
3237         ses_num += len(st)
3238         st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
3239         ses_num += len(st)
3240         st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
3241         ses_num += len(st)
3242         return ses_num
3243
3244     def clear_nat64(self):
3245         """
3246         Clear NAT64 configuration.
3247         """
3248         self.vapi.nat64_set_timeouts()
3249
3250         interfaces = self.vapi.nat64_interface_dump()
3251         for intf in interfaces:
3252             self.vapi.nat64_add_del_interface(intf.sw_if_index,
3253                                               intf.is_inside,
3254                                               is_add=0)
3255
3256         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3257         for bibe in bib:
3258             if bibe.is_static:
3259                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3260                                                    bibe.o_addr,
3261                                                    bibe.i_port,
3262                                                    bibe.o_port,
3263                                                    bibe.proto,
3264                                                    bibe.vrf_id,
3265                                                    is_add=0)
3266
3267         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3268         for bibe in bib:
3269             if bibe.is_static:
3270                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3271                                                    bibe.o_addr,
3272                                                    bibe.i_port,
3273                                                    bibe.o_port,
3274                                                    bibe.proto,
3275                                                    bibe.vrf_id,
3276                                                    is_add=0)
3277
3278         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3279         for bibe in bib:
3280             if bibe.is_static:
3281                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3282                                                    bibe.o_addr,
3283                                                    bibe.i_port,
3284                                                    bibe.o_port,
3285                                                    bibe.proto,
3286                                                    bibe.vrf_id,
3287                                                    is_add=0)
3288
3289         adresses = self.vapi.nat64_pool_addr_dump()
3290         for addr in adresses:
3291             self.vapi.nat64_add_del_pool_addr_range(addr.address,
3292                                                     addr.address,
3293                                                     vrf_id=addr.vrf_id,
3294                                                     is_add=0)
3295
3296         prefixes = self.vapi.nat64_prefix_dump()
3297         for prefix in prefixes:
3298             self.vapi.nat64_add_del_prefix(prefix.prefix,
3299                                            prefix.prefix_len,
3300                                            vrf_id=prefix.vrf_id,
3301                                            is_add=0)
3302
3303     def tearDown(self):
3304         super(TestNAT64, self).tearDown()
3305         if not self.vpp_dead:
3306             self.logger.info(self.vapi.cli("show nat64 pool"))
3307             self.logger.info(self.vapi.cli("show nat64 interfaces"))
3308             self.logger.info(self.vapi.cli("show nat64 prefix"))
3309             self.logger.info(self.vapi.cli("show nat64 bib tcp"))
3310             self.logger.info(self.vapi.cli("show nat64 bib udp"))
3311             self.logger.info(self.vapi.cli("show nat64 bib icmp"))
3312             self.logger.info(self.vapi.cli("show nat64 session table tcp"))
3313             self.logger.info(self.vapi.cli("show nat64 session table udp"))
3314             self.logger.info(self.vapi.cli("show nat64 session table icmp"))
3315             self.clear_nat64()
3316
3317 if __name__ == '__main__':
3318     unittest.main(testRunner=VppTestRunner)