NAT: fixed hairpinning for in2out translation as an output feature (VPP-976)
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
15 from util import ppp
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
18 from util import ip4_range
19
20
21 class MethodHolder(VppTestCase):
22     """ NAT create capture and verify method holder """
23
24     @classmethod
25     def setUpClass(cls):
26         super(MethodHolder, cls).setUpClass()
27
28     def tearDown(self):
29         super(MethodHolder, self).tearDown()
30
31     def check_ip_checksum(self, pkt):
32         """
33         Check IP checksum of the packet
34
35         :param pkt: Packet to check IP checksum
36         """
37         new = pkt.__class__(str(pkt))
38         del new['IP'].chksum
39         new = new.__class__(str(new))
40         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
41
42     def check_tcp_checksum(self, pkt):
43         """
44         Check TCP checksum in IP packet
45
46         :param pkt: Packet to check TCP checksum
47         """
48         new = pkt.__class__(str(pkt))
49         del new['TCP'].chksum
50         new = new.__class__(str(new))
51         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
52
53     def check_udp_checksum(self, pkt):
54         """
55         Check UDP checksum in IP packet
56
57         :param pkt: Packet to check UDP checksum
58         """
59         new = pkt.__class__(str(pkt))
60         del new['UDP'].chksum
61         new = new.__class__(str(new))
62         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
63
64     def check_icmp_errror_embedded(self, pkt):
65         """
66         Check ICMP error embeded packet checksum
67
68         :param pkt: Packet to check ICMP error embeded packet checksum
69         """
70         if pkt.haslayer(IPerror):
71             new = pkt.__class__(str(pkt))
72             del new['IPerror'].chksum
73             new = new.__class__(str(new))
74             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
75
76         if pkt.haslayer(TCPerror):
77             new = pkt.__class__(str(pkt))
78             del new['TCPerror'].chksum
79             new = new.__class__(str(new))
80             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
81
82         if pkt.haslayer(UDPerror):
83             if pkt['UDPerror'].chksum != 0:
84                 new = pkt.__class__(str(pkt))
85                 del new['UDPerror'].chksum
86                 new = new.__class__(str(new))
87                 self.assertEqual(new['UDPerror'].chksum,
88                                  pkt['UDPerror'].chksum)
89
90         if pkt.haslayer(ICMPerror):
91             del new['ICMPerror'].chksum
92             new = new.__class__(str(new))
93             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
94
95     def check_icmp_checksum(self, pkt):
96         """
97         Check ICMP checksum in IPv4 packet
98
99         :param pkt: Packet to check ICMP checksum
100         """
101         new = pkt.__class__(str(pkt))
102         del new['ICMP'].chksum
103         new = new.__class__(str(new))
104         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
105         if pkt.haslayer(IPerror):
106             self.check_icmp_errror_embedded(pkt)
107
108     def check_icmpv6_checksum(self, pkt):
109         """
110         Check ICMPv6 checksum in IPv4 packet
111
112         :param pkt: Packet to check ICMPv6 checksum
113         """
114         new = pkt.__class__(str(pkt))
115         if pkt.haslayer(ICMPv6DestUnreach):
116             del new['ICMPv6DestUnreach'].cksum
117             new = new.__class__(str(new))
118             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
119                              pkt['ICMPv6DestUnreach'].cksum)
120             self.check_icmp_errror_embedded(pkt)
121         if pkt.haslayer(ICMPv6EchoRequest):
122             del new['ICMPv6EchoRequest'].cksum
123             new = new.__class__(str(new))
124             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
125                              pkt['ICMPv6EchoRequest'].cksum)
126         if pkt.haslayer(ICMPv6EchoReply):
127             del new['ICMPv6EchoReply'].cksum
128             new = new.__class__(str(new))
129             self.assertEqual(new['ICMPv6EchoReply'].cksum,
130                              pkt['ICMPv6EchoReply'].cksum)
131
132     def create_stream_in(self, in_if, out_if, ttl=64):
133         """
134         Create packet stream for inside network
135
136         :param in_if: Inside interface
137         :param out_if: Outside interface
138         :param ttl: TTL of generated packets
139         """
140         pkts = []
141         # TCP
142         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
143              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
144              TCP(sport=self.tcp_port_in, dport=20))
145         pkts.append(p)
146
147         # UDP
148         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
149              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
150              UDP(sport=self.udp_port_in, dport=20))
151         pkts.append(p)
152
153         # ICMP
154         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
155              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
156              ICMP(id=self.icmp_id_in, type='echo-request'))
157         pkts.append(p)
158
159         return pkts
160
161     def compose_ip6(self, ip4, pref, plen):
162         """
163         Compose IPv4-embedded IPv6 addresses
164
165         :param ip4: IPv4 address
166         :param pref: IPv6 prefix
167         :param plen: IPv6 prefix length
168         :returns: IPv4-embedded IPv6 addresses
169         """
170         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
171         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
172         if plen == 32:
173             pref_n[4] = ip4_n[0]
174             pref_n[5] = ip4_n[1]
175             pref_n[6] = ip4_n[2]
176             pref_n[7] = ip4_n[3]
177         elif plen == 40:
178             pref_n[5] = ip4_n[0]
179             pref_n[6] = ip4_n[1]
180             pref_n[7] = ip4_n[2]
181             pref_n[9] = ip4_n[3]
182         elif plen == 48:
183             pref_n[6] = ip4_n[0]
184             pref_n[7] = ip4_n[1]
185             pref_n[9] = ip4_n[2]
186             pref_n[10] = ip4_n[3]
187         elif plen == 56:
188             pref_n[7] = ip4_n[0]
189             pref_n[9] = ip4_n[1]
190             pref_n[10] = ip4_n[2]
191             pref_n[11] = ip4_n[3]
192         elif plen == 64:
193             pref_n[9] = ip4_n[0]
194             pref_n[10] = ip4_n[1]
195             pref_n[11] = ip4_n[2]
196             pref_n[12] = ip4_n[3]
197         elif plen == 96:
198             pref_n[12] = ip4_n[0]
199             pref_n[13] = ip4_n[1]
200             pref_n[14] = ip4_n[2]
201             pref_n[15] = ip4_n[3]
202         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
203
204     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
205         """
206         Create IPv6 packet stream for inside network
207
208         :param in_if: Inside interface
209         :param out_if: Outside interface
210         :param ttl: Hop Limit of generated packets
211         :param pref: NAT64 prefix
212         :param plen: NAT64 prefix length
213         """
214         pkts = []
215         if pref is None:
216             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
217         else:
218             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
219
220         # TCP
221         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
222              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
223              TCP(sport=self.tcp_port_in, dport=20))
224         pkts.append(p)
225
226         # UDP
227         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
228              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
229              UDP(sport=self.udp_port_in, dport=20))
230         pkts.append(p)
231
232         # ICMP
233         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
234              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
235              ICMPv6EchoRequest(id=self.icmp_id_in))
236         pkts.append(p)
237
238         return pkts
239
240     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
241         """
242         Create packet stream for outside network
243
244         :param out_if: Outside interface
245         :param dst_ip: Destination IP address (Default use global NAT address)
246         :param ttl: TTL of generated packets
247         """
248         if dst_ip is None:
249             dst_ip = self.nat_addr
250         pkts = []
251         # TCP
252         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
253              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
254              TCP(dport=self.tcp_port_out, sport=20))
255         pkts.append(p)
256
257         # UDP
258         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
259              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
260              UDP(dport=self.udp_port_out, sport=20))
261         pkts.append(p)
262
263         # ICMP
264         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
265              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
266              ICMP(id=self.icmp_id_out, type='echo-reply'))
267         pkts.append(p)
268
269         return pkts
270
271     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
272                            packet_num=3, dst_ip=None):
273         """
274         Verify captured packets on outside network
275
276         :param capture: Captured packets
277         :param nat_ip: Translated IP address (Default use global NAT address)
278         :param same_port: Sorce port number is not translated (Default False)
279         :param packet_num: Expected number of packets (Default 3)
280         :param dst_ip: Destination IP address (Default do not verify)
281         """
282         if nat_ip is None:
283             nat_ip = self.nat_addr
284         self.assertEqual(packet_num, len(capture))
285         for packet in capture:
286             try:
287                 self.check_ip_checksum(packet)
288                 self.assertEqual(packet[IP].src, nat_ip)
289                 if dst_ip is not None:
290                     self.assertEqual(packet[IP].dst, dst_ip)
291                 if packet.haslayer(TCP):
292                     if same_port:
293                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
294                     else:
295                         self.assertNotEqual(
296                             packet[TCP].sport, self.tcp_port_in)
297                     self.tcp_port_out = packet[TCP].sport
298                     self.check_tcp_checksum(packet)
299                 elif packet.haslayer(UDP):
300                     if same_port:
301                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
302                     else:
303                         self.assertNotEqual(
304                             packet[UDP].sport, self.udp_port_in)
305                     self.udp_port_out = packet[UDP].sport
306                 else:
307                     if same_port:
308                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
309                     else:
310                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
311                     self.icmp_id_out = packet[ICMP].id
312                     self.check_icmp_checksum(packet)
313             except:
314                 self.logger.error(ppp("Unexpected or invalid packet "
315                                       "(outside network):", packet))
316                 raise
317
318     def verify_capture_in(self, capture, in_if, packet_num=3):
319         """
320         Verify captured packets on inside network
321
322         :param capture: Captured packets
323         :param in_if: Inside interface
324         :param packet_num: Expected number of packets (Default 3)
325         """
326         self.assertEqual(packet_num, len(capture))
327         for packet in capture:
328             try:
329                 self.check_ip_checksum(packet)
330                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
331                 if packet.haslayer(TCP):
332                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
333                     self.check_tcp_checksum(packet)
334                 elif packet.haslayer(UDP):
335                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
336                 else:
337                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
338                     self.check_icmp_checksum(packet)
339             except:
340                 self.logger.error(ppp("Unexpected or invalid packet "
341                                       "(inside network):", packet))
342                 raise
343
344     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
345         """
346         Verify captured IPv6 packets on inside network
347
348         :param capture: Captured packets
349         :param src_ip: Source IP
350         :param dst_ip: Destination IP address
351         :param packet_num: Expected number of packets (Default 3)
352         """
353         self.assertEqual(packet_num, len(capture))
354         for packet in capture:
355             try:
356                 self.assertEqual(packet[IPv6].src, src_ip)
357                 self.assertEqual(packet[IPv6].dst, dst_ip)
358                 if packet.haslayer(TCP):
359                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
360                     self.check_tcp_checksum(packet)
361                 elif packet.haslayer(UDP):
362                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
363                     self.check_udp_checksum(packet)
364                 else:
365                     self.assertEqual(packet[ICMPv6EchoReply].id,
366                                      self.icmp_id_in)
367                     self.check_icmpv6_checksum(packet)
368             except:
369                 self.logger.error(ppp("Unexpected or invalid packet "
370                                       "(inside network):", packet))
371                 raise
372
373     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
374         """
375         Verify captured packet that don't have to be translated
376
377         :param capture: Captured packets
378         :param ingress_if: Ingress interface
379         :param egress_if: Egress interface
380         """
381         for packet in capture:
382             try:
383                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
384                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
385                 if packet.haslayer(TCP):
386                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
387                 elif packet.haslayer(UDP):
388                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
389                 else:
390                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
391             except:
392                 self.logger.error(ppp("Unexpected or invalid packet "
393                                       "(inside network):", packet))
394                 raise
395
396     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
397                                             packet_num=3, icmp_type=11):
398         """
399         Verify captured packets with ICMP errors on outside network
400
401         :param capture: Captured packets
402         :param src_ip: Translated IP address or IP address of VPP
403                        (Default use global NAT address)
404         :param packet_num: Expected number of packets (Default 3)
405         :param icmp_type: Type of error ICMP packet
406                           we are expecting (Default 11)
407         """
408         if src_ip is None:
409             src_ip = self.nat_addr
410         self.assertEqual(packet_num, len(capture))
411         for packet in capture:
412             try:
413                 self.assertEqual(packet[IP].src, src_ip)
414                 self.assertTrue(packet.haslayer(ICMP))
415                 icmp = packet[ICMP]
416                 self.assertEqual(icmp.type, icmp_type)
417                 self.assertTrue(icmp.haslayer(IPerror))
418                 inner_ip = icmp[IPerror]
419                 if inner_ip.haslayer(TCPerror):
420                     self.assertEqual(inner_ip[TCPerror].dport,
421                                      self.tcp_port_out)
422                 elif inner_ip.haslayer(UDPerror):
423                     self.assertEqual(inner_ip[UDPerror].dport,
424                                      self.udp_port_out)
425                 else:
426                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
427             except:
428                 self.logger.error(ppp("Unexpected or invalid packet "
429                                       "(outside network):", packet))
430                 raise
431
432     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
433                                            icmp_type=11):
434         """
435         Verify captured packets with ICMP errors on inside network
436
437         :param capture: Captured packets
438         :param in_if: Inside interface
439         :param packet_num: Expected number of packets (Default 3)
440         :param icmp_type: Type of error ICMP packet
441                           we are expecting (Default 11)
442         """
443         self.assertEqual(packet_num, len(capture))
444         for packet in capture:
445             try:
446                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447                 self.assertTrue(packet.haslayer(ICMP))
448                 icmp = packet[ICMP]
449                 self.assertEqual(icmp.type, icmp_type)
450                 self.assertTrue(icmp.haslayer(IPerror))
451                 inner_ip = icmp[IPerror]
452                 if inner_ip.haslayer(TCPerror):
453                     self.assertEqual(inner_ip[TCPerror].sport,
454                                      self.tcp_port_in)
455                 elif inner_ip.haslayer(UDPerror):
456                     self.assertEqual(inner_ip[UDPerror].sport,
457                                      self.udp_port_in)
458                 else:
459                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
460             except:
461                 self.logger.error(ppp("Unexpected or invalid packet "
462                                       "(inside network):", packet))
463                 raise
464
465     def verify_ipfix_nat44_ses(self, data):
466         """
467         Verify IPFIX NAT44 session create/delete event
468
469         :param data: Decoded IPFIX data records
470         """
471         nat44_ses_create_num = 0
472         nat44_ses_delete_num = 0
473         self.assertEqual(6, len(data))
474         for record in data:
475             # natEvent
476             self.assertIn(ord(record[230]), [4, 5])
477             if ord(record[230]) == 4:
478                 nat44_ses_create_num += 1
479             else:
480                 nat44_ses_delete_num += 1
481             # sourceIPv4Address
482             self.assertEqual(self.pg0.remote_ip4n, record[8])
483             # postNATSourceIPv4Address
484             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
485                              record[225])
486             # ingressVRFID
487             self.assertEqual(struct.pack("!I", 0), record[234])
488             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
489             if IP_PROTOS.icmp == ord(record[4]):
490                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
491                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
492                                  record[227])
493             elif IP_PROTOS.tcp == ord(record[4]):
494                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
495                                  record[7])
496                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
497                                  record[227])
498             elif IP_PROTOS.udp == ord(record[4]):
499                 self.assertEqual(struct.pack("!H", self.udp_port_in),
500                                  record[7])
501                 self.assertEqual(struct.pack("!H", self.udp_port_out),
502                                  record[227])
503             else:
504                 self.fail("Invalid protocol")
505         self.assertEqual(3, nat44_ses_create_num)
506         self.assertEqual(3, nat44_ses_delete_num)
507
508     def verify_ipfix_addr_exhausted(self, data):
509         """
510         Verify IPFIX NAT addresses event
511
512         :param data: Decoded IPFIX data records
513         """
514         self.assertEqual(1, len(data))
515         record = data[0]
516         # natEvent
517         self.assertEqual(ord(record[230]), 3)
518         # natPoolID
519         self.assertEqual(struct.pack("!I", 0), record[283])
520
521
522 class TestNAT44(MethodHolder):
523     """ NAT44 Test Cases """
524
525     @classmethod
526     def setUpClass(cls):
527         super(TestNAT44, cls).setUpClass()
528
529         try:
530             cls.tcp_port_in = 6303
531             cls.tcp_port_out = 6303
532             cls.udp_port_in = 6304
533             cls.udp_port_out = 6304
534             cls.icmp_id_in = 6305
535             cls.icmp_id_out = 6305
536             cls.nat_addr = '10.0.0.3'
537             cls.ipfix_src_port = 4739
538             cls.ipfix_domain_id = 1
539
540             cls.create_pg_interfaces(range(9))
541             cls.interfaces = list(cls.pg_interfaces[0:4])
542
543             for i in cls.interfaces:
544                 i.admin_up()
545                 i.config_ip4()
546                 i.resolve_arp()
547
548             cls.pg0.generate_remote_hosts(3)
549             cls.pg0.configure_ipv4_neighbors()
550
551             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
552
553             cls.pg4._local_ip4 = "172.16.255.1"
554             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
555             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
556             cls.pg4.set_table_ip4(10)
557             cls.pg5._local_ip4 = "172.17.255.3"
558             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
559             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
560             cls.pg5.set_table_ip4(10)
561             cls.pg6._local_ip4 = "172.16.255.1"
562             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
563             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
564             cls.pg6.set_table_ip4(20)
565             for i in cls.overlapping_interfaces:
566                 i.config_ip4()
567                 i.admin_up()
568                 i.resolve_arp()
569
570             cls.pg7.admin_up()
571             cls.pg8.admin_up()
572
573         except Exception:
574             super(TestNAT44, cls).tearDownClass()
575             raise
576
577     def clear_nat44(self):
578         """
579         Clear NAT44 configuration.
580         """
581         # I found no elegant way to do this
582         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
583                                    dst_address_length=32,
584                                    next_hop_address=self.pg7.remote_ip4n,
585                                    next_hop_sw_if_index=self.pg7.sw_if_index,
586                                    is_add=0)
587         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
588                                    dst_address_length=32,
589                                    next_hop_address=self.pg8.remote_ip4n,
590                                    next_hop_sw_if_index=self.pg8.sw_if_index,
591                                    is_add=0)
592
593         for intf in [self.pg7, self.pg8]:
594             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
595             for n in neighbors:
596                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
597                                               n.mac_address,
598                                               n.ip_address,
599                                               is_add=0)
600
601         if self.pg7.has_ip4_config:
602             self.pg7.unconfig_ip4()
603
604         interfaces = self.vapi.nat44_interface_addr_dump()
605         for intf in interfaces:
606             self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
607
608         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
609                             domain_id=self.ipfix_domain_id)
610         self.ipfix_src_port = 4739
611         self.ipfix_domain_id = 1
612
613         interfaces = self.vapi.nat44_interface_dump()
614         for intf in interfaces:
615             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
616                                                       intf.is_inside,
617                                                       is_add=0)
618
619         interfaces = self.vapi.nat44_interface_output_feature_dump()
620         for intf in interfaces:
621             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
622                                                              intf.is_inside,
623                                                              is_add=0)
624
625         static_mappings = self.vapi.nat44_static_mapping_dump()
626         for sm in static_mappings:
627             self.vapi.nat44_add_del_static_mapping(
628                 sm.local_ip_address,
629                 sm.external_ip_address,
630                 local_port=sm.local_port,
631                 external_port=sm.external_port,
632                 addr_only=sm.addr_only,
633                 vrf_id=sm.vrf_id,
634                 protocol=sm.protocol,
635                 is_add=0)
636
637         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
638         for lb_sm in lb_static_mappings:
639             self.vapi.nat44_add_del_lb_static_mapping(
640                 lb_sm.external_addr,
641                 lb_sm.external_port,
642                 lb_sm.protocol,
643                 lb_sm.vrf_id,
644                 is_add=0)
645
646         adresses = self.vapi.nat44_address_dump()
647         for addr in adresses:
648             self.vapi.nat44_add_del_address_range(addr.ip_address,
649                                                   addr.ip_address,
650                                                   is_add=0)
651
652     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
653                                  local_port=0, external_port=0, vrf_id=0,
654                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
655                                  proto=0):
656         """
657         Add/delete NAT44 static mapping
658
659         :param local_ip: Local IP address
660         :param external_ip: External IP address
661         :param local_port: Local port number (Optional)
662         :param external_port: External port number (Optional)
663         :param vrf_id: VRF ID (Default 0)
664         :param is_add: 1 if add, 0 if delete (Default add)
665         :param external_sw_if_index: External interface instead of IP address
666         :param proto: IP protocol (Mandatory if port specified)
667         """
668         addr_only = 1
669         if local_port and external_port:
670             addr_only = 0
671         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
672         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
673         self.vapi.nat44_add_del_static_mapping(
674             l_ip,
675             e_ip,
676             external_sw_if_index,
677             local_port,
678             external_port,
679             addr_only,
680             vrf_id,
681             proto,
682             is_add)
683
684     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
685         """
686         Add/delete NAT44 address
687
688         :param ip: IP address
689         :param is_add: 1 if add, 0 if delete (Default add)
690         """
691         nat_addr = socket.inet_pton(socket.AF_INET, ip)
692         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
693                                               vrf_id=vrf_id)
694
695     def test_dynamic(self):
696         """ NAT44 dynamic translation test """
697
698         self.nat44_add_address(self.nat_addr)
699         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
700         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
701                                                   is_inside=0)
702
703         # in2out
704         pkts = self.create_stream_in(self.pg0, self.pg1)
705         self.pg0.add_stream(pkts)
706         self.pg_enable_capture(self.pg_interfaces)
707         self.pg_start()
708         capture = self.pg1.get_capture(len(pkts))
709         self.verify_capture_out(capture)
710
711         # out2in
712         pkts = self.create_stream_out(self.pg1)
713         self.pg1.add_stream(pkts)
714         self.pg_enable_capture(self.pg_interfaces)
715         self.pg_start()
716         capture = self.pg0.get_capture(len(pkts))
717         self.verify_capture_in(capture, self.pg0)
718
719     def test_dynamic_icmp_errors_in2out_ttl_1(self):
720         """ NAT44 handling of client packets with TTL=1 """
721
722         self.nat44_add_address(self.nat_addr)
723         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
724         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
725                                                   is_inside=0)
726
727         # Client side - generate traffic
728         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
729         self.pg0.add_stream(pkts)
730         self.pg_enable_capture(self.pg_interfaces)
731         self.pg_start()
732
733         # Client side - verify ICMP type 11 packets
734         capture = self.pg0.get_capture(len(pkts))
735         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
736
737     def test_dynamic_icmp_errors_out2in_ttl_1(self):
738         """ NAT44 handling of server packets with TTL=1 """
739
740         self.nat44_add_address(self.nat_addr)
741         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
742         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
743                                                   is_inside=0)
744
745         # Client side - create sessions
746         pkts = self.create_stream_in(self.pg0, self.pg1)
747         self.pg0.add_stream(pkts)
748         self.pg_enable_capture(self.pg_interfaces)
749         self.pg_start()
750
751         # Server side - generate traffic
752         capture = self.pg1.get_capture(len(pkts))
753         self.verify_capture_out(capture)
754         pkts = self.create_stream_out(self.pg1, ttl=1)
755         self.pg1.add_stream(pkts)
756         self.pg_enable_capture(self.pg_interfaces)
757         self.pg_start()
758
759         # Server side - verify ICMP type 11 packets
760         capture = self.pg1.get_capture(len(pkts))
761         self.verify_capture_out_with_icmp_errors(capture,
762                                                  src_ip=self.pg1.local_ip4)
763
764     def test_dynamic_icmp_errors_in2out_ttl_2(self):
765         """ NAT44 handling of error responses to client packets with TTL=2 """
766
767         self.nat44_add_address(self.nat_addr)
768         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
769         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
770                                                   is_inside=0)
771
772         # Client side - generate traffic
773         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
774         self.pg0.add_stream(pkts)
775         self.pg_enable_capture(self.pg_interfaces)
776         self.pg_start()
777
778         # Server side - simulate ICMP type 11 response
779         capture = self.pg1.get_capture(len(pkts))
780         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
781                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
782                 ICMP(type=11) / packet[IP] for packet in capture]
783         self.pg1.add_stream(pkts)
784         self.pg_enable_capture(self.pg_interfaces)
785         self.pg_start()
786
787         # Client side - verify ICMP type 11 packets
788         capture = self.pg0.get_capture(len(pkts))
789         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
790
791     def test_dynamic_icmp_errors_out2in_ttl_2(self):
792         """ NAT44 handling of error responses to server packets with TTL=2 """
793
794         self.nat44_add_address(self.nat_addr)
795         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
796         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
797                                                   is_inside=0)
798
799         # Client side - create sessions
800         pkts = self.create_stream_in(self.pg0, self.pg1)
801         self.pg0.add_stream(pkts)
802         self.pg_enable_capture(self.pg_interfaces)
803         self.pg_start()
804
805         # Server side - generate traffic
806         capture = self.pg1.get_capture(len(pkts))
807         self.verify_capture_out(capture)
808         pkts = self.create_stream_out(self.pg1, ttl=2)
809         self.pg1.add_stream(pkts)
810         self.pg_enable_capture(self.pg_interfaces)
811         self.pg_start()
812
813         # Client side - simulate ICMP type 11 response
814         capture = self.pg0.get_capture(len(pkts))
815         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
816                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
817                 ICMP(type=11) / packet[IP] for packet in capture]
818         self.pg0.add_stream(pkts)
819         self.pg_enable_capture(self.pg_interfaces)
820         self.pg_start()
821
822         # Server side - verify ICMP type 11 packets
823         capture = self.pg1.get_capture(len(pkts))
824         self.verify_capture_out_with_icmp_errors(capture)
825
826     def test_ping_out_interface_from_outside(self):
827         """ Ping NAT44 out interface from outside network """
828
829         self.nat44_add_address(self.nat_addr)
830         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
831         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
832                                                   is_inside=0)
833
834         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
835              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
836              ICMP(id=self.icmp_id_out, type='echo-request'))
837         pkts = [p]
838         self.pg1.add_stream(pkts)
839         self.pg_enable_capture(self.pg_interfaces)
840         self.pg_start()
841         capture = self.pg1.get_capture(len(pkts))
842         self.assertEqual(1, len(capture))
843         packet = capture[0]
844         try:
845             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
846             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
847             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
848             self.assertEqual(packet[ICMP].type, 0)  # echo reply
849         except:
850             self.logger.error(ppp("Unexpected or invalid packet "
851                                   "(outside network):", packet))
852             raise
853
854     def test_ping_internal_host_from_outside(self):
855         """ Ping internal host from outside network """
856
857         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
858         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
859         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
860                                                   is_inside=0)
861
862         # out2in
863         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
864                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
865                ICMP(id=self.icmp_id_out, type='echo-request'))
866         self.pg1.add_stream(pkt)
867         self.pg_enable_capture(self.pg_interfaces)
868         self.pg_start()
869         capture = self.pg0.get_capture(1)
870         self.verify_capture_in(capture, self.pg0, packet_num=1)
871         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
872
873         # in2out
874         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
875                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
876                ICMP(id=self.icmp_id_in, type='echo-reply'))
877         self.pg0.add_stream(pkt)
878         self.pg_enable_capture(self.pg_interfaces)
879         self.pg_start()
880         capture = self.pg1.get_capture(1)
881         self.verify_capture_out(capture, same_port=True, packet_num=1)
882         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
883
884     def test_static_in(self):
885         """ 1:1 NAT initialized from inside network """
886
887         nat_ip = "10.0.0.10"
888         self.tcp_port_out = 6303
889         self.udp_port_out = 6304
890         self.icmp_id_out = 6305
891
892         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
893         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
894         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
895                                                   is_inside=0)
896
897         # in2out
898         pkts = self.create_stream_in(self.pg0, self.pg1)
899         self.pg0.add_stream(pkts)
900         self.pg_enable_capture(self.pg_interfaces)
901         self.pg_start()
902         capture = self.pg1.get_capture(len(pkts))
903         self.verify_capture_out(capture, nat_ip, True)
904
905         # out2in
906         pkts = self.create_stream_out(self.pg1, nat_ip)
907         self.pg1.add_stream(pkts)
908         self.pg_enable_capture(self.pg_interfaces)
909         self.pg_start()
910         capture = self.pg0.get_capture(len(pkts))
911         self.verify_capture_in(capture, self.pg0)
912
913     def test_static_out(self):
914         """ 1:1 NAT initialized from outside network """
915
916         nat_ip = "10.0.0.20"
917         self.tcp_port_out = 6303
918         self.udp_port_out = 6304
919         self.icmp_id_out = 6305
920
921         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
922         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
923         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
924                                                   is_inside=0)
925
926         # out2in
927         pkts = self.create_stream_out(self.pg1, nat_ip)
928         self.pg1.add_stream(pkts)
929         self.pg_enable_capture(self.pg_interfaces)
930         self.pg_start()
931         capture = self.pg0.get_capture(len(pkts))
932         self.verify_capture_in(capture, self.pg0)
933
934         # in2out
935         pkts = self.create_stream_in(self.pg0, self.pg1)
936         self.pg0.add_stream(pkts)
937         self.pg_enable_capture(self.pg_interfaces)
938         self.pg_start()
939         capture = self.pg1.get_capture(len(pkts))
940         self.verify_capture_out(capture, nat_ip, True)
941
942     def test_static_with_port_in(self):
943         """ 1:1 NAPT initialized from inside network """
944
945         self.tcp_port_out = 3606
946         self.udp_port_out = 3607
947         self.icmp_id_out = 3608
948
949         self.nat44_add_address(self.nat_addr)
950         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
951                                       self.tcp_port_in, self.tcp_port_out,
952                                       proto=IP_PROTOS.tcp)
953         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
954                                       self.udp_port_in, self.udp_port_out,
955                                       proto=IP_PROTOS.udp)
956         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
957                                       self.icmp_id_in, self.icmp_id_out,
958                                       proto=IP_PROTOS.icmp)
959         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
960         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
961                                                   is_inside=0)
962
963         # in2out
964         pkts = self.create_stream_in(self.pg0, self.pg1)
965         self.pg0.add_stream(pkts)
966         self.pg_enable_capture(self.pg_interfaces)
967         self.pg_start()
968         capture = self.pg1.get_capture(len(pkts))
969         self.verify_capture_out(capture)
970
971         # out2in
972         pkts = self.create_stream_out(self.pg1)
973         self.pg1.add_stream(pkts)
974         self.pg_enable_capture(self.pg_interfaces)
975         self.pg_start()
976         capture = self.pg0.get_capture(len(pkts))
977         self.verify_capture_in(capture, self.pg0)
978
979     def test_static_with_port_out(self):
980         """ 1:1 NAPT initialized from outside network """
981
982         self.tcp_port_out = 30606
983         self.udp_port_out = 30607
984         self.icmp_id_out = 30608
985
986         self.nat44_add_address(self.nat_addr)
987         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
988                                       self.tcp_port_in, self.tcp_port_out,
989                                       proto=IP_PROTOS.tcp)
990         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
991                                       self.udp_port_in, self.udp_port_out,
992                                       proto=IP_PROTOS.udp)
993         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
994                                       self.icmp_id_in, self.icmp_id_out,
995                                       proto=IP_PROTOS.icmp)
996         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
997         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
998                                                   is_inside=0)
999
1000         # out2in
1001         pkts = self.create_stream_out(self.pg1)
1002         self.pg1.add_stream(pkts)
1003         self.pg_enable_capture(self.pg_interfaces)
1004         self.pg_start()
1005         capture = self.pg0.get_capture(len(pkts))
1006         self.verify_capture_in(capture, self.pg0)
1007
1008         # in2out
1009         pkts = self.create_stream_in(self.pg0, self.pg1)
1010         self.pg0.add_stream(pkts)
1011         self.pg_enable_capture(self.pg_interfaces)
1012         self.pg_start()
1013         capture = self.pg1.get_capture(len(pkts))
1014         self.verify_capture_out(capture)
1015
1016     def test_static_vrf_aware(self):
1017         """ 1:1 NAT VRF awareness """
1018
1019         nat_ip1 = "10.0.0.30"
1020         nat_ip2 = "10.0.0.40"
1021         self.tcp_port_out = 6303
1022         self.udp_port_out = 6304
1023         self.icmp_id_out = 6305
1024
1025         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1026                                       vrf_id=10)
1027         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1028                                       vrf_id=10)
1029         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1030                                                   is_inside=0)
1031         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1032         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1033
1034         # inside interface VRF match NAT44 static mapping VRF
1035         pkts = self.create_stream_in(self.pg4, self.pg3)
1036         self.pg4.add_stream(pkts)
1037         self.pg_enable_capture(self.pg_interfaces)
1038         self.pg_start()
1039         capture = self.pg3.get_capture(len(pkts))
1040         self.verify_capture_out(capture, nat_ip1, True)
1041
1042         # inside interface VRF don't match NAT44 static mapping VRF (packets
1043         # are dropped)
1044         pkts = self.create_stream_in(self.pg0, self.pg3)
1045         self.pg0.add_stream(pkts)
1046         self.pg_enable_capture(self.pg_interfaces)
1047         self.pg_start()
1048         self.pg3.assert_nothing_captured()
1049
1050     def test_static_lb(self):
1051         """ NAT44 local service load balancing """
1052         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1053         external_port = 80
1054         local_port = 8080
1055         server1 = self.pg0.remote_hosts[0]
1056         server2 = self.pg0.remote_hosts[1]
1057
1058         locals = [{'addr': server1.ip4n,
1059                    'port': local_port,
1060                    'probability': 70},
1061                   {'addr': server2.ip4n,
1062                    'port': local_port,
1063                    'probability': 30}]
1064
1065         self.nat44_add_address(self.nat_addr)
1066         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1067                                                   external_port,
1068                                                   IP_PROTOS.tcp,
1069                                                   local_num=len(locals),
1070                                                   locals=locals)
1071         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1072         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1073                                                   is_inside=0)
1074
1075         # from client to service
1076         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1077              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1078              TCP(sport=12345, dport=external_port))
1079         self.pg1.add_stream(p)
1080         self.pg_enable_capture(self.pg_interfaces)
1081         self.pg_start()
1082         capture = self.pg0.get_capture(1)
1083         p = capture[0]
1084         server = None
1085         try:
1086             ip = p[IP]
1087             tcp = p[TCP]
1088             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1089             if ip.dst == server1.ip4:
1090                 server = server1
1091             else:
1092                 server = server2
1093             self.assertEqual(tcp.dport, local_port)
1094             self.check_tcp_checksum(p)
1095             self.check_ip_checksum(p)
1096         except:
1097             self.logger.error(ppp("Unexpected or invalid packet:", p))
1098             raise
1099
1100         # from service back to client
1101         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1102              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1103              TCP(sport=local_port, dport=12345))
1104         self.pg0.add_stream(p)
1105         self.pg_enable_capture(self.pg_interfaces)
1106         self.pg_start()
1107         capture = self.pg1.get_capture(1)
1108         p = capture[0]
1109         try:
1110             ip = p[IP]
1111             tcp = p[TCP]
1112             self.assertEqual(ip.src, self.nat_addr)
1113             self.assertEqual(tcp.sport, external_port)
1114             self.check_tcp_checksum(p)
1115             self.check_ip_checksum(p)
1116         except:
1117             self.logger.error(ppp("Unexpected or invalid packet:", p))
1118             raise
1119
1120         # multiple clients
1121         server1_n = 0
1122         server2_n = 0
1123         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1124         pkts = []
1125         for client in clients:
1126             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1127                  IP(src=client, dst=self.nat_addr) /
1128                  TCP(sport=12345, dport=external_port))
1129             pkts.append(p)
1130         self.pg1.add_stream(pkts)
1131         self.pg_enable_capture(self.pg_interfaces)
1132         self.pg_start()
1133         capture = self.pg0.get_capture(len(pkts))
1134         for p in capture:
1135             if p[IP].dst == server1.ip4:
1136                 server1_n += 1
1137             else:
1138                 server2_n += 1
1139         self.assertTrue(server1_n > server2_n)
1140
1141     def test_multiple_inside_interfaces(self):
1142         """ NAT44 multiple non-overlapping address space inside interfaces """
1143
1144         self.nat44_add_address(self.nat_addr)
1145         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1146         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1147         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1148                                                   is_inside=0)
1149
1150         # between two NAT44 inside interfaces (no translation)
1151         pkts = self.create_stream_in(self.pg0, self.pg1)
1152         self.pg0.add_stream(pkts)
1153         self.pg_enable_capture(self.pg_interfaces)
1154         self.pg_start()
1155         capture = self.pg1.get_capture(len(pkts))
1156         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1157
1158         # from NAT44 inside to interface without NAT44 feature (no translation)
1159         pkts = self.create_stream_in(self.pg0, self.pg2)
1160         self.pg0.add_stream(pkts)
1161         self.pg_enable_capture(self.pg_interfaces)
1162         self.pg_start()
1163         capture = self.pg2.get_capture(len(pkts))
1164         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1165
1166         # in2out 1st interface
1167         pkts = self.create_stream_in(self.pg0, self.pg3)
1168         self.pg0.add_stream(pkts)
1169         self.pg_enable_capture(self.pg_interfaces)
1170         self.pg_start()
1171         capture = self.pg3.get_capture(len(pkts))
1172         self.verify_capture_out(capture)
1173
1174         # out2in 1st interface
1175         pkts = self.create_stream_out(self.pg3)
1176         self.pg3.add_stream(pkts)
1177         self.pg_enable_capture(self.pg_interfaces)
1178         self.pg_start()
1179         capture = self.pg0.get_capture(len(pkts))
1180         self.verify_capture_in(capture, self.pg0)
1181
1182         # in2out 2nd interface
1183         pkts = self.create_stream_in(self.pg1, self.pg3)
1184         self.pg1.add_stream(pkts)
1185         self.pg_enable_capture(self.pg_interfaces)
1186         self.pg_start()
1187         capture = self.pg3.get_capture(len(pkts))
1188         self.verify_capture_out(capture)
1189
1190         # out2in 2nd interface
1191         pkts = self.create_stream_out(self.pg3)
1192         self.pg3.add_stream(pkts)
1193         self.pg_enable_capture(self.pg_interfaces)
1194         self.pg_start()
1195         capture = self.pg1.get_capture(len(pkts))
1196         self.verify_capture_in(capture, self.pg1)
1197
1198     def test_inside_overlapping_interfaces(self):
1199         """ NAT44 multiple inside interfaces with overlapping address space """
1200
1201         static_nat_ip = "10.0.0.10"
1202         self.nat44_add_address(self.nat_addr)
1203         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1204                                                   is_inside=0)
1205         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1206         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1207         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1208         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1209                                       vrf_id=20)
1210
1211         # between NAT44 inside interfaces with same VRF (no translation)
1212         pkts = self.create_stream_in(self.pg4, self.pg5)
1213         self.pg4.add_stream(pkts)
1214         self.pg_enable_capture(self.pg_interfaces)
1215         self.pg_start()
1216         capture = self.pg5.get_capture(len(pkts))
1217         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1218
1219         # between NAT44 inside interfaces with different VRF (hairpinning)
1220         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1221              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1222              TCP(sport=1234, dport=5678))
1223         self.pg4.add_stream(p)
1224         self.pg_enable_capture(self.pg_interfaces)
1225         self.pg_start()
1226         capture = self.pg6.get_capture(1)
1227         p = capture[0]
1228         try:
1229             ip = p[IP]
1230             tcp = p[TCP]
1231             self.assertEqual(ip.src, self.nat_addr)
1232             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1233             self.assertNotEqual(tcp.sport, 1234)
1234             self.assertEqual(tcp.dport, 5678)
1235         except:
1236             self.logger.error(ppp("Unexpected or invalid packet:", p))
1237             raise
1238
1239         # in2out 1st interface
1240         pkts = self.create_stream_in(self.pg4, self.pg3)
1241         self.pg4.add_stream(pkts)
1242         self.pg_enable_capture(self.pg_interfaces)
1243         self.pg_start()
1244         capture = self.pg3.get_capture(len(pkts))
1245         self.verify_capture_out(capture)
1246
1247         # out2in 1st interface
1248         pkts = self.create_stream_out(self.pg3)
1249         self.pg3.add_stream(pkts)
1250         self.pg_enable_capture(self.pg_interfaces)
1251         self.pg_start()
1252         capture = self.pg4.get_capture(len(pkts))
1253         self.verify_capture_in(capture, self.pg4)
1254
1255         # in2out 2nd interface
1256         pkts = self.create_stream_in(self.pg5, self.pg3)
1257         self.pg5.add_stream(pkts)
1258         self.pg_enable_capture(self.pg_interfaces)
1259         self.pg_start()
1260         capture = self.pg3.get_capture(len(pkts))
1261         self.verify_capture_out(capture)
1262
1263         # out2in 2nd interface
1264         pkts = self.create_stream_out(self.pg3)
1265         self.pg3.add_stream(pkts)
1266         self.pg_enable_capture(self.pg_interfaces)
1267         self.pg_start()
1268         capture = self.pg5.get_capture(len(pkts))
1269         self.verify_capture_in(capture, self.pg5)
1270
1271         # pg5 session dump
1272         addresses = self.vapi.nat44_address_dump()
1273         self.assertEqual(len(addresses), 1)
1274         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1275         self.assertEqual(len(sessions), 3)
1276         for session in sessions:
1277             self.assertFalse(session.is_static)
1278             self.assertEqual(session.inside_ip_address[0:4],
1279                              self.pg5.remote_ip4n)
1280             self.assertEqual(session.outside_ip_address,
1281                              addresses[0].ip_address)
1282         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1283         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1284         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1285         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1286         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1287         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1288         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1289         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1290         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1291
1292         # in2out 3rd interface
1293         pkts = self.create_stream_in(self.pg6, self.pg3)
1294         self.pg6.add_stream(pkts)
1295         self.pg_enable_capture(self.pg_interfaces)
1296         self.pg_start()
1297         capture = self.pg3.get_capture(len(pkts))
1298         self.verify_capture_out(capture, static_nat_ip, True)
1299
1300         # out2in 3rd interface
1301         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1302         self.pg3.add_stream(pkts)
1303         self.pg_enable_capture(self.pg_interfaces)
1304         self.pg_start()
1305         capture = self.pg6.get_capture(len(pkts))
1306         self.verify_capture_in(capture, self.pg6)
1307
1308         # general user and session dump verifications
1309         users = self.vapi.nat44_user_dump()
1310         self.assertTrue(len(users) >= 3)
1311         addresses = self.vapi.nat44_address_dump()
1312         self.assertEqual(len(addresses), 1)
1313         for user in users:
1314             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1315                                                          user.vrf_id)
1316             for session in sessions:
1317                 self.assertEqual(user.ip_address, session.inside_ip_address)
1318                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1319                 self.assertTrue(session.protocol in
1320                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1321                                  IP_PROTOS.icmp])
1322
1323         # pg4 session dump
1324         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1325         self.assertTrue(len(sessions) >= 4)
1326         for session in sessions:
1327             self.assertFalse(session.is_static)
1328             self.assertEqual(session.inside_ip_address[0:4],
1329                              self.pg4.remote_ip4n)
1330             self.assertEqual(session.outside_ip_address,
1331                              addresses[0].ip_address)
1332
1333         # pg6 session dump
1334         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1335         self.assertTrue(len(sessions) >= 3)
1336         for session in sessions:
1337             self.assertTrue(session.is_static)
1338             self.assertEqual(session.inside_ip_address[0:4],
1339                              self.pg6.remote_ip4n)
1340             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1341                              map(int, static_nat_ip.split('.')))
1342             self.assertTrue(session.inside_port in
1343                             [self.tcp_port_in, self.udp_port_in,
1344                              self.icmp_id_in])
1345
1346     def test_hairpinning(self):
1347         """ NAT44 hairpinning - 1:1 NAPT """
1348
1349         host = self.pg0.remote_hosts[0]
1350         server = self.pg0.remote_hosts[1]
1351         host_in_port = 1234
1352         host_out_port = 0
1353         server_in_port = 5678
1354         server_out_port = 8765
1355
1356         self.nat44_add_address(self.nat_addr)
1357         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1358         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1359                                                   is_inside=0)
1360         # add static mapping for server
1361         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1362                                       server_in_port, server_out_port,
1363                                       proto=IP_PROTOS.tcp)
1364
1365         # send packet from host to server
1366         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1367              IP(src=host.ip4, dst=self.nat_addr) /
1368              TCP(sport=host_in_port, dport=server_out_port))
1369         self.pg0.add_stream(p)
1370         self.pg_enable_capture(self.pg_interfaces)
1371         self.pg_start()
1372         capture = self.pg0.get_capture(1)
1373         p = capture[0]
1374         try:
1375             ip = p[IP]
1376             tcp = p[TCP]
1377             self.assertEqual(ip.src, self.nat_addr)
1378             self.assertEqual(ip.dst, server.ip4)
1379             self.assertNotEqual(tcp.sport, host_in_port)
1380             self.assertEqual(tcp.dport, server_in_port)
1381             self.check_tcp_checksum(p)
1382             host_out_port = tcp.sport
1383         except:
1384             self.logger.error(ppp("Unexpected or invalid packet:", p))
1385             raise
1386
1387         # send reply from server to host
1388         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1389              IP(src=server.ip4, dst=self.nat_addr) /
1390              TCP(sport=server_in_port, dport=host_out_port))
1391         self.pg0.add_stream(p)
1392         self.pg_enable_capture(self.pg_interfaces)
1393         self.pg_start()
1394         capture = self.pg0.get_capture(1)
1395         p = capture[0]
1396         try:
1397             ip = p[IP]
1398             tcp = p[TCP]
1399             self.assertEqual(ip.src, self.nat_addr)
1400             self.assertEqual(ip.dst, host.ip4)
1401             self.assertEqual(tcp.sport, server_out_port)
1402             self.assertEqual(tcp.dport, host_in_port)
1403             self.check_tcp_checksum(p)
1404         except:
1405             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1406             raise
1407
1408     def test_hairpinning2(self):
1409         """ NAT44 hairpinning - 1:1 NAT"""
1410
1411         server1_nat_ip = "10.0.0.10"
1412         server2_nat_ip = "10.0.0.11"
1413         host = self.pg0.remote_hosts[0]
1414         server1 = self.pg0.remote_hosts[1]
1415         server2 = self.pg0.remote_hosts[2]
1416         server_tcp_port = 22
1417         server_udp_port = 20
1418
1419         self.nat44_add_address(self.nat_addr)
1420         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1421         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1422                                                   is_inside=0)
1423
1424         # add static mapping for servers
1425         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1426         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1427
1428         # host to server1
1429         pkts = []
1430         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1431              IP(src=host.ip4, dst=server1_nat_ip) /
1432              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1433         pkts.append(p)
1434         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1435              IP(src=host.ip4, dst=server1_nat_ip) /
1436              UDP(sport=self.udp_port_in, dport=server_udp_port))
1437         pkts.append(p)
1438         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1439              IP(src=host.ip4, dst=server1_nat_ip) /
1440              ICMP(id=self.icmp_id_in, type='echo-request'))
1441         pkts.append(p)
1442         self.pg0.add_stream(pkts)
1443         self.pg_enable_capture(self.pg_interfaces)
1444         self.pg_start()
1445         capture = self.pg0.get_capture(len(pkts))
1446         for packet in capture:
1447             try:
1448                 self.assertEqual(packet[IP].src, self.nat_addr)
1449                 self.assertEqual(packet[IP].dst, server1.ip4)
1450                 if packet.haslayer(TCP):
1451                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1452                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1453                     self.tcp_port_out = packet[TCP].sport
1454                     self.check_tcp_checksum(packet)
1455                 elif packet.haslayer(UDP):
1456                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1457                     self.assertEqual(packet[UDP].dport, server_udp_port)
1458                     self.udp_port_out = packet[UDP].sport
1459                 else:
1460                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1461                     self.icmp_id_out = packet[ICMP].id
1462             except:
1463                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1464                 raise
1465
1466         # server1 to host
1467         pkts = []
1468         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1469              IP(src=server1.ip4, dst=self.nat_addr) /
1470              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1471         pkts.append(p)
1472         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1473              IP(src=server1.ip4, dst=self.nat_addr) /
1474              UDP(sport=server_udp_port, dport=self.udp_port_out))
1475         pkts.append(p)
1476         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1477              IP(src=server1.ip4, dst=self.nat_addr) /
1478              ICMP(id=self.icmp_id_out, type='echo-reply'))
1479         pkts.append(p)
1480         self.pg0.add_stream(pkts)
1481         self.pg_enable_capture(self.pg_interfaces)
1482         self.pg_start()
1483         capture = self.pg0.get_capture(len(pkts))
1484         for packet in capture:
1485             try:
1486                 self.assertEqual(packet[IP].src, server1_nat_ip)
1487                 self.assertEqual(packet[IP].dst, host.ip4)
1488                 if packet.haslayer(TCP):
1489                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1490                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1491                     self.check_tcp_checksum(packet)
1492                 elif packet.haslayer(UDP):
1493                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1494                     self.assertEqual(packet[UDP].sport, server_udp_port)
1495                 else:
1496                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1497             except:
1498                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1499                 raise
1500
1501         # server2 to server1
1502         pkts = []
1503         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1504              IP(src=server2.ip4, dst=server1_nat_ip) /
1505              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1506         pkts.append(p)
1507         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1508              IP(src=server2.ip4, dst=server1_nat_ip) /
1509              UDP(sport=self.udp_port_in, dport=server_udp_port))
1510         pkts.append(p)
1511         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1512              IP(src=server2.ip4, dst=server1_nat_ip) /
1513              ICMP(id=self.icmp_id_in, type='echo-request'))
1514         pkts.append(p)
1515         self.pg0.add_stream(pkts)
1516         self.pg_enable_capture(self.pg_interfaces)
1517         self.pg_start()
1518         capture = self.pg0.get_capture(len(pkts))
1519         for packet in capture:
1520             try:
1521                 self.assertEqual(packet[IP].src, server2_nat_ip)
1522                 self.assertEqual(packet[IP].dst, server1.ip4)
1523                 if packet.haslayer(TCP):
1524                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1525                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1526                     self.tcp_port_out = packet[TCP].sport
1527                     self.check_tcp_checksum(packet)
1528                 elif packet.haslayer(UDP):
1529                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1530                     self.assertEqual(packet[UDP].dport, server_udp_port)
1531                     self.udp_port_out = packet[UDP].sport
1532                 else:
1533                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1534                     self.icmp_id_out = packet[ICMP].id
1535             except:
1536                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1537                 raise
1538
1539         # server1 to server2
1540         pkts = []
1541         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1542              IP(src=server1.ip4, dst=server2_nat_ip) /
1543              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1544         pkts.append(p)
1545         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1546              IP(src=server1.ip4, dst=server2_nat_ip) /
1547              UDP(sport=server_udp_port, dport=self.udp_port_out))
1548         pkts.append(p)
1549         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1550              IP(src=server1.ip4, dst=server2_nat_ip) /
1551              ICMP(id=self.icmp_id_out, type='echo-reply'))
1552         pkts.append(p)
1553         self.pg0.add_stream(pkts)
1554         self.pg_enable_capture(self.pg_interfaces)
1555         self.pg_start()
1556         capture = self.pg0.get_capture(len(pkts))
1557         for packet in capture:
1558             try:
1559                 self.assertEqual(packet[IP].src, server1_nat_ip)
1560                 self.assertEqual(packet[IP].dst, server2.ip4)
1561                 if packet.haslayer(TCP):
1562                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1563                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1564                     self.check_tcp_checksum(packet)
1565                 elif packet.haslayer(UDP):
1566                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1567                     self.assertEqual(packet[UDP].sport, server_udp_port)
1568                 else:
1569                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1570             except:
1571                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1572                 raise
1573
1574     def test_max_translations_per_user(self):
1575         """ MAX translations per user - recycle the least recently used """
1576
1577         self.nat44_add_address(self.nat_addr)
1578         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1579         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1580                                                   is_inside=0)
1581
1582         # get maximum number of translations per user
1583         nat44_config = self.vapi.nat_show_config()
1584
1585         # send more than maximum number of translations per user packets
1586         pkts_num = nat44_config.max_translations_per_user + 5
1587         pkts = []
1588         for port in range(0, pkts_num):
1589             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1590                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1591                  TCP(sport=1025 + port))
1592             pkts.append(p)
1593         self.pg0.add_stream(pkts)
1594         self.pg_enable_capture(self.pg_interfaces)
1595         self.pg_start()
1596
1597         # verify number of translated packet
1598         self.pg1.get_capture(pkts_num)
1599
1600     def test_interface_addr(self):
1601         """ Acquire NAT44 addresses from interface """
1602         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1603
1604         # no address in NAT pool
1605         adresses = self.vapi.nat44_address_dump()
1606         self.assertEqual(0, len(adresses))
1607
1608         # configure interface address and check NAT address pool
1609         self.pg7.config_ip4()
1610         adresses = self.vapi.nat44_address_dump()
1611         self.assertEqual(1, len(adresses))
1612         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1613
1614         # remove interface address and check NAT address pool
1615         self.pg7.unconfig_ip4()
1616         adresses = self.vapi.nat44_address_dump()
1617         self.assertEqual(0, len(adresses))
1618
1619     def test_interface_addr_static_mapping(self):
1620         """ Static mapping with addresses from interface """
1621         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1622         self.nat44_add_static_mapping(
1623             '1.2.3.4',
1624             external_sw_if_index=self.pg7.sw_if_index)
1625
1626         # static mappings with external interface
1627         static_mappings = self.vapi.nat44_static_mapping_dump()
1628         self.assertEqual(1, len(static_mappings))
1629         self.assertEqual(self.pg7.sw_if_index,
1630                          static_mappings[0].external_sw_if_index)
1631
1632         # configure interface address and check static mappings
1633         self.pg7.config_ip4()
1634         static_mappings = self.vapi.nat44_static_mapping_dump()
1635         self.assertEqual(1, len(static_mappings))
1636         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1637                          self.pg7.local_ip4n)
1638         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1639
1640         # remove interface address and check static mappings
1641         self.pg7.unconfig_ip4()
1642         static_mappings = self.vapi.nat44_static_mapping_dump()
1643         self.assertEqual(0, len(static_mappings))
1644
1645     def test_ipfix_nat44_sess(self):
1646         """ IPFIX logging NAT44 session created/delted """
1647         self.ipfix_domain_id = 10
1648         self.ipfix_src_port = 20202
1649         colector_port = 30303
1650         bind_layers(UDP, IPFIX, dport=30303)
1651         self.nat44_add_address(self.nat_addr)
1652         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1653         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1654                                                   is_inside=0)
1655         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1656                                      src_address=self.pg3.local_ip4n,
1657                                      path_mtu=512,
1658                                      template_interval=10,
1659                                      collector_port=colector_port)
1660         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1661                             src_port=self.ipfix_src_port)
1662
1663         pkts = self.create_stream_in(self.pg0, self.pg1)
1664         self.pg0.add_stream(pkts)
1665         self.pg_enable_capture(self.pg_interfaces)
1666         self.pg_start()
1667         capture = self.pg1.get_capture(len(pkts))
1668         self.verify_capture_out(capture)
1669         self.nat44_add_address(self.nat_addr, is_add=0)
1670         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1671         capture = self.pg3.get_capture(3)
1672         ipfix = IPFIXDecoder()
1673         # first load template
1674         for p in capture:
1675             self.assertTrue(p.haslayer(IPFIX))
1676             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1677             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1678             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1679             self.assertEqual(p[UDP].dport, colector_port)
1680             self.assertEqual(p[IPFIX].observationDomainID,
1681                              self.ipfix_domain_id)
1682             if p.haslayer(Template):
1683                 ipfix.add_template(p.getlayer(Template))
1684         # verify events in data set
1685         for p in capture:
1686             if p.haslayer(Data):
1687                 data = ipfix.decode_data_set(p.getlayer(Set))
1688                 self.verify_ipfix_nat44_ses(data)
1689
1690     def test_ipfix_addr_exhausted(self):
1691         """ IPFIX logging NAT addresses exhausted """
1692         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1693         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1694                                                   is_inside=0)
1695         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1696                                      src_address=self.pg3.local_ip4n,
1697                                      path_mtu=512,
1698                                      template_interval=10)
1699         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1700                             src_port=self.ipfix_src_port)
1701
1702         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1703              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1704              TCP(sport=3025))
1705         self.pg0.add_stream(p)
1706         self.pg_enable_capture(self.pg_interfaces)
1707         self.pg_start()
1708         capture = self.pg1.get_capture(0)
1709         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1710         capture = self.pg3.get_capture(3)
1711         ipfix = IPFIXDecoder()
1712         # first load template
1713         for p in capture:
1714             self.assertTrue(p.haslayer(IPFIX))
1715             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1716             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1717             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1718             self.assertEqual(p[UDP].dport, 4739)
1719             self.assertEqual(p[IPFIX].observationDomainID,
1720                              self.ipfix_domain_id)
1721             if p.haslayer(Template):
1722                 ipfix.add_template(p.getlayer(Template))
1723         # verify events in data set
1724         for p in capture:
1725             if p.haslayer(Data):
1726                 data = ipfix.decode_data_set(p.getlayer(Set))
1727                 self.verify_ipfix_addr_exhausted(data)
1728
1729     def test_pool_addr_fib(self):
1730         """ NAT44 add pool addresses to FIB """
1731         static_addr = '10.0.0.10'
1732         self.nat44_add_address(self.nat_addr)
1733         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1734         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1735                                                   is_inside=0)
1736         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1737
1738         # NAT44 address
1739         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1740              ARP(op=ARP.who_has, pdst=self.nat_addr,
1741                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1742         self.pg1.add_stream(p)
1743         self.pg_enable_capture(self.pg_interfaces)
1744         self.pg_start()
1745         capture = self.pg1.get_capture(1)
1746         self.assertTrue(capture[0].haslayer(ARP))
1747         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1748
1749         # 1:1 NAT address
1750         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1751              ARP(op=ARP.who_has, pdst=static_addr,
1752                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1753         self.pg1.add_stream(p)
1754         self.pg_enable_capture(self.pg_interfaces)
1755         self.pg_start()
1756         capture = self.pg1.get_capture(1)
1757         self.assertTrue(capture[0].haslayer(ARP))
1758         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1759
1760         # send ARP to non-NAT44 interface
1761         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1762              ARP(op=ARP.who_has, pdst=self.nat_addr,
1763                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1764         self.pg2.add_stream(p)
1765         self.pg_enable_capture(self.pg_interfaces)
1766         self.pg_start()
1767         capture = self.pg1.get_capture(0)
1768
1769         # remove addresses and verify
1770         self.nat44_add_address(self.nat_addr, is_add=0)
1771         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1772                                       is_add=0)
1773
1774         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1775              ARP(op=ARP.who_has, pdst=self.nat_addr,
1776                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1777         self.pg1.add_stream(p)
1778         self.pg_enable_capture(self.pg_interfaces)
1779         self.pg_start()
1780         capture = self.pg1.get_capture(0)
1781
1782         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1783              ARP(op=ARP.who_has, pdst=static_addr,
1784                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1785         self.pg1.add_stream(p)
1786         self.pg_enable_capture(self.pg_interfaces)
1787         self.pg_start()
1788         capture = self.pg1.get_capture(0)
1789
1790     def test_vrf_mode(self):
1791         """ NAT44 tenant VRF aware address pool mode """
1792
1793         vrf_id1 = 1
1794         vrf_id2 = 2
1795         nat_ip1 = "10.0.0.10"
1796         nat_ip2 = "10.0.0.11"
1797
1798         self.pg0.unconfig_ip4()
1799         self.pg1.unconfig_ip4()
1800         self.pg0.set_table_ip4(vrf_id1)
1801         self.pg1.set_table_ip4(vrf_id2)
1802         self.pg0.config_ip4()
1803         self.pg1.config_ip4()
1804
1805         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
1806         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
1807         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1808         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1809         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1810                                                   is_inside=0)
1811
1812         # first VRF
1813         pkts = self.create_stream_in(self.pg0, self.pg2)
1814         self.pg0.add_stream(pkts)
1815         self.pg_enable_capture(self.pg_interfaces)
1816         self.pg_start()
1817         capture = self.pg2.get_capture(len(pkts))
1818         self.verify_capture_out(capture, nat_ip1)
1819
1820         # second VRF
1821         pkts = self.create_stream_in(self.pg1, self.pg2)
1822         self.pg1.add_stream(pkts)
1823         self.pg_enable_capture(self.pg_interfaces)
1824         self.pg_start()
1825         capture = self.pg2.get_capture(len(pkts))
1826         self.verify_capture_out(capture, nat_ip2)
1827
1828     def test_vrf_feature_independent(self):
1829         """ NAT44 tenant VRF independent address pool mode """
1830
1831         nat_ip1 = "10.0.0.10"
1832         nat_ip2 = "10.0.0.11"
1833
1834         self.nat44_add_address(nat_ip1)
1835         self.nat44_add_address(nat_ip2)
1836         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1837         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1838         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1839                                                   is_inside=0)
1840
1841         # first VRF
1842         pkts = self.create_stream_in(self.pg0, self.pg2)
1843         self.pg0.add_stream(pkts)
1844         self.pg_enable_capture(self.pg_interfaces)
1845         self.pg_start()
1846         capture = self.pg2.get_capture(len(pkts))
1847         self.verify_capture_out(capture, nat_ip1)
1848
1849         # second VRF
1850         pkts = self.create_stream_in(self.pg1, self.pg2)
1851         self.pg1.add_stream(pkts)
1852         self.pg_enable_capture(self.pg_interfaces)
1853         self.pg_start()
1854         capture = self.pg2.get_capture(len(pkts))
1855         self.verify_capture_out(capture, nat_ip1)
1856
1857     def test_dynamic_ipless_interfaces(self):
1858         """ NAT44 interfaces without configured IP address """
1859
1860         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1861                                       self.pg7.remote_mac,
1862                                       self.pg7.remote_ip4n,
1863                                       is_static=1)
1864         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1865                                       self.pg8.remote_mac,
1866                                       self.pg8.remote_ip4n,
1867                                       is_static=1)
1868
1869         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1870                                    dst_address_length=32,
1871                                    next_hop_address=self.pg7.remote_ip4n,
1872                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1873         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1874                                    dst_address_length=32,
1875                                    next_hop_address=self.pg8.remote_ip4n,
1876                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1877
1878         self.nat44_add_address(self.nat_addr)
1879         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1880         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1881                                                   is_inside=0)
1882
1883         # in2out
1884         pkts = self.create_stream_in(self.pg7, self.pg8)
1885         self.pg7.add_stream(pkts)
1886         self.pg_enable_capture(self.pg_interfaces)
1887         self.pg_start()
1888         capture = self.pg8.get_capture(len(pkts))
1889         self.verify_capture_out(capture)
1890
1891         # out2in
1892         pkts = self.create_stream_out(self.pg8, self.nat_addr)
1893         self.pg8.add_stream(pkts)
1894         self.pg_enable_capture(self.pg_interfaces)
1895         self.pg_start()
1896         capture = self.pg7.get_capture(len(pkts))
1897         self.verify_capture_in(capture, self.pg7)
1898
1899     def test_static_ipless_interfaces(self):
1900         """ NAT44 interfaces without configured IP address - 1:1 NAT """
1901
1902         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1903                                       self.pg7.remote_mac,
1904                                       self.pg7.remote_ip4n,
1905                                       is_static=1)
1906         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1907                                       self.pg8.remote_mac,
1908                                       self.pg8.remote_ip4n,
1909                                       is_static=1)
1910
1911         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1912                                    dst_address_length=32,
1913                                    next_hop_address=self.pg7.remote_ip4n,
1914                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1915         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1916                                    dst_address_length=32,
1917                                    next_hop_address=self.pg8.remote_ip4n,
1918                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1919
1920         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
1921         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1922         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1923                                                   is_inside=0)
1924
1925         # out2in
1926         pkts = self.create_stream_out(self.pg8)
1927         self.pg8.add_stream(pkts)
1928         self.pg_enable_capture(self.pg_interfaces)
1929         self.pg_start()
1930         capture = self.pg7.get_capture(len(pkts))
1931         self.verify_capture_in(capture, self.pg7)
1932
1933         # in2out
1934         pkts = self.create_stream_in(self.pg7, self.pg8)
1935         self.pg7.add_stream(pkts)
1936         self.pg_enable_capture(self.pg_interfaces)
1937         self.pg_start()
1938         capture = self.pg8.get_capture(len(pkts))
1939         self.verify_capture_out(capture, self.nat_addr, True)
1940
1941     def test_static_with_port_ipless_interfaces(self):
1942         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
1943
1944         self.tcp_port_out = 30606
1945         self.udp_port_out = 30607
1946         self.icmp_id_out = 30608
1947
1948         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1949                                       self.pg7.remote_mac,
1950                                       self.pg7.remote_ip4n,
1951                                       is_static=1)
1952         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1953                                       self.pg8.remote_mac,
1954                                       self.pg8.remote_ip4n,
1955                                       is_static=1)
1956
1957         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1958                                    dst_address_length=32,
1959                                    next_hop_address=self.pg7.remote_ip4n,
1960                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1961         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1962                                    dst_address_length=32,
1963                                    next_hop_address=self.pg8.remote_ip4n,
1964                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1965
1966         self.nat44_add_address(self.nat_addr)
1967         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1968                                       self.tcp_port_in, self.tcp_port_out,
1969                                       proto=IP_PROTOS.tcp)
1970         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1971                                       self.udp_port_in, self.udp_port_out,
1972                                       proto=IP_PROTOS.udp)
1973         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1974                                       self.icmp_id_in, self.icmp_id_out,
1975                                       proto=IP_PROTOS.icmp)
1976         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1977         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1978                                                   is_inside=0)
1979
1980         # out2in
1981         pkts = self.create_stream_out(self.pg8)
1982         self.pg8.add_stream(pkts)
1983         self.pg_enable_capture(self.pg_interfaces)
1984         self.pg_start()
1985         capture = self.pg7.get_capture(len(pkts))
1986         self.verify_capture_in(capture, self.pg7)
1987
1988         # in2out
1989         pkts = self.create_stream_in(self.pg7, self.pg8)
1990         self.pg7.add_stream(pkts)
1991         self.pg_enable_capture(self.pg_interfaces)
1992         self.pg_start()
1993         capture = self.pg8.get_capture(len(pkts))
1994         self.verify_capture_out(capture)
1995
1996     def test_static_unknown_proto(self):
1997         """ 1:1 NAT translate packet with unknown protocol """
1998         nat_ip = "10.0.0.10"
1999         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2000         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2001         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2002                                                   is_inside=0)
2003
2004         # in2out
2005         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2006              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2007              GRE() /
2008              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2009              TCP(sport=1234, dport=1234))
2010         self.pg0.add_stream(p)
2011         self.pg_enable_capture(self.pg_interfaces)
2012         self.pg_start()
2013         p = self.pg1.get_capture(1)
2014         packet = p[0]
2015         try:
2016             self.assertEqual(packet[IP].src, nat_ip)
2017             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2018             self.assertTrue(packet.haslayer(GRE))
2019             self.check_ip_checksum(packet)
2020         except:
2021             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2022             raise
2023
2024         # out2in
2025         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2026              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2027              GRE() /
2028              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2029              TCP(sport=1234, dport=1234))
2030         self.pg1.add_stream(p)
2031         self.pg_enable_capture(self.pg_interfaces)
2032         self.pg_start()
2033         p = self.pg0.get_capture(1)
2034         packet = p[0]
2035         try:
2036             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2037             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2038             self.assertTrue(packet.haslayer(GRE))
2039             self.check_ip_checksum(packet)
2040         except:
2041             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2042             raise
2043
2044     def test_hairpinning_static_unknown_proto(self):
2045         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2046
2047         host = self.pg0.remote_hosts[0]
2048         server = self.pg0.remote_hosts[1]
2049
2050         host_nat_ip = "10.0.0.10"
2051         server_nat_ip = "10.0.0.11"
2052
2053         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2054         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2055         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2056         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2057                                                   is_inside=0)
2058
2059         # host to server
2060         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2061              IP(src=host.ip4, dst=server_nat_ip) /
2062              GRE() /
2063              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2064              TCP(sport=1234, dport=1234))
2065         self.pg0.add_stream(p)
2066         self.pg_enable_capture(self.pg_interfaces)
2067         self.pg_start()
2068         p = self.pg0.get_capture(1)
2069         packet = p[0]
2070         try:
2071             self.assertEqual(packet[IP].src, host_nat_ip)
2072             self.assertEqual(packet[IP].dst, server.ip4)
2073             self.assertTrue(packet.haslayer(GRE))
2074             self.check_ip_checksum(packet)
2075         except:
2076             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2077             raise
2078
2079         # server to host
2080         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2081              IP(src=server.ip4, dst=host_nat_ip) /
2082              GRE() /
2083              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2084              TCP(sport=1234, dport=1234))
2085         self.pg0.add_stream(p)
2086         self.pg_enable_capture(self.pg_interfaces)
2087         self.pg_start()
2088         p = self.pg0.get_capture(1)
2089         packet = p[0]
2090         try:
2091             self.assertEqual(packet[IP].src, server_nat_ip)
2092             self.assertEqual(packet[IP].dst, host.ip4)
2093             self.assertTrue(packet.haslayer(GRE))
2094             self.check_ip_checksum(packet)
2095         except:
2096             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2097             raise
2098
2099     def test_unknown_proto(self):
2100         """ NAT44 translate packet with unknown protocol """
2101         self.nat44_add_address(self.nat_addr)
2102         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2103         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2104                                                   is_inside=0)
2105
2106         # in2out
2107         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2108              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2109              TCP(sport=self.tcp_port_in, dport=20))
2110         self.pg0.add_stream(p)
2111         self.pg_enable_capture(self.pg_interfaces)
2112         self.pg_start()
2113         p = self.pg1.get_capture(1)
2114
2115         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2116              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2117              GRE() /
2118              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2119              TCP(sport=1234, dport=1234))
2120         self.pg0.add_stream(p)
2121         self.pg_enable_capture(self.pg_interfaces)
2122         self.pg_start()
2123         p = self.pg1.get_capture(1)
2124         packet = p[0]
2125         try:
2126             self.assertEqual(packet[IP].src, self.nat_addr)
2127             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2128             self.assertTrue(packet.haslayer(GRE))
2129             self.check_ip_checksum(packet)
2130         except:
2131             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2132             raise
2133
2134         # out2in
2135         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2136              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2137              GRE() /
2138              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2139              TCP(sport=1234, dport=1234))
2140         self.pg1.add_stream(p)
2141         self.pg_enable_capture(self.pg_interfaces)
2142         self.pg_start()
2143         p = self.pg0.get_capture(1)
2144         packet = p[0]
2145         try:
2146             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2147             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2148             self.assertTrue(packet.haslayer(GRE))
2149             self.check_ip_checksum(packet)
2150         except:
2151             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2152             raise
2153
2154     def test_hairpinning_unknown_proto(self):
2155         """ NAT44 translate packet with unknown protocol - hairpinning """
2156         host = self.pg0.remote_hosts[0]
2157         server = self.pg0.remote_hosts[1]
2158         host_in_port = 1234
2159         host_out_port = 0
2160         server_in_port = 5678
2161         server_out_port = 8765
2162         server_nat_ip = "10.0.0.11"
2163
2164         self.nat44_add_address(self.nat_addr)
2165         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2166         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2167                                                   is_inside=0)
2168
2169         # add static mapping for server
2170         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2171
2172         # host to server
2173         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2174              IP(src=host.ip4, dst=server_nat_ip) /
2175              TCP(sport=host_in_port, dport=server_out_port))
2176         self.pg0.add_stream(p)
2177         self.pg_enable_capture(self.pg_interfaces)
2178         self.pg_start()
2179         capture = self.pg0.get_capture(1)
2180
2181         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2182              IP(src=host.ip4, dst=server_nat_ip) /
2183              GRE() /
2184              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2185              TCP(sport=1234, dport=1234))
2186         self.pg0.add_stream(p)
2187         self.pg_enable_capture(self.pg_interfaces)
2188         self.pg_start()
2189         p = self.pg0.get_capture(1)
2190         packet = p[0]
2191         try:
2192             self.assertEqual(packet[IP].src, self.nat_addr)
2193             self.assertEqual(packet[IP].dst, server.ip4)
2194             self.assertTrue(packet.haslayer(GRE))
2195             self.check_ip_checksum(packet)
2196         except:
2197             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2198             raise
2199
2200         # server to host
2201         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2202              IP(src=server.ip4, dst=self.nat_addr) /
2203              GRE() /
2204              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2205              TCP(sport=1234, dport=1234))
2206         self.pg0.add_stream(p)
2207         self.pg_enable_capture(self.pg_interfaces)
2208         self.pg_start()
2209         p = self.pg0.get_capture(1)
2210         packet = p[0]
2211         try:
2212             self.assertEqual(packet[IP].src, server_nat_ip)
2213             self.assertEqual(packet[IP].dst, host.ip4)
2214             self.assertTrue(packet.haslayer(GRE))
2215             self.check_ip_checksum(packet)
2216         except:
2217             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2218             raise
2219
2220     def test_output_feature(self):
2221         """ NAT44 interface output feature (in2out postrouting) """
2222         self.nat44_add_address(self.nat_addr)
2223         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2224         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2225         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2226                                                          is_inside=0)
2227
2228         # in2out
2229         pkts = self.create_stream_in(self.pg0, self.pg3)
2230         self.pg0.add_stream(pkts)
2231         self.pg_enable_capture(self.pg_interfaces)
2232         self.pg_start()
2233         capture = self.pg3.get_capture(len(pkts))
2234         self.verify_capture_out(capture)
2235
2236         # out2in
2237         pkts = self.create_stream_out(self.pg3)
2238         self.pg3.add_stream(pkts)
2239         self.pg_enable_capture(self.pg_interfaces)
2240         self.pg_start()
2241         capture = self.pg0.get_capture(len(pkts))
2242         self.verify_capture_in(capture, self.pg0)
2243
2244         # from non-NAT interface to NAT inside interface
2245         pkts = self.create_stream_in(self.pg2, self.pg0)
2246         self.pg2.add_stream(pkts)
2247         self.pg_enable_capture(self.pg_interfaces)
2248         self.pg_start()
2249         capture = self.pg0.get_capture(len(pkts))
2250         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2251
2252     def test_output_feature_vrf_aware(self):
2253         """ NAT44 interface output feature VRF aware (in2out postrouting) """
2254         nat_ip_vrf10 = "10.0.0.10"
2255         nat_ip_vrf20 = "10.0.0.20"
2256
2257         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2258                                    dst_address_length=32,
2259                                    next_hop_address=self.pg3.remote_ip4n,
2260                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2261                                    table_id=10)
2262         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2263                                    dst_address_length=32,
2264                                    next_hop_address=self.pg3.remote_ip4n,
2265                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2266                                    table_id=20)
2267
2268         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2269         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2270         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2271         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2272         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2273                                                          is_inside=0)
2274
2275         # in2out VRF 10
2276         pkts = self.create_stream_in(self.pg4, self.pg3)
2277         self.pg4.add_stream(pkts)
2278         self.pg_enable_capture(self.pg_interfaces)
2279         self.pg_start()
2280         capture = self.pg3.get_capture(len(pkts))
2281         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2282
2283         # out2in VRF 10
2284         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2285         self.pg3.add_stream(pkts)
2286         self.pg_enable_capture(self.pg_interfaces)
2287         self.pg_start()
2288         capture = self.pg4.get_capture(len(pkts))
2289         self.verify_capture_in(capture, self.pg4)
2290
2291         # in2out VRF 20
2292         pkts = self.create_stream_in(self.pg6, self.pg3)
2293         self.pg6.add_stream(pkts)
2294         self.pg_enable_capture(self.pg_interfaces)
2295         self.pg_start()
2296         capture = self.pg3.get_capture(len(pkts))
2297         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2298
2299         # out2in VRF 20
2300         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2301         self.pg3.add_stream(pkts)
2302         self.pg_enable_capture(self.pg_interfaces)
2303         self.pg_start()
2304         capture = self.pg6.get_capture(len(pkts))
2305         self.verify_capture_in(capture, self.pg6)
2306
2307     def test_output_feature_hairpinning(self):
2308         """ NAT44 interface output feature hairpinning (in2out postrouting) """
2309         host = self.pg0.remote_hosts[0]
2310         server = self.pg0.remote_hosts[1]
2311         host_in_port = 1234
2312         host_out_port = 0
2313         server_in_port = 5678
2314         server_out_port = 8765
2315
2316         self.nat44_add_address(self.nat_addr)
2317         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2318         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2319                                                          is_inside=0)
2320
2321         # add static mapping for server
2322         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2323                                       server_in_port, server_out_port,
2324                                       proto=IP_PROTOS.tcp)
2325
2326         # send packet from host to server
2327         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2328              IP(src=host.ip4, dst=self.nat_addr) /
2329              TCP(sport=host_in_port, dport=server_out_port))
2330         self.pg0.add_stream(p)
2331         self.pg_enable_capture(self.pg_interfaces)
2332         self.pg_start()
2333         capture = self.pg0.get_capture(1)
2334         p = capture[0]
2335         try:
2336             ip = p[IP]
2337             tcp = p[TCP]
2338             self.assertEqual(ip.src, self.nat_addr)
2339             self.assertEqual(ip.dst, server.ip4)
2340             self.assertNotEqual(tcp.sport, host_in_port)
2341             self.assertEqual(tcp.dport, server_in_port)
2342             self.check_tcp_checksum(p)
2343             host_out_port = tcp.sport
2344         except:
2345             self.logger.error(ppp("Unexpected or invalid packet:", p))
2346             raise
2347
2348         # send reply from server to host
2349         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2350              IP(src=server.ip4, dst=self.nat_addr) /
2351              TCP(sport=server_in_port, dport=host_out_port))
2352         self.pg0.add_stream(p)
2353         self.pg_enable_capture(self.pg_interfaces)
2354         self.pg_start()
2355         capture = self.pg0.get_capture(1)
2356         p = capture[0]
2357         try:
2358             ip = p[IP]
2359             tcp = p[TCP]
2360             self.assertEqual(ip.src, self.nat_addr)
2361             self.assertEqual(ip.dst, host.ip4)
2362             self.assertEqual(tcp.sport, server_out_port)
2363             self.assertEqual(tcp.dport, host_in_port)
2364             self.check_tcp_checksum(p)
2365         except:
2366             self.logger.error(ppp("Unexpected or invalid packet:"), p)
2367             raise
2368
2369     def tearDown(self):
2370         super(TestNAT44, self).tearDown()
2371         if not self.vpp_dead:
2372             self.logger.info(self.vapi.cli("show nat44 verbose"))
2373             self.clear_nat44()
2374
2375
2376 class TestDeterministicNAT(MethodHolder):
2377     """ Deterministic NAT Test Cases """
2378
2379     @classmethod
2380     def setUpConstants(cls):
2381         super(TestDeterministicNAT, cls).setUpConstants()
2382         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2383
2384     @classmethod
2385     def setUpClass(cls):
2386         super(TestDeterministicNAT, cls).setUpClass()
2387
2388         try:
2389             cls.tcp_port_in = 6303
2390             cls.tcp_external_port = 6303
2391             cls.udp_port_in = 6304
2392             cls.udp_external_port = 6304
2393             cls.icmp_id_in = 6305
2394             cls.nat_addr = '10.0.0.3'
2395
2396             cls.create_pg_interfaces(range(3))
2397             cls.interfaces = list(cls.pg_interfaces)
2398
2399             for i in cls.interfaces:
2400                 i.admin_up()
2401                 i.config_ip4()
2402                 i.resolve_arp()
2403
2404             cls.pg0.generate_remote_hosts(2)
2405             cls.pg0.configure_ipv4_neighbors()
2406
2407         except Exception:
2408             super(TestDeterministicNAT, cls).tearDownClass()
2409             raise
2410
2411     def create_stream_in(self, in_if, out_if, ttl=64):
2412         """
2413         Create packet stream for inside network
2414
2415         :param in_if: Inside interface
2416         :param out_if: Outside interface
2417         :param ttl: TTL of generated packets
2418         """
2419         pkts = []
2420         # TCP
2421         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2422              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2423              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2424         pkts.append(p)
2425
2426         # UDP
2427         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2428              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2429              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2430         pkts.append(p)
2431
2432         # ICMP
2433         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2434              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2435              ICMP(id=self.icmp_id_in, type='echo-request'))
2436         pkts.append(p)
2437
2438         return pkts
2439
2440     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2441         """
2442         Create packet stream for outside network
2443
2444         :param out_if: Outside interface
2445         :param dst_ip: Destination IP address (Default use global NAT address)
2446         :param ttl: TTL of generated packets
2447         """
2448         if dst_ip is None:
2449             dst_ip = self.nat_addr
2450         pkts = []
2451         # TCP
2452         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2453              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2454              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2455         pkts.append(p)
2456
2457         # UDP
2458         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2459              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2460              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2461         pkts.append(p)
2462
2463         # ICMP
2464         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2465              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2466              ICMP(id=self.icmp_external_id, type='echo-reply'))
2467         pkts.append(p)
2468
2469         return pkts
2470
2471     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2472         """
2473         Verify captured packets on outside network
2474
2475         :param capture: Captured packets
2476         :param nat_ip: Translated IP address (Default use global NAT address)
2477         :param same_port: Sorce port number is not translated (Default False)
2478         :param packet_num: Expected number of packets (Default 3)
2479         """
2480         if nat_ip is None:
2481             nat_ip = self.nat_addr
2482         self.assertEqual(packet_num, len(capture))
2483         for packet in capture:
2484             try:
2485                 self.assertEqual(packet[IP].src, nat_ip)
2486                 if packet.haslayer(TCP):
2487                     self.tcp_port_out = packet[TCP].sport
2488                 elif packet.haslayer(UDP):
2489                     self.udp_port_out = packet[UDP].sport
2490                 else:
2491                     self.icmp_external_id = packet[ICMP].id
2492             except:
2493                 self.logger.error(ppp("Unexpected or invalid packet "
2494                                       "(outside network):", packet))
2495                 raise
2496
2497     def initiate_tcp_session(self, in_if, out_if):
2498         """
2499         Initiates TCP session
2500
2501         :param in_if: Inside interface
2502         :param out_if: Outside interface
2503         """
2504         try:
2505             # SYN packet in->out
2506             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2507                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2508                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2509                      flags="S"))
2510             in_if.add_stream(p)
2511             self.pg_enable_capture(self.pg_interfaces)
2512             self.pg_start()
2513             capture = out_if.get_capture(1)
2514             p = capture[0]
2515             self.tcp_port_out = p[TCP].sport
2516
2517             # SYN + ACK packet out->in
2518             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2519                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
2520                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2521                      flags="SA"))
2522             out_if.add_stream(p)
2523             self.pg_enable_capture(self.pg_interfaces)
2524             self.pg_start()
2525             in_if.get_capture(1)
2526
2527             # ACK packet in->out
2528             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2529                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2530                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2531                      flags="A"))
2532             in_if.add_stream(p)
2533             self.pg_enable_capture(self.pg_interfaces)
2534             self.pg_start()
2535             out_if.get_capture(1)
2536
2537         except:
2538             self.logger.error("TCP 3 way handshake failed")
2539             raise
2540
2541     def verify_ipfix_max_entries_per_user(self, data):
2542         """
2543         Verify IPFIX maximum entries per user exceeded event
2544
2545         :param data: Decoded IPFIX data records
2546         """
2547         self.assertEqual(1, len(data))
2548         record = data[0]
2549         # natEvent
2550         self.assertEqual(ord(record[230]), 13)
2551         # natQuotaExceededEvent
2552         self.assertEqual('\x03\x00\x00\x00', record[466])
2553         # sourceIPv4Address
2554         self.assertEqual(self.pg0.remote_ip4n, record[8])
2555
2556     def test_deterministic_mode(self):
2557         """ NAT plugin run deterministic mode """
2558         in_addr = '172.16.255.0'
2559         out_addr = '172.17.255.50'
2560         in_addr_t = '172.16.255.20'
2561         in_addr_n = socket.inet_aton(in_addr)
2562         out_addr_n = socket.inet_aton(out_addr)
2563         in_addr_t_n = socket.inet_aton(in_addr_t)
2564         in_plen = 24
2565         out_plen = 32
2566
2567         nat_config = self.vapi.nat_show_config()
2568         self.assertEqual(1, nat_config.deterministic)
2569
2570         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
2571
2572         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
2573         self.assertEqual(rep1.out_addr[:4], out_addr_n)
2574         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
2575         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2576
2577         deterministic_mappings = self.vapi.nat_det_map_dump()
2578         self.assertEqual(len(deterministic_mappings), 1)
2579         dsm = deterministic_mappings[0]
2580         self.assertEqual(in_addr_n, dsm.in_addr[:4])
2581         self.assertEqual(in_plen, dsm.in_plen)
2582         self.assertEqual(out_addr_n, dsm.out_addr[:4])
2583         self.assertEqual(out_plen, dsm.out_plen)
2584
2585         self.clear_nat_det()
2586         deterministic_mappings = self.vapi.nat_det_map_dump()
2587         self.assertEqual(len(deterministic_mappings), 0)
2588
2589     def test_set_timeouts(self):
2590         """ Set deterministic NAT timeouts """
2591         timeouts_before = self.vapi.nat_det_get_timeouts()
2592
2593         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
2594                                        timeouts_before.tcp_established + 10,
2595                                        timeouts_before.tcp_transitory + 10,
2596                                        timeouts_before.icmp + 10)
2597
2598         timeouts_after = self.vapi.nat_det_get_timeouts()
2599
2600         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2601         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2602         self.assertNotEqual(timeouts_before.tcp_established,
2603                             timeouts_after.tcp_established)
2604         self.assertNotEqual(timeouts_before.tcp_transitory,
2605                             timeouts_after.tcp_transitory)
2606
2607     def test_det_in(self):
2608         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
2609
2610         nat_ip = "10.0.0.10"
2611
2612         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2613                                       32,
2614                                       socket.inet_aton(nat_ip),
2615                                       32)
2616         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2617         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2618                                                   is_inside=0)
2619
2620         # in2out
2621         pkts = self.create_stream_in(self.pg0, self.pg1)
2622         self.pg0.add_stream(pkts)
2623         self.pg_enable_capture(self.pg_interfaces)
2624         self.pg_start()
2625         capture = self.pg1.get_capture(len(pkts))
2626         self.verify_capture_out(capture, nat_ip)
2627
2628         # out2in
2629         pkts = self.create_stream_out(self.pg1, nat_ip)
2630         self.pg1.add_stream(pkts)
2631         self.pg_enable_capture(self.pg_interfaces)
2632         self.pg_start()
2633         capture = self.pg0.get_capture(len(pkts))
2634         self.verify_capture_in(capture, self.pg0)
2635
2636         # session dump test
2637         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
2638         self.assertEqual(len(sessions), 3)
2639
2640         # TCP session
2641         s = sessions[0]
2642         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2643         self.assertEqual(s.in_port, self.tcp_port_in)
2644         self.assertEqual(s.out_port, self.tcp_port_out)
2645         self.assertEqual(s.ext_port, self.tcp_external_port)
2646
2647         # UDP session
2648         s = sessions[1]
2649         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2650         self.assertEqual(s.in_port, self.udp_port_in)
2651         self.assertEqual(s.out_port, self.udp_port_out)
2652         self.assertEqual(s.ext_port, self.udp_external_port)
2653
2654         # ICMP session
2655         s = sessions[2]
2656         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2657         self.assertEqual(s.in_port, self.icmp_id_in)
2658         self.assertEqual(s.out_port, self.icmp_external_id)
2659
2660     def test_multiple_users(self):
2661         """ Deterministic NAT multiple users """
2662
2663         nat_ip = "10.0.0.10"
2664         port_in = 80
2665         external_port = 6303
2666
2667         host0 = self.pg0.remote_hosts[0]
2668         host1 = self.pg0.remote_hosts[1]
2669
2670         self.vapi.nat_det_add_del_map(host0.ip4n,
2671                                       24,
2672                                       socket.inet_aton(nat_ip),
2673                                       32)
2674         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2675         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2676                                                   is_inside=0)
2677
2678         # host0 to out
2679         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2680              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2681              TCP(sport=port_in, dport=external_port))
2682         self.pg0.add_stream(p)
2683         self.pg_enable_capture(self.pg_interfaces)
2684         self.pg_start()
2685         capture = self.pg1.get_capture(1)
2686         p = capture[0]
2687         try:
2688             ip = p[IP]
2689             tcp = p[TCP]
2690             self.assertEqual(ip.src, nat_ip)
2691             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2692             self.assertEqual(tcp.dport, external_port)
2693             port_out0 = tcp.sport
2694         except:
2695             self.logger.error(ppp("Unexpected or invalid packet:", p))
2696             raise
2697
2698         # host1 to out
2699         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2700              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2701              TCP(sport=port_in, dport=external_port))
2702         self.pg0.add_stream(p)
2703         self.pg_enable_capture(self.pg_interfaces)
2704         self.pg_start()
2705         capture = self.pg1.get_capture(1)
2706         p = capture[0]
2707         try:
2708             ip = p[IP]
2709             tcp = p[TCP]
2710             self.assertEqual(ip.src, nat_ip)
2711             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2712             self.assertEqual(tcp.dport, external_port)
2713             port_out1 = tcp.sport
2714         except:
2715             self.logger.error(ppp("Unexpected or invalid packet:", p))
2716             raise
2717
2718         dms = self.vapi.nat_det_map_dump()
2719         self.assertEqual(1, len(dms))
2720         self.assertEqual(2, dms[0].ses_num)
2721
2722         # out to host0
2723         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2724              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2725              TCP(sport=external_port, dport=port_out0))
2726         self.pg1.add_stream(p)
2727         self.pg_enable_capture(self.pg_interfaces)
2728         self.pg_start()
2729         capture = self.pg0.get_capture(1)
2730         p = capture[0]
2731         try:
2732             ip = p[IP]
2733             tcp = p[TCP]
2734             self.assertEqual(ip.src, self.pg1.remote_ip4)
2735             self.assertEqual(ip.dst, host0.ip4)
2736             self.assertEqual(tcp.dport, port_in)
2737             self.assertEqual(tcp.sport, external_port)
2738         except:
2739             self.logger.error(ppp("Unexpected or invalid packet:", p))
2740             raise
2741
2742         # out to host1
2743         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2744              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2745              TCP(sport=external_port, dport=port_out1))
2746         self.pg1.add_stream(p)
2747         self.pg_enable_capture(self.pg_interfaces)
2748         self.pg_start()
2749         capture = self.pg0.get_capture(1)
2750         p = capture[0]
2751         try:
2752             ip = p[IP]
2753             tcp = p[TCP]
2754             self.assertEqual(ip.src, self.pg1.remote_ip4)
2755             self.assertEqual(ip.dst, host1.ip4)
2756             self.assertEqual(tcp.dport, port_in)
2757             self.assertEqual(tcp.sport, external_port)
2758         except:
2759             self.logger.error(ppp("Unexpected or invalid packet", p))
2760             raise
2761
2762         # session close api test
2763         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
2764                                             port_out1,
2765                                             self.pg1.remote_ip4n,
2766                                             external_port)
2767         dms = self.vapi.nat_det_map_dump()
2768         self.assertEqual(dms[0].ses_num, 1)
2769
2770         self.vapi.nat_det_close_session_in(host0.ip4n,
2771                                            port_in,
2772                                            self.pg1.remote_ip4n,
2773                                            external_port)
2774         dms = self.vapi.nat_det_map_dump()
2775         self.assertEqual(dms[0].ses_num, 0)
2776
2777     def test_tcp_session_close_detection_in(self):
2778         """ Deterministic NAT TCP session close from inside network """
2779         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2780                                       32,
2781                                       socket.inet_aton(self.nat_addr),
2782                                       32)
2783         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2784         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2785                                                   is_inside=0)
2786
2787         self.initiate_tcp_session(self.pg0, self.pg1)
2788
2789         # close the session from inside
2790         try:
2791             # FIN packet in -> out
2792             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2793                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2794                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2795                      flags="F"))
2796             self.pg0.add_stream(p)
2797             self.pg_enable_capture(self.pg_interfaces)
2798             self.pg_start()
2799             self.pg1.get_capture(1)
2800
2801             pkts = []
2802
2803             # ACK packet out -> in
2804             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2805                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2806                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2807                      flags="A"))
2808             pkts.append(p)
2809
2810             # FIN packet out -> in
2811             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2812                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2813                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2814                      flags="F"))
2815             pkts.append(p)
2816
2817             self.pg1.add_stream(pkts)
2818             self.pg_enable_capture(self.pg_interfaces)
2819             self.pg_start()
2820             self.pg0.get_capture(2)
2821
2822             # ACK packet in -> out
2823             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2824                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2825                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2826                      flags="A"))
2827             self.pg0.add_stream(p)
2828             self.pg_enable_capture(self.pg_interfaces)
2829             self.pg_start()
2830             self.pg1.get_capture(1)
2831
2832             # Check if deterministic NAT44 closed the session
2833             dms = self.vapi.nat_det_map_dump()
2834             self.assertEqual(0, dms[0].ses_num)
2835         except:
2836             self.logger.error("TCP session termination failed")
2837             raise
2838
2839     def test_tcp_session_close_detection_out(self):
2840         """ Deterministic NAT TCP session close from outside network """
2841         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2842                                       32,
2843                                       socket.inet_aton(self.nat_addr),
2844                                       32)
2845         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2846         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2847                                                   is_inside=0)
2848
2849         self.initiate_tcp_session(self.pg0, self.pg1)
2850
2851         # close the session from outside
2852         try:
2853             # FIN packet out -> in
2854             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2855                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2856                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2857                      flags="F"))
2858             self.pg1.add_stream(p)
2859             self.pg_enable_capture(self.pg_interfaces)
2860             self.pg_start()
2861             self.pg0.get_capture(1)
2862
2863             pkts = []
2864
2865             # ACK packet in -> out
2866             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2867                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2868                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2869                      flags="A"))
2870             pkts.append(p)
2871
2872             # ACK packet in -> out
2873             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2874                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2875                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2876                      flags="F"))
2877             pkts.append(p)
2878
2879             self.pg0.add_stream(pkts)
2880             self.pg_enable_capture(self.pg_interfaces)
2881             self.pg_start()
2882             self.pg1.get_capture(2)
2883
2884             # ACK packet out -> in
2885             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2886                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2887                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2888                      flags="A"))
2889             self.pg1.add_stream(p)
2890             self.pg_enable_capture(self.pg_interfaces)
2891             self.pg_start()
2892             self.pg0.get_capture(1)
2893
2894             # Check if deterministic NAT44 closed the session
2895             dms = self.vapi.nat_det_map_dump()
2896             self.assertEqual(0, dms[0].ses_num)
2897         except:
2898             self.logger.error("TCP session termination failed")
2899             raise
2900
2901     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2902     def test_session_timeout(self):
2903         """ Deterministic NAT session timeouts """
2904         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2905                                       32,
2906                                       socket.inet_aton(self.nat_addr),
2907                                       32)
2908         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2909         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2910                                                   is_inside=0)
2911
2912         self.initiate_tcp_session(self.pg0, self.pg1)
2913         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
2914         pkts = self.create_stream_in(self.pg0, self.pg1)
2915         self.pg0.add_stream(pkts)
2916         self.pg_enable_capture(self.pg_interfaces)
2917         self.pg_start()
2918         capture = self.pg1.get_capture(len(pkts))
2919         sleep(15)
2920
2921         dms = self.vapi.nat_det_map_dump()
2922         self.assertEqual(0, dms[0].ses_num)
2923
2924     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2925     def test_session_limit_per_user(self):
2926         """ Deterministic NAT maximum sessions per user limit """
2927         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2928                                       32,
2929                                       socket.inet_aton(self.nat_addr),
2930                                       32)
2931         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2932         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2933                                                   is_inside=0)
2934         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2935                                      src_address=self.pg2.local_ip4n,
2936                                      path_mtu=512,
2937                                      template_interval=10)
2938         self.vapi.nat_ipfix()
2939
2940         pkts = []
2941         for port in range(1025, 2025):
2942             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2943                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2944                  UDP(sport=port, dport=port))
2945             pkts.append(p)
2946
2947         self.pg0.add_stream(pkts)
2948         self.pg_enable_capture(self.pg_interfaces)
2949         self.pg_start()
2950         capture = self.pg1.get_capture(len(pkts))
2951
2952         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2953              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2954              UDP(sport=3001, dport=3002))
2955         self.pg0.add_stream(p)
2956         self.pg_enable_capture(self.pg_interfaces)
2957         self.pg_start()
2958         capture = self.pg1.assert_nothing_captured()
2959
2960         # verify ICMP error packet
2961         capture = self.pg0.get_capture(1)
2962         p = capture[0]
2963         self.assertTrue(p.haslayer(ICMP))
2964         icmp = p[ICMP]
2965         self.assertEqual(icmp.type, 3)
2966         self.assertEqual(icmp.code, 1)
2967         self.assertTrue(icmp.haslayer(IPerror))
2968         inner_ip = icmp[IPerror]
2969         self.assertEqual(inner_ip[UDPerror].sport, 3001)
2970         self.assertEqual(inner_ip[UDPerror].dport, 3002)
2971
2972         dms = self.vapi.nat_det_map_dump()
2973
2974         self.assertEqual(1000, dms[0].ses_num)
2975
2976         # verify IPFIX logging
2977         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2978         sleep(1)
2979         capture = self.pg2.get_capture(2)
2980         ipfix = IPFIXDecoder()
2981         # first load template
2982         for p in capture:
2983             self.assertTrue(p.haslayer(IPFIX))
2984             if p.haslayer(Template):
2985                 ipfix.add_template(p.getlayer(Template))
2986         # verify events in data set
2987         for p in capture:
2988             if p.haslayer(Data):
2989                 data = ipfix.decode_data_set(p.getlayer(Set))
2990                 self.verify_ipfix_max_entries_per_user(data)
2991
2992     def clear_nat_det(self):
2993         """
2994         Clear deterministic NAT configuration.
2995         """
2996         self.vapi.nat_ipfix(enable=0)
2997         self.vapi.nat_det_set_timeouts()
2998         deterministic_mappings = self.vapi.nat_det_map_dump()
2999         for dsm in deterministic_mappings:
3000             self.vapi.nat_det_add_del_map(dsm.in_addr,
3001                                           dsm.in_plen,
3002                                           dsm.out_addr,
3003                                           dsm.out_plen,
3004                                           is_add=0)
3005
3006         interfaces = self.vapi.nat44_interface_dump()
3007         for intf in interfaces:
3008             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3009                                                       intf.is_inside,
3010                                                       is_add=0)
3011
3012     def tearDown(self):
3013         super(TestDeterministicNAT, self).tearDown()
3014         if not self.vpp_dead:
3015             self.logger.info(self.vapi.cli("show nat44 detail"))
3016             self.clear_nat_det()
3017
3018
3019 class TestNAT64(MethodHolder):
3020     """ NAT64 Test Cases """
3021
3022     @classmethod
3023     def setUpClass(cls):
3024         super(TestNAT64, cls).setUpClass()
3025
3026         try:
3027             cls.tcp_port_in = 6303
3028             cls.tcp_port_out = 6303
3029             cls.udp_port_in = 6304
3030             cls.udp_port_out = 6304
3031             cls.icmp_id_in = 6305
3032             cls.icmp_id_out = 6305
3033             cls.nat_addr = '10.0.0.3'
3034             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3035             cls.vrf1_id = 10
3036             cls.vrf1_nat_addr = '10.0.10.3'
3037             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3038                                                    cls.vrf1_nat_addr)
3039
3040             cls.create_pg_interfaces(range(3))
3041             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3042             cls.ip6_interfaces.append(cls.pg_interfaces[2])
3043             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3044
3045             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3046
3047             cls.pg0.generate_remote_hosts(2)
3048
3049             for i in cls.ip6_interfaces:
3050                 i.admin_up()
3051                 i.config_ip6()
3052                 i.configure_ipv6_neighbors()
3053
3054             for i in cls.ip4_interfaces:
3055                 i.admin_up()
3056                 i.config_ip4()
3057                 i.resolve_arp()
3058
3059         except Exception:
3060             super(TestNAT64, cls).tearDownClass()
3061             raise
3062
3063     def test_pool(self):
3064         """ Add/delete address to NAT64 pool """
3065         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3066
3067         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3068
3069         addresses = self.vapi.nat64_pool_addr_dump()
3070         self.assertEqual(len(addresses), 1)
3071         self.assertEqual(addresses[0].address, nat_addr)
3072
3073         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3074
3075         addresses = self.vapi.nat64_pool_addr_dump()
3076         self.assertEqual(len(addresses), 0)
3077
3078     def test_interface(self):
3079         """ Enable/disable NAT64 feature on the interface """
3080         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3081         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3082
3083         interfaces = self.vapi.nat64_interface_dump()
3084         self.assertEqual(len(interfaces), 2)
3085         pg0_found = False
3086         pg1_found = False
3087         for intf in interfaces:
3088             if intf.sw_if_index == self.pg0.sw_if_index:
3089                 self.assertEqual(intf.is_inside, 1)
3090                 pg0_found = True
3091             elif intf.sw_if_index == self.pg1.sw_if_index:
3092                 self.assertEqual(intf.is_inside, 0)
3093                 pg1_found = True
3094         self.assertTrue(pg0_found)
3095         self.assertTrue(pg1_found)
3096
3097         features = self.vapi.cli("show interface features pg0")
3098         self.assertNotEqual(features.find('nat64-in2out'), -1)
3099         features = self.vapi.cli("show interface features pg1")
3100         self.assertNotEqual(features.find('nat64-out2in'), -1)
3101
3102         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3103         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3104
3105         interfaces = self.vapi.nat64_interface_dump()
3106         self.assertEqual(len(interfaces), 0)
3107
3108     def test_static_bib(self):
3109         """ Add/delete static BIB entry """
3110         in_addr = socket.inet_pton(socket.AF_INET6,
3111                                    '2001:db8:85a3::8a2e:370:7334')
3112         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3113         in_port = 1234
3114         out_port = 5678
3115         proto = IP_PROTOS.tcp
3116
3117         self.vapi.nat64_add_del_static_bib(in_addr,
3118                                            out_addr,
3119                                            in_port,
3120                                            out_port,
3121                                            proto)
3122         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3123         static_bib_num = 0
3124         for bibe in bib:
3125             if bibe.is_static:
3126                 static_bib_num += 1
3127                 self.assertEqual(bibe.i_addr, in_addr)
3128                 self.assertEqual(bibe.o_addr, out_addr)
3129                 self.assertEqual(bibe.i_port, in_port)
3130                 self.assertEqual(bibe.o_port, out_port)
3131         self.assertEqual(static_bib_num, 1)
3132
3133         self.vapi.nat64_add_del_static_bib(in_addr,
3134                                            out_addr,
3135                                            in_port,
3136                                            out_port,
3137                                            proto,
3138                                            is_add=0)
3139         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3140         static_bib_num = 0
3141         for bibe in bib:
3142             if bibe.is_static:
3143                 static_bib_num += 1
3144         self.assertEqual(static_bib_num, 0)
3145
3146     def test_set_timeouts(self):
3147         """ Set NAT64 timeouts """
3148         # verify default values
3149         timeouts = self.vapi.nat64_get_timeouts()
3150         self.assertEqual(timeouts.udp, 300)
3151         self.assertEqual(timeouts.icmp, 60)
3152         self.assertEqual(timeouts.tcp_trans, 240)
3153         self.assertEqual(timeouts.tcp_est, 7440)
3154         self.assertEqual(timeouts.tcp_incoming_syn, 6)
3155
3156         # set and verify custom values
3157         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3158                                      tcp_est=7450, tcp_incoming_syn=10)
3159         timeouts = self.vapi.nat64_get_timeouts()
3160         self.assertEqual(timeouts.udp, 200)
3161         self.assertEqual(timeouts.icmp, 30)
3162         self.assertEqual(timeouts.tcp_trans, 250)
3163         self.assertEqual(timeouts.tcp_est, 7450)
3164         self.assertEqual(timeouts.tcp_incoming_syn, 10)
3165
3166     def test_dynamic(self):
3167         """ NAT64 dynamic translation test """
3168         self.tcp_port_in = 6303
3169         self.udp_port_in = 6304
3170         self.icmp_id_in = 6305
3171
3172         ses_num_start = self.nat64_get_ses_num()
3173
3174         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3175                                                 self.nat_addr_n)
3176         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3177         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3178
3179         # in2out
3180         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3181         self.pg0.add_stream(pkts)
3182         self.pg_enable_capture(self.pg_interfaces)
3183         self.pg_start()
3184         capture = self.pg1.get_capture(len(pkts))
3185         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3186                                 dst_ip=self.pg1.remote_ip4)
3187
3188         # out2in
3189         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3190         self.pg1.add_stream(pkts)
3191         self.pg_enable_capture(self.pg_interfaces)
3192         self.pg_start()
3193         capture = self.pg0.get_capture(len(pkts))
3194         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3195         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3196
3197         # in2out
3198         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3199         self.pg0.add_stream(pkts)
3200         self.pg_enable_capture(self.pg_interfaces)
3201         self.pg_start()
3202         capture = self.pg1.get_capture(len(pkts))
3203         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3204                                 dst_ip=self.pg1.remote_ip4)
3205
3206         # out2in
3207         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3208         self.pg1.add_stream(pkts)
3209         self.pg_enable_capture(self.pg_interfaces)
3210         self.pg_start()
3211         capture = self.pg0.get_capture(len(pkts))
3212         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3213
3214         ses_num_end = self.nat64_get_ses_num()
3215
3216         self.assertEqual(ses_num_end - ses_num_start, 3)
3217
3218         # tenant with specific VRF
3219         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3220                                                 self.vrf1_nat_addr_n,
3221                                                 vrf_id=self.vrf1_id)
3222         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3223
3224         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3225         self.pg2.add_stream(pkts)
3226         self.pg_enable_capture(self.pg_interfaces)
3227         self.pg_start()
3228         capture = self.pg1.get_capture(len(pkts))
3229         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3230                                 dst_ip=self.pg1.remote_ip4)
3231
3232         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3233         self.pg1.add_stream(pkts)
3234         self.pg_enable_capture(self.pg_interfaces)
3235         self.pg_start()
3236         capture = self.pg2.get_capture(len(pkts))
3237         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3238
3239     def test_static(self):
3240         """ NAT64 static translation test """
3241         self.tcp_port_in = 60303
3242         self.udp_port_in = 60304
3243         self.icmp_id_in = 60305
3244         self.tcp_port_out = 60303
3245         self.udp_port_out = 60304
3246         self.icmp_id_out = 60305
3247
3248         ses_num_start = self.nat64_get_ses_num()
3249
3250         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3251                                                 self.nat_addr_n)
3252         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3253         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3254
3255         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3256                                            self.nat_addr_n,
3257                                            self.tcp_port_in,
3258                                            self.tcp_port_out,
3259                                            IP_PROTOS.tcp)
3260         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3261                                            self.nat_addr_n,
3262                                            self.udp_port_in,
3263                                            self.udp_port_out,
3264                                            IP_PROTOS.udp)
3265         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3266                                            self.nat_addr_n,
3267                                            self.icmp_id_in,
3268                                            self.icmp_id_out,
3269                                            IP_PROTOS.icmp)
3270
3271         # in2out
3272         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3273         self.pg0.add_stream(pkts)
3274         self.pg_enable_capture(self.pg_interfaces)
3275         self.pg_start()
3276         capture = self.pg1.get_capture(len(pkts))
3277         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3278                                 dst_ip=self.pg1.remote_ip4, same_port=True)
3279
3280         # out2in
3281         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3282         self.pg1.add_stream(pkts)
3283         self.pg_enable_capture(self.pg_interfaces)
3284         self.pg_start()
3285         capture = self.pg0.get_capture(len(pkts))
3286         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3287         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3288
3289         ses_num_end = self.nat64_get_ses_num()
3290
3291         self.assertEqual(ses_num_end - ses_num_start, 3)
3292
3293     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3294     def test_session_timeout(self):
3295         """ NAT64 session timeout """
3296         self.icmp_id_in = 1234
3297         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3298                                                 self.nat_addr_n)
3299         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3300         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3301         self.vapi.nat64_set_timeouts(icmp=5)
3302
3303         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3304         self.pg0.add_stream(pkts)
3305         self.pg_enable_capture(self.pg_interfaces)
3306         self.pg_start()
3307         capture = self.pg1.get_capture(len(pkts))
3308
3309         ses_num_before_timeout = self.nat64_get_ses_num()
3310
3311         sleep(15)
3312
3313         # ICMP session after timeout
3314         ses_num_after_timeout = self.nat64_get_ses_num()
3315         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3316
3317     def test_icmp_error(self):
3318         """ NAT64 ICMP Error message translation """
3319         self.tcp_port_in = 6303
3320         self.udp_port_in = 6304
3321         self.icmp_id_in = 6305
3322
3323         ses_num_start = self.nat64_get_ses_num()
3324
3325         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3326                                                 self.nat_addr_n)
3327         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3328         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3329
3330         # send some packets to create sessions
3331         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3332         self.pg0.add_stream(pkts)
3333         self.pg_enable_capture(self.pg_interfaces)
3334         self.pg_start()
3335         capture_ip4 = self.pg1.get_capture(len(pkts))
3336         self.verify_capture_out(capture_ip4,
3337                                 nat_ip=self.nat_addr,
3338                                 dst_ip=self.pg1.remote_ip4)
3339
3340         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3341         self.pg1.add_stream(pkts)
3342         self.pg_enable_capture(self.pg_interfaces)
3343         self.pg_start()
3344         capture_ip6 = self.pg0.get_capture(len(pkts))
3345         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3346         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3347                                    self.pg0.remote_ip6)
3348
3349         # in2out
3350         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3351                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3352                 ICMPv6DestUnreach(code=1) /
3353                 packet[IPv6] for packet in capture_ip6]
3354         self.pg0.add_stream(pkts)
3355         self.pg_enable_capture(self.pg_interfaces)
3356         self.pg_start()
3357         capture = self.pg1.get_capture(len(pkts))
3358         for packet in capture:
3359             try:
3360                 self.assertEqual(packet[IP].src, self.nat_addr)
3361                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3362                 self.assertEqual(packet[ICMP].type, 3)
3363                 self.assertEqual(packet[ICMP].code, 13)
3364                 inner = packet[IPerror]
3365                 self.assertEqual(inner.src, self.pg1.remote_ip4)
3366                 self.assertEqual(inner.dst, self.nat_addr)
3367                 self.check_icmp_checksum(packet)
3368                 if inner.haslayer(TCPerror):
3369                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3370                 elif inner.haslayer(UDPerror):
3371                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3372                 else:
3373                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3374             except:
3375                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3376                 raise
3377
3378         # out2in
3379         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3380                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3381                 ICMP(type=3, code=13) /
3382                 packet[IP] for packet in capture_ip4]
3383         self.pg1.add_stream(pkts)
3384         self.pg_enable_capture(self.pg_interfaces)
3385         self.pg_start()
3386         capture = self.pg0.get_capture(len(pkts))
3387         for packet in capture:
3388             try:
3389                 self.assertEqual(packet[IPv6].src, ip.src)
3390                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3391                 icmp = packet[ICMPv6DestUnreach]
3392                 self.assertEqual(icmp.code, 1)
3393                 inner = icmp[IPerror6]
3394                 self.assertEqual(inner.src, self.pg0.remote_ip6)
3395                 self.assertEqual(inner.dst, ip.src)
3396                 self.check_icmpv6_checksum(packet)
3397                 if inner.haslayer(TCPerror):
3398                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3399                 elif inner.haslayer(UDPerror):
3400                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3401                 else:
3402                     self.assertEqual(inner[ICMPv6EchoRequest].id,
3403                                      self.icmp_id_in)
3404             except:
3405                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3406                 raise
3407
3408     def test_hairpinning(self):
3409         """ NAT64 hairpinning """
3410
3411         client = self.pg0.remote_hosts[0]
3412         server = self.pg0.remote_hosts[1]
3413         server_tcp_in_port = 22
3414         server_tcp_out_port = 4022
3415         server_udp_in_port = 23
3416         server_udp_out_port = 4023
3417         client_tcp_in_port = 1234
3418         client_udp_in_port = 1235
3419         client_tcp_out_port = 0
3420         client_udp_out_port = 0
3421         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3422         nat_addr_ip6 = ip.src
3423
3424         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3425                                                 self.nat_addr_n)
3426         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3427         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3428
3429         self.vapi.nat64_add_del_static_bib(server.ip6n,
3430                                            self.nat_addr_n,
3431                                            server_tcp_in_port,
3432                                            server_tcp_out_port,
3433                                            IP_PROTOS.tcp)
3434         self.vapi.nat64_add_del_static_bib(server.ip6n,
3435                                            self.nat_addr_n,
3436                                            server_udp_in_port,
3437                                            server_udp_out_port,
3438                                            IP_PROTOS.udp)
3439
3440         # client to server
3441         pkts = []
3442         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3443              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3444              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3445         pkts.append(p)
3446         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3447              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3448              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3449         pkts.append(p)
3450         self.pg0.add_stream(pkts)
3451         self.pg_enable_capture(self.pg_interfaces)
3452         self.pg_start()
3453         capture = self.pg0.get_capture(len(pkts))
3454         for packet in capture:
3455             try:
3456                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3457                 self.assertEqual(packet[IPv6].dst, server.ip6)
3458                 if packet.haslayer(TCP):
3459                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3460                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3461                     self.check_tcp_checksum(packet)
3462                     client_tcp_out_port = packet[TCP].sport
3463                 else:
3464                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3465                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
3466                     self.check_udp_checksum(packet)
3467                     client_udp_out_port = packet[UDP].sport
3468             except:
3469                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3470                 raise
3471
3472         # server to client
3473         pkts = []
3474         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3475              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3476              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3477         pkts.append(p)
3478         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3479              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3480              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3481         pkts.append(p)
3482         self.pg0.add_stream(pkts)
3483         self.pg_enable_capture(self.pg_interfaces)
3484         self.pg_start()
3485         capture = self.pg0.get_capture(len(pkts))
3486         for packet in capture:
3487             try:
3488                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3489                 self.assertEqual(packet[IPv6].dst, client.ip6)
3490                 if packet.haslayer(TCP):
3491                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3492                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3493                     self.check_tcp_checksum(packet)
3494                 else:
3495                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
3496                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
3497                     self.check_udp_checksum(packet)
3498             except:
3499                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3500                 raise
3501
3502         # ICMP error
3503         pkts = []
3504         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3505                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3506                 ICMPv6DestUnreach(code=1) /
3507                 packet[IPv6] for packet in capture]
3508         self.pg0.add_stream(pkts)
3509         self.pg_enable_capture(self.pg_interfaces)
3510         self.pg_start()
3511         capture = self.pg0.get_capture(len(pkts))
3512         for packet in capture:
3513             try:
3514                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3515                 self.assertEqual(packet[IPv6].dst, server.ip6)
3516                 icmp = packet[ICMPv6DestUnreach]
3517                 self.assertEqual(icmp.code, 1)
3518                 inner = icmp[IPerror6]
3519                 self.assertEqual(inner.src, server.ip6)
3520                 self.assertEqual(inner.dst, nat_addr_ip6)
3521                 self.check_icmpv6_checksum(packet)
3522                 if inner.haslayer(TCPerror):
3523                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3524                     self.assertEqual(inner[TCPerror].dport,
3525                                      client_tcp_out_port)
3526                 else:
3527                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3528                     self.assertEqual(inner[UDPerror].dport,
3529                                      client_udp_out_port)
3530             except:
3531                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3532                 raise
3533
3534     def test_prefix(self):
3535         """ NAT64 Network-Specific Prefix """
3536
3537         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3538                                                 self.nat_addr_n)
3539         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3540         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3541         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3542                                                 self.vrf1_nat_addr_n,
3543                                                 vrf_id=self.vrf1_id)
3544         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3545
3546         # Add global prefix
3547         global_pref64 = "2001:db8::"
3548         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3549         global_pref64_len = 32
3550         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3551
3552         prefix = self.vapi.nat64_prefix_dump()
3553         self.assertEqual(len(prefix), 1)
3554         self.assertEqual(prefix[0].prefix, global_pref64_n)
3555         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3556         self.assertEqual(prefix[0].vrf_id, 0)
3557
3558         # Add tenant specific prefix
3559         vrf1_pref64 = "2001:db8:122:300::"
3560         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3561         vrf1_pref64_len = 56
3562         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3563                                        vrf1_pref64_len,
3564                                        vrf_id=self.vrf1_id)
3565         prefix = self.vapi.nat64_prefix_dump()
3566         self.assertEqual(len(prefix), 2)
3567
3568         # Global prefix
3569         pkts = self.create_stream_in_ip6(self.pg0,
3570                                          self.pg1,
3571                                          pref=global_pref64,
3572                                          plen=global_pref64_len)
3573         self.pg0.add_stream(pkts)
3574         self.pg_enable_capture(self.pg_interfaces)
3575         self.pg_start()
3576         capture = self.pg1.get_capture(len(pkts))
3577         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3578                                 dst_ip=self.pg1.remote_ip4)
3579
3580         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3581         self.pg1.add_stream(pkts)
3582         self.pg_enable_capture(self.pg_interfaces)
3583         self.pg_start()
3584         capture = self.pg0.get_capture(len(pkts))
3585         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3586                                   global_pref64,
3587                                   global_pref64_len)
3588         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3589
3590         # Tenant specific prefix
3591         pkts = self.create_stream_in_ip6(self.pg2,
3592                                          self.pg1,
3593                                          pref=vrf1_pref64,
3594                                          plen=vrf1_pref64_len)
3595         self.pg2.add_stream(pkts)
3596         self.pg_enable_capture(self.pg_interfaces)
3597         self.pg_start()
3598         capture = self.pg1.get_capture(len(pkts))
3599         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3600                                 dst_ip=self.pg1.remote_ip4)
3601
3602         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3603         self.pg1.add_stream(pkts)
3604         self.pg_enable_capture(self.pg_interfaces)
3605         self.pg_start()
3606         capture = self.pg2.get_capture(len(pkts))
3607         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3608                                   vrf1_pref64,
3609                                   vrf1_pref64_len)
3610         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3611
3612     def test_unknown_proto(self):
3613         """ NAT64 translate packet with unknown protocol """
3614
3615         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3616                                                 self.nat_addr_n)
3617         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3618         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3619         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
3620
3621         # in2out
3622         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3623              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3624              TCP(sport=self.tcp_port_in, dport=20))
3625         self.pg0.add_stream(p)
3626         self.pg_enable_capture(self.pg_interfaces)
3627         self.pg_start()
3628         p = self.pg1.get_capture(1)
3629
3630         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3631              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
3632              GRE() /
3633              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3634              TCP(sport=1234, dport=1234))
3635         self.pg0.add_stream(p)
3636         self.pg_enable_capture(self.pg_interfaces)
3637         self.pg_start()
3638         p = self.pg1.get_capture(1)
3639         packet = p[0]
3640         try:
3641             self.assertEqual(packet[IP].src, self.nat_addr)
3642             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3643             self.assertTrue(packet.haslayer(GRE))
3644             self.check_ip_checksum(packet)
3645         except:
3646             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3647             raise
3648
3649         # out2in
3650         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3651              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3652              GRE() /
3653              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3654              TCP(sport=1234, dport=1234))
3655         self.pg1.add_stream(p)
3656         self.pg_enable_capture(self.pg_interfaces)
3657         self.pg_start()
3658         p = self.pg0.get_capture(1)
3659         packet = p[0]
3660         try:
3661             self.assertEqual(packet[IPv6].src, remote_ip6)
3662             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3663             self.assertEqual(packet[IPv6].nh, 47)
3664         except:
3665             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3666             raise
3667
3668     def test_hairpinning_unknown_proto(self):
3669         """ NAT64 translate packet with unknown protocol - hairpinning """
3670
3671         client = self.pg0.remote_hosts[0]
3672         server = self.pg0.remote_hosts[1]
3673         server_tcp_in_port = 22
3674         server_tcp_out_port = 4022
3675         client_tcp_in_port = 1234
3676         client_tcp_out_port = 1235
3677         server_nat_ip = "10.0.0.100"
3678         client_nat_ip = "10.0.0.110"
3679         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
3680         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
3681         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
3682         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
3683
3684         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
3685                                                 client_nat_ip_n)
3686         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3687         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3688
3689         self.vapi.nat64_add_del_static_bib(server.ip6n,
3690                                            server_nat_ip_n,
3691                                            server_tcp_in_port,
3692                                            server_tcp_out_port,
3693                                            IP_PROTOS.tcp)
3694
3695         self.vapi.nat64_add_del_static_bib(server.ip6n,
3696                                            server_nat_ip_n,
3697                                            0,
3698                                            0,
3699                                            IP_PROTOS.gre)
3700
3701         self.vapi.nat64_add_del_static_bib(client.ip6n,
3702                                            client_nat_ip_n,
3703                                            client_tcp_in_port,
3704                                            client_tcp_out_port,
3705                                            IP_PROTOS.tcp)
3706
3707         # client to server
3708         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3709              IPv6(src=client.ip6, dst=server_nat_ip6) /
3710              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3711         self.pg0.add_stream(p)
3712         self.pg_enable_capture(self.pg_interfaces)
3713         self.pg_start()
3714         p = self.pg0.get_capture(1)
3715
3716         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3717              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
3718              GRE() /
3719              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3720              TCP(sport=1234, dport=1234))
3721         self.pg0.add_stream(p)
3722         self.pg_enable_capture(self.pg_interfaces)
3723         self.pg_start()
3724         p = self.pg0.get_capture(1)
3725         packet = p[0]
3726         try:
3727             self.assertEqual(packet[IPv6].src, client_nat_ip6)
3728             self.assertEqual(packet[IPv6].dst, server.ip6)
3729             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3730         except:
3731             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3732             raise
3733
3734         # server to client
3735         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3736              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
3737              GRE() /
3738              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3739              TCP(sport=1234, dport=1234))
3740         self.pg0.add_stream(p)
3741         self.pg_enable_capture(self.pg_interfaces)
3742         self.pg_start()
3743         p = self.pg0.get_capture(1)
3744         packet = p[0]
3745         try:
3746             self.assertEqual(packet[IPv6].src, server_nat_ip6)
3747             self.assertEqual(packet[IPv6].dst, client.ip6)
3748             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3749         except:
3750             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3751             raise
3752
3753     def nat64_get_ses_num(self):
3754         """
3755         Return number of active NAT64 sessions.
3756         """
3757         st = self.vapi.nat64_st_dump()
3758         return len(st)
3759
3760     def clear_nat64(self):
3761         """
3762         Clear NAT64 configuration.
3763         """
3764         self.vapi.nat64_set_timeouts()
3765
3766         interfaces = self.vapi.nat64_interface_dump()
3767         for intf in interfaces:
3768             self.vapi.nat64_add_del_interface(intf.sw_if_index,
3769                                               intf.is_inside,
3770                                               is_add=0)
3771
3772         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3773         for bibe in bib:
3774             if bibe.is_static:
3775                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3776                                                    bibe.o_addr,
3777                                                    bibe.i_port,
3778                                                    bibe.o_port,
3779                                                    bibe.proto,
3780                                                    bibe.vrf_id,
3781                                                    is_add=0)
3782
3783         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3784         for bibe in bib:
3785             if bibe.is_static:
3786                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3787                                                    bibe.o_addr,
3788                                                    bibe.i_port,
3789                                                    bibe.o_port,
3790                                                    bibe.proto,
3791                                                    bibe.vrf_id,
3792                                                    is_add=0)
3793
3794         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3795         for bibe in bib:
3796             if bibe.is_static:
3797                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3798                                                    bibe.o_addr,
3799                                                    bibe.i_port,
3800                                                    bibe.o_port,
3801                                                    bibe.proto,
3802                                                    bibe.vrf_id,
3803                                                    is_add=0)
3804
3805         adresses = self.vapi.nat64_pool_addr_dump()
3806         for addr in adresses:
3807             self.vapi.nat64_add_del_pool_addr_range(addr.address,
3808                                                     addr.address,
3809                                                     vrf_id=addr.vrf_id,
3810                                                     is_add=0)
3811
3812         prefixes = self.vapi.nat64_prefix_dump()
3813         for prefix in prefixes:
3814             self.vapi.nat64_add_del_prefix(prefix.prefix,
3815                                            prefix.prefix_len,
3816                                            vrf_id=prefix.vrf_id,
3817                                            is_add=0)
3818
3819     def tearDown(self):
3820         super(TestNAT64, self).tearDown()
3821         if not self.vpp_dead:
3822             self.logger.info(self.vapi.cli("show nat64 pool"))
3823             self.logger.info(self.vapi.cli("show nat64 interfaces"))
3824             self.logger.info(self.vapi.cli("show nat64 prefix"))
3825             self.logger.info(self.vapi.cli("show nat64 bib all"))
3826             self.logger.info(self.vapi.cli("show nat64 session table all"))
3827             self.clear_nat64()
3828
3829 if __name__ == '__main__':
3830     unittest.main(testRunner=VppTestRunner)