TEST:add L2BD arp term tests
[vpp.git] / test / test_snat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
15 from util import ppp
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
18
19
20 class MethodHolder(VppTestCase):
21     """ SNAT create capture and verify method holder """
22
23     @classmethod
24     def setUpClass(cls):
25         super(MethodHolder, cls).setUpClass()
26
27     def tearDown(self):
28         super(MethodHolder, self).tearDown()
29
30     def check_ip_checksum(self, pkt):
31         """
32         Check IP checksum of the packet
33
34         :param pkt: Packet to check IP checksum
35         """
36         new = pkt.__class__(str(pkt))
37         del new['IP'].chksum
38         new = new.__class__(str(new))
39         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
40
41     def check_tcp_checksum(self, pkt):
42         """
43         Check TCP checksum in IP packet
44
45         :param pkt: Packet to check TCP checksum
46         """
47         new = pkt.__class__(str(pkt))
48         del new['TCP'].chksum
49         new = new.__class__(str(new))
50         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
51
52     def check_udp_checksum(self, pkt):
53         """
54         Check UDP checksum in IP packet
55
56         :param pkt: Packet to check UDP checksum
57         """
58         new = pkt.__class__(str(pkt))
59         del new['UDP'].chksum
60         new = new.__class__(str(new))
61         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
62
63     def check_icmp_errror_embedded(self, pkt):
64         """
65         Check ICMP error embeded packet checksum
66
67         :param pkt: Packet to check ICMP error embeded packet checksum
68         """
69         if pkt.haslayer(IPerror):
70             new = pkt.__class__(str(pkt))
71             del new['IPerror'].chksum
72             new = new.__class__(str(new))
73             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
74
75         if pkt.haslayer(TCPerror):
76             new = pkt.__class__(str(pkt))
77             del new['TCPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
80
81         if pkt.haslayer(UDPerror):
82             if pkt['UDPerror'].chksum != 0:
83                 new = pkt.__class__(str(pkt))
84                 del new['UDPerror'].chksum
85                 new = new.__class__(str(new))
86                 self.assertEqual(new['UDPerror'].chksum,
87                                  pkt['UDPerror'].chksum)
88
89         if pkt.haslayer(ICMPerror):
90             del new['ICMPerror'].chksum
91             new = new.__class__(str(new))
92             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
93
94     def check_icmp_checksum(self, pkt):
95         """
96         Check ICMP checksum in IPv4 packet
97
98         :param pkt: Packet to check ICMP checksum
99         """
100         new = pkt.__class__(str(pkt))
101         del new['ICMP'].chksum
102         new = new.__class__(str(new))
103         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
104         if pkt.haslayer(IPerror):
105             self.check_icmp_errror_embedded(pkt)
106
107     def check_icmpv6_checksum(self, pkt):
108         """
109         Check ICMPv6 checksum in IPv4 packet
110
111         :param pkt: Packet to check ICMPv6 checksum
112         """
113         new = pkt.__class__(str(pkt))
114         if pkt.haslayer(ICMPv6DestUnreach):
115             del new['ICMPv6DestUnreach'].cksum
116             new = new.__class__(str(new))
117             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
118                              pkt['ICMPv6DestUnreach'].cksum)
119             self.check_icmp_errror_embedded(pkt)
120         if pkt.haslayer(ICMPv6EchoRequest):
121             del new['ICMPv6EchoRequest'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
124                              pkt['ICMPv6EchoRequest'].cksum)
125         if pkt.haslayer(ICMPv6EchoReply):
126             del new['ICMPv6EchoReply'].cksum
127             new = new.__class__(str(new))
128             self.assertEqual(new['ICMPv6EchoReply'].cksum,
129                              pkt['ICMPv6EchoReply'].cksum)
130
131     def create_stream_in(self, in_if, out_if, ttl=64):
132         """
133         Create packet stream for inside network
134
135         :param in_if: Inside interface
136         :param out_if: Outside interface
137         :param ttl: TTL of generated packets
138         """
139         pkts = []
140         # TCP
141         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
142              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
143              TCP(sport=self.tcp_port_in, dport=20))
144         pkts.append(p)
145
146         # UDP
147         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149              UDP(sport=self.udp_port_in, dport=20))
150         pkts.append(p)
151
152         # ICMP
153         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155              ICMP(id=self.icmp_id_in, type='echo-request'))
156         pkts.append(p)
157
158         return pkts
159
160     def compose_ip6(self, ip4, pref, plen):
161         """
162         Compose IPv4-embedded IPv6 addresses
163
164         :param ip4: IPv4 address
165         :param pref: IPv6 prefix
166         :param plen: IPv6 prefix length
167         :returns: IPv4-embedded IPv6 addresses
168         """
169         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
170         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
171         if plen == 32:
172             pref_n[4] = ip4_n[0]
173             pref_n[5] = ip4_n[1]
174             pref_n[6] = ip4_n[2]
175             pref_n[7] = ip4_n[3]
176         elif plen == 40:
177             pref_n[5] = ip4_n[0]
178             pref_n[6] = ip4_n[1]
179             pref_n[7] = ip4_n[2]
180             pref_n[9] = ip4_n[3]
181         elif plen == 48:
182             pref_n[6] = ip4_n[0]
183             pref_n[7] = ip4_n[1]
184             pref_n[9] = ip4_n[2]
185             pref_n[10] = ip4_n[3]
186         elif plen == 56:
187             pref_n[7] = ip4_n[0]
188             pref_n[9] = ip4_n[1]
189             pref_n[10] = ip4_n[2]
190             pref_n[11] = ip4_n[3]
191         elif plen == 64:
192             pref_n[9] = ip4_n[0]
193             pref_n[10] = ip4_n[1]
194             pref_n[11] = ip4_n[2]
195             pref_n[12] = ip4_n[3]
196         elif plen == 96:
197             pref_n[12] = ip4_n[0]
198             pref_n[13] = ip4_n[1]
199             pref_n[14] = ip4_n[2]
200             pref_n[15] = ip4_n[3]
201         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
202
203     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
204         """
205         Create IPv6 packet stream for inside network
206
207         :param in_if: Inside interface
208         :param out_if: Outside interface
209         :param ttl: Hop Limit of generated packets
210         :param pref: NAT64 prefix
211         :param plen: NAT64 prefix length
212         """
213         pkts = []
214         if pref is None:
215             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
216         else:
217             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
218
219         # TCP
220         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
221              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
222              TCP(sport=self.tcp_port_in, dport=20))
223         pkts.append(p)
224
225         # UDP
226         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
227              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
228              UDP(sport=self.udp_port_in, dport=20))
229         pkts.append(p)
230
231         # ICMP
232         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
233              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
234              ICMPv6EchoRequest(id=self.icmp_id_in))
235         pkts.append(p)
236
237         return pkts
238
239     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
240         """
241         Create packet stream for outside network
242
243         :param out_if: Outside interface
244         :param dst_ip: Destination IP address (Default use global SNAT address)
245         :param ttl: TTL of generated packets
246         """
247         if dst_ip is None:
248             dst_ip = self.snat_addr
249         pkts = []
250         # TCP
251         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
252              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
253              TCP(dport=self.tcp_port_out, sport=20))
254         pkts.append(p)
255
256         # UDP
257         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
258              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
259              UDP(dport=self.udp_port_out, sport=20))
260         pkts.append(p)
261
262         # ICMP
263         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
264              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
265              ICMP(id=self.icmp_id_out, type='echo-reply'))
266         pkts.append(p)
267
268         return pkts
269
270     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
271                            packet_num=3, dst_ip=None):
272         """
273         Verify captured packets on outside network
274
275         :param capture: Captured packets
276         :param nat_ip: Translated IP address (Default use global SNAT address)
277         :param same_port: Sorce port number is not translated (Default False)
278         :param packet_num: Expected number of packets (Default 3)
279         :param dst_ip: Destination IP address (Default do not verify)
280         """
281         if nat_ip is None:
282             nat_ip = self.snat_addr
283         self.assertEqual(packet_num, len(capture))
284         for packet in capture:
285             try:
286                 self.check_ip_checksum(packet)
287                 self.assertEqual(packet[IP].src, nat_ip)
288                 if dst_ip is not None:
289                     self.assertEqual(packet[IP].dst, dst_ip)
290                 if packet.haslayer(TCP):
291                     if same_port:
292                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
293                     else:
294                         self.assertNotEqual(
295                             packet[TCP].sport, self.tcp_port_in)
296                     self.tcp_port_out = packet[TCP].sport
297                     self.check_tcp_checksum(packet)
298                 elif packet.haslayer(UDP):
299                     if same_port:
300                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
301                     else:
302                         self.assertNotEqual(
303                             packet[UDP].sport, self.udp_port_in)
304                     self.udp_port_out = packet[UDP].sport
305                 else:
306                     if same_port:
307                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
308                     else:
309                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
310                     self.icmp_id_out = packet[ICMP].id
311                     self.check_icmp_checksum(packet)
312             except:
313                 self.logger.error(ppp("Unexpected or invalid packet "
314                                       "(outside network):", packet))
315                 raise
316
317     def verify_capture_in(self, capture, in_if, packet_num=3):
318         """
319         Verify captured packets on inside network
320
321         :param capture: Captured packets
322         :param in_if: Inside interface
323         :param packet_num: Expected number of packets (Default 3)
324         """
325         self.assertEqual(packet_num, len(capture))
326         for packet in capture:
327             try:
328                 self.check_ip_checksum(packet)
329                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
330                 if packet.haslayer(TCP):
331                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
332                     self.check_tcp_checksum(packet)
333                 elif packet.haslayer(UDP):
334                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
335                 else:
336                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
337                     self.check_icmp_checksum(packet)
338             except:
339                 self.logger.error(ppp("Unexpected or invalid packet "
340                                       "(inside network):", packet))
341                 raise
342
343     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
344         """
345         Verify captured IPv6 packets on inside network
346
347         :param capture: Captured packets
348         :param src_ip: Source IP
349         :param dst_ip: Destination IP address
350         :param packet_num: Expected number of packets (Default 3)
351         """
352         self.assertEqual(packet_num, len(capture))
353         for packet in capture:
354             try:
355                 self.assertEqual(packet[IPv6].src, src_ip)
356                 self.assertEqual(packet[IPv6].dst, dst_ip)
357                 if packet.haslayer(TCP):
358                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
359                     self.check_tcp_checksum(packet)
360                 elif packet.haslayer(UDP):
361                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
362                     self.check_udp_checksum(packet)
363                 else:
364                     self.assertEqual(packet[ICMPv6EchoReply].id,
365                                      self.icmp_id_in)
366                     self.check_icmpv6_checksum(packet)
367             except:
368                 self.logger.error(ppp("Unexpected or invalid packet "
369                                       "(inside network):", packet))
370                 raise
371
372     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
373         """
374         Verify captured packet that don't have to be translated
375
376         :param capture: Captured packets
377         :param ingress_if: Ingress interface
378         :param egress_if: Egress interface
379         """
380         for packet in capture:
381             try:
382                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
383                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
384                 if packet.haslayer(TCP):
385                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
386                 elif packet.haslayer(UDP):
387                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
388                 else:
389                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
390             except:
391                 self.logger.error(ppp("Unexpected or invalid packet "
392                                       "(inside network):", packet))
393                 raise
394
395     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
396                                             packet_num=3, icmp_type=11):
397         """
398         Verify captured packets with ICMP errors on outside network
399
400         :param capture: Captured packets
401         :param src_ip: Translated IP address or IP address of VPP
402                        (Default use global SNAT address)
403         :param packet_num: Expected number of packets (Default 3)
404         :param icmp_type: Type of error ICMP packet
405                           we are expecting (Default 11)
406         """
407         if src_ip is None:
408             src_ip = self.snat_addr
409         self.assertEqual(packet_num, len(capture))
410         for packet in capture:
411             try:
412                 self.assertEqual(packet[IP].src, src_ip)
413                 self.assertTrue(packet.haslayer(ICMP))
414                 icmp = packet[ICMP]
415                 self.assertEqual(icmp.type, icmp_type)
416                 self.assertTrue(icmp.haslayer(IPerror))
417                 inner_ip = icmp[IPerror]
418                 if inner_ip.haslayer(TCPerror):
419                     self.assertEqual(inner_ip[TCPerror].dport,
420                                      self.tcp_port_out)
421                 elif inner_ip.haslayer(UDPerror):
422                     self.assertEqual(inner_ip[UDPerror].dport,
423                                      self.udp_port_out)
424                 else:
425                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
426             except:
427                 self.logger.error(ppp("Unexpected or invalid packet "
428                                       "(outside network):", packet))
429                 raise
430
431     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
432                                            icmp_type=11):
433         """
434         Verify captured packets with ICMP errors on inside network
435
436         :param capture: Captured packets
437         :param in_if: Inside interface
438         :param packet_num: Expected number of packets (Default 3)
439         :param icmp_type: Type of error ICMP packet
440                           we are expecting (Default 11)
441         """
442         self.assertEqual(packet_num, len(capture))
443         for packet in capture:
444             try:
445                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
446                 self.assertTrue(packet.haslayer(ICMP))
447                 icmp = packet[ICMP]
448                 self.assertEqual(icmp.type, icmp_type)
449                 self.assertTrue(icmp.haslayer(IPerror))
450                 inner_ip = icmp[IPerror]
451                 if inner_ip.haslayer(TCPerror):
452                     self.assertEqual(inner_ip[TCPerror].sport,
453                                      self.tcp_port_in)
454                 elif inner_ip.haslayer(UDPerror):
455                     self.assertEqual(inner_ip[UDPerror].sport,
456                                      self.udp_port_in)
457                 else:
458                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
459             except:
460                 self.logger.error(ppp("Unexpected or invalid packet "
461                                       "(inside network):", packet))
462                 raise
463
464     def verify_ipfix_nat44_ses(self, data):
465         """
466         Verify IPFIX NAT44 session create/delete event
467
468         :param data: Decoded IPFIX data records
469         """
470         nat44_ses_create_num = 0
471         nat44_ses_delete_num = 0
472         self.assertEqual(6, len(data))
473         for record in data:
474             # natEvent
475             self.assertIn(ord(record[230]), [4, 5])
476             if ord(record[230]) == 4:
477                 nat44_ses_create_num += 1
478             else:
479                 nat44_ses_delete_num += 1
480             # sourceIPv4Address
481             self.assertEqual(self.pg0.remote_ip4n, record[8])
482             # postNATSourceIPv4Address
483             self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
484                              record[225])
485             # ingressVRFID
486             self.assertEqual(struct.pack("!I", 0), record[234])
487             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
488             if IP_PROTOS.icmp == ord(record[4]):
489                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
490                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
491                                  record[227])
492             elif IP_PROTOS.tcp == ord(record[4]):
493                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
494                                  record[7])
495                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
496                                  record[227])
497             elif IP_PROTOS.udp == ord(record[4]):
498                 self.assertEqual(struct.pack("!H", self.udp_port_in),
499                                  record[7])
500                 self.assertEqual(struct.pack("!H", self.udp_port_out),
501                                  record[227])
502             else:
503                 self.fail("Invalid protocol")
504         self.assertEqual(3, nat44_ses_create_num)
505         self.assertEqual(3, nat44_ses_delete_num)
506
507     def verify_ipfix_addr_exhausted(self, data):
508         """
509         Verify IPFIX NAT addresses event
510
511         :param data: Decoded IPFIX data records
512         """
513         self.assertEqual(1, len(data))
514         record = data[0]
515         # natEvent
516         self.assertEqual(ord(record[230]), 3)
517         # natPoolID
518         self.assertEqual(struct.pack("!I", 0), record[283])
519
520
521 class TestSNAT(MethodHolder):
522     """ SNAT Test Cases """
523
524     @classmethod
525     def setUpClass(cls):
526         super(TestSNAT, cls).setUpClass()
527
528         try:
529             cls.tcp_port_in = 6303
530             cls.tcp_port_out = 6303
531             cls.udp_port_in = 6304
532             cls.udp_port_out = 6304
533             cls.icmp_id_in = 6305
534             cls.icmp_id_out = 6305
535             cls.snat_addr = '10.0.0.3'
536             cls.ipfix_src_port = 4739
537             cls.ipfix_domain_id = 1
538
539             cls.create_pg_interfaces(range(9))
540             cls.interfaces = list(cls.pg_interfaces[0:4])
541
542             for i in cls.interfaces:
543                 i.admin_up()
544                 i.config_ip4()
545                 i.resolve_arp()
546
547             cls.pg0.generate_remote_hosts(3)
548             cls.pg0.configure_ipv4_neighbors()
549
550             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
551
552             cls.pg4._local_ip4 = "172.16.255.1"
553             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
554             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
555             cls.pg4.set_table_ip4(10)
556             cls.pg5._local_ip4 = "172.16.255.3"
557             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
558             cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
559             cls.pg5.set_table_ip4(10)
560             cls.pg6._local_ip4 = "172.16.255.1"
561             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
562             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
563             cls.pg6.set_table_ip4(20)
564             for i in cls.overlapping_interfaces:
565                 i.config_ip4()
566                 i.admin_up()
567                 i.resolve_arp()
568
569             cls.pg7.admin_up()
570             cls.pg8.admin_up()
571
572         except Exception:
573             super(TestSNAT, cls).tearDownClass()
574             raise
575
576     def clear_snat(self):
577         """
578         Clear SNAT configuration.
579         """
580         # I found no elegant way to do this
581         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
582                                    dst_address_length=32,
583                                    next_hop_address=self.pg7.remote_ip4n,
584                                    next_hop_sw_if_index=self.pg7.sw_if_index,
585                                    is_add=0)
586         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
587                                    dst_address_length=32,
588                                    next_hop_address=self.pg8.remote_ip4n,
589                                    next_hop_sw_if_index=self.pg8.sw_if_index,
590                                    is_add=0)
591
592         for intf in [self.pg7, self.pg8]:
593             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
594             for n in neighbors:
595                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
596                                               n.mac_address,
597                                               n.ip_address,
598                                               is_add=0)
599
600         if self.pg7.has_ip4_config:
601             self.pg7.unconfig_ip4()
602
603         interfaces = self.vapi.snat_interface_addr_dump()
604         for intf in interfaces:
605             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
606
607         self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
608                              domain_id=self.ipfix_domain_id)
609         self.ipfix_src_port = 4739
610         self.ipfix_domain_id = 1
611
612         interfaces = self.vapi.snat_interface_dump()
613         for intf in interfaces:
614             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
615                                                      intf.is_inside,
616                                                      is_add=0)
617
618         static_mappings = self.vapi.snat_static_mapping_dump()
619         for sm in static_mappings:
620             self.vapi.snat_add_static_mapping(sm.local_ip_address,
621                                               sm.external_ip_address,
622                                               local_port=sm.local_port,
623                                               external_port=sm.external_port,
624                                               addr_only=sm.addr_only,
625                                               vrf_id=sm.vrf_id,
626                                               protocol=sm.protocol,
627                                               is_add=0)
628
629         adresses = self.vapi.snat_address_dump()
630         for addr in adresses:
631             self.vapi.snat_add_address_range(addr.ip_address,
632                                              addr.ip_address,
633                                              is_add=0)
634
635     def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
636                                 local_port=0, external_port=0, vrf_id=0,
637                                 is_add=1, external_sw_if_index=0xFFFFFFFF,
638                                 proto=0):
639         """
640         Add/delete S-NAT static mapping
641
642         :param local_ip: Local IP address
643         :param external_ip: External IP address
644         :param local_port: Local port number (Optional)
645         :param external_port: External port number (Optional)
646         :param vrf_id: VRF ID (Default 0)
647         :param is_add: 1 if add, 0 if delete (Default add)
648         :param external_sw_if_index: External interface instead of IP address
649         :param proto: IP protocol (Mandatory if port specified)
650         """
651         addr_only = 1
652         if local_port and external_port:
653             addr_only = 0
654         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
655         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
656         self.vapi.snat_add_static_mapping(
657             l_ip,
658             e_ip,
659             external_sw_if_index,
660             local_port,
661             external_port,
662             addr_only,
663             vrf_id,
664             proto,
665             is_add)
666
667     def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
668         """
669         Add/delete S-NAT address
670
671         :param ip: IP address
672         :param is_add: 1 if add, 0 if delete (Default add)
673         """
674         snat_addr = socket.inet_pton(socket.AF_INET, ip)
675         self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
676                                          vrf_id=vrf_id)
677
678     def test_dynamic(self):
679         """ SNAT dynamic translation test """
680
681         self.snat_add_address(self.snat_addr)
682         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
683         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
684                                                  is_inside=0)
685
686         # in2out
687         pkts = self.create_stream_in(self.pg0, self.pg1)
688         self.pg0.add_stream(pkts)
689         self.pg_enable_capture(self.pg_interfaces)
690         self.pg_start()
691         capture = self.pg1.get_capture(len(pkts))
692         self.verify_capture_out(capture)
693
694         # out2in
695         pkts = self.create_stream_out(self.pg1)
696         self.pg1.add_stream(pkts)
697         self.pg_enable_capture(self.pg_interfaces)
698         self.pg_start()
699         capture = self.pg0.get_capture(len(pkts))
700         self.verify_capture_in(capture, self.pg0)
701
702     def test_dynamic_icmp_errors_in2out_ttl_1(self):
703         """ SNAT handling of client packets with TTL=1 """
704
705         self.snat_add_address(self.snat_addr)
706         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
707         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
708                                                  is_inside=0)
709
710         # Client side - generate traffic
711         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
712         self.pg0.add_stream(pkts)
713         self.pg_enable_capture(self.pg_interfaces)
714         self.pg_start()
715
716         # Client side - verify ICMP type 11 packets
717         capture = self.pg0.get_capture(len(pkts))
718         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
719
720     def test_dynamic_icmp_errors_out2in_ttl_1(self):
721         """ SNAT handling of server packets with TTL=1 """
722
723         self.snat_add_address(self.snat_addr)
724         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
725         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
726                                                  is_inside=0)
727
728         # Client side - create sessions
729         pkts = self.create_stream_in(self.pg0, self.pg1)
730         self.pg0.add_stream(pkts)
731         self.pg_enable_capture(self.pg_interfaces)
732         self.pg_start()
733
734         # Server side - generate traffic
735         capture = self.pg1.get_capture(len(pkts))
736         self.verify_capture_out(capture)
737         pkts = self.create_stream_out(self.pg1, ttl=1)
738         self.pg1.add_stream(pkts)
739         self.pg_enable_capture(self.pg_interfaces)
740         self.pg_start()
741
742         # Server side - verify ICMP type 11 packets
743         capture = self.pg1.get_capture(len(pkts))
744         self.verify_capture_out_with_icmp_errors(capture,
745                                                  src_ip=self.pg1.local_ip4)
746
747     def test_dynamic_icmp_errors_in2out_ttl_2(self):
748         """ SNAT handling of error responses to client packets with TTL=2 """
749
750         self.snat_add_address(self.snat_addr)
751         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
752         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
753                                                  is_inside=0)
754
755         # Client side - generate traffic
756         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
757         self.pg0.add_stream(pkts)
758         self.pg_enable_capture(self.pg_interfaces)
759         self.pg_start()
760
761         # Server side - simulate ICMP type 11 response
762         capture = self.pg1.get_capture(len(pkts))
763         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
764                 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
765                 ICMP(type=11) / packet[IP] for packet in capture]
766         self.pg1.add_stream(pkts)
767         self.pg_enable_capture(self.pg_interfaces)
768         self.pg_start()
769
770         # Client side - verify ICMP type 11 packets
771         capture = self.pg0.get_capture(len(pkts))
772         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
773
774     def test_dynamic_icmp_errors_out2in_ttl_2(self):
775         """ SNAT handling of error responses to server packets with TTL=2 """
776
777         self.snat_add_address(self.snat_addr)
778         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
779         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
780                                                  is_inside=0)
781
782         # Client side - create sessions
783         pkts = self.create_stream_in(self.pg0, self.pg1)
784         self.pg0.add_stream(pkts)
785         self.pg_enable_capture(self.pg_interfaces)
786         self.pg_start()
787
788         # Server side - generate traffic
789         capture = self.pg1.get_capture(len(pkts))
790         self.verify_capture_out(capture)
791         pkts = self.create_stream_out(self.pg1, ttl=2)
792         self.pg1.add_stream(pkts)
793         self.pg_enable_capture(self.pg_interfaces)
794         self.pg_start()
795
796         # Client side - simulate ICMP type 11 response
797         capture = self.pg0.get_capture(len(pkts))
798         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
799                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
800                 ICMP(type=11) / packet[IP] for packet in capture]
801         self.pg0.add_stream(pkts)
802         self.pg_enable_capture(self.pg_interfaces)
803         self.pg_start()
804
805         # Server side - verify ICMP type 11 packets
806         capture = self.pg1.get_capture(len(pkts))
807         self.verify_capture_out_with_icmp_errors(capture)
808
809     def test_ping_out_interface_from_outside(self):
810         """ Ping SNAT out interface from outside network """
811
812         self.snat_add_address(self.snat_addr)
813         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
814         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
815                                                  is_inside=0)
816
817         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
818              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
819              ICMP(id=self.icmp_id_out, type='echo-request'))
820         pkts = [p]
821         self.pg1.add_stream(pkts)
822         self.pg_enable_capture(self.pg_interfaces)
823         self.pg_start()
824         capture = self.pg1.get_capture(len(pkts))
825         self.assertEqual(1, len(capture))
826         packet = capture[0]
827         try:
828             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
829             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
830             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
831             self.assertEqual(packet[ICMP].type, 0)  # echo reply
832         except:
833             self.logger.error(ppp("Unexpected or invalid packet "
834                                   "(outside network):", packet))
835             raise
836
837     def test_ping_internal_host_from_outside(self):
838         """ Ping internal host from outside network """
839
840         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
841         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
842         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
843                                                  is_inside=0)
844
845         # out2in
846         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
847                IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
848                ICMP(id=self.icmp_id_out, type='echo-request'))
849         self.pg1.add_stream(pkt)
850         self.pg_enable_capture(self.pg_interfaces)
851         self.pg_start()
852         capture = self.pg0.get_capture(1)
853         self.verify_capture_in(capture, self.pg0, packet_num=1)
854         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
855
856         # in2out
857         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
858                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
859                ICMP(id=self.icmp_id_in, type='echo-reply'))
860         self.pg0.add_stream(pkt)
861         self.pg_enable_capture(self.pg_interfaces)
862         self.pg_start()
863         capture = self.pg1.get_capture(1)
864         self.verify_capture_out(capture, same_port=True, packet_num=1)
865         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
866
867     def test_static_in(self):
868         """ SNAT 1:1 NAT initialized from inside network """
869
870         nat_ip = "10.0.0.10"
871         self.tcp_port_out = 6303
872         self.udp_port_out = 6304
873         self.icmp_id_out = 6305
874
875         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
876         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
877         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
878                                                  is_inside=0)
879
880         # in2out
881         pkts = self.create_stream_in(self.pg0, self.pg1)
882         self.pg0.add_stream(pkts)
883         self.pg_enable_capture(self.pg_interfaces)
884         self.pg_start()
885         capture = self.pg1.get_capture(len(pkts))
886         self.verify_capture_out(capture, nat_ip, True)
887
888         # out2in
889         pkts = self.create_stream_out(self.pg1, nat_ip)
890         self.pg1.add_stream(pkts)
891         self.pg_enable_capture(self.pg_interfaces)
892         self.pg_start()
893         capture = self.pg0.get_capture(len(pkts))
894         self.verify_capture_in(capture, self.pg0)
895
896     def test_static_out(self):
897         """ SNAT 1:1 NAT initialized from outside network """
898
899         nat_ip = "10.0.0.20"
900         self.tcp_port_out = 6303
901         self.udp_port_out = 6304
902         self.icmp_id_out = 6305
903
904         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
905         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
906         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
907                                                  is_inside=0)
908
909         # out2in
910         pkts = self.create_stream_out(self.pg1, nat_ip)
911         self.pg1.add_stream(pkts)
912         self.pg_enable_capture(self.pg_interfaces)
913         self.pg_start()
914         capture = self.pg0.get_capture(len(pkts))
915         self.verify_capture_in(capture, self.pg0)
916
917         # in2out
918         pkts = self.create_stream_in(self.pg0, self.pg1)
919         self.pg0.add_stream(pkts)
920         self.pg_enable_capture(self.pg_interfaces)
921         self.pg_start()
922         capture = self.pg1.get_capture(len(pkts))
923         self.verify_capture_out(capture, nat_ip, True)
924
925     def test_static_with_port_in(self):
926         """ SNAT 1:1 NAT with port initialized from inside network """
927
928         self.tcp_port_out = 3606
929         self.udp_port_out = 3607
930         self.icmp_id_out = 3608
931
932         self.snat_add_address(self.snat_addr)
933         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
934                                      self.tcp_port_in, self.tcp_port_out,
935                                      proto=IP_PROTOS.tcp)
936         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
937                                      self.udp_port_in, self.udp_port_out,
938                                      proto=IP_PROTOS.udp)
939         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
940                                      self.icmp_id_in, self.icmp_id_out,
941                                      proto=IP_PROTOS.icmp)
942         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
943         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
944                                                  is_inside=0)
945
946         # in2out
947         pkts = self.create_stream_in(self.pg0, self.pg1)
948         self.pg0.add_stream(pkts)
949         self.pg_enable_capture(self.pg_interfaces)
950         self.pg_start()
951         capture = self.pg1.get_capture(len(pkts))
952         self.verify_capture_out(capture)
953
954         # out2in
955         pkts = self.create_stream_out(self.pg1)
956         self.pg1.add_stream(pkts)
957         self.pg_enable_capture(self.pg_interfaces)
958         self.pg_start()
959         capture = self.pg0.get_capture(len(pkts))
960         self.verify_capture_in(capture, self.pg0)
961
962     def test_static_with_port_out(self):
963         """ SNAT 1:1 NAT with port initialized from outside network """
964
965         self.tcp_port_out = 30606
966         self.udp_port_out = 30607
967         self.icmp_id_out = 30608
968
969         self.snat_add_address(self.snat_addr)
970         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
971                                      self.tcp_port_in, self.tcp_port_out,
972                                      proto=IP_PROTOS.tcp)
973         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
974                                      self.udp_port_in, self.udp_port_out,
975                                      proto=IP_PROTOS.udp)
976         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
977                                      self.icmp_id_in, self.icmp_id_out,
978                                      proto=IP_PROTOS.icmp)
979         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
980         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
981                                                  is_inside=0)
982
983         # out2in
984         pkts = self.create_stream_out(self.pg1)
985         self.pg1.add_stream(pkts)
986         self.pg_enable_capture(self.pg_interfaces)
987         self.pg_start()
988         capture = self.pg0.get_capture(len(pkts))
989         self.verify_capture_in(capture, self.pg0)
990
991         # in2out
992         pkts = self.create_stream_in(self.pg0, self.pg1)
993         self.pg0.add_stream(pkts)
994         self.pg_enable_capture(self.pg_interfaces)
995         self.pg_start()
996         capture = self.pg1.get_capture(len(pkts))
997         self.verify_capture_out(capture)
998
999     def test_static_vrf_aware(self):
1000         """ SNAT 1:1 NAT VRF awareness """
1001
1002         nat_ip1 = "10.0.0.30"
1003         nat_ip2 = "10.0.0.40"
1004         self.tcp_port_out = 6303
1005         self.udp_port_out = 6304
1006         self.icmp_id_out = 6305
1007
1008         self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1009                                      vrf_id=10)
1010         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1011                                      vrf_id=10)
1012         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1013                                                  is_inside=0)
1014         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1015         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
1016
1017         # inside interface VRF match SNAT static mapping VRF
1018         pkts = self.create_stream_in(self.pg4, self.pg3)
1019         self.pg4.add_stream(pkts)
1020         self.pg_enable_capture(self.pg_interfaces)
1021         self.pg_start()
1022         capture = self.pg3.get_capture(len(pkts))
1023         self.verify_capture_out(capture, nat_ip1, True)
1024
1025         # inside interface VRF don't match SNAT static mapping VRF (packets
1026         # are dropped)
1027         pkts = self.create_stream_in(self.pg0, self.pg3)
1028         self.pg0.add_stream(pkts)
1029         self.pg_enable_capture(self.pg_interfaces)
1030         self.pg_start()
1031         self.pg3.assert_nothing_captured()
1032
1033     def test_multiple_inside_interfaces(self):
1034         """ SNAT multiple inside interfaces (non-overlapping address space) """
1035
1036         self.snat_add_address(self.snat_addr)
1037         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1038         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1039         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1040                                                  is_inside=0)
1041
1042         # between two S-NAT inside interfaces (no translation)
1043         pkts = self.create_stream_in(self.pg0, self.pg1)
1044         self.pg0.add_stream(pkts)
1045         self.pg_enable_capture(self.pg_interfaces)
1046         self.pg_start()
1047         capture = self.pg1.get_capture(len(pkts))
1048         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1049
1050         # from S-NAT inside to interface without S-NAT feature (no translation)
1051         pkts = self.create_stream_in(self.pg0, self.pg2)
1052         self.pg0.add_stream(pkts)
1053         self.pg_enable_capture(self.pg_interfaces)
1054         self.pg_start()
1055         capture = self.pg2.get_capture(len(pkts))
1056         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1057
1058         # in2out 1st interface
1059         pkts = self.create_stream_in(self.pg0, self.pg3)
1060         self.pg0.add_stream(pkts)
1061         self.pg_enable_capture(self.pg_interfaces)
1062         self.pg_start()
1063         capture = self.pg3.get_capture(len(pkts))
1064         self.verify_capture_out(capture)
1065
1066         # out2in 1st interface
1067         pkts = self.create_stream_out(self.pg3)
1068         self.pg3.add_stream(pkts)
1069         self.pg_enable_capture(self.pg_interfaces)
1070         self.pg_start()
1071         capture = self.pg0.get_capture(len(pkts))
1072         self.verify_capture_in(capture, self.pg0)
1073
1074         # in2out 2nd interface
1075         pkts = self.create_stream_in(self.pg1, self.pg3)
1076         self.pg1.add_stream(pkts)
1077         self.pg_enable_capture(self.pg_interfaces)
1078         self.pg_start()
1079         capture = self.pg3.get_capture(len(pkts))
1080         self.verify_capture_out(capture)
1081
1082         # out2in 2nd interface
1083         pkts = self.create_stream_out(self.pg3)
1084         self.pg3.add_stream(pkts)
1085         self.pg_enable_capture(self.pg_interfaces)
1086         self.pg_start()
1087         capture = self.pg1.get_capture(len(pkts))
1088         self.verify_capture_in(capture, self.pg1)
1089
1090     def test_inside_overlapping_interfaces(self):
1091         """ SNAT multiple inside interfaces with overlapping address space """
1092
1093         static_nat_ip = "10.0.0.10"
1094         self.snat_add_address(self.snat_addr)
1095         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1096                                                  is_inside=0)
1097         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
1098         self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
1099         self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
1100         self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1101                                      vrf_id=20)
1102
1103         # between S-NAT inside interfaces with same VRF (no translation)
1104         pkts = self.create_stream_in(self.pg4, self.pg5)
1105         self.pg4.add_stream(pkts)
1106         self.pg_enable_capture(self.pg_interfaces)
1107         self.pg_start()
1108         capture = self.pg5.get_capture(len(pkts))
1109         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1110
1111         # between S-NAT inside interfaces with different VRF (hairpinning)
1112         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1113              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1114              TCP(sport=1234, dport=5678))
1115         self.pg4.add_stream(p)
1116         self.pg_enable_capture(self.pg_interfaces)
1117         self.pg_start()
1118         capture = self.pg6.get_capture(1)
1119         p = capture[0]
1120         try:
1121             ip = p[IP]
1122             tcp = p[TCP]
1123             self.assertEqual(ip.src, self.snat_addr)
1124             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1125             self.assertNotEqual(tcp.sport, 1234)
1126             self.assertEqual(tcp.dport, 5678)
1127         except:
1128             self.logger.error(ppp("Unexpected or invalid packet:", p))
1129             raise
1130
1131         # in2out 1st interface
1132         pkts = self.create_stream_in(self.pg4, self.pg3)
1133         self.pg4.add_stream(pkts)
1134         self.pg_enable_capture(self.pg_interfaces)
1135         self.pg_start()
1136         capture = self.pg3.get_capture(len(pkts))
1137         self.verify_capture_out(capture)
1138
1139         # out2in 1st interface
1140         pkts = self.create_stream_out(self.pg3)
1141         self.pg3.add_stream(pkts)
1142         self.pg_enable_capture(self.pg_interfaces)
1143         self.pg_start()
1144         capture = self.pg4.get_capture(len(pkts))
1145         self.verify_capture_in(capture, self.pg4)
1146
1147         # in2out 2nd interface
1148         pkts = self.create_stream_in(self.pg5, self.pg3)
1149         self.pg5.add_stream(pkts)
1150         self.pg_enable_capture(self.pg_interfaces)
1151         self.pg_start()
1152         capture = self.pg3.get_capture(len(pkts))
1153         self.verify_capture_out(capture)
1154
1155         # out2in 2nd interface
1156         pkts = self.create_stream_out(self.pg3)
1157         self.pg3.add_stream(pkts)
1158         self.pg_enable_capture(self.pg_interfaces)
1159         self.pg_start()
1160         capture = self.pg5.get_capture(len(pkts))
1161         self.verify_capture_in(capture, self.pg5)
1162
1163         # pg5 session dump
1164         addresses = self.vapi.snat_address_dump()
1165         self.assertEqual(len(addresses), 1)
1166         sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
1167         self.assertEqual(len(sessions), 3)
1168         for session in sessions:
1169             self.assertFalse(session.is_static)
1170             self.assertEqual(session.inside_ip_address[0:4],
1171                              self.pg5.remote_ip4n)
1172             self.assertEqual(session.outside_ip_address,
1173                              addresses[0].ip_address)
1174         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1175         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1176         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1177         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1178         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1179         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1180         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1181         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1182         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1183
1184         # in2out 3rd interface
1185         pkts = self.create_stream_in(self.pg6, self.pg3)
1186         self.pg6.add_stream(pkts)
1187         self.pg_enable_capture(self.pg_interfaces)
1188         self.pg_start()
1189         capture = self.pg3.get_capture(len(pkts))
1190         self.verify_capture_out(capture, static_nat_ip, True)
1191
1192         # out2in 3rd interface
1193         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1194         self.pg3.add_stream(pkts)
1195         self.pg_enable_capture(self.pg_interfaces)
1196         self.pg_start()
1197         capture = self.pg6.get_capture(len(pkts))
1198         self.verify_capture_in(capture, self.pg6)
1199
1200         # general user and session dump verifications
1201         users = self.vapi.snat_user_dump()
1202         self.assertTrue(len(users) >= 3)
1203         addresses = self.vapi.snat_address_dump()
1204         self.assertEqual(len(addresses), 1)
1205         for user in users:
1206             sessions = self.vapi.snat_user_session_dump(user.ip_address,
1207                                                         user.vrf_id)
1208             for session in sessions:
1209                 self.assertEqual(user.ip_address, session.inside_ip_address)
1210                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1211                 self.assertTrue(session.protocol in
1212                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1213                                  IP_PROTOS.icmp])
1214
1215         # pg4 session dump
1216         sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
1217         self.assertTrue(len(sessions) >= 4)
1218         for session in sessions:
1219             self.assertFalse(session.is_static)
1220             self.assertEqual(session.inside_ip_address[0:4],
1221                              self.pg4.remote_ip4n)
1222             self.assertEqual(session.outside_ip_address,
1223                              addresses[0].ip_address)
1224
1225         # pg6 session dump
1226         sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1227         self.assertTrue(len(sessions) >= 3)
1228         for session in sessions:
1229             self.assertTrue(session.is_static)
1230             self.assertEqual(session.inside_ip_address[0:4],
1231                              self.pg6.remote_ip4n)
1232             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1233                              map(int, static_nat_ip.split('.')))
1234             self.assertTrue(session.inside_port in
1235                             [self.tcp_port_in, self.udp_port_in,
1236                              self.icmp_id_in])
1237
1238     def test_hairpinning(self):
1239         """ SNAT hairpinning - 1:1 NAT with port"""
1240
1241         host = self.pg0.remote_hosts[0]
1242         server = self.pg0.remote_hosts[1]
1243         host_in_port = 1234
1244         host_out_port = 0
1245         server_in_port = 5678
1246         server_out_port = 8765
1247
1248         self.snat_add_address(self.snat_addr)
1249         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1250         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1251                                                  is_inside=0)
1252         # add static mapping for server
1253         self.snat_add_static_mapping(server.ip4, self.snat_addr,
1254                                      server_in_port, server_out_port,
1255                                      proto=IP_PROTOS.tcp)
1256
1257         # send packet from host to server
1258         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1259              IP(src=host.ip4, dst=self.snat_addr) /
1260              TCP(sport=host_in_port, dport=server_out_port))
1261         self.pg0.add_stream(p)
1262         self.pg_enable_capture(self.pg_interfaces)
1263         self.pg_start()
1264         capture = self.pg0.get_capture(1)
1265         p = capture[0]
1266         try:
1267             ip = p[IP]
1268             tcp = p[TCP]
1269             self.assertEqual(ip.src, self.snat_addr)
1270             self.assertEqual(ip.dst, server.ip4)
1271             self.assertNotEqual(tcp.sport, host_in_port)
1272             self.assertEqual(tcp.dport, server_in_port)
1273             self.check_tcp_checksum(p)
1274             host_out_port = tcp.sport
1275         except:
1276             self.logger.error(ppp("Unexpected or invalid packet:", p))
1277             raise
1278
1279         # send reply from server to host
1280         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1281              IP(src=server.ip4, dst=self.snat_addr) /
1282              TCP(sport=server_in_port, dport=host_out_port))
1283         self.pg0.add_stream(p)
1284         self.pg_enable_capture(self.pg_interfaces)
1285         self.pg_start()
1286         capture = self.pg0.get_capture(1)
1287         p = capture[0]
1288         try:
1289             ip = p[IP]
1290             tcp = p[TCP]
1291             self.assertEqual(ip.src, self.snat_addr)
1292             self.assertEqual(ip.dst, host.ip4)
1293             self.assertEqual(tcp.sport, server_out_port)
1294             self.assertEqual(tcp.dport, host_in_port)
1295             self.check_tcp_checksum(p)
1296         except:
1297             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1298             raise
1299
1300     def test_hairpinning2(self):
1301         """ SNAT hairpinning - 1:1 NAT"""
1302
1303         server1_nat_ip = "10.0.0.10"
1304         server2_nat_ip = "10.0.0.11"
1305         host = self.pg0.remote_hosts[0]
1306         server1 = self.pg0.remote_hosts[1]
1307         server2 = self.pg0.remote_hosts[2]
1308         server_tcp_port = 22
1309         server_udp_port = 20
1310
1311         self.snat_add_address(self.snat_addr)
1312         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1313         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1314                                                  is_inside=0)
1315
1316         # add static mapping for servers
1317         self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
1318         self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
1319
1320         # host to server1
1321         pkts = []
1322         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1323              IP(src=host.ip4, dst=server1_nat_ip) /
1324              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1325         pkts.append(p)
1326         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1327              IP(src=host.ip4, dst=server1_nat_ip) /
1328              UDP(sport=self.udp_port_in, dport=server_udp_port))
1329         pkts.append(p)
1330         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1331              IP(src=host.ip4, dst=server1_nat_ip) /
1332              ICMP(id=self.icmp_id_in, type='echo-request'))
1333         pkts.append(p)
1334         self.pg0.add_stream(pkts)
1335         self.pg_enable_capture(self.pg_interfaces)
1336         self.pg_start()
1337         capture = self.pg0.get_capture(len(pkts))
1338         for packet in capture:
1339             try:
1340                 self.assertEqual(packet[IP].src, self.snat_addr)
1341                 self.assertEqual(packet[IP].dst, server1.ip4)
1342                 if packet.haslayer(TCP):
1343                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1344                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1345                     self.tcp_port_out = packet[TCP].sport
1346                     self.check_tcp_checksum(packet)
1347                 elif packet.haslayer(UDP):
1348                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1349                     self.assertEqual(packet[UDP].dport, server_udp_port)
1350                     self.udp_port_out = packet[UDP].sport
1351                 else:
1352                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1353                     self.icmp_id_out = packet[ICMP].id
1354             except:
1355                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1356                 raise
1357
1358         # server1 to host
1359         pkts = []
1360         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1361              IP(src=server1.ip4, dst=self.snat_addr) /
1362              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1363         pkts.append(p)
1364         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1365              IP(src=server1.ip4, dst=self.snat_addr) /
1366              UDP(sport=server_udp_port, dport=self.udp_port_out))
1367         pkts.append(p)
1368         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1369              IP(src=server1.ip4, dst=self.snat_addr) /
1370              ICMP(id=self.icmp_id_out, type='echo-reply'))
1371         pkts.append(p)
1372         self.pg0.add_stream(pkts)
1373         self.pg_enable_capture(self.pg_interfaces)
1374         self.pg_start()
1375         capture = self.pg0.get_capture(len(pkts))
1376         for packet in capture:
1377             try:
1378                 self.assertEqual(packet[IP].src, server1_nat_ip)
1379                 self.assertEqual(packet[IP].dst, host.ip4)
1380                 if packet.haslayer(TCP):
1381                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1382                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1383                     self.check_tcp_checksum(packet)
1384                 elif packet.haslayer(UDP):
1385                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1386                     self.assertEqual(packet[UDP].sport, server_udp_port)
1387                 else:
1388                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1389             except:
1390                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1391                 raise
1392
1393         # server2 to server1
1394         pkts = []
1395         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1396              IP(src=server2.ip4, dst=server1_nat_ip) /
1397              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1398         pkts.append(p)
1399         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1400              IP(src=server2.ip4, dst=server1_nat_ip) /
1401              UDP(sport=self.udp_port_in, dport=server_udp_port))
1402         pkts.append(p)
1403         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1404              IP(src=server2.ip4, dst=server1_nat_ip) /
1405              ICMP(id=self.icmp_id_in, type='echo-request'))
1406         pkts.append(p)
1407         self.pg0.add_stream(pkts)
1408         self.pg_enable_capture(self.pg_interfaces)
1409         self.pg_start()
1410         capture = self.pg0.get_capture(len(pkts))
1411         for packet in capture:
1412             try:
1413                 self.assertEqual(packet[IP].src, server2_nat_ip)
1414                 self.assertEqual(packet[IP].dst, server1.ip4)
1415                 if packet.haslayer(TCP):
1416                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1417                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1418                     self.tcp_port_out = packet[TCP].sport
1419                     self.check_tcp_checksum(packet)
1420                 elif packet.haslayer(UDP):
1421                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1422                     self.assertEqual(packet[UDP].dport, server_udp_port)
1423                     self.udp_port_out = packet[UDP].sport
1424                 else:
1425                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1426                     self.icmp_id_out = packet[ICMP].id
1427             except:
1428                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1429                 raise
1430
1431         # server1 to server2
1432         pkts = []
1433         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1434              IP(src=server1.ip4, dst=server2_nat_ip) /
1435              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1436         pkts.append(p)
1437         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1438              IP(src=server1.ip4, dst=server2_nat_ip) /
1439              UDP(sport=server_udp_port, dport=self.udp_port_out))
1440         pkts.append(p)
1441         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1442              IP(src=server1.ip4, dst=server2_nat_ip) /
1443              ICMP(id=self.icmp_id_out, type='echo-reply'))
1444         pkts.append(p)
1445         self.pg0.add_stream(pkts)
1446         self.pg_enable_capture(self.pg_interfaces)
1447         self.pg_start()
1448         capture = self.pg0.get_capture(len(pkts))
1449         for packet in capture:
1450             try:
1451                 self.assertEqual(packet[IP].src, server1_nat_ip)
1452                 self.assertEqual(packet[IP].dst, server2.ip4)
1453                 if packet.haslayer(TCP):
1454                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1455                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1456                     self.check_tcp_checksum(packet)
1457                 elif packet.haslayer(UDP):
1458                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1459                     self.assertEqual(packet[UDP].sport, server_udp_port)
1460                 else:
1461                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1462             except:
1463                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1464                 raise
1465
1466     def test_max_translations_per_user(self):
1467         """ MAX translations per user - recycle the least recently used """
1468
1469         self.snat_add_address(self.snat_addr)
1470         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1471         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1472                                                  is_inside=0)
1473
1474         # get maximum number of translations per user
1475         snat_config = self.vapi.snat_show_config()
1476
1477         # send more than maximum number of translations per user packets
1478         pkts_num = snat_config.max_translations_per_user + 5
1479         pkts = []
1480         for port in range(0, pkts_num):
1481             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1482                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1483                  TCP(sport=1025 + port))
1484             pkts.append(p)
1485         self.pg0.add_stream(pkts)
1486         self.pg_enable_capture(self.pg_interfaces)
1487         self.pg_start()
1488
1489         # verify number of translated packet
1490         self.pg1.get_capture(pkts_num)
1491
1492     def test_interface_addr(self):
1493         """ Acquire SNAT addresses from interface """
1494         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1495
1496         # no address in NAT pool
1497         adresses = self.vapi.snat_address_dump()
1498         self.assertEqual(0, len(adresses))
1499
1500         # configure interface address and check NAT address pool
1501         self.pg7.config_ip4()
1502         adresses = self.vapi.snat_address_dump()
1503         self.assertEqual(1, len(adresses))
1504         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1505
1506         # remove interface address and check NAT address pool
1507         self.pg7.unconfig_ip4()
1508         adresses = self.vapi.snat_address_dump()
1509         self.assertEqual(0, len(adresses))
1510
1511     def test_interface_addr_static_mapping(self):
1512         """ Static mapping with addresses from interface """
1513         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1514         self.snat_add_static_mapping('1.2.3.4',
1515                                      external_sw_if_index=self.pg7.sw_if_index)
1516
1517         # static mappings with external interface
1518         static_mappings = self.vapi.snat_static_mapping_dump()
1519         self.assertEqual(1, len(static_mappings))
1520         self.assertEqual(self.pg7.sw_if_index,
1521                          static_mappings[0].external_sw_if_index)
1522
1523         # configure interface address and check static mappings
1524         self.pg7.config_ip4()
1525         static_mappings = self.vapi.snat_static_mapping_dump()
1526         self.assertEqual(1, len(static_mappings))
1527         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1528                          self.pg7.local_ip4n)
1529         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1530
1531         # remove interface address and check static mappings
1532         self.pg7.unconfig_ip4()
1533         static_mappings = self.vapi.snat_static_mapping_dump()
1534         self.assertEqual(0, len(static_mappings))
1535
1536     def test_ipfix_nat44_sess(self):
1537         """ S-NAT IPFIX logging NAT44 session created/delted """
1538         self.ipfix_domain_id = 10
1539         self.ipfix_src_port = 20202
1540         colector_port = 30303
1541         bind_layers(UDP, IPFIX, dport=30303)
1542         self.snat_add_address(self.snat_addr)
1543         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1544         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1545                                                  is_inside=0)
1546         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1547                                      src_address=self.pg3.local_ip4n,
1548                                      path_mtu=512,
1549                                      template_interval=10,
1550                                      collector_port=colector_port)
1551         self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1552                              src_port=self.ipfix_src_port)
1553
1554         pkts = self.create_stream_in(self.pg0, self.pg1)
1555         self.pg0.add_stream(pkts)
1556         self.pg_enable_capture(self.pg_interfaces)
1557         self.pg_start()
1558         capture = self.pg1.get_capture(len(pkts))
1559         self.verify_capture_out(capture)
1560         self.snat_add_address(self.snat_addr, is_add=0)
1561         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1562         capture = self.pg3.get_capture(3)
1563         ipfix = IPFIXDecoder()
1564         # first load template
1565         for p in capture:
1566             self.assertTrue(p.haslayer(IPFIX))
1567             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1568             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1569             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1570             self.assertEqual(p[UDP].dport, colector_port)
1571             self.assertEqual(p[IPFIX].observationDomainID,
1572                              self.ipfix_domain_id)
1573             if p.haslayer(Template):
1574                 ipfix.add_template(p.getlayer(Template))
1575         # verify events in data set
1576         for p in capture:
1577             if p.haslayer(Data):
1578                 data = ipfix.decode_data_set(p.getlayer(Set))
1579                 self.verify_ipfix_nat44_ses(data)
1580
1581     def test_ipfix_addr_exhausted(self):
1582         """ S-NAT IPFIX logging NAT addresses exhausted """
1583         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1584         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1585                                                  is_inside=0)
1586         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1587                                      src_address=self.pg3.local_ip4n,
1588                                      path_mtu=512,
1589                                      template_interval=10)
1590         self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1591                              src_port=self.ipfix_src_port)
1592
1593         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1594              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1595              TCP(sport=3025))
1596         self.pg0.add_stream(p)
1597         self.pg_enable_capture(self.pg_interfaces)
1598         self.pg_start()
1599         capture = self.pg1.get_capture(0)
1600         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1601         capture = self.pg3.get_capture(3)
1602         ipfix = IPFIXDecoder()
1603         # first load template
1604         for p in capture:
1605             self.assertTrue(p.haslayer(IPFIX))
1606             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1607             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1608             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1609             self.assertEqual(p[UDP].dport, 4739)
1610             self.assertEqual(p[IPFIX].observationDomainID,
1611                              self.ipfix_domain_id)
1612             if p.haslayer(Template):
1613                 ipfix.add_template(p.getlayer(Template))
1614         # verify events in data set
1615         for p in capture:
1616             if p.haslayer(Data):
1617                 data = ipfix.decode_data_set(p.getlayer(Set))
1618                 self.verify_ipfix_addr_exhausted(data)
1619
1620     def test_pool_addr_fib(self):
1621         """ S-NAT add pool addresses to FIB """
1622         static_addr = '10.0.0.10'
1623         self.snat_add_address(self.snat_addr)
1624         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1625         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1626                                                  is_inside=0)
1627         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1628
1629         # SNAT address
1630         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1631              ARP(op=ARP.who_has, pdst=self.snat_addr,
1632                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1633         self.pg1.add_stream(p)
1634         self.pg_enable_capture(self.pg_interfaces)
1635         self.pg_start()
1636         capture = self.pg1.get_capture(1)
1637         self.assertTrue(capture[0].haslayer(ARP))
1638         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1639
1640         # 1:1 NAT address
1641         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1642              ARP(op=ARP.who_has, pdst=static_addr,
1643                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1644         self.pg1.add_stream(p)
1645         self.pg_enable_capture(self.pg_interfaces)
1646         self.pg_start()
1647         capture = self.pg1.get_capture(1)
1648         self.assertTrue(capture[0].haslayer(ARP))
1649         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1650
1651         # send ARP to non-SNAT interface
1652         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1653              ARP(op=ARP.who_has, pdst=self.snat_addr,
1654                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1655         self.pg2.add_stream(p)
1656         self.pg_enable_capture(self.pg_interfaces)
1657         self.pg_start()
1658         capture = self.pg1.get_capture(0)
1659
1660         # remove addresses and verify
1661         self.snat_add_address(self.snat_addr, is_add=0)
1662         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1663                                      is_add=0)
1664
1665         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1666              ARP(op=ARP.who_has, pdst=self.snat_addr,
1667                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1668         self.pg1.add_stream(p)
1669         self.pg_enable_capture(self.pg_interfaces)
1670         self.pg_start()
1671         capture = self.pg1.get_capture(0)
1672
1673         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1674              ARP(op=ARP.who_has, pdst=static_addr,
1675                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1676         self.pg1.add_stream(p)
1677         self.pg_enable_capture(self.pg_interfaces)
1678         self.pg_start()
1679         capture = self.pg1.get_capture(0)
1680
1681     def test_vrf_mode(self):
1682         """ S-NAT tenant VRF aware address pool mode """
1683
1684         vrf_id1 = 1
1685         vrf_id2 = 2
1686         nat_ip1 = "10.0.0.10"
1687         nat_ip2 = "10.0.0.11"
1688
1689         self.pg0.unconfig_ip4()
1690         self.pg1.unconfig_ip4()
1691         self.pg0.set_table_ip4(vrf_id1)
1692         self.pg1.set_table_ip4(vrf_id2)
1693         self.pg0.config_ip4()
1694         self.pg1.config_ip4()
1695
1696         self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1697         self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1698         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1699         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1700         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1701                                                  is_inside=0)
1702
1703         # first VRF
1704         pkts = self.create_stream_in(self.pg0, self.pg2)
1705         self.pg0.add_stream(pkts)
1706         self.pg_enable_capture(self.pg_interfaces)
1707         self.pg_start()
1708         capture = self.pg2.get_capture(len(pkts))
1709         self.verify_capture_out(capture, nat_ip1)
1710
1711         # second VRF
1712         pkts = self.create_stream_in(self.pg1, self.pg2)
1713         self.pg1.add_stream(pkts)
1714         self.pg_enable_capture(self.pg_interfaces)
1715         self.pg_start()
1716         capture = self.pg2.get_capture(len(pkts))
1717         self.verify_capture_out(capture, nat_ip2)
1718
1719     def test_vrf_feature_independent(self):
1720         """ S-NAT tenant VRF independent address pool mode """
1721
1722         nat_ip1 = "10.0.0.10"
1723         nat_ip2 = "10.0.0.11"
1724
1725         self.snat_add_address(nat_ip1)
1726         self.snat_add_address(nat_ip2)
1727         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1728         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1729         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1730                                                  is_inside=0)
1731
1732         # first VRF
1733         pkts = self.create_stream_in(self.pg0, self.pg2)
1734         self.pg0.add_stream(pkts)
1735         self.pg_enable_capture(self.pg_interfaces)
1736         self.pg_start()
1737         capture = self.pg2.get_capture(len(pkts))
1738         self.verify_capture_out(capture, nat_ip1)
1739
1740         # second VRF
1741         pkts = self.create_stream_in(self.pg1, self.pg2)
1742         self.pg1.add_stream(pkts)
1743         self.pg_enable_capture(self.pg_interfaces)
1744         self.pg_start()
1745         capture = self.pg2.get_capture(len(pkts))
1746         self.verify_capture_out(capture, nat_ip1)
1747
1748     def test_dynamic_ipless_interfaces(self):
1749         """ SNAT interfaces without configured ip dynamic map """
1750
1751         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1752                                       self.pg7.remote_mac,
1753                                       self.pg7.remote_ip4n,
1754                                       is_static=1)
1755         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1756                                       self.pg8.remote_mac,
1757                                       self.pg8.remote_ip4n,
1758                                       is_static=1)
1759
1760         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1761                                    dst_address_length=32,
1762                                    next_hop_address=self.pg7.remote_ip4n,
1763                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1764         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1765                                    dst_address_length=32,
1766                                    next_hop_address=self.pg8.remote_ip4n,
1767                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1768
1769         self.snat_add_address(self.snat_addr)
1770         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1771         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1772                                                  is_inside=0)
1773
1774         # in2out
1775         pkts = self.create_stream_in(self.pg7, self.pg8)
1776         self.pg7.add_stream(pkts)
1777         self.pg_enable_capture(self.pg_interfaces)
1778         self.pg_start()
1779         capture = self.pg8.get_capture(len(pkts))
1780         self.verify_capture_out(capture)
1781
1782         # out2in
1783         pkts = self.create_stream_out(self.pg8, self.snat_addr)
1784         self.pg8.add_stream(pkts)
1785         self.pg_enable_capture(self.pg_interfaces)
1786         self.pg_start()
1787         capture = self.pg7.get_capture(len(pkts))
1788         self.verify_capture_in(capture, self.pg7)
1789
1790     def test_static_ipless_interfaces(self):
1791         """ SNAT 1:1 NAT interfaces without configured ip """
1792
1793         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1794                                       self.pg7.remote_mac,
1795                                       self.pg7.remote_ip4n,
1796                                       is_static=1)
1797         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1798                                       self.pg8.remote_mac,
1799                                       self.pg8.remote_ip4n,
1800                                       is_static=1)
1801
1802         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1803                                    dst_address_length=32,
1804                                    next_hop_address=self.pg7.remote_ip4n,
1805                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1806         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1807                                    dst_address_length=32,
1808                                    next_hop_address=self.pg8.remote_ip4n,
1809                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1810
1811         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1812         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1813         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1814                                                  is_inside=0)
1815
1816         # out2in
1817         pkts = self.create_stream_out(self.pg8)
1818         self.pg8.add_stream(pkts)
1819         self.pg_enable_capture(self.pg_interfaces)
1820         self.pg_start()
1821         capture = self.pg7.get_capture(len(pkts))
1822         self.verify_capture_in(capture, self.pg7)
1823
1824         # in2out
1825         pkts = self.create_stream_in(self.pg7, self.pg8)
1826         self.pg7.add_stream(pkts)
1827         self.pg_enable_capture(self.pg_interfaces)
1828         self.pg_start()
1829         capture = self.pg8.get_capture(len(pkts))
1830         self.verify_capture_out(capture, self.snat_addr, True)
1831
1832     def test_static_with_port_ipless_interfaces(self):
1833         """ SNAT 1:1 NAT with port interfaces without configured ip """
1834
1835         self.tcp_port_out = 30606
1836         self.udp_port_out = 30607
1837         self.icmp_id_out = 30608
1838
1839         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1840                                       self.pg7.remote_mac,
1841                                       self.pg7.remote_ip4n,
1842                                       is_static=1)
1843         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1844                                       self.pg8.remote_mac,
1845                                       self.pg8.remote_ip4n,
1846                                       is_static=1)
1847
1848         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1849                                    dst_address_length=32,
1850                                    next_hop_address=self.pg7.remote_ip4n,
1851                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1852         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1853                                    dst_address_length=32,
1854                                    next_hop_address=self.pg8.remote_ip4n,
1855                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1856
1857         self.snat_add_address(self.snat_addr)
1858         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1859                                      self.tcp_port_in, self.tcp_port_out,
1860                                      proto=IP_PROTOS.tcp)
1861         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1862                                      self.udp_port_in, self.udp_port_out,
1863                                      proto=IP_PROTOS.udp)
1864         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1865                                      self.icmp_id_in, self.icmp_id_out,
1866                                      proto=IP_PROTOS.icmp)
1867         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1868         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1869                                                  is_inside=0)
1870
1871         # out2in
1872         pkts = self.create_stream_out(self.pg8)
1873         self.pg8.add_stream(pkts)
1874         self.pg_enable_capture(self.pg_interfaces)
1875         self.pg_start()
1876         capture = self.pg7.get_capture(len(pkts))
1877         self.verify_capture_in(capture, self.pg7)
1878
1879         # in2out
1880         pkts = self.create_stream_in(self.pg7, self.pg8)
1881         self.pg7.add_stream(pkts)
1882         self.pg_enable_capture(self.pg_interfaces)
1883         self.pg_start()
1884         capture = self.pg8.get_capture(len(pkts))
1885         self.verify_capture_out(capture)
1886
1887     def test_static_unknown_proto(self):
1888         """ 1:1 NAT translate packet with unknown protocol """
1889         nat_ip = "10.0.0.10"
1890         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1891         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1892         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1893                                                  is_inside=0)
1894
1895         # in2out
1896         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1897              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1898              GRE() /
1899              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
1900              TCP(sport=1234, dport=1234))
1901         self.pg0.add_stream(p)
1902         self.pg_enable_capture(self.pg_interfaces)
1903         self.pg_start()
1904         p = self.pg1.get_capture(1)
1905         packet = p[0]
1906         try:
1907             self.assertEqual(packet[IP].src, nat_ip)
1908             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1909             self.assertTrue(packet.haslayer(GRE))
1910             self.check_ip_checksum(packet)
1911         except:
1912             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1913             raise
1914
1915         # out2in
1916         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1917              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1918              GRE() /
1919              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
1920              TCP(sport=1234, dport=1234))
1921         self.pg1.add_stream(p)
1922         self.pg_enable_capture(self.pg_interfaces)
1923         self.pg_start()
1924         p = self.pg0.get_capture(1)
1925         packet = p[0]
1926         try:
1927             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
1928             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
1929             self.assertTrue(packet.haslayer(GRE))
1930             self.check_ip_checksum(packet)
1931         except:
1932             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1933             raise
1934
1935     def test_hairpinning_unknown_proto(self):
1936         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
1937
1938         host = self.pg0.remote_hosts[0]
1939         server = self.pg0.remote_hosts[1]
1940
1941         host_nat_ip = "10.0.0.10"
1942         server_nat_ip = "10.0.0.11"
1943
1944         self.snat_add_static_mapping(host.ip4, host_nat_ip)
1945         self.snat_add_static_mapping(server.ip4, server_nat_ip)
1946         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1947         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1948                                                  is_inside=0)
1949
1950         # host to server
1951         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
1952              IP(src=host.ip4, dst=server_nat_ip) /
1953              GRE() /
1954              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
1955              TCP(sport=1234, dport=1234))
1956         self.pg0.add_stream(p)
1957         self.pg_enable_capture(self.pg_interfaces)
1958         self.pg_start()
1959         p = self.pg0.get_capture(1)
1960         packet = p[0]
1961         try:
1962             self.assertEqual(packet[IP].src, host_nat_ip)
1963             self.assertEqual(packet[IP].dst, server.ip4)
1964             self.assertTrue(packet.haslayer(GRE))
1965             self.check_ip_checksum(packet)
1966         except:
1967             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1968             raise
1969
1970         # server to host
1971         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
1972              IP(src=server.ip4, dst=host_nat_ip) /
1973              GRE() /
1974              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
1975              TCP(sport=1234, dport=1234))
1976         self.pg0.add_stream(p)
1977         self.pg_enable_capture(self.pg_interfaces)
1978         self.pg_start()
1979         p = self.pg0.get_capture(1)
1980         packet = p[0]
1981         try:
1982             self.assertEqual(packet[IP].src, server_nat_ip)
1983             self.assertEqual(packet[IP].dst, host.ip4)
1984             self.assertTrue(packet.haslayer(GRE))
1985             self.check_ip_checksum(packet)
1986         except:
1987             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1988             raise
1989
1990     def tearDown(self):
1991         super(TestSNAT, self).tearDown()
1992         if not self.vpp_dead:
1993             self.logger.info(self.vapi.cli("show snat verbose"))
1994             self.clear_snat()
1995
1996
1997 class TestDeterministicNAT(MethodHolder):
1998     """ Deterministic NAT Test Cases """
1999
2000     @classmethod
2001     def setUpConstants(cls):
2002         super(TestDeterministicNAT, cls).setUpConstants()
2003         cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
2004
2005     @classmethod
2006     def setUpClass(cls):
2007         super(TestDeterministicNAT, cls).setUpClass()
2008
2009         try:
2010             cls.tcp_port_in = 6303
2011             cls.tcp_external_port = 6303
2012             cls.udp_port_in = 6304
2013             cls.udp_external_port = 6304
2014             cls.icmp_id_in = 6305
2015             cls.snat_addr = '10.0.0.3'
2016
2017             cls.create_pg_interfaces(range(3))
2018             cls.interfaces = list(cls.pg_interfaces)
2019
2020             for i in cls.interfaces:
2021                 i.admin_up()
2022                 i.config_ip4()
2023                 i.resolve_arp()
2024
2025             cls.pg0.generate_remote_hosts(2)
2026             cls.pg0.configure_ipv4_neighbors()
2027
2028         except Exception:
2029             super(TestDeterministicNAT, cls).tearDownClass()
2030             raise
2031
2032     def create_stream_in(self, in_if, out_if, ttl=64):
2033         """
2034         Create packet stream for inside network
2035
2036         :param in_if: Inside interface
2037         :param out_if: Outside interface
2038         :param ttl: TTL of generated packets
2039         """
2040         pkts = []
2041         # TCP
2042         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2043              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2044              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2045         pkts.append(p)
2046
2047         # UDP
2048         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2049              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2050              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2051         pkts.append(p)
2052
2053         # ICMP
2054         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2055              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2056              ICMP(id=self.icmp_id_in, type='echo-request'))
2057         pkts.append(p)
2058
2059         return pkts
2060
2061     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2062         """
2063         Create packet stream for outside network
2064
2065         :param out_if: Outside interface
2066         :param dst_ip: Destination IP address (Default use global SNAT address)
2067         :param ttl: TTL of generated packets
2068         """
2069         if dst_ip is None:
2070             dst_ip = self.snat_addr
2071         pkts = []
2072         # TCP
2073         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2074              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2075              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2076         pkts.append(p)
2077
2078         # UDP
2079         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2080              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2081              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2082         pkts.append(p)
2083
2084         # ICMP
2085         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2086              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2087              ICMP(id=self.icmp_external_id, type='echo-reply'))
2088         pkts.append(p)
2089
2090         return pkts
2091
2092     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2093         """
2094         Verify captured packets on outside network
2095
2096         :param capture: Captured packets
2097         :param nat_ip: Translated IP address (Default use global SNAT address)
2098         :param same_port: Sorce port number is not translated (Default False)
2099         :param packet_num: Expected number of packets (Default 3)
2100         """
2101         if nat_ip is None:
2102             nat_ip = self.snat_addr
2103         self.assertEqual(packet_num, len(capture))
2104         for packet in capture:
2105             try:
2106                 self.assertEqual(packet[IP].src, nat_ip)
2107                 if packet.haslayer(TCP):
2108                     self.tcp_port_out = packet[TCP].sport
2109                 elif packet.haslayer(UDP):
2110                     self.udp_port_out = packet[UDP].sport
2111                 else:
2112                     self.icmp_external_id = packet[ICMP].id
2113             except:
2114                 self.logger.error(ppp("Unexpected or invalid packet "
2115                                       "(outside network):", packet))
2116                 raise
2117
2118     def initiate_tcp_session(self, in_if, out_if):
2119         """
2120         Initiates TCP session
2121
2122         :param in_if: Inside interface
2123         :param out_if: Outside interface
2124         """
2125         try:
2126             # SYN packet in->out
2127             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2128                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2129                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2130                      flags="S"))
2131             in_if.add_stream(p)
2132             self.pg_enable_capture(self.pg_interfaces)
2133             self.pg_start()
2134             capture = out_if.get_capture(1)
2135             p = capture[0]
2136             self.tcp_port_out = p[TCP].sport
2137
2138             # SYN + ACK packet out->in
2139             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2140                  IP(src=out_if.remote_ip4, dst=self.snat_addr) /
2141                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2142                      flags="SA"))
2143             out_if.add_stream(p)
2144             self.pg_enable_capture(self.pg_interfaces)
2145             self.pg_start()
2146             in_if.get_capture(1)
2147
2148             # ACK packet in->out
2149             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2150                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2151                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2152                      flags="A"))
2153             in_if.add_stream(p)
2154             self.pg_enable_capture(self.pg_interfaces)
2155             self.pg_start()
2156             out_if.get_capture(1)
2157
2158         except:
2159             self.logger.error("TCP 3 way handshake failed")
2160             raise
2161
2162     def verify_ipfix_max_entries_per_user(self, data):
2163         """
2164         Verify IPFIX maximum entries per user exceeded event
2165
2166         :param data: Decoded IPFIX data records
2167         """
2168         self.assertEqual(1, len(data))
2169         record = data[0]
2170         # natEvent
2171         self.assertEqual(ord(record[230]), 13)
2172         # natQuotaExceededEvent
2173         self.assertEqual('\x03\x00\x00\x00', record[466])
2174         # sourceIPv4Address
2175         self.assertEqual(self.pg0.remote_ip4n, record[8])
2176
2177     def test_deterministic_mode(self):
2178         """ S-NAT run deterministic mode """
2179         in_addr = '172.16.255.0'
2180         out_addr = '172.17.255.50'
2181         in_addr_t = '172.16.255.20'
2182         in_addr_n = socket.inet_aton(in_addr)
2183         out_addr_n = socket.inet_aton(out_addr)
2184         in_addr_t_n = socket.inet_aton(in_addr_t)
2185         in_plen = 24
2186         out_plen = 32
2187
2188         snat_config = self.vapi.snat_show_config()
2189         self.assertEqual(1, snat_config.deterministic)
2190
2191         self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
2192
2193         rep1 = self.vapi.snat_det_forward(in_addr_t_n)
2194         self.assertEqual(rep1.out_addr[:4], out_addr_n)
2195         rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
2196         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2197
2198         deterministic_mappings = self.vapi.snat_det_map_dump()
2199         self.assertEqual(len(deterministic_mappings), 1)
2200         dsm = deterministic_mappings[0]
2201         self.assertEqual(in_addr_n, dsm.in_addr[:4])
2202         self.assertEqual(in_plen, dsm.in_plen)
2203         self.assertEqual(out_addr_n, dsm.out_addr[:4])
2204         self.assertEqual(out_plen, dsm.out_plen)
2205
2206         self.clear_snat()
2207         deterministic_mappings = self.vapi.snat_det_map_dump()
2208         self.assertEqual(len(deterministic_mappings), 0)
2209
2210     def test_set_timeouts(self):
2211         """ Set deterministic NAT timeouts """
2212         timeouts_before = self.vapi.snat_det_get_timeouts()
2213
2214         self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
2215                                         timeouts_before.tcp_established + 10,
2216                                         timeouts_before.tcp_transitory + 10,
2217                                         timeouts_before.icmp + 10)
2218
2219         timeouts_after = self.vapi.snat_det_get_timeouts()
2220
2221         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2222         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2223         self.assertNotEqual(timeouts_before.tcp_established,
2224                             timeouts_after.tcp_established)
2225         self.assertNotEqual(timeouts_before.tcp_transitory,
2226                             timeouts_after.tcp_transitory)
2227
2228     def test_det_in(self):
2229         """ CGNAT translation test (TCP, UDP, ICMP) """
2230
2231         nat_ip = "10.0.0.10"
2232
2233         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2234                                    32,
2235                                    socket.inet_aton(nat_ip),
2236                                    32)
2237         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2238         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2239                                                  is_inside=0)
2240
2241         # in2out
2242         pkts = self.create_stream_in(self.pg0, self.pg1)
2243         self.pg0.add_stream(pkts)
2244         self.pg_enable_capture(self.pg_interfaces)
2245         self.pg_start()
2246         capture = self.pg1.get_capture(len(pkts))
2247         self.verify_capture_out(capture, nat_ip)
2248
2249         # out2in
2250         pkts = self.create_stream_out(self.pg1, nat_ip)
2251         self.pg1.add_stream(pkts)
2252         self.pg_enable_capture(self.pg_interfaces)
2253         self.pg_start()
2254         capture = self.pg0.get_capture(len(pkts))
2255         self.verify_capture_in(capture, self.pg0)
2256
2257         # session dump test
2258         sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
2259         self.assertEqual(len(sessions), 3)
2260
2261         # TCP session
2262         s = sessions[0]
2263         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2264         self.assertEqual(s.in_port, self.tcp_port_in)
2265         self.assertEqual(s.out_port, self.tcp_port_out)
2266         self.assertEqual(s.ext_port, self.tcp_external_port)
2267
2268         # UDP session
2269         s = sessions[1]
2270         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2271         self.assertEqual(s.in_port, self.udp_port_in)
2272         self.assertEqual(s.out_port, self.udp_port_out)
2273         self.assertEqual(s.ext_port, self.udp_external_port)
2274
2275         # ICMP session
2276         s = sessions[2]
2277         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2278         self.assertEqual(s.in_port, self.icmp_id_in)
2279         self.assertEqual(s.out_port, self.icmp_external_id)
2280
2281     def test_multiple_users(self):
2282         """ CGNAT multiple users """
2283
2284         nat_ip = "10.0.0.10"
2285         port_in = 80
2286         external_port = 6303
2287
2288         host0 = self.pg0.remote_hosts[0]
2289         host1 = self.pg0.remote_hosts[1]
2290
2291         self.vapi.snat_add_det_map(host0.ip4n,
2292                                    24,
2293                                    socket.inet_aton(nat_ip),
2294                                    32)
2295         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2296         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2297                                                  is_inside=0)
2298
2299         # host0 to out
2300         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2301              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2302              TCP(sport=port_in, dport=external_port))
2303         self.pg0.add_stream(p)
2304         self.pg_enable_capture(self.pg_interfaces)
2305         self.pg_start()
2306         capture = self.pg1.get_capture(1)
2307         p = capture[0]
2308         try:
2309             ip = p[IP]
2310             tcp = p[TCP]
2311             self.assertEqual(ip.src, nat_ip)
2312             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2313             self.assertEqual(tcp.dport, external_port)
2314             port_out0 = tcp.sport
2315         except:
2316             self.logger.error(ppp("Unexpected or invalid packet:", p))
2317             raise
2318
2319         # host1 to out
2320         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2321              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2322              TCP(sport=port_in, dport=external_port))
2323         self.pg0.add_stream(p)
2324         self.pg_enable_capture(self.pg_interfaces)
2325         self.pg_start()
2326         capture = self.pg1.get_capture(1)
2327         p = capture[0]
2328         try:
2329             ip = p[IP]
2330             tcp = p[TCP]
2331             self.assertEqual(ip.src, nat_ip)
2332             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2333             self.assertEqual(tcp.dport, external_port)
2334             port_out1 = tcp.sport
2335         except:
2336             self.logger.error(ppp("Unexpected or invalid packet:", p))
2337             raise
2338
2339         dms = self.vapi.snat_det_map_dump()
2340         self.assertEqual(1, len(dms))
2341         self.assertEqual(2, dms[0].ses_num)
2342
2343         # out to host0
2344         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2345              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2346              TCP(sport=external_port, dport=port_out0))
2347         self.pg1.add_stream(p)
2348         self.pg_enable_capture(self.pg_interfaces)
2349         self.pg_start()
2350         capture = self.pg0.get_capture(1)
2351         p = capture[0]
2352         try:
2353             ip = p[IP]
2354             tcp = p[TCP]
2355             self.assertEqual(ip.src, self.pg1.remote_ip4)
2356             self.assertEqual(ip.dst, host0.ip4)
2357             self.assertEqual(tcp.dport, port_in)
2358             self.assertEqual(tcp.sport, external_port)
2359         except:
2360             self.logger.error(ppp("Unexpected or invalid packet:", p))
2361             raise
2362
2363         # out to host1
2364         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2365              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2366              TCP(sport=external_port, dport=port_out1))
2367         self.pg1.add_stream(p)
2368         self.pg_enable_capture(self.pg_interfaces)
2369         self.pg_start()
2370         capture = self.pg0.get_capture(1)
2371         p = capture[0]
2372         try:
2373             ip = p[IP]
2374             tcp = p[TCP]
2375             self.assertEqual(ip.src, self.pg1.remote_ip4)
2376             self.assertEqual(ip.dst, host1.ip4)
2377             self.assertEqual(tcp.dport, port_in)
2378             self.assertEqual(tcp.sport, external_port)
2379         except:
2380             self.logger.error(ppp("Unexpected or invalid packet", p))
2381             raise
2382
2383         # session close api test
2384         self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
2385                                              port_out1,
2386                                              self.pg1.remote_ip4n,
2387                                              external_port)
2388         dms = self.vapi.snat_det_map_dump()
2389         self.assertEqual(dms[0].ses_num, 1)
2390
2391         self.vapi.snat_det_close_session_in(host0.ip4n,
2392                                             port_in,
2393                                             self.pg1.remote_ip4n,
2394                                             external_port)
2395         dms = self.vapi.snat_det_map_dump()
2396         self.assertEqual(dms[0].ses_num, 0)
2397
2398     def test_tcp_session_close_detection_in(self):
2399         """ CGNAT TCP session close initiated from inside network """
2400         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2401                                    32,
2402                                    socket.inet_aton(self.snat_addr),
2403                                    32)
2404         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2405         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2406                                                  is_inside=0)
2407
2408         self.initiate_tcp_session(self.pg0, self.pg1)
2409
2410         # close the session from inside
2411         try:
2412             # FIN packet in -> out
2413             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2414                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2415                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2416                      flags="F"))
2417             self.pg0.add_stream(p)
2418             self.pg_enable_capture(self.pg_interfaces)
2419             self.pg_start()
2420             self.pg1.get_capture(1)
2421
2422             pkts = []
2423
2424             # ACK packet out -> in
2425             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2426                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2427                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2428                      flags="A"))
2429             pkts.append(p)
2430
2431             # FIN packet out -> in
2432             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2433                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2434                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2435                      flags="F"))
2436             pkts.append(p)
2437
2438             self.pg1.add_stream(pkts)
2439             self.pg_enable_capture(self.pg_interfaces)
2440             self.pg_start()
2441             self.pg0.get_capture(2)
2442
2443             # ACK packet in -> out
2444             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2445                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2446                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2447                      flags="A"))
2448             self.pg0.add_stream(p)
2449             self.pg_enable_capture(self.pg_interfaces)
2450             self.pg_start()
2451             self.pg1.get_capture(1)
2452
2453             # Check if snat closed the session
2454             dms = self.vapi.snat_det_map_dump()
2455             self.assertEqual(0, dms[0].ses_num)
2456         except:
2457             self.logger.error("TCP session termination failed")
2458             raise
2459
2460     def test_tcp_session_close_detection_out(self):
2461         """ CGNAT TCP session close initiated from outside network """
2462         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2463                                    32,
2464                                    socket.inet_aton(self.snat_addr),
2465                                    32)
2466         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2467         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2468                                                  is_inside=0)
2469
2470         self.initiate_tcp_session(self.pg0, self.pg1)
2471
2472         # close the session from outside
2473         try:
2474             # FIN packet out -> in
2475             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2476                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2477                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2478                      flags="F"))
2479             self.pg1.add_stream(p)
2480             self.pg_enable_capture(self.pg_interfaces)
2481             self.pg_start()
2482             self.pg0.get_capture(1)
2483
2484             pkts = []
2485
2486             # ACK packet in -> out
2487             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2488                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2489                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2490                      flags="A"))
2491             pkts.append(p)
2492
2493             # ACK packet in -> out
2494             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2495                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2496                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2497                      flags="F"))
2498             pkts.append(p)
2499
2500             self.pg0.add_stream(pkts)
2501             self.pg_enable_capture(self.pg_interfaces)
2502             self.pg_start()
2503             self.pg1.get_capture(2)
2504
2505             # ACK packet out -> in
2506             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2507                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2508                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2509                      flags="A"))
2510             self.pg1.add_stream(p)
2511             self.pg_enable_capture(self.pg_interfaces)
2512             self.pg_start()
2513             self.pg0.get_capture(1)
2514
2515             # Check if snat closed the session
2516             dms = self.vapi.snat_det_map_dump()
2517             self.assertEqual(0, dms[0].ses_num)
2518         except:
2519             self.logger.error("TCP session termination failed")
2520             raise
2521
2522     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2523     def test_session_timeout(self):
2524         """ CGNAT session timeouts """
2525         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2526                                    32,
2527                                    socket.inet_aton(self.snat_addr),
2528                                    32)
2529         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2530         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2531                                                  is_inside=0)
2532
2533         self.initiate_tcp_session(self.pg0, self.pg1)
2534         self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2535         pkts = self.create_stream_in(self.pg0, self.pg1)
2536         self.pg0.add_stream(pkts)
2537         self.pg_enable_capture(self.pg_interfaces)
2538         self.pg_start()
2539         capture = self.pg1.get_capture(len(pkts))
2540         sleep(15)
2541
2542         dms = self.vapi.snat_det_map_dump()
2543         self.assertEqual(0, dms[0].ses_num)
2544
2545     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2546     def test_session_limit_per_user(self):
2547         """ CGNAT maximum 1000 sessions per user should be created """
2548         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2549                                    32,
2550                                    socket.inet_aton(self.snat_addr),
2551                                    32)
2552         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2553         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2554                                                  is_inside=0)
2555         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2556                                      src_address=self.pg2.local_ip4n,
2557                                      path_mtu=512,
2558                                      template_interval=10)
2559         self.vapi.snat_ipfix()
2560
2561         pkts = []
2562         for port in range(1025, 2025):
2563             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2564                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2565                  UDP(sport=port, dport=port))
2566             pkts.append(p)
2567
2568         self.pg0.add_stream(pkts)
2569         self.pg_enable_capture(self.pg_interfaces)
2570         self.pg_start()
2571         capture = self.pg1.get_capture(len(pkts))
2572
2573         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2574              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2575              UDP(sport=3001, dport=3002))
2576         self.pg0.add_stream(p)
2577         self.pg_enable_capture(self.pg_interfaces)
2578         self.pg_start()
2579         capture = self.pg1.assert_nothing_captured()
2580
2581         # verify ICMP error packet
2582         capture = self.pg0.get_capture(1)
2583         p = capture[0]
2584         self.assertTrue(p.haslayer(ICMP))
2585         icmp = p[ICMP]
2586         self.assertEqual(icmp.type, 3)
2587         self.assertEqual(icmp.code, 1)
2588         self.assertTrue(icmp.haslayer(IPerror))
2589         inner_ip = icmp[IPerror]
2590         self.assertEqual(inner_ip[UDPerror].sport, 3001)
2591         self.assertEqual(inner_ip[UDPerror].dport, 3002)
2592
2593         dms = self.vapi.snat_det_map_dump()
2594
2595         self.assertEqual(1000, dms[0].ses_num)
2596
2597         # verify IPFIX logging
2598         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2599         sleep(1)
2600         capture = self.pg2.get_capture(2)
2601         ipfix = IPFIXDecoder()
2602         # first load template
2603         for p in capture:
2604             self.assertTrue(p.haslayer(IPFIX))
2605             if p.haslayer(Template):
2606                 ipfix.add_template(p.getlayer(Template))
2607         # verify events in data set
2608         for p in capture:
2609             if p.haslayer(Data):
2610                 data = ipfix.decode_data_set(p.getlayer(Set))
2611                 self.verify_ipfix_max_entries_per_user(data)
2612
2613     def clear_snat(self):
2614         """
2615         Clear SNAT configuration.
2616         """
2617         self.vapi.snat_ipfix(enable=0)
2618         self.vapi.snat_det_set_timeouts()
2619         deterministic_mappings = self.vapi.snat_det_map_dump()
2620         for dsm in deterministic_mappings:
2621             self.vapi.snat_add_det_map(dsm.in_addr,
2622                                        dsm.in_plen,
2623                                        dsm.out_addr,
2624                                        dsm.out_plen,
2625                                        is_add=0)
2626
2627         interfaces = self.vapi.snat_interface_dump()
2628         for intf in interfaces:
2629             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2630                                                      intf.is_inside,
2631                                                      is_add=0)
2632
2633     def tearDown(self):
2634         super(TestDeterministicNAT, self).tearDown()
2635         if not self.vpp_dead:
2636             self.logger.info(self.vapi.cli("show snat detail"))
2637             self.clear_snat()
2638
2639
2640 class TestNAT64(MethodHolder):
2641     """ NAT64 Test Cases """
2642
2643     @classmethod
2644     def setUpClass(cls):
2645         super(TestNAT64, cls).setUpClass()
2646
2647         try:
2648             cls.tcp_port_in = 6303
2649             cls.tcp_port_out = 6303
2650             cls.udp_port_in = 6304
2651             cls.udp_port_out = 6304
2652             cls.icmp_id_in = 6305
2653             cls.icmp_id_out = 6305
2654             cls.nat_addr = '10.0.0.3'
2655             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
2656             cls.vrf1_id = 10
2657             cls.vrf1_nat_addr = '10.0.10.3'
2658             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
2659                                                    cls.vrf1_nat_addr)
2660
2661             cls.create_pg_interfaces(range(3))
2662             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
2663             cls.ip6_interfaces.append(cls.pg_interfaces[2])
2664             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
2665
2666             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
2667
2668             cls.pg0.generate_remote_hosts(2)
2669
2670             for i in cls.ip6_interfaces:
2671                 i.admin_up()
2672                 i.config_ip6()
2673                 i.configure_ipv6_neighbors()
2674
2675             for i in cls.ip4_interfaces:
2676                 i.admin_up()
2677                 i.config_ip4()
2678                 i.resolve_arp()
2679
2680         except Exception:
2681             super(TestNAT64, cls).tearDownClass()
2682             raise
2683
2684     def test_pool(self):
2685         """ Add/delete address to NAT64 pool """
2686         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
2687
2688         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
2689
2690         addresses = self.vapi.nat64_pool_addr_dump()
2691         self.assertEqual(len(addresses), 1)
2692         self.assertEqual(addresses[0].address, nat_addr)
2693
2694         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
2695
2696         addresses = self.vapi.nat64_pool_addr_dump()
2697         self.assertEqual(len(addresses), 0)
2698
2699     def test_interface(self):
2700         """ Enable/disable NAT64 feature on the interface """
2701         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2702         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2703
2704         interfaces = self.vapi.nat64_interface_dump()
2705         self.assertEqual(len(interfaces), 2)
2706         pg0_found = False
2707         pg1_found = False
2708         for intf in interfaces:
2709             if intf.sw_if_index == self.pg0.sw_if_index:
2710                 self.assertEqual(intf.is_inside, 1)
2711                 pg0_found = True
2712             elif intf.sw_if_index == self.pg1.sw_if_index:
2713                 self.assertEqual(intf.is_inside, 0)
2714                 pg1_found = True
2715         self.assertTrue(pg0_found)
2716         self.assertTrue(pg1_found)
2717
2718         features = self.vapi.cli("show interface features pg0")
2719         self.assertNotEqual(features.find('nat64-in2out'), -1)
2720         features = self.vapi.cli("show interface features pg1")
2721         self.assertNotEqual(features.find('nat64-out2in'), -1)
2722
2723         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
2724         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
2725
2726         interfaces = self.vapi.nat64_interface_dump()
2727         self.assertEqual(len(interfaces), 0)
2728
2729     def test_static_bib(self):
2730         """ Add/delete static BIB entry """
2731         in_addr = socket.inet_pton(socket.AF_INET6,
2732                                    '2001:db8:85a3::8a2e:370:7334')
2733         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
2734         in_port = 1234
2735         out_port = 5678
2736         proto = IP_PROTOS.tcp
2737
2738         self.vapi.nat64_add_del_static_bib(in_addr,
2739                                            out_addr,
2740                                            in_port,
2741                                            out_port,
2742                                            proto)
2743         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2744         static_bib_num = 0
2745         for bibe in bib:
2746             if bibe.is_static:
2747                 static_bib_num += 1
2748                 self.assertEqual(bibe.i_addr, in_addr)
2749                 self.assertEqual(bibe.o_addr, out_addr)
2750                 self.assertEqual(bibe.i_port, in_port)
2751                 self.assertEqual(bibe.o_port, out_port)
2752         self.assertEqual(static_bib_num, 1)
2753
2754         self.vapi.nat64_add_del_static_bib(in_addr,
2755                                            out_addr,
2756                                            in_port,
2757                                            out_port,
2758                                            proto,
2759                                            is_add=0)
2760         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2761         static_bib_num = 0
2762         for bibe in bib:
2763             if bibe.is_static:
2764                 static_bib_num += 1
2765         self.assertEqual(static_bib_num, 0)
2766
2767     def test_set_timeouts(self):
2768         """ Set NAT64 timeouts """
2769         # verify default values
2770         timeouts = self.vapi.nat64_get_timeouts()
2771         self.assertEqual(timeouts.udp, 300)
2772         self.assertEqual(timeouts.icmp, 60)
2773         self.assertEqual(timeouts.tcp_trans, 240)
2774         self.assertEqual(timeouts.tcp_est, 7440)
2775         self.assertEqual(timeouts.tcp_incoming_syn, 6)
2776
2777         # set and verify custom values
2778         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
2779                                      tcp_est=7450, tcp_incoming_syn=10)
2780         timeouts = self.vapi.nat64_get_timeouts()
2781         self.assertEqual(timeouts.udp, 200)
2782         self.assertEqual(timeouts.icmp, 30)
2783         self.assertEqual(timeouts.tcp_trans, 250)
2784         self.assertEqual(timeouts.tcp_est, 7450)
2785         self.assertEqual(timeouts.tcp_incoming_syn, 10)
2786
2787     def test_dynamic(self):
2788         """ NAT64 dynamic translation test """
2789         self.tcp_port_in = 6303
2790         self.udp_port_in = 6304
2791         self.icmp_id_in = 6305
2792
2793         ses_num_start = self.nat64_get_ses_num()
2794
2795         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2796                                                 self.nat_addr_n)
2797         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2798         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2799
2800         # in2out
2801         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2802         self.pg0.add_stream(pkts)
2803         self.pg_enable_capture(self.pg_interfaces)
2804         self.pg_start()
2805         capture = self.pg1.get_capture(len(pkts))
2806         self.verify_capture_out(capture, nat_ip=self.nat_addr,
2807                                 dst_ip=self.pg1.remote_ip4)
2808
2809         # out2in
2810         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2811         self.pg1.add_stream(pkts)
2812         self.pg_enable_capture(self.pg_interfaces)
2813         self.pg_start()
2814         capture = self.pg0.get_capture(len(pkts))
2815         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2816         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2817
2818         # in2out
2819         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2820         self.pg0.add_stream(pkts)
2821         self.pg_enable_capture(self.pg_interfaces)
2822         self.pg_start()
2823         capture = self.pg1.get_capture(len(pkts))
2824         self.verify_capture_out(capture, nat_ip=self.nat_addr,
2825                                 dst_ip=self.pg1.remote_ip4)
2826
2827         # out2in
2828         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2829         self.pg1.add_stream(pkts)
2830         self.pg_enable_capture(self.pg_interfaces)
2831         self.pg_start()
2832         capture = self.pg0.get_capture(len(pkts))
2833         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2834
2835         ses_num_end = self.nat64_get_ses_num()
2836
2837         self.assertEqual(ses_num_end - ses_num_start, 3)
2838
2839         # tenant with specific VRF
2840         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
2841                                                 self.vrf1_nat_addr_n,
2842                                                 vrf_id=self.vrf1_id)
2843         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
2844
2845         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
2846         self.pg2.add_stream(pkts)
2847         self.pg_enable_capture(self.pg_interfaces)
2848         self.pg_start()
2849         capture = self.pg1.get_capture(len(pkts))
2850         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
2851                                 dst_ip=self.pg1.remote_ip4)
2852
2853         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
2854         self.pg1.add_stream(pkts)
2855         self.pg_enable_capture(self.pg_interfaces)
2856         self.pg_start()
2857         capture = self.pg2.get_capture(len(pkts))
2858         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
2859
2860     def test_static(self):
2861         """ NAT64 static translation test """
2862         self.tcp_port_in = 60303
2863         self.udp_port_in = 60304
2864         self.icmp_id_in = 60305
2865         self.tcp_port_out = 60303
2866         self.udp_port_out = 60304
2867         self.icmp_id_out = 60305
2868
2869         ses_num_start = self.nat64_get_ses_num()
2870
2871         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2872                                                 self.nat_addr_n)
2873         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2874         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2875
2876         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2877                                            self.nat_addr_n,
2878                                            self.tcp_port_in,
2879                                            self.tcp_port_out,
2880                                            IP_PROTOS.tcp)
2881         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2882                                            self.nat_addr_n,
2883                                            self.udp_port_in,
2884                                            self.udp_port_out,
2885                                            IP_PROTOS.udp)
2886         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2887                                            self.nat_addr_n,
2888                                            self.icmp_id_in,
2889                                            self.icmp_id_out,
2890                                            IP_PROTOS.icmp)
2891
2892         # in2out
2893         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2894         self.pg0.add_stream(pkts)
2895         self.pg_enable_capture(self.pg_interfaces)
2896         self.pg_start()
2897         capture = self.pg1.get_capture(len(pkts))
2898         self.verify_capture_out(capture, nat_ip=self.nat_addr,
2899                                 dst_ip=self.pg1.remote_ip4, same_port=True)
2900
2901         # out2in
2902         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2903         self.pg1.add_stream(pkts)
2904         self.pg_enable_capture(self.pg_interfaces)
2905         self.pg_start()
2906         capture = self.pg0.get_capture(len(pkts))
2907         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2908         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2909
2910         ses_num_end = self.nat64_get_ses_num()
2911
2912         self.assertEqual(ses_num_end - ses_num_start, 3)
2913
2914     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2915     def test_session_timeout(self):
2916         """ NAT64 session timeout """
2917         self.icmp_id_in = 1234
2918         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2919                                                 self.nat_addr_n)
2920         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2921         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2922         self.vapi.nat64_set_timeouts(icmp=5)
2923
2924         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2925         self.pg0.add_stream(pkts)
2926         self.pg_enable_capture(self.pg_interfaces)
2927         self.pg_start()
2928         capture = self.pg1.get_capture(len(pkts))
2929
2930         ses_num_before_timeout = self.nat64_get_ses_num()
2931
2932         sleep(15)
2933
2934         # ICMP session after timeout
2935         ses_num_after_timeout = self.nat64_get_ses_num()
2936         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
2937
2938     def test_icmp_error(self):
2939         """ NAT64 ICMP Error message translation """
2940         self.tcp_port_in = 6303
2941         self.udp_port_in = 6304
2942         self.icmp_id_in = 6305
2943
2944         ses_num_start = self.nat64_get_ses_num()
2945
2946         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2947                                                 self.nat_addr_n)
2948         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2949         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2950
2951         # send some packets to create sessions
2952         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2953         self.pg0.add_stream(pkts)
2954         self.pg_enable_capture(self.pg_interfaces)
2955         self.pg_start()
2956         capture_ip4 = self.pg1.get_capture(len(pkts))
2957         self.verify_capture_out(capture_ip4,
2958                                 nat_ip=self.nat_addr,
2959                                 dst_ip=self.pg1.remote_ip4)
2960
2961         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2962         self.pg1.add_stream(pkts)
2963         self.pg_enable_capture(self.pg_interfaces)
2964         self.pg_start()
2965         capture_ip6 = self.pg0.get_capture(len(pkts))
2966         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2967         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
2968                                    self.pg0.remote_ip6)
2969
2970         # in2out
2971         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2972                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
2973                 ICMPv6DestUnreach(code=1) /
2974                 packet[IPv6] for packet in capture_ip6]
2975         self.pg0.add_stream(pkts)
2976         self.pg_enable_capture(self.pg_interfaces)
2977         self.pg_start()
2978         capture = self.pg1.get_capture(len(pkts))
2979         for packet in capture:
2980             try:
2981                 self.assertEqual(packet[IP].src, self.nat_addr)
2982                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2983                 self.assertEqual(packet[ICMP].type, 3)
2984                 self.assertEqual(packet[ICMP].code, 13)
2985                 inner = packet[IPerror]
2986                 self.assertEqual(inner.src, self.pg1.remote_ip4)
2987                 self.assertEqual(inner.dst, self.nat_addr)
2988                 self.check_icmp_checksum(packet)
2989                 if inner.haslayer(TCPerror):
2990                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
2991                 elif inner.haslayer(UDPerror):
2992                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
2993                 else:
2994                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
2995             except:
2996                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2997                 raise
2998
2999         # out2in
3000         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3001                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3002                 ICMP(type=3, code=13) /
3003                 packet[IP] for packet in capture_ip4]
3004         self.pg1.add_stream(pkts)
3005         self.pg_enable_capture(self.pg_interfaces)
3006         self.pg_start()
3007         capture = self.pg0.get_capture(len(pkts))
3008         for packet in capture:
3009             try:
3010                 self.assertEqual(packet[IPv6].src, ip.src)
3011                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3012                 icmp = packet[ICMPv6DestUnreach]
3013                 self.assertEqual(icmp.code, 1)
3014                 inner = icmp[IPerror6]
3015                 self.assertEqual(inner.src, self.pg0.remote_ip6)
3016                 self.assertEqual(inner.dst, ip.src)
3017                 self.check_icmpv6_checksum(packet)
3018                 if inner.haslayer(TCPerror):
3019                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3020                 elif inner.haslayer(UDPerror):
3021                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3022                 else:
3023                     self.assertEqual(inner[ICMPv6EchoRequest].id,
3024                                      self.icmp_id_in)
3025             except:
3026                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3027                 raise
3028
3029     def test_hairpinning(self):
3030         """ NAT64 hairpinning """
3031
3032         client = self.pg0.remote_hosts[0]
3033         server = self.pg0.remote_hosts[1]
3034         server_tcp_in_port = 22
3035         server_tcp_out_port = 4022
3036         server_udp_in_port = 23
3037         server_udp_out_port = 4023
3038         client_tcp_in_port = 1234
3039         client_udp_in_port = 1235
3040         client_tcp_out_port = 0
3041         client_udp_out_port = 0
3042         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3043         nat_addr_ip6 = ip.src
3044
3045         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3046                                                 self.nat_addr_n)
3047         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3048         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3049
3050         self.vapi.nat64_add_del_static_bib(server.ip6n,
3051                                            self.nat_addr_n,
3052                                            server_tcp_in_port,
3053                                            server_tcp_out_port,
3054                                            IP_PROTOS.tcp)
3055         self.vapi.nat64_add_del_static_bib(server.ip6n,
3056                                            self.nat_addr_n,
3057                                            server_udp_in_port,
3058                                            server_udp_out_port,
3059                                            IP_PROTOS.udp)
3060
3061         # client to server
3062         pkts = []
3063         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3064              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3065              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3066         pkts.append(p)
3067         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3068              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3069              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3070         pkts.append(p)
3071         self.pg0.add_stream(pkts)
3072         self.pg_enable_capture(self.pg_interfaces)
3073         self.pg_start()
3074         capture = self.pg0.get_capture(len(pkts))
3075         for packet in capture:
3076             try:
3077                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3078                 self.assertEqual(packet[IPv6].dst, server.ip6)
3079                 if packet.haslayer(TCP):
3080                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3081                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3082                     self.check_tcp_checksum(packet)
3083                     client_tcp_out_port = packet[TCP].sport
3084                 else:
3085                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3086                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
3087                     self.check_udp_checksum(packet)
3088                     client_udp_out_port = packet[UDP].sport
3089             except:
3090                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3091                 raise
3092
3093         # server to client
3094         pkts = []
3095         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3096              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3097              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3098         pkts.append(p)
3099         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3100              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3101              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3102         pkts.append(p)
3103         self.pg0.add_stream(pkts)
3104         self.pg_enable_capture(self.pg_interfaces)
3105         self.pg_start()
3106         capture = self.pg0.get_capture(len(pkts))
3107         for packet in capture:
3108             try:
3109                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3110                 self.assertEqual(packet[IPv6].dst, client.ip6)
3111                 if packet.haslayer(TCP):
3112                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3113                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3114                     self.check_tcp_checksum(packet)
3115                 else:
3116                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
3117                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
3118                     self.check_udp_checksum(packet)
3119             except:
3120                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3121                 raise
3122
3123         # ICMP error
3124         pkts = []
3125         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3126                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3127                 ICMPv6DestUnreach(code=1) /
3128                 packet[IPv6] for packet in capture]
3129         self.pg0.add_stream(pkts)
3130         self.pg_enable_capture(self.pg_interfaces)
3131         self.pg_start()
3132         capture = self.pg0.get_capture(len(pkts))
3133         for packet in capture:
3134             try:
3135                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3136                 self.assertEqual(packet[IPv6].dst, server.ip6)
3137                 icmp = packet[ICMPv6DestUnreach]
3138                 self.assertEqual(icmp.code, 1)
3139                 inner = icmp[IPerror6]
3140                 self.assertEqual(inner.src, server.ip6)
3141                 self.assertEqual(inner.dst, nat_addr_ip6)
3142                 self.check_icmpv6_checksum(packet)
3143                 if inner.haslayer(TCPerror):
3144                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3145                     self.assertEqual(inner[TCPerror].dport,
3146                                      client_tcp_out_port)
3147                 else:
3148                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3149                     self.assertEqual(inner[UDPerror].dport,
3150                                      client_udp_out_port)
3151             except:
3152                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3153                 raise
3154
3155     def test_prefix(self):
3156         """ NAT64 Network-Specific Prefix """
3157
3158         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3159                                                 self.nat_addr_n)
3160         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3161         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3162         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3163                                                 self.vrf1_nat_addr_n,
3164                                                 vrf_id=self.vrf1_id)
3165         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3166
3167         # Add global prefix
3168         global_pref64 = "2001:db8::"
3169         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3170         global_pref64_len = 32
3171         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3172
3173         prefix = self.vapi.nat64_prefix_dump()
3174         self.assertEqual(len(prefix), 1)
3175         self.assertEqual(prefix[0].prefix, global_pref64_n)
3176         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3177         self.assertEqual(prefix[0].vrf_id, 0)
3178
3179         # Add tenant specific prefix
3180         vrf1_pref64 = "2001:db8:122:300::"
3181         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3182         vrf1_pref64_len = 56
3183         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3184                                        vrf1_pref64_len,
3185                                        vrf_id=self.vrf1_id)
3186         prefix = self.vapi.nat64_prefix_dump()
3187         self.assertEqual(len(prefix), 2)
3188
3189         # Global prefix
3190         pkts = self.create_stream_in_ip6(self.pg0,
3191                                          self.pg1,
3192                                          pref=global_pref64,
3193                                          plen=global_pref64_len)
3194         self.pg0.add_stream(pkts)
3195         self.pg_enable_capture(self.pg_interfaces)
3196         self.pg_start()
3197         capture = self.pg1.get_capture(len(pkts))
3198         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3199                                 dst_ip=self.pg1.remote_ip4)
3200
3201         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3202         self.pg1.add_stream(pkts)
3203         self.pg_enable_capture(self.pg_interfaces)
3204         self.pg_start()
3205         capture = self.pg0.get_capture(len(pkts))
3206         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3207                                   global_pref64,
3208                                   global_pref64_len)
3209         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3210
3211         # Tenant specific prefix
3212         pkts = self.create_stream_in_ip6(self.pg2,
3213                                          self.pg1,
3214                                          pref=vrf1_pref64,
3215                                          plen=vrf1_pref64_len)
3216         self.pg2.add_stream(pkts)
3217         self.pg_enable_capture(self.pg_interfaces)
3218         self.pg_start()
3219         capture = self.pg1.get_capture(len(pkts))
3220         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3221                                 dst_ip=self.pg1.remote_ip4)
3222
3223         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3224         self.pg1.add_stream(pkts)
3225         self.pg_enable_capture(self.pg_interfaces)
3226         self.pg_start()
3227         capture = self.pg2.get_capture(len(pkts))
3228         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3229                                   vrf1_pref64,
3230                                   vrf1_pref64_len)
3231         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3232
3233     def nat64_get_ses_num(self):
3234         """
3235         Return number of active NAT64 sessions.
3236         """
3237         ses_num = 0
3238         st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
3239         ses_num += len(st)
3240         st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
3241         ses_num += len(st)
3242         st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
3243         ses_num += len(st)
3244         return ses_num
3245
3246     def clear_nat64(self):
3247         """
3248         Clear NAT64 configuration.
3249         """
3250         self.vapi.nat64_set_timeouts()
3251
3252         interfaces = self.vapi.nat64_interface_dump()
3253         for intf in interfaces:
3254             self.vapi.nat64_add_del_interface(intf.sw_if_index,
3255                                               intf.is_inside,
3256                                               is_add=0)
3257
3258         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3259         for bibe in bib:
3260             if bibe.is_static:
3261                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3262                                                    bibe.o_addr,
3263                                                    bibe.i_port,
3264                                                    bibe.o_port,
3265                                                    bibe.proto,
3266                                                    bibe.vrf_id,
3267                                                    is_add=0)
3268
3269         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3270         for bibe in bib:
3271             if bibe.is_static:
3272                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3273                                                    bibe.o_addr,
3274                                                    bibe.i_port,
3275                                                    bibe.o_port,
3276                                                    bibe.proto,
3277                                                    bibe.vrf_id,
3278                                                    is_add=0)
3279
3280         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3281         for bibe in bib:
3282             if bibe.is_static:
3283                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3284                                                    bibe.o_addr,
3285                                                    bibe.i_port,
3286                                                    bibe.o_port,
3287                                                    bibe.proto,
3288                                                    bibe.vrf_id,
3289                                                    is_add=0)
3290
3291         adresses = self.vapi.nat64_pool_addr_dump()
3292         for addr in adresses:
3293             self.vapi.nat64_add_del_pool_addr_range(addr.address,
3294                                                     addr.address,
3295                                                     vrf_id=addr.vrf_id,
3296                                                     is_add=0)
3297
3298         prefixes = self.vapi.nat64_prefix_dump()
3299         for prefix in prefixes:
3300             self.vapi.nat64_add_del_prefix(prefix.prefix,
3301                                            prefix.prefix_len,
3302                                            vrf_id=prefix.vrf_id,
3303                                            is_add=0)
3304
3305     def tearDown(self):
3306         super(TestNAT64, self).tearDown()
3307         if not self.vpp_dead:
3308             self.logger.info(self.vapi.cli("show nat64 pool"))
3309             self.logger.info(self.vapi.cli("show nat64 interfaces"))
3310             self.logger.info(self.vapi.cli("show nat64 prefix"))
3311             self.logger.info(self.vapi.cli("show nat64 bib tcp"))
3312             self.logger.info(self.vapi.cli("show nat64 bib udp"))
3313             self.logger.info(self.vapi.cli("show nat64 bib icmp"))
3314             self.logger.info(self.vapi.cli("show nat64 session table tcp"))
3315             self.logger.info(self.vapi.cli("show nat64 session table udp"))
3316             self.logger.info(self.vapi.cli("show nat64 session table icmp"))
3317             self.clear_nat64()
3318
3319 if __name__ == '__main__':
3320     unittest.main(testRunner=VppTestRunner)