NAT: delete session API/CLI (VPP-1041)
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
15 from util import ppp
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
18 from util import ip4_range
19 from util import mactobinary
20
21
22 class MethodHolder(VppTestCase):
23     """ NAT create capture and verify method holder """
24
25     @classmethod
26     def setUpClass(cls):
27         super(MethodHolder, cls).setUpClass()
28
29     def tearDown(self):
30         super(MethodHolder, self).tearDown()
31
32     def check_ip_checksum(self, pkt):
33         """
34         Check IP checksum of the packet
35
36         :param pkt: Packet to check IP checksum
37         """
38         new = pkt.__class__(str(pkt))
39         del new['IP'].chksum
40         new = new.__class__(str(new))
41         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
42
43     def check_tcp_checksum(self, pkt):
44         """
45         Check TCP checksum in IP packet
46
47         :param pkt: Packet to check TCP checksum
48         """
49         new = pkt.__class__(str(pkt))
50         del new['TCP'].chksum
51         new = new.__class__(str(new))
52         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
53
54     def check_udp_checksum(self, pkt):
55         """
56         Check UDP checksum in IP packet
57
58         :param pkt: Packet to check UDP checksum
59         """
60         new = pkt.__class__(str(pkt))
61         del new['UDP'].chksum
62         new = new.__class__(str(new))
63         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
64
65     def check_icmp_errror_embedded(self, pkt):
66         """
67         Check ICMP error embeded packet checksum
68
69         :param pkt: Packet to check ICMP error embeded packet checksum
70         """
71         if pkt.haslayer(IPerror):
72             new = pkt.__class__(str(pkt))
73             del new['IPerror'].chksum
74             new = new.__class__(str(new))
75             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
76
77         if pkt.haslayer(TCPerror):
78             new = pkt.__class__(str(pkt))
79             del new['TCPerror'].chksum
80             new = new.__class__(str(new))
81             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
82
83         if pkt.haslayer(UDPerror):
84             if pkt['UDPerror'].chksum != 0:
85                 new = pkt.__class__(str(pkt))
86                 del new['UDPerror'].chksum
87                 new = new.__class__(str(new))
88                 self.assertEqual(new['UDPerror'].chksum,
89                                  pkt['UDPerror'].chksum)
90
91         if pkt.haslayer(ICMPerror):
92             del new['ICMPerror'].chksum
93             new = new.__class__(str(new))
94             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
95
96     def check_icmp_checksum(self, pkt):
97         """
98         Check ICMP checksum in IPv4 packet
99
100         :param pkt: Packet to check ICMP checksum
101         """
102         new = pkt.__class__(str(pkt))
103         del new['ICMP'].chksum
104         new = new.__class__(str(new))
105         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
106         if pkt.haslayer(IPerror):
107             self.check_icmp_errror_embedded(pkt)
108
109     def check_icmpv6_checksum(self, pkt):
110         """
111         Check ICMPv6 checksum in IPv4 packet
112
113         :param pkt: Packet to check ICMPv6 checksum
114         """
115         new = pkt.__class__(str(pkt))
116         if pkt.haslayer(ICMPv6DestUnreach):
117             del new['ICMPv6DestUnreach'].cksum
118             new = new.__class__(str(new))
119             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
120                              pkt['ICMPv6DestUnreach'].cksum)
121             self.check_icmp_errror_embedded(pkt)
122         if pkt.haslayer(ICMPv6EchoRequest):
123             del new['ICMPv6EchoRequest'].cksum
124             new = new.__class__(str(new))
125             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
126                              pkt['ICMPv6EchoRequest'].cksum)
127         if pkt.haslayer(ICMPv6EchoReply):
128             del new['ICMPv6EchoReply'].cksum
129             new = new.__class__(str(new))
130             self.assertEqual(new['ICMPv6EchoReply'].cksum,
131                              pkt['ICMPv6EchoReply'].cksum)
132
133     def create_stream_in(self, in_if, out_if, ttl=64):
134         """
135         Create packet stream for inside network
136
137         :param in_if: Inside interface
138         :param out_if: Outside interface
139         :param ttl: TTL of generated packets
140         """
141         pkts = []
142         # TCP
143         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
144              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
145              TCP(sport=self.tcp_port_in, dport=20))
146         pkts.append(p)
147
148         # UDP
149         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
150              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
151              UDP(sport=self.udp_port_in, dport=20))
152         pkts.append(p)
153
154         # ICMP
155         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
156              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
157              ICMP(id=self.icmp_id_in, type='echo-request'))
158         pkts.append(p)
159
160         return pkts
161
162     def compose_ip6(self, ip4, pref, plen):
163         """
164         Compose IPv4-embedded IPv6 addresses
165
166         :param ip4: IPv4 address
167         :param pref: IPv6 prefix
168         :param plen: IPv6 prefix length
169         :returns: IPv4-embedded IPv6 addresses
170         """
171         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
172         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
173         if plen == 32:
174             pref_n[4] = ip4_n[0]
175             pref_n[5] = ip4_n[1]
176             pref_n[6] = ip4_n[2]
177             pref_n[7] = ip4_n[3]
178         elif plen == 40:
179             pref_n[5] = ip4_n[0]
180             pref_n[6] = ip4_n[1]
181             pref_n[7] = ip4_n[2]
182             pref_n[9] = ip4_n[3]
183         elif plen == 48:
184             pref_n[6] = ip4_n[0]
185             pref_n[7] = ip4_n[1]
186             pref_n[9] = ip4_n[2]
187             pref_n[10] = ip4_n[3]
188         elif plen == 56:
189             pref_n[7] = ip4_n[0]
190             pref_n[9] = ip4_n[1]
191             pref_n[10] = ip4_n[2]
192             pref_n[11] = ip4_n[3]
193         elif plen == 64:
194             pref_n[9] = ip4_n[0]
195             pref_n[10] = ip4_n[1]
196             pref_n[11] = ip4_n[2]
197             pref_n[12] = ip4_n[3]
198         elif plen == 96:
199             pref_n[12] = ip4_n[0]
200             pref_n[13] = ip4_n[1]
201             pref_n[14] = ip4_n[2]
202             pref_n[15] = ip4_n[3]
203         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
204
205     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
206         """
207         Create IPv6 packet stream for inside network
208
209         :param in_if: Inside interface
210         :param out_if: Outside interface
211         :param ttl: Hop Limit of generated packets
212         :param pref: NAT64 prefix
213         :param plen: NAT64 prefix length
214         """
215         pkts = []
216         if pref is None:
217             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
218         else:
219             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
220
221         # TCP
222         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
223              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
224              TCP(sport=self.tcp_port_in, dport=20))
225         pkts.append(p)
226
227         # UDP
228         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
229              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
230              UDP(sport=self.udp_port_in, dport=20))
231         pkts.append(p)
232
233         # ICMP
234         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
235              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
236              ICMPv6EchoRequest(id=self.icmp_id_in))
237         pkts.append(p)
238
239         return pkts
240
241     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
242         """
243         Create packet stream for outside network
244
245         :param out_if: Outside interface
246         :param dst_ip: Destination IP address (Default use global NAT address)
247         :param ttl: TTL of generated packets
248         """
249         if dst_ip is None:
250             dst_ip = self.nat_addr
251         pkts = []
252         # TCP
253         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
254              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
255              TCP(dport=self.tcp_port_out, sport=20))
256         pkts.append(p)
257
258         # UDP
259         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
260              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
261              UDP(dport=self.udp_port_out, sport=20))
262         pkts.append(p)
263
264         # ICMP
265         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
266              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
267              ICMP(id=self.icmp_id_out, type='echo-reply'))
268         pkts.append(p)
269
270         return pkts
271
272     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
273                            packet_num=3, dst_ip=None):
274         """
275         Verify captured packets on outside network
276
277         :param capture: Captured packets
278         :param nat_ip: Translated IP address (Default use global NAT address)
279         :param same_port: Sorce port number is not translated (Default False)
280         :param packet_num: Expected number of packets (Default 3)
281         :param dst_ip: Destination IP address (Default do not verify)
282         """
283         if nat_ip is None:
284             nat_ip = self.nat_addr
285         self.assertEqual(packet_num, len(capture))
286         for packet in capture:
287             try:
288                 self.check_ip_checksum(packet)
289                 self.assertEqual(packet[IP].src, nat_ip)
290                 if dst_ip is not None:
291                     self.assertEqual(packet[IP].dst, dst_ip)
292                 if packet.haslayer(TCP):
293                     if same_port:
294                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
295                     else:
296                         self.assertNotEqual(
297                             packet[TCP].sport, self.tcp_port_in)
298                     self.tcp_port_out = packet[TCP].sport
299                     self.check_tcp_checksum(packet)
300                 elif packet.haslayer(UDP):
301                     if same_port:
302                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
303                     else:
304                         self.assertNotEqual(
305                             packet[UDP].sport, self.udp_port_in)
306                     self.udp_port_out = packet[UDP].sport
307                 else:
308                     if same_port:
309                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
310                     else:
311                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
312                     self.icmp_id_out = packet[ICMP].id
313                     self.check_icmp_checksum(packet)
314             except:
315                 self.logger.error(ppp("Unexpected or invalid packet "
316                                       "(outside network):", packet))
317                 raise
318
319     def verify_capture_in(self, capture, in_if, packet_num=3):
320         """
321         Verify captured packets on inside network
322
323         :param capture: Captured packets
324         :param in_if: Inside interface
325         :param packet_num: Expected number of packets (Default 3)
326         """
327         self.assertEqual(packet_num, len(capture))
328         for packet in capture:
329             try:
330                 self.check_ip_checksum(packet)
331                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
332                 if packet.haslayer(TCP):
333                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
334                     self.check_tcp_checksum(packet)
335                 elif packet.haslayer(UDP):
336                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
337                 else:
338                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
339                     self.check_icmp_checksum(packet)
340             except:
341                 self.logger.error(ppp("Unexpected or invalid packet "
342                                       "(inside network):", packet))
343                 raise
344
345     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
346         """
347         Verify captured IPv6 packets on inside network
348
349         :param capture: Captured packets
350         :param src_ip: Source IP
351         :param dst_ip: Destination IP address
352         :param packet_num: Expected number of packets (Default 3)
353         """
354         self.assertEqual(packet_num, len(capture))
355         for packet in capture:
356             try:
357                 self.assertEqual(packet[IPv6].src, src_ip)
358                 self.assertEqual(packet[IPv6].dst, dst_ip)
359                 if packet.haslayer(TCP):
360                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
361                     self.check_tcp_checksum(packet)
362                 elif packet.haslayer(UDP):
363                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
364                     self.check_udp_checksum(packet)
365                 else:
366                     self.assertEqual(packet[ICMPv6EchoReply].id,
367                                      self.icmp_id_in)
368                     self.check_icmpv6_checksum(packet)
369             except:
370                 self.logger.error(ppp("Unexpected or invalid packet "
371                                       "(inside network):", packet))
372                 raise
373
374     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
375         """
376         Verify captured packet that don't have to be translated
377
378         :param capture: Captured packets
379         :param ingress_if: Ingress interface
380         :param egress_if: Egress interface
381         """
382         for packet in capture:
383             try:
384                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
385                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
386                 if packet.haslayer(TCP):
387                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
388                 elif packet.haslayer(UDP):
389                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
390                 else:
391                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
392             except:
393                 self.logger.error(ppp("Unexpected or invalid packet "
394                                       "(inside network):", packet))
395                 raise
396
397     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
398                                             packet_num=3, icmp_type=11):
399         """
400         Verify captured packets with ICMP errors on outside network
401
402         :param capture: Captured packets
403         :param src_ip: Translated IP address or IP address of VPP
404                        (Default use global NAT address)
405         :param packet_num: Expected number of packets (Default 3)
406         :param icmp_type: Type of error ICMP packet
407                           we are expecting (Default 11)
408         """
409         if src_ip is None:
410             src_ip = self.nat_addr
411         self.assertEqual(packet_num, len(capture))
412         for packet in capture:
413             try:
414                 self.assertEqual(packet[IP].src, src_ip)
415                 self.assertTrue(packet.haslayer(ICMP))
416                 icmp = packet[ICMP]
417                 self.assertEqual(icmp.type, icmp_type)
418                 self.assertTrue(icmp.haslayer(IPerror))
419                 inner_ip = icmp[IPerror]
420                 if inner_ip.haslayer(TCPerror):
421                     self.assertEqual(inner_ip[TCPerror].dport,
422                                      self.tcp_port_out)
423                 elif inner_ip.haslayer(UDPerror):
424                     self.assertEqual(inner_ip[UDPerror].dport,
425                                      self.udp_port_out)
426                 else:
427                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
428             except:
429                 self.logger.error(ppp("Unexpected or invalid packet "
430                                       "(outside network):", packet))
431                 raise
432
433     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
434                                            icmp_type=11):
435         """
436         Verify captured packets with ICMP errors on inside network
437
438         :param capture: Captured packets
439         :param in_if: Inside interface
440         :param packet_num: Expected number of packets (Default 3)
441         :param icmp_type: Type of error ICMP packet
442                           we are expecting (Default 11)
443         """
444         self.assertEqual(packet_num, len(capture))
445         for packet in capture:
446             try:
447                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
448                 self.assertTrue(packet.haslayer(ICMP))
449                 icmp = packet[ICMP]
450                 self.assertEqual(icmp.type, icmp_type)
451                 self.assertTrue(icmp.haslayer(IPerror))
452                 inner_ip = icmp[IPerror]
453                 if inner_ip.haslayer(TCPerror):
454                     self.assertEqual(inner_ip[TCPerror].sport,
455                                      self.tcp_port_in)
456                 elif inner_ip.haslayer(UDPerror):
457                     self.assertEqual(inner_ip[UDPerror].sport,
458                                      self.udp_port_in)
459                 else:
460                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
461             except:
462                 self.logger.error(ppp("Unexpected or invalid packet "
463                                       "(inside network):", packet))
464                 raise
465
466     def verify_ipfix_nat44_ses(self, data):
467         """
468         Verify IPFIX NAT44 session create/delete event
469
470         :param data: Decoded IPFIX data records
471         """
472         nat44_ses_create_num = 0
473         nat44_ses_delete_num = 0
474         self.assertEqual(6, len(data))
475         for record in data:
476             # natEvent
477             self.assertIn(ord(record[230]), [4, 5])
478             if ord(record[230]) == 4:
479                 nat44_ses_create_num += 1
480             else:
481                 nat44_ses_delete_num += 1
482             # sourceIPv4Address
483             self.assertEqual(self.pg0.remote_ip4n, record[8])
484             # postNATSourceIPv4Address
485             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
486                              record[225])
487             # ingressVRFID
488             self.assertEqual(struct.pack("!I", 0), record[234])
489             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
490             if IP_PROTOS.icmp == ord(record[4]):
491                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
492                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
493                                  record[227])
494             elif IP_PROTOS.tcp == ord(record[4]):
495                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
496                                  record[7])
497                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
498                                  record[227])
499             elif IP_PROTOS.udp == ord(record[4]):
500                 self.assertEqual(struct.pack("!H", self.udp_port_in),
501                                  record[7])
502                 self.assertEqual(struct.pack("!H", self.udp_port_out),
503                                  record[227])
504             else:
505                 self.fail("Invalid protocol")
506         self.assertEqual(3, nat44_ses_create_num)
507         self.assertEqual(3, nat44_ses_delete_num)
508
509     def verify_ipfix_addr_exhausted(self, data):
510         """
511         Verify IPFIX NAT addresses event
512
513         :param data: Decoded IPFIX data records
514         """
515         self.assertEqual(1, len(data))
516         record = data[0]
517         # natEvent
518         self.assertEqual(ord(record[230]), 3)
519         # natPoolID
520         self.assertEqual(struct.pack("!I", 0), record[283])
521
522
523 class TestNAT44(MethodHolder):
524     """ NAT44 Test Cases """
525
526     @classmethod
527     def setUpClass(cls):
528         super(TestNAT44, cls).setUpClass()
529
530         try:
531             cls.tcp_port_in = 6303
532             cls.tcp_port_out = 6303
533             cls.udp_port_in = 6304
534             cls.udp_port_out = 6304
535             cls.icmp_id_in = 6305
536             cls.icmp_id_out = 6305
537             cls.nat_addr = '10.0.0.3'
538             cls.ipfix_src_port = 4739
539             cls.ipfix_domain_id = 1
540
541             cls.create_pg_interfaces(range(10))
542             cls.interfaces = list(cls.pg_interfaces[0:4])
543
544             for i in cls.interfaces:
545                 i.admin_up()
546                 i.config_ip4()
547                 i.resolve_arp()
548
549             cls.pg0.generate_remote_hosts(3)
550             cls.pg0.configure_ipv4_neighbors()
551
552             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
553             cls.vapi.ip_table_add_del(10, is_add=1)
554             cls.vapi.ip_table_add_del(20, is_add=1)
555
556             cls.pg4._local_ip4 = "172.16.255.1"
557             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
558             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
559             cls.pg4.set_table_ip4(10)
560             cls.pg5._local_ip4 = "172.17.255.3"
561             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
562             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
563             cls.pg5.set_table_ip4(10)
564             cls.pg6._local_ip4 = "172.16.255.1"
565             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
566             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
567             cls.pg6.set_table_ip4(20)
568             for i in cls.overlapping_interfaces:
569                 i.config_ip4()
570                 i.admin_up()
571                 i.resolve_arp()
572
573             cls.pg7.admin_up()
574             cls.pg8.admin_up()
575
576             cls.pg9.generate_remote_hosts(2)
577             cls.pg9.config_ip4()
578             ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
579             cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
580                                                   ip_addr_n,
581                                                   24)
582             cls.pg9.admin_up()
583             cls.pg9.resolve_arp()
584             cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
585             cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
586             cls.pg9.resolve_arp()
587
588         except Exception:
589             super(TestNAT44, cls).tearDownClass()
590             raise
591
592     def clear_nat44(self):
593         """
594         Clear NAT44 configuration.
595         """
596         # I found no elegant way to do this
597         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
598                                    dst_address_length=32,
599                                    next_hop_address=self.pg7.remote_ip4n,
600                                    next_hop_sw_if_index=self.pg7.sw_if_index,
601                                    is_add=0)
602         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
603                                    dst_address_length=32,
604                                    next_hop_address=self.pg8.remote_ip4n,
605                                    next_hop_sw_if_index=self.pg8.sw_if_index,
606                                    is_add=0)
607
608         for intf in [self.pg7, self.pg8]:
609             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
610             for n in neighbors:
611                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
612                                               n.mac_address,
613                                               n.ip_address,
614                                               is_add=0)
615
616         if self.pg7.has_ip4_config:
617             self.pg7.unconfig_ip4()
618
619         interfaces = self.vapi.nat44_interface_addr_dump()
620         for intf in interfaces:
621             self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
622
623         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
624                             domain_id=self.ipfix_domain_id)
625         self.ipfix_src_port = 4739
626         self.ipfix_domain_id = 1
627
628         interfaces = self.vapi.nat44_interface_dump()
629         for intf in interfaces:
630             if intf.is_inside > 1:
631                 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
632                                                           0,
633                                                           is_add=0)
634             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
635                                                       intf.is_inside,
636                                                       is_add=0)
637
638         interfaces = self.vapi.nat44_interface_output_feature_dump()
639         for intf in interfaces:
640             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
641                                                              intf.is_inside,
642                                                              is_add=0)
643
644         static_mappings = self.vapi.nat44_static_mapping_dump()
645         for sm in static_mappings:
646             self.vapi.nat44_add_del_static_mapping(
647                 sm.local_ip_address,
648                 sm.external_ip_address,
649                 local_port=sm.local_port,
650                 external_port=sm.external_port,
651                 addr_only=sm.addr_only,
652                 vrf_id=sm.vrf_id,
653                 protocol=sm.protocol,
654                 is_add=0)
655
656         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
657         for lb_sm in lb_static_mappings:
658             self.vapi.nat44_add_del_lb_static_mapping(
659                 lb_sm.external_addr,
660                 lb_sm.external_port,
661                 lb_sm.protocol,
662                 lb_sm.vrf_id,
663                 is_add=0,
664                 local_num=0,
665                 locals=[])
666
667         adresses = self.vapi.nat44_address_dump()
668         for addr in adresses:
669             self.vapi.nat44_add_del_address_range(addr.ip_address,
670                                                   addr.ip_address,
671                                                   is_add=0)
672
673     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
674                                  local_port=0, external_port=0, vrf_id=0,
675                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
676                                  proto=0):
677         """
678         Add/delete NAT44 static mapping
679
680         :param local_ip: Local IP address
681         :param external_ip: External IP address
682         :param local_port: Local port number (Optional)
683         :param external_port: External port number (Optional)
684         :param vrf_id: VRF ID (Default 0)
685         :param is_add: 1 if add, 0 if delete (Default add)
686         :param external_sw_if_index: External interface instead of IP address
687         :param proto: IP protocol (Mandatory if port specified)
688         """
689         addr_only = 1
690         if local_port and external_port:
691             addr_only = 0
692         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
693         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
694         self.vapi.nat44_add_del_static_mapping(
695             l_ip,
696             e_ip,
697             external_sw_if_index,
698             local_port,
699             external_port,
700             addr_only,
701             vrf_id,
702             proto,
703             is_add)
704
705     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
706         """
707         Add/delete NAT44 address
708
709         :param ip: IP address
710         :param is_add: 1 if add, 0 if delete (Default add)
711         """
712         nat_addr = socket.inet_pton(socket.AF_INET, ip)
713         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
714                                               vrf_id=vrf_id)
715
716     def test_dynamic(self):
717         """ NAT44 dynamic translation test """
718
719         self.nat44_add_address(self.nat_addr)
720         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
721         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
722                                                   is_inside=0)
723
724         # in2out
725         pkts = self.create_stream_in(self.pg0, self.pg1)
726         self.pg0.add_stream(pkts)
727         self.pg_enable_capture(self.pg_interfaces)
728         self.pg_start()
729         capture = self.pg1.get_capture(len(pkts))
730         self.verify_capture_out(capture)
731
732         # out2in
733         pkts = self.create_stream_out(self.pg1)
734         self.pg1.add_stream(pkts)
735         self.pg_enable_capture(self.pg_interfaces)
736         self.pg_start()
737         capture = self.pg0.get_capture(len(pkts))
738         self.verify_capture_in(capture, self.pg0)
739
740     def test_dynamic_icmp_errors_in2out_ttl_1(self):
741         """ NAT44 handling of client packets with TTL=1 """
742
743         self.nat44_add_address(self.nat_addr)
744         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
745         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
746                                                   is_inside=0)
747
748         # Client side - generate traffic
749         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
750         self.pg0.add_stream(pkts)
751         self.pg_enable_capture(self.pg_interfaces)
752         self.pg_start()
753
754         # Client side - verify ICMP type 11 packets
755         capture = self.pg0.get_capture(len(pkts))
756         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
757
758     def test_dynamic_icmp_errors_out2in_ttl_1(self):
759         """ NAT44 handling of server packets with TTL=1 """
760
761         self.nat44_add_address(self.nat_addr)
762         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
763         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
764                                                   is_inside=0)
765
766         # Client side - create sessions
767         pkts = self.create_stream_in(self.pg0, self.pg1)
768         self.pg0.add_stream(pkts)
769         self.pg_enable_capture(self.pg_interfaces)
770         self.pg_start()
771
772         # Server side - generate traffic
773         capture = self.pg1.get_capture(len(pkts))
774         self.verify_capture_out(capture)
775         pkts = self.create_stream_out(self.pg1, ttl=1)
776         self.pg1.add_stream(pkts)
777         self.pg_enable_capture(self.pg_interfaces)
778         self.pg_start()
779
780         # Server side - verify ICMP type 11 packets
781         capture = self.pg1.get_capture(len(pkts))
782         self.verify_capture_out_with_icmp_errors(capture,
783                                                  src_ip=self.pg1.local_ip4)
784
785     def test_dynamic_icmp_errors_in2out_ttl_2(self):
786         """ NAT44 handling of error responses to client packets with TTL=2 """
787
788         self.nat44_add_address(self.nat_addr)
789         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
790         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
791                                                   is_inside=0)
792
793         # Client side - generate traffic
794         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
795         self.pg0.add_stream(pkts)
796         self.pg_enable_capture(self.pg_interfaces)
797         self.pg_start()
798
799         # Server side - simulate ICMP type 11 response
800         capture = self.pg1.get_capture(len(pkts))
801         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
802                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
803                 ICMP(type=11) / packet[IP] for packet in capture]
804         self.pg1.add_stream(pkts)
805         self.pg_enable_capture(self.pg_interfaces)
806         self.pg_start()
807
808         # Client side - verify ICMP type 11 packets
809         capture = self.pg0.get_capture(len(pkts))
810         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
811
812     def test_dynamic_icmp_errors_out2in_ttl_2(self):
813         """ NAT44 handling of error responses to server packets with TTL=2 """
814
815         self.nat44_add_address(self.nat_addr)
816         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
817         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
818                                                   is_inside=0)
819
820         # Client side - create sessions
821         pkts = self.create_stream_in(self.pg0, self.pg1)
822         self.pg0.add_stream(pkts)
823         self.pg_enable_capture(self.pg_interfaces)
824         self.pg_start()
825
826         # Server side - generate traffic
827         capture = self.pg1.get_capture(len(pkts))
828         self.verify_capture_out(capture)
829         pkts = self.create_stream_out(self.pg1, ttl=2)
830         self.pg1.add_stream(pkts)
831         self.pg_enable_capture(self.pg_interfaces)
832         self.pg_start()
833
834         # Client side - simulate ICMP type 11 response
835         capture = self.pg0.get_capture(len(pkts))
836         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
837                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
838                 ICMP(type=11) / packet[IP] for packet in capture]
839         self.pg0.add_stream(pkts)
840         self.pg_enable_capture(self.pg_interfaces)
841         self.pg_start()
842
843         # Server side - verify ICMP type 11 packets
844         capture = self.pg1.get_capture(len(pkts))
845         self.verify_capture_out_with_icmp_errors(capture)
846
847     def test_ping_out_interface_from_outside(self):
848         """ Ping NAT44 out interface from outside network """
849
850         self.nat44_add_address(self.nat_addr)
851         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
852         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
853                                                   is_inside=0)
854
855         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
856              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
857              ICMP(id=self.icmp_id_out, type='echo-request'))
858         pkts = [p]
859         self.pg1.add_stream(pkts)
860         self.pg_enable_capture(self.pg_interfaces)
861         self.pg_start()
862         capture = self.pg1.get_capture(len(pkts))
863         self.assertEqual(1, len(capture))
864         packet = capture[0]
865         try:
866             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
867             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
868             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
869             self.assertEqual(packet[ICMP].type, 0)  # echo reply
870         except:
871             self.logger.error(ppp("Unexpected or invalid packet "
872                                   "(outside network):", packet))
873             raise
874
875     def test_ping_internal_host_from_outside(self):
876         """ Ping internal host from outside network """
877
878         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
879         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
880         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
881                                                   is_inside=0)
882
883         # out2in
884         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
885                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
886                ICMP(id=self.icmp_id_out, type='echo-request'))
887         self.pg1.add_stream(pkt)
888         self.pg_enable_capture(self.pg_interfaces)
889         self.pg_start()
890         capture = self.pg0.get_capture(1)
891         self.verify_capture_in(capture, self.pg0, packet_num=1)
892         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
893
894         # in2out
895         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
896                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
897                ICMP(id=self.icmp_id_in, type='echo-reply'))
898         self.pg0.add_stream(pkt)
899         self.pg_enable_capture(self.pg_interfaces)
900         self.pg_start()
901         capture = self.pg1.get_capture(1)
902         self.verify_capture_out(capture, same_port=True, packet_num=1)
903         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
904
905     def test_static_in(self):
906         """ 1:1 NAT initialized from inside network """
907
908         nat_ip = "10.0.0.10"
909         self.tcp_port_out = 6303
910         self.udp_port_out = 6304
911         self.icmp_id_out = 6305
912
913         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
914         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
915         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
916                                                   is_inside=0)
917
918         # in2out
919         pkts = self.create_stream_in(self.pg0, self.pg1)
920         self.pg0.add_stream(pkts)
921         self.pg_enable_capture(self.pg_interfaces)
922         self.pg_start()
923         capture = self.pg1.get_capture(len(pkts))
924         self.verify_capture_out(capture, nat_ip, True)
925
926         # out2in
927         pkts = self.create_stream_out(self.pg1, nat_ip)
928         self.pg1.add_stream(pkts)
929         self.pg_enable_capture(self.pg_interfaces)
930         self.pg_start()
931         capture = self.pg0.get_capture(len(pkts))
932         self.verify_capture_in(capture, self.pg0)
933
934     def test_static_out(self):
935         """ 1:1 NAT initialized from outside network """
936
937         nat_ip = "10.0.0.20"
938         self.tcp_port_out = 6303
939         self.udp_port_out = 6304
940         self.icmp_id_out = 6305
941
942         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
943         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
944         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
945                                                   is_inside=0)
946
947         # out2in
948         pkts = self.create_stream_out(self.pg1, nat_ip)
949         self.pg1.add_stream(pkts)
950         self.pg_enable_capture(self.pg_interfaces)
951         self.pg_start()
952         capture = self.pg0.get_capture(len(pkts))
953         self.verify_capture_in(capture, self.pg0)
954
955         # in2out
956         pkts = self.create_stream_in(self.pg0, self.pg1)
957         self.pg0.add_stream(pkts)
958         self.pg_enable_capture(self.pg_interfaces)
959         self.pg_start()
960         capture = self.pg1.get_capture(len(pkts))
961         self.verify_capture_out(capture, nat_ip, True)
962
963     def test_static_with_port_in(self):
964         """ 1:1 NAPT initialized from inside network """
965
966         self.tcp_port_out = 3606
967         self.udp_port_out = 3607
968         self.icmp_id_out = 3608
969
970         self.nat44_add_address(self.nat_addr)
971         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
972                                       self.tcp_port_in, self.tcp_port_out,
973                                       proto=IP_PROTOS.tcp)
974         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
975                                       self.udp_port_in, self.udp_port_out,
976                                       proto=IP_PROTOS.udp)
977         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
978                                       self.icmp_id_in, self.icmp_id_out,
979                                       proto=IP_PROTOS.icmp)
980         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
981         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
982                                                   is_inside=0)
983
984         # in2out
985         pkts = self.create_stream_in(self.pg0, self.pg1)
986         self.pg0.add_stream(pkts)
987         self.pg_enable_capture(self.pg_interfaces)
988         self.pg_start()
989         capture = self.pg1.get_capture(len(pkts))
990         self.verify_capture_out(capture)
991
992         # out2in
993         pkts = self.create_stream_out(self.pg1)
994         self.pg1.add_stream(pkts)
995         self.pg_enable_capture(self.pg_interfaces)
996         self.pg_start()
997         capture = self.pg0.get_capture(len(pkts))
998         self.verify_capture_in(capture, self.pg0)
999
1000     def test_static_with_port_out(self):
1001         """ 1:1 NAPT initialized from outside network """
1002
1003         self.tcp_port_out = 30606
1004         self.udp_port_out = 30607
1005         self.icmp_id_out = 30608
1006
1007         self.nat44_add_address(self.nat_addr)
1008         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1009                                       self.tcp_port_in, self.tcp_port_out,
1010                                       proto=IP_PROTOS.tcp)
1011         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1012                                       self.udp_port_in, self.udp_port_out,
1013                                       proto=IP_PROTOS.udp)
1014         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1015                                       self.icmp_id_in, self.icmp_id_out,
1016                                       proto=IP_PROTOS.icmp)
1017         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1018         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1019                                                   is_inside=0)
1020
1021         # out2in
1022         pkts = self.create_stream_out(self.pg1)
1023         self.pg1.add_stream(pkts)
1024         self.pg_enable_capture(self.pg_interfaces)
1025         self.pg_start()
1026         capture = self.pg0.get_capture(len(pkts))
1027         self.verify_capture_in(capture, self.pg0)
1028
1029         # in2out
1030         pkts = self.create_stream_in(self.pg0, self.pg1)
1031         self.pg0.add_stream(pkts)
1032         self.pg_enable_capture(self.pg_interfaces)
1033         self.pg_start()
1034         capture = self.pg1.get_capture(len(pkts))
1035         self.verify_capture_out(capture)
1036
1037     def test_static_vrf_aware(self):
1038         """ 1:1 NAT VRF awareness """
1039
1040         nat_ip1 = "10.0.0.30"
1041         nat_ip2 = "10.0.0.40"
1042         self.tcp_port_out = 6303
1043         self.udp_port_out = 6304
1044         self.icmp_id_out = 6305
1045
1046         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1047                                       vrf_id=10)
1048         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1049                                       vrf_id=10)
1050         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1051                                                   is_inside=0)
1052         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1053         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1054
1055         # inside interface VRF match NAT44 static mapping VRF
1056         pkts = self.create_stream_in(self.pg4, self.pg3)
1057         self.pg4.add_stream(pkts)
1058         self.pg_enable_capture(self.pg_interfaces)
1059         self.pg_start()
1060         capture = self.pg3.get_capture(len(pkts))
1061         self.verify_capture_out(capture, nat_ip1, True)
1062
1063         # inside interface VRF don't match NAT44 static mapping VRF (packets
1064         # are dropped)
1065         pkts = self.create_stream_in(self.pg0, self.pg3)
1066         self.pg0.add_stream(pkts)
1067         self.pg_enable_capture(self.pg_interfaces)
1068         self.pg_start()
1069         self.pg3.assert_nothing_captured()
1070
1071     def test_static_lb(self):
1072         """ NAT44 local service load balancing """
1073         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1074         external_port = 80
1075         local_port = 8080
1076         server1 = self.pg0.remote_hosts[0]
1077         server2 = self.pg0.remote_hosts[1]
1078
1079         locals = [{'addr': server1.ip4n,
1080                    'port': local_port,
1081                    'probability': 70},
1082                   {'addr': server2.ip4n,
1083                    'port': local_port,
1084                    'probability': 30}]
1085
1086         self.nat44_add_address(self.nat_addr)
1087         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1088                                                   external_port,
1089                                                   IP_PROTOS.tcp,
1090                                                   local_num=len(locals),
1091                                                   locals=locals)
1092         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1093         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1094                                                   is_inside=0)
1095
1096         # from client to service
1097         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1098              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1099              TCP(sport=12345, dport=external_port))
1100         self.pg1.add_stream(p)
1101         self.pg_enable_capture(self.pg_interfaces)
1102         self.pg_start()
1103         capture = self.pg0.get_capture(1)
1104         p = capture[0]
1105         server = None
1106         try:
1107             ip = p[IP]
1108             tcp = p[TCP]
1109             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1110             if ip.dst == server1.ip4:
1111                 server = server1
1112             else:
1113                 server = server2
1114             self.assertEqual(tcp.dport, local_port)
1115             self.check_tcp_checksum(p)
1116             self.check_ip_checksum(p)
1117         except:
1118             self.logger.error(ppp("Unexpected or invalid packet:", p))
1119             raise
1120
1121         # from service back to client
1122         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1123              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1124              TCP(sport=local_port, dport=12345))
1125         self.pg0.add_stream(p)
1126         self.pg_enable_capture(self.pg_interfaces)
1127         self.pg_start()
1128         capture = self.pg1.get_capture(1)
1129         p = capture[0]
1130         try:
1131             ip = p[IP]
1132             tcp = p[TCP]
1133             self.assertEqual(ip.src, self.nat_addr)
1134             self.assertEqual(tcp.sport, external_port)
1135             self.check_tcp_checksum(p)
1136             self.check_ip_checksum(p)
1137         except:
1138             self.logger.error(ppp("Unexpected or invalid packet:", p))
1139             raise
1140
1141         # multiple clients
1142         server1_n = 0
1143         server2_n = 0
1144         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1145         pkts = []
1146         for client in clients:
1147             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1148                  IP(src=client, dst=self.nat_addr) /
1149                  TCP(sport=12345, dport=external_port))
1150             pkts.append(p)
1151         self.pg1.add_stream(pkts)
1152         self.pg_enable_capture(self.pg_interfaces)
1153         self.pg_start()
1154         capture = self.pg0.get_capture(len(pkts))
1155         for p in capture:
1156             if p[IP].dst == server1.ip4:
1157                 server1_n += 1
1158             else:
1159                 server2_n += 1
1160         self.assertTrue(server1_n > server2_n)
1161
1162     def test_multiple_inside_interfaces(self):
1163         """ NAT44 multiple non-overlapping address space inside interfaces """
1164
1165         self.nat44_add_address(self.nat_addr)
1166         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1167         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1168         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1169                                                   is_inside=0)
1170
1171         # between two NAT44 inside interfaces (no translation)
1172         pkts = self.create_stream_in(self.pg0, self.pg1)
1173         self.pg0.add_stream(pkts)
1174         self.pg_enable_capture(self.pg_interfaces)
1175         self.pg_start()
1176         capture = self.pg1.get_capture(len(pkts))
1177         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1178
1179         # from NAT44 inside to interface without NAT44 feature (no translation)
1180         pkts = self.create_stream_in(self.pg0, self.pg2)
1181         self.pg0.add_stream(pkts)
1182         self.pg_enable_capture(self.pg_interfaces)
1183         self.pg_start()
1184         capture = self.pg2.get_capture(len(pkts))
1185         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1186
1187         # in2out 1st interface
1188         pkts = self.create_stream_in(self.pg0, self.pg3)
1189         self.pg0.add_stream(pkts)
1190         self.pg_enable_capture(self.pg_interfaces)
1191         self.pg_start()
1192         capture = self.pg3.get_capture(len(pkts))
1193         self.verify_capture_out(capture)
1194
1195         # out2in 1st interface
1196         pkts = self.create_stream_out(self.pg3)
1197         self.pg3.add_stream(pkts)
1198         self.pg_enable_capture(self.pg_interfaces)
1199         self.pg_start()
1200         capture = self.pg0.get_capture(len(pkts))
1201         self.verify_capture_in(capture, self.pg0)
1202
1203         # in2out 2nd interface
1204         pkts = self.create_stream_in(self.pg1, self.pg3)
1205         self.pg1.add_stream(pkts)
1206         self.pg_enable_capture(self.pg_interfaces)
1207         self.pg_start()
1208         capture = self.pg3.get_capture(len(pkts))
1209         self.verify_capture_out(capture)
1210
1211         # out2in 2nd interface
1212         pkts = self.create_stream_out(self.pg3)
1213         self.pg3.add_stream(pkts)
1214         self.pg_enable_capture(self.pg_interfaces)
1215         self.pg_start()
1216         capture = self.pg1.get_capture(len(pkts))
1217         self.verify_capture_in(capture, self.pg1)
1218
1219     def test_inside_overlapping_interfaces(self):
1220         """ NAT44 multiple inside interfaces with overlapping address space """
1221
1222         static_nat_ip = "10.0.0.10"
1223         self.nat44_add_address(self.nat_addr)
1224         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1225                                                   is_inside=0)
1226         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1227         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1228         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1229         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1230                                       vrf_id=20)
1231
1232         # between NAT44 inside interfaces with same VRF (no translation)
1233         pkts = self.create_stream_in(self.pg4, self.pg5)
1234         self.pg4.add_stream(pkts)
1235         self.pg_enable_capture(self.pg_interfaces)
1236         self.pg_start()
1237         capture = self.pg5.get_capture(len(pkts))
1238         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1239
1240         # between NAT44 inside interfaces with different VRF (hairpinning)
1241         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1242              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1243              TCP(sport=1234, dport=5678))
1244         self.pg4.add_stream(p)
1245         self.pg_enable_capture(self.pg_interfaces)
1246         self.pg_start()
1247         capture = self.pg6.get_capture(1)
1248         p = capture[0]
1249         try:
1250             ip = p[IP]
1251             tcp = p[TCP]
1252             self.assertEqual(ip.src, self.nat_addr)
1253             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1254             self.assertNotEqual(tcp.sport, 1234)
1255             self.assertEqual(tcp.dport, 5678)
1256         except:
1257             self.logger.error(ppp("Unexpected or invalid packet:", p))
1258             raise
1259
1260         # in2out 1st interface
1261         pkts = self.create_stream_in(self.pg4, self.pg3)
1262         self.pg4.add_stream(pkts)
1263         self.pg_enable_capture(self.pg_interfaces)
1264         self.pg_start()
1265         capture = self.pg3.get_capture(len(pkts))
1266         self.verify_capture_out(capture)
1267
1268         # out2in 1st interface
1269         pkts = self.create_stream_out(self.pg3)
1270         self.pg3.add_stream(pkts)
1271         self.pg_enable_capture(self.pg_interfaces)
1272         self.pg_start()
1273         capture = self.pg4.get_capture(len(pkts))
1274         self.verify_capture_in(capture, self.pg4)
1275
1276         # in2out 2nd interface
1277         pkts = self.create_stream_in(self.pg5, self.pg3)
1278         self.pg5.add_stream(pkts)
1279         self.pg_enable_capture(self.pg_interfaces)
1280         self.pg_start()
1281         capture = self.pg3.get_capture(len(pkts))
1282         self.verify_capture_out(capture)
1283
1284         # out2in 2nd interface
1285         pkts = self.create_stream_out(self.pg3)
1286         self.pg3.add_stream(pkts)
1287         self.pg_enable_capture(self.pg_interfaces)
1288         self.pg_start()
1289         capture = self.pg5.get_capture(len(pkts))
1290         self.verify_capture_in(capture, self.pg5)
1291
1292         # pg5 session dump
1293         addresses = self.vapi.nat44_address_dump()
1294         self.assertEqual(len(addresses), 1)
1295         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1296         self.assertEqual(len(sessions), 3)
1297         for session in sessions:
1298             self.assertFalse(session.is_static)
1299             self.assertEqual(session.inside_ip_address[0:4],
1300                              self.pg5.remote_ip4n)
1301             self.assertEqual(session.outside_ip_address,
1302                              addresses[0].ip_address)
1303         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1304         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1305         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1306         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1307         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1308         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1309         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1310         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1311         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1312
1313         # in2out 3rd interface
1314         pkts = self.create_stream_in(self.pg6, self.pg3)
1315         self.pg6.add_stream(pkts)
1316         self.pg_enable_capture(self.pg_interfaces)
1317         self.pg_start()
1318         capture = self.pg3.get_capture(len(pkts))
1319         self.verify_capture_out(capture, static_nat_ip, True)
1320
1321         # out2in 3rd interface
1322         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1323         self.pg3.add_stream(pkts)
1324         self.pg_enable_capture(self.pg_interfaces)
1325         self.pg_start()
1326         capture = self.pg6.get_capture(len(pkts))
1327         self.verify_capture_in(capture, self.pg6)
1328
1329         # general user and session dump verifications
1330         users = self.vapi.nat44_user_dump()
1331         self.assertTrue(len(users) >= 3)
1332         addresses = self.vapi.nat44_address_dump()
1333         self.assertEqual(len(addresses), 1)
1334         for user in users:
1335             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1336                                                          user.vrf_id)
1337             for session in sessions:
1338                 self.assertEqual(user.ip_address, session.inside_ip_address)
1339                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1340                 self.assertTrue(session.protocol in
1341                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1342                                  IP_PROTOS.icmp])
1343
1344         # pg4 session dump
1345         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1346         self.assertTrue(len(sessions) >= 4)
1347         for session in sessions:
1348             self.assertFalse(session.is_static)
1349             self.assertEqual(session.inside_ip_address[0:4],
1350                              self.pg4.remote_ip4n)
1351             self.assertEqual(session.outside_ip_address,
1352                              addresses[0].ip_address)
1353
1354         # pg6 session dump
1355         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1356         self.assertTrue(len(sessions) >= 3)
1357         for session in sessions:
1358             self.assertTrue(session.is_static)
1359             self.assertEqual(session.inside_ip_address[0:4],
1360                              self.pg6.remote_ip4n)
1361             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1362                              map(int, static_nat_ip.split('.')))
1363             self.assertTrue(session.inside_port in
1364                             [self.tcp_port_in, self.udp_port_in,
1365                              self.icmp_id_in])
1366
1367     def test_hairpinning(self):
1368         """ NAT44 hairpinning - 1:1 NAPT """
1369
1370         host = self.pg0.remote_hosts[0]
1371         server = self.pg0.remote_hosts[1]
1372         host_in_port = 1234
1373         host_out_port = 0
1374         server_in_port = 5678
1375         server_out_port = 8765
1376
1377         self.nat44_add_address(self.nat_addr)
1378         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1379         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1380                                                   is_inside=0)
1381         # add static mapping for server
1382         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1383                                       server_in_port, server_out_port,
1384                                       proto=IP_PROTOS.tcp)
1385
1386         # send packet from host to server
1387         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1388              IP(src=host.ip4, dst=self.nat_addr) /
1389              TCP(sport=host_in_port, dport=server_out_port))
1390         self.pg0.add_stream(p)
1391         self.pg_enable_capture(self.pg_interfaces)
1392         self.pg_start()
1393         capture = self.pg0.get_capture(1)
1394         p = capture[0]
1395         try:
1396             ip = p[IP]
1397             tcp = p[TCP]
1398             self.assertEqual(ip.src, self.nat_addr)
1399             self.assertEqual(ip.dst, server.ip4)
1400             self.assertNotEqual(tcp.sport, host_in_port)
1401             self.assertEqual(tcp.dport, server_in_port)
1402             self.check_tcp_checksum(p)
1403             host_out_port = tcp.sport
1404         except:
1405             self.logger.error(ppp("Unexpected or invalid packet:", p))
1406             raise
1407
1408         # send reply from server to host
1409         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1410              IP(src=server.ip4, dst=self.nat_addr) /
1411              TCP(sport=server_in_port, dport=host_out_port))
1412         self.pg0.add_stream(p)
1413         self.pg_enable_capture(self.pg_interfaces)
1414         self.pg_start()
1415         capture = self.pg0.get_capture(1)
1416         p = capture[0]
1417         try:
1418             ip = p[IP]
1419             tcp = p[TCP]
1420             self.assertEqual(ip.src, self.nat_addr)
1421             self.assertEqual(ip.dst, host.ip4)
1422             self.assertEqual(tcp.sport, server_out_port)
1423             self.assertEqual(tcp.dport, host_in_port)
1424             self.check_tcp_checksum(p)
1425         except:
1426             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1427             raise
1428
1429     def test_hairpinning2(self):
1430         """ NAT44 hairpinning - 1:1 NAT"""
1431
1432         server1_nat_ip = "10.0.0.10"
1433         server2_nat_ip = "10.0.0.11"
1434         host = self.pg0.remote_hosts[0]
1435         server1 = self.pg0.remote_hosts[1]
1436         server2 = self.pg0.remote_hosts[2]
1437         server_tcp_port = 22
1438         server_udp_port = 20
1439
1440         self.nat44_add_address(self.nat_addr)
1441         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1442         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1443                                                   is_inside=0)
1444
1445         # add static mapping for servers
1446         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1447         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1448
1449         # host to server1
1450         pkts = []
1451         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1452              IP(src=host.ip4, dst=server1_nat_ip) /
1453              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1454         pkts.append(p)
1455         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1456              IP(src=host.ip4, dst=server1_nat_ip) /
1457              UDP(sport=self.udp_port_in, dport=server_udp_port))
1458         pkts.append(p)
1459         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1460              IP(src=host.ip4, dst=server1_nat_ip) /
1461              ICMP(id=self.icmp_id_in, type='echo-request'))
1462         pkts.append(p)
1463         self.pg0.add_stream(pkts)
1464         self.pg_enable_capture(self.pg_interfaces)
1465         self.pg_start()
1466         capture = self.pg0.get_capture(len(pkts))
1467         for packet in capture:
1468             try:
1469                 self.assertEqual(packet[IP].src, self.nat_addr)
1470                 self.assertEqual(packet[IP].dst, server1.ip4)
1471                 if packet.haslayer(TCP):
1472                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1473                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1474                     self.tcp_port_out = packet[TCP].sport
1475                     self.check_tcp_checksum(packet)
1476                 elif packet.haslayer(UDP):
1477                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1478                     self.assertEqual(packet[UDP].dport, server_udp_port)
1479                     self.udp_port_out = packet[UDP].sport
1480                 else:
1481                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1482                     self.icmp_id_out = packet[ICMP].id
1483             except:
1484                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1485                 raise
1486
1487         # server1 to host
1488         pkts = []
1489         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1490              IP(src=server1.ip4, dst=self.nat_addr) /
1491              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1492         pkts.append(p)
1493         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1494              IP(src=server1.ip4, dst=self.nat_addr) /
1495              UDP(sport=server_udp_port, dport=self.udp_port_out))
1496         pkts.append(p)
1497         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1498              IP(src=server1.ip4, dst=self.nat_addr) /
1499              ICMP(id=self.icmp_id_out, type='echo-reply'))
1500         pkts.append(p)
1501         self.pg0.add_stream(pkts)
1502         self.pg_enable_capture(self.pg_interfaces)
1503         self.pg_start()
1504         capture = self.pg0.get_capture(len(pkts))
1505         for packet in capture:
1506             try:
1507                 self.assertEqual(packet[IP].src, server1_nat_ip)
1508                 self.assertEqual(packet[IP].dst, host.ip4)
1509                 if packet.haslayer(TCP):
1510                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1511                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1512                     self.check_tcp_checksum(packet)
1513                 elif packet.haslayer(UDP):
1514                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1515                     self.assertEqual(packet[UDP].sport, server_udp_port)
1516                 else:
1517                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1518             except:
1519                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1520                 raise
1521
1522         # server2 to server1
1523         pkts = []
1524         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1525              IP(src=server2.ip4, dst=server1_nat_ip) /
1526              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1527         pkts.append(p)
1528         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1529              IP(src=server2.ip4, dst=server1_nat_ip) /
1530              UDP(sport=self.udp_port_in, dport=server_udp_port))
1531         pkts.append(p)
1532         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1533              IP(src=server2.ip4, dst=server1_nat_ip) /
1534              ICMP(id=self.icmp_id_in, type='echo-request'))
1535         pkts.append(p)
1536         self.pg0.add_stream(pkts)
1537         self.pg_enable_capture(self.pg_interfaces)
1538         self.pg_start()
1539         capture = self.pg0.get_capture(len(pkts))
1540         for packet in capture:
1541             try:
1542                 self.assertEqual(packet[IP].src, server2_nat_ip)
1543                 self.assertEqual(packet[IP].dst, server1.ip4)
1544                 if packet.haslayer(TCP):
1545                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1546                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1547                     self.tcp_port_out = packet[TCP].sport
1548                     self.check_tcp_checksum(packet)
1549                 elif packet.haslayer(UDP):
1550                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1551                     self.assertEqual(packet[UDP].dport, server_udp_port)
1552                     self.udp_port_out = packet[UDP].sport
1553                 else:
1554                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1555                     self.icmp_id_out = packet[ICMP].id
1556             except:
1557                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1558                 raise
1559
1560         # server1 to server2
1561         pkts = []
1562         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1563              IP(src=server1.ip4, dst=server2_nat_ip) /
1564              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1565         pkts.append(p)
1566         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1567              IP(src=server1.ip4, dst=server2_nat_ip) /
1568              UDP(sport=server_udp_port, dport=self.udp_port_out))
1569         pkts.append(p)
1570         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1571              IP(src=server1.ip4, dst=server2_nat_ip) /
1572              ICMP(id=self.icmp_id_out, type='echo-reply'))
1573         pkts.append(p)
1574         self.pg0.add_stream(pkts)
1575         self.pg_enable_capture(self.pg_interfaces)
1576         self.pg_start()
1577         capture = self.pg0.get_capture(len(pkts))
1578         for packet in capture:
1579             try:
1580                 self.assertEqual(packet[IP].src, server1_nat_ip)
1581                 self.assertEqual(packet[IP].dst, server2.ip4)
1582                 if packet.haslayer(TCP):
1583                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1584                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1585                     self.check_tcp_checksum(packet)
1586                 elif packet.haslayer(UDP):
1587                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1588                     self.assertEqual(packet[UDP].sport, server_udp_port)
1589                 else:
1590                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1591             except:
1592                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1593                 raise
1594
1595     def test_max_translations_per_user(self):
1596         """ MAX translations per user - recycle the least recently used """
1597
1598         self.nat44_add_address(self.nat_addr)
1599         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1600         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1601                                                   is_inside=0)
1602
1603         # get maximum number of translations per user
1604         nat44_config = self.vapi.nat_show_config()
1605
1606         # send more than maximum number of translations per user packets
1607         pkts_num = nat44_config.max_translations_per_user + 5
1608         pkts = []
1609         for port in range(0, pkts_num):
1610             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1611                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1612                  TCP(sport=1025 + port))
1613             pkts.append(p)
1614         self.pg0.add_stream(pkts)
1615         self.pg_enable_capture(self.pg_interfaces)
1616         self.pg_start()
1617
1618         # verify number of translated packet
1619         self.pg1.get_capture(pkts_num)
1620
1621     def test_interface_addr(self):
1622         """ Acquire NAT44 addresses from interface """
1623         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1624
1625         # no address in NAT pool
1626         adresses = self.vapi.nat44_address_dump()
1627         self.assertEqual(0, len(adresses))
1628
1629         # configure interface address and check NAT address pool
1630         self.pg7.config_ip4()
1631         adresses = self.vapi.nat44_address_dump()
1632         self.assertEqual(1, len(adresses))
1633         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1634
1635         # remove interface address and check NAT address pool
1636         self.pg7.unconfig_ip4()
1637         adresses = self.vapi.nat44_address_dump()
1638         self.assertEqual(0, len(adresses))
1639
1640     def test_interface_addr_static_mapping(self):
1641         """ Static mapping with addresses from interface """
1642         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1643         self.nat44_add_static_mapping(
1644             '1.2.3.4',
1645             external_sw_if_index=self.pg7.sw_if_index)
1646
1647         # static mappings with external interface
1648         static_mappings = self.vapi.nat44_static_mapping_dump()
1649         self.assertEqual(1, len(static_mappings))
1650         self.assertEqual(self.pg7.sw_if_index,
1651                          static_mappings[0].external_sw_if_index)
1652
1653         # configure interface address and check static mappings
1654         self.pg7.config_ip4()
1655         static_mappings = self.vapi.nat44_static_mapping_dump()
1656         self.assertEqual(1, len(static_mappings))
1657         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1658                          self.pg7.local_ip4n)
1659         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1660
1661         # remove interface address and check static mappings
1662         self.pg7.unconfig_ip4()
1663         static_mappings = self.vapi.nat44_static_mapping_dump()
1664         self.assertEqual(0, len(static_mappings))
1665
1666     def test_ipfix_nat44_sess(self):
1667         """ IPFIX logging NAT44 session created/delted """
1668         self.ipfix_domain_id = 10
1669         self.ipfix_src_port = 20202
1670         colector_port = 30303
1671         bind_layers(UDP, IPFIX, dport=30303)
1672         self.nat44_add_address(self.nat_addr)
1673         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1674         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1675                                                   is_inside=0)
1676         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1677                                      src_address=self.pg3.local_ip4n,
1678                                      path_mtu=512,
1679                                      template_interval=10,
1680                                      collector_port=colector_port)
1681         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1682                             src_port=self.ipfix_src_port)
1683
1684         pkts = self.create_stream_in(self.pg0, self.pg1)
1685         self.pg0.add_stream(pkts)
1686         self.pg_enable_capture(self.pg_interfaces)
1687         self.pg_start()
1688         capture = self.pg1.get_capture(len(pkts))
1689         self.verify_capture_out(capture)
1690         self.nat44_add_address(self.nat_addr, is_add=0)
1691         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1692         capture = self.pg3.get_capture(3)
1693         ipfix = IPFIXDecoder()
1694         # first load template
1695         for p in capture:
1696             self.assertTrue(p.haslayer(IPFIX))
1697             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1698             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1699             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1700             self.assertEqual(p[UDP].dport, colector_port)
1701             self.assertEqual(p[IPFIX].observationDomainID,
1702                              self.ipfix_domain_id)
1703             if p.haslayer(Template):
1704                 ipfix.add_template(p.getlayer(Template))
1705         # verify events in data set
1706         for p in capture:
1707             if p.haslayer(Data):
1708                 data = ipfix.decode_data_set(p.getlayer(Set))
1709                 self.verify_ipfix_nat44_ses(data)
1710
1711     def test_ipfix_addr_exhausted(self):
1712         """ IPFIX logging NAT addresses exhausted """
1713         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1714         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1715                                                   is_inside=0)
1716         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1717                                      src_address=self.pg3.local_ip4n,
1718                                      path_mtu=512,
1719                                      template_interval=10)
1720         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1721                             src_port=self.ipfix_src_port)
1722
1723         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1724              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1725              TCP(sport=3025))
1726         self.pg0.add_stream(p)
1727         self.pg_enable_capture(self.pg_interfaces)
1728         self.pg_start()
1729         capture = self.pg1.get_capture(0)
1730         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1731         capture = self.pg3.get_capture(3)
1732         ipfix = IPFIXDecoder()
1733         # first load template
1734         for p in capture:
1735             self.assertTrue(p.haslayer(IPFIX))
1736             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1737             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1738             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1739             self.assertEqual(p[UDP].dport, 4739)
1740             self.assertEqual(p[IPFIX].observationDomainID,
1741                              self.ipfix_domain_id)
1742             if p.haslayer(Template):
1743                 ipfix.add_template(p.getlayer(Template))
1744         # verify events in data set
1745         for p in capture:
1746             if p.haslayer(Data):
1747                 data = ipfix.decode_data_set(p.getlayer(Set))
1748                 self.verify_ipfix_addr_exhausted(data)
1749
1750     def test_pool_addr_fib(self):
1751         """ NAT44 add pool addresses to FIB """
1752         static_addr = '10.0.0.10'
1753         self.nat44_add_address(self.nat_addr)
1754         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1755         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1756                                                   is_inside=0)
1757         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1758
1759         # NAT44 address
1760         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1761              ARP(op=ARP.who_has, pdst=self.nat_addr,
1762                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1763         self.pg1.add_stream(p)
1764         self.pg_enable_capture(self.pg_interfaces)
1765         self.pg_start()
1766         capture = self.pg1.get_capture(1)
1767         self.assertTrue(capture[0].haslayer(ARP))
1768         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1769
1770         # 1:1 NAT address
1771         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1772              ARP(op=ARP.who_has, pdst=static_addr,
1773                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1774         self.pg1.add_stream(p)
1775         self.pg_enable_capture(self.pg_interfaces)
1776         self.pg_start()
1777         capture = self.pg1.get_capture(1)
1778         self.assertTrue(capture[0].haslayer(ARP))
1779         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1780
1781         # send ARP to non-NAT44 interface
1782         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1783              ARP(op=ARP.who_has, pdst=self.nat_addr,
1784                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1785         self.pg2.add_stream(p)
1786         self.pg_enable_capture(self.pg_interfaces)
1787         self.pg_start()
1788         capture = self.pg1.get_capture(0)
1789
1790         # remove addresses and verify
1791         self.nat44_add_address(self.nat_addr, is_add=0)
1792         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1793                                       is_add=0)
1794
1795         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1796              ARP(op=ARP.who_has, pdst=self.nat_addr,
1797                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1798         self.pg1.add_stream(p)
1799         self.pg_enable_capture(self.pg_interfaces)
1800         self.pg_start()
1801         capture = self.pg1.get_capture(0)
1802
1803         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1804              ARP(op=ARP.who_has, pdst=static_addr,
1805                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1806         self.pg1.add_stream(p)
1807         self.pg_enable_capture(self.pg_interfaces)
1808         self.pg_start()
1809         capture = self.pg1.get_capture(0)
1810
1811     def test_vrf_mode(self):
1812         """ NAT44 tenant VRF aware address pool mode """
1813
1814         vrf_id1 = 1
1815         vrf_id2 = 2
1816         nat_ip1 = "10.0.0.10"
1817         nat_ip2 = "10.0.0.11"
1818
1819         self.pg0.unconfig_ip4()
1820         self.pg1.unconfig_ip4()
1821         self.vapi.ip_table_add_del(vrf_id1, is_add=1)
1822         self.vapi.ip_table_add_del(vrf_id2, is_add=1)
1823         self.pg0.set_table_ip4(vrf_id1)
1824         self.pg1.set_table_ip4(vrf_id2)
1825         self.pg0.config_ip4()
1826         self.pg1.config_ip4()
1827
1828         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
1829         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
1830         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1831         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1832         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1833                                                   is_inside=0)
1834
1835         # first VRF
1836         pkts = self.create_stream_in(self.pg0, self.pg2)
1837         self.pg0.add_stream(pkts)
1838         self.pg_enable_capture(self.pg_interfaces)
1839         self.pg_start()
1840         capture = self.pg2.get_capture(len(pkts))
1841         self.verify_capture_out(capture, nat_ip1)
1842
1843         # second VRF
1844         pkts = self.create_stream_in(self.pg1, self.pg2)
1845         self.pg1.add_stream(pkts)
1846         self.pg_enable_capture(self.pg_interfaces)
1847         self.pg_start()
1848         capture = self.pg2.get_capture(len(pkts))
1849         self.verify_capture_out(capture, nat_ip2)
1850
1851         self.pg0.unconfig_ip4()
1852         self.pg1.unconfig_ip4()
1853         self.pg0.set_table_ip4(0)
1854         self.pg1.set_table_ip4(0)
1855         self.vapi.ip_table_add_del(vrf_id1, is_add=0)
1856         self.vapi.ip_table_add_del(vrf_id2, is_add=0)
1857
1858     def test_vrf_feature_independent(self):
1859         """ NAT44 tenant VRF independent address pool mode """
1860
1861         nat_ip1 = "10.0.0.10"
1862         nat_ip2 = "10.0.0.11"
1863
1864         self.nat44_add_address(nat_ip1)
1865         self.nat44_add_address(nat_ip2)
1866         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1867         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1868         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1869                                                   is_inside=0)
1870
1871         # first VRF
1872         pkts = self.create_stream_in(self.pg0, self.pg2)
1873         self.pg0.add_stream(pkts)
1874         self.pg_enable_capture(self.pg_interfaces)
1875         self.pg_start()
1876         capture = self.pg2.get_capture(len(pkts))
1877         self.verify_capture_out(capture, nat_ip1)
1878
1879         # second VRF
1880         pkts = self.create_stream_in(self.pg1, self.pg2)
1881         self.pg1.add_stream(pkts)
1882         self.pg_enable_capture(self.pg_interfaces)
1883         self.pg_start()
1884         capture = self.pg2.get_capture(len(pkts))
1885         self.verify_capture_out(capture, nat_ip1)
1886
1887     def test_dynamic_ipless_interfaces(self):
1888         """ NAT44 interfaces without configured IP address """
1889
1890         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1891                                       mactobinary(self.pg7.remote_mac),
1892                                       self.pg7.remote_ip4n,
1893                                       is_static=1)
1894         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1895                                       mactobinary(self.pg8.remote_mac),
1896                                       self.pg8.remote_ip4n,
1897                                       is_static=1)
1898
1899         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1900                                    dst_address_length=32,
1901                                    next_hop_address=self.pg7.remote_ip4n,
1902                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1903         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1904                                    dst_address_length=32,
1905                                    next_hop_address=self.pg8.remote_ip4n,
1906                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1907
1908         self.nat44_add_address(self.nat_addr)
1909         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1910         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1911                                                   is_inside=0)
1912
1913         # in2out
1914         pkts = self.create_stream_in(self.pg7, self.pg8)
1915         self.pg7.add_stream(pkts)
1916         self.pg_enable_capture(self.pg_interfaces)
1917         self.pg_start()
1918         capture = self.pg8.get_capture(len(pkts))
1919         self.verify_capture_out(capture)
1920
1921         # out2in
1922         pkts = self.create_stream_out(self.pg8, self.nat_addr)
1923         self.pg8.add_stream(pkts)
1924         self.pg_enable_capture(self.pg_interfaces)
1925         self.pg_start()
1926         capture = self.pg7.get_capture(len(pkts))
1927         self.verify_capture_in(capture, self.pg7)
1928
1929     def test_static_ipless_interfaces(self):
1930         """ NAT44 interfaces without configured IP address - 1:1 NAT """
1931
1932         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1933                                       mactobinary(self.pg7.remote_mac),
1934                                       self.pg7.remote_ip4n,
1935                                       is_static=1)
1936         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1937                                       mactobinary(self.pg8.remote_mac),
1938                                       self.pg8.remote_ip4n,
1939                                       is_static=1)
1940
1941         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1942                                    dst_address_length=32,
1943                                    next_hop_address=self.pg7.remote_ip4n,
1944                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1945         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1946                                    dst_address_length=32,
1947                                    next_hop_address=self.pg8.remote_ip4n,
1948                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1949
1950         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
1951         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1952         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1953                                                   is_inside=0)
1954
1955         # out2in
1956         pkts = self.create_stream_out(self.pg8)
1957         self.pg8.add_stream(pkts)
1958         self.pg_enable_capture(self.pg_interfaces)
1959         self.pg_start()
1960         capture = self.pg7.get_capture(len(pkts))
1961         self.verify_capture_in(capture, self.pg7)
1962
1963         # in2out
1964         pkts = self.create_stream_in(self.pg7, self.pg8)
1965         self.pg7.add_stream(pkts)
1966         self.pg_enable_capture(self.pg_interfaces)
1967         self.pg_start()
1968         capture = self.pg8.get_capture(len(pkts))
1969         self.verify_capture_out(capture, self.nat_addr, True)
1970
1971     def test_static_with_port_ipless_interfaces(self):
1972         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
1973
1974         self.tcp_port_out = 30606
1975         self.udp_port_out = 30607
1976         self.icmp_id_out = 30608
1977
1978         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1979                                       mactobinary(self.pg7.remote_mac),
1980                                       self.pg7.remote_ip4n,
1981                                       is_static=1)
1982         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1983                                       mactobinary(self.pg8.remote_mac),
1984                                       self.pg8.remote_ip4n,
1985                                       is_static=1)
1986
1987         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1988                                    dst_address_length=32,
1989                                    next_hop_address=self.pg7.remote_ip4n,
1990                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1991         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1992                                    dst_address_length=32,
1993                                    next_hop_address=self.pg8.remote_ip4n,
1994                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1995
1996         self.nat44_add_address(self.nat_addr)
1997         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1998                                       self.tcp_port_in, self.tcp_port_out,
1999                                       proto=IP_PROTOS.tcp)
2000         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2001                                       self.udp_port_in, self.udp_port_out,
2002                                       proto=IP_PROTOS.udp)
2003         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2004                                       self.icmp_id_in, self.icmp_id_out,
2005                                       proto=IP_PROTOS.icmp)
2006         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2007         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2008                                                   is_inside=0)
2009
2010         # out2in
2011         pkts = self.create_stream_out(self.pg8)
2012         self.pg8.add_stream(pkts)
2013         self.pg_enable_capture(self.pg_interfaces)
2014         self.pg_start()
2015         capture = self.pg7.get_capture(len(pkts))
2016         self.verify_capture_in(capture, self.pg7)
2017
2018         # in2out
2019         pkts = self.create_stream_in(self.pg7, self.pg8)
2020         self.pg7.add_stream(pkts)
2021         self.pg_enable_capture(self.pg_interfaces)
2022         self.pg_start()
2023         capture = self.pg8.get_capture(len(pkts))
2024         self.verify_capture_out(capture)
2025
2026     def test_static_unknown_proto(self):
2027         """ 1:1 NAT translate packet with unknown protocol """
2028         nat_ip = "10.0.0.10"
2029         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2030         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2031         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2032                                                   is_inside=0)
2033
2034         # in2out
2035         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2036              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2037              GRE() /
2038              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2039              TCP(sport=1234, dport=1234))
2040         self.pg0.add_stream(p)
2041         self.pg_enable_capture(self.pg_interfaces)
2042         self.pg_start()
2043         p = self.pg1.get_capture(1)
2044         packet = p[0]
2045         try:
2046             self.assertEqual(packet[IP].src, nat_ip)
2047             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2048             self.assertTrue(packet.haslayer(GRE))
2049             self.check_ip_checksum(packet)
2050         except:
2051             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2052             raise
2053
2054         # out2in
2055         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2056              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2057              GRE() /
2058              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2059              TCP(sport=1234, dport=1234))
2060         self.pg1.add_stream(p)
2061         self.pg_enable_capture(self.pg_interfaces)
2062         self.pg_start()
2063         p = self.pg0.get_capture(1)
2064         packet = p[0]
2065         try:
2066             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2067             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2068             self.assertTrue(packet.haslayer(GRE))
2069             self.check_ip_checksum(packet)
2070         except:
2071             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2072             raise
2073
2074     def test_hairpinning_static_unknown_proto(self):
2075         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2076
2077         host = self.pg0.remote_hosts[0]
2078         server = self.pg0.remote_hosts[1]
2079
2080         host_nat_ip = "10.0.0.10"
2081         server_nat_ip = "10.0.0.11"
2082
2083         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2084         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2085         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2086         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2087                                                   is_inside=0)
2088
2089         # host to server
2090         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2091              IP(src=host.ip4, dst=server_nat_ip) /
2092              GRE() /
2093              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2094              TCP(sport=1234, dport=1234))
2095         self.pg0.add_stream(p)
2096         self.pg_enable_capture(self.pg_interfaces)
2097         self.pg_start()
2098         p = self.pg0.get_capture(1)
2099         packet = p[0]
2100         try:
2101             self.assertEqual(packet[IP].src, host_nat_ip)
2102             self.assertEqual(packet[IP].dst, server.ip4)
2103             self.assertTrue(packet.haslayer(GRE))
2104             self.check_ip_checksum(packet)
2105         except:
2106             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2107             raise
2108
2109         # server to host
2110         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2111              IP(src=server.ip4, dst=host_nat_ip) /
2112              GRE() /
2113              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2114              TCP(sport=1234, dport=1234))
2115         self.pg0.add_stream(p)
2116         self.pg_enable_capture(self.pg_interfaces)
2117         self.pg_start()
2118         p = self.pg0.get_capture(1)
2119         packet = p[0]
2120         try:
2121             self.assertEqual(packet[IP].src, server_nat_ip)
2122             self.assertEqual(packet[IP].dst, host.ip4)
2123             self.assertTrue(packet.haslayer(GRE))
2124             self.check_ip_checksum(packet)
2125         except:
2126             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2127             raise
2128
2129     def test_unknown_proto(self):
2130         """ NAT44 translate packet with unknown protocol """
2131         self.nat44_add_address(self.nat_addr)
2132         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2133         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2134                                                   is_inside=0)
2135
2136         # in2out
2137         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2138              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2139              TCP(sport=self.tcp_port_in, dport=20))
2140         self.pg0.add_stream(p)
2141         self.pg_enable_capture(self.pg_interfaces)
2142         self.pg_start()
2143         p = self.pg1.get_capture(1)
2144
2145         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2146              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2147              GRE() /
2148              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2149              TCP(sport=1234, dport=1234))
2150         self.pg0.add_stream(p)
2151         self.pg_enable_capture(self.pg_interfaces)
2152         self.pg_start()
2153         p = self.pg1.get_capture(1)
2154         packet = p[0]
2155         try:
2156             self.assertEqual(packet[IP].src, self.nat_addr)
2157             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2158             self.assertTrue(packet.haslayer(GRE))
2159             self.check_ip_checksum(packet)
2160         except:
2161             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2162             raise
2163
2164         # out2in
2165         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2166              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2167              GRE() /
2168              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2169              TCP(sport=1234, dport=1234))
2170         self.pg1.add_stream(p)
2171         self.pg_enable_capture(self.pg_interfaces)
2172         self.pg_start()
2173         p = self.pg0.get_capture(1)
2174         packet = p[0]
2175         try:
2176             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2177             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2178             self.assertTrue(packet.haslayer(GRE))
2179             self.check_ip_checksum(packet)
2180         except:
2181             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2182             raise
2183
2184     def test_hairpinning_unknown_proto(self):
2185         """ NAT44 translate packet with unknown protocol - hairpinning """
2186         host = self.pg0.remote_hosts[0]
2187         server = self.pg0.remote_hosts[1]
2188         host_in_port = 1234
2189         host_out_port = 0
2190         server_in_port = 5678
2191         server_out_port = 8765
2192         server_nat_ip = "10.0.0.11"
2193
2194         self.nat44_add_address(self.nat_addr)
2195         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2196         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2197                                                   is_inside=0)
2198
2199         # add static mapping for server
2200         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2201
2202         # host to server
2203         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2204              IP(src=host.ip4, dst=server_nat_ip) /
2205              TCP(sport=host_in_port, dport=server_out_port))
2206         self.pg0.add_stream(p)
2207         self.pg_enable_capture(self.pg_interfaces)
2208         self.pg_start()
2209         capture = self.pg0.get_capture(1)
2210
2211         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2212              IP(src=host.ip4, dst=server_nat_ip) /
2213              GRE() /
2214              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2215              TCP(sport=1234, dport=1234))
2216         self.pg0.add_stream(p)
2217         self.pg_enable_capture(self.pg_interfaces)
2218         self.pg_start()
2219         p = self.pg0.get_capture(1)
2220         packet = p[0]
2221         try:
2222             self.assertEqual(packet[IP].src, self.nat_addr)
2223             self.assertEqual(packet[IP].dst, server.ip4)
2224             self.assertTrue(packet.haslayer(GRE))
2225             self.check_ip_checksum(packet)
2226         except:
2227             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2228             raise
2229
2230         # server to host
2231         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2232              IP(src=server.ip4, dst=self.nat_addr) /
2233              GRE() /
2234              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2235              TCP(sport=1234, dport=1234))
2236         self.pg0.add_stream(p)
2237         self.pg_enable_capture(self.pg_interfaces)
2238         self.pg_start()
2239         p = self.pg0.get_capture(1)
2240         packet = p[0]
2241         try:
2242             self.assertEqual(packet[IP].src, server_nat_ip)
2243             self.assertEqual(packet[IP].dst, host.ip4)
2244             self.assertTrue(packet.haslayer(GRE))
2245             self.check_ip_checksum(packet)
2246         except:
2247             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2248             raise
2249
2250     def test_output_feature(self):
2251         """ NAT44 interface output feature (in2out postrouting) """
2252         self.nat44_add_address(self.nat_addr)
2253         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2254         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2255         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2256                                                          is_inside=0)
2257
2258         # in2out
2259         pkts = self.create_stream_in(self.pg0, self.pg3)
2260         self.pg0.add_stream(pkts)
2261         self.pg_enable_capture(self.pg_interfaces)
2262         self.pg_start()
2263         capture = self.pg3.get_capture(len(pkts))
2264         self.verify_capture_out(capture)
2265
2266         # out2in
2267         pkts = self.create_stream_out(self.pg3)
2268         self.pg3.add_stream(pkts)
2269         self.pg_enable_capture(self.pg_interfaces)
2270         self.pg_start()
2271         capture = self.pg0.get_capture(len(pkts))
2272         self.verify_capture_in(capture, self.pg0)
2273
2274         # from non-NAT interface to NAT inside interface
2275         pkts = self.create_stream_in(self.pg2, self.pg0)
2276         self.pg2.add_stream(pkts)
2277         self.pg_enable_capture(self.pg_interfaces)
2278         self.pg_start()
2279         capture = self.pg0.get_capture(len(pkts))
2280         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2281
2282     def test_output_feature_vrf_aware(self):
2283         """ NAT44 interface output feature VRF aware (in2out postrouting) """
2284         nat_ip_vrf10 = "10.0.0.10"
2285         nat_ip_vrf20 = "10.0.0.20"
2286
2287         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2288                                    dst_address_length=32,
2289                                    next_hop_address=self.pg3.remote_ip4n,
2290                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2291                                    table_id=10)
2292         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2293                                    dst_address_length=32,
2294                                    next_hop_address=self.pg3.remote_ip4n,
2295                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2296                                    table_id=20)
2297
2298         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2299         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2300         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2301         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2302         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2303                                                          is_inside=0)
2304
2305         # in2out VRF 10
2306         pkts = self.create_stream_in(self.pg4, self.pg3)
2307         self.pg4.add_stream(pkts)
2308         self.pg_enable_capture(self.pg_interfaces)
2309         self.pg_start()
2310         capture = self.pg3.get_capture(len(pkts))
2311         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2312
2313         # out2in VRF 10
2314         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2315         self.pg3.add_stream(pkts)
2316         self.pg_enable_capture(self.pg_interfaces)
2317         self.pg_start()
2318         capture = self.pg4.get_capture(len(pkts))
2319         self.verify_capture_in(capture, self.pg4)
2320
2321         # in2out VRF 20
2322         pkts = self.create_stream_in(self.pg6, self.pg3)
2323         self.pg6.add_stream(pkts)
2324         self.pg_enable_capture(self.pg_interfaces)
2325         self.pg_start()
2326         capture = self.pg3.get_capture(len(pkts))
2327         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2328
2329         # out2in VRF 20
2330         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2331         self.pg3.add_stream(pkts)
2332         self.pg_enable_capture(self.pg_interfaces)
2333         self.pg_start()
2334         capture = self.pg6.get_capture(len(pkts))
2335         self.verify_capture_in(capture, self.pg6)
2336
2337     def test_output_feature_hairpinning(self):
2338         """ NAT44 interface output feature hairpinning (in2out postrouting) """
2339         host = self.pg0.remote_hosts[0]
2340         server = self.pg0.remote_hosts[1]
2341         host_in_port = 1234
2342         host_out_port = 0
2343         server_in_port = 5678
2344         server_out_port = 8765
2345
2346         self.nat44_add_address(self.nat_addr)
2347         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2348         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2349                                                          is_inside=0)
2350
2351         # add static mapping for server
2352         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2353                                       server_in_port, server_out_port,
2354                                       proto=IP_PROTOS.tcp)
2355
2356         # send packet from host to server
2357         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2358              IP(src=host.ip4, dst=self.nat_addr) /
2359              TCP(sport=host_in_port, dport=server_out_port))
2360         self.pg0.add_stream(p)
2361         self.pg_enable_capture(self.pg_interfaces)
2362         self.pg_start()
2363         capture = self.pg0.get_capture(1)
2364         p = capture[0]
2365         try:
2366             ip = p[IP]
2367             tcp = p[TCP]
2368             self.assertEqual(ip.src, self.nat_addr)
2369             self.assertEqual(ip.dst, server.ip4)
2370             self.assertNotEqual(tcp.sport, host_in_port)
2371             self.assertEqual(tcp.dport, server_in_port)
2372             self.check_tcp_checksum(p)
2373             host_out_port = tcp.sport
2374         except:
2375             self.logger.error(ppp("Unexpected or invalid packet:", p))
2376             raise
2377
2378         # send reply from server to host
2379         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2380              IP(src=server.ip4, dst=self.nat_addr) /
2381              TCP(sport=server_in_port, dport=host_out_port))
2382         self.pg0.add_stream(p)
2383         self.pg_enable_capture(self.pg_interfaces)
2384         self.pg_start()
2385         capture = self.pg0.get_capture(1)
2386         p = capture[0]
2387         try:
2388             ip = p[IP]
2389             tcp = p[TCP]
2390             self.assertEqual(ip.src, self.nat_addr)
2391             self.assertEqual(ip.dst, host.ip4)
2392             self.assertEqual(tcp.sport, server_out_port)
2393             self.assertEqual(tcp.dport, host_in_port)
2394             self.check_tcp_checksum(p)
2395         except:
2396             self.logger.error(ppp("Unexpected or invalid packet:"), p)
2397             raise
2398
2399     def test_one_armed_nat44(self):
2400         """ One armed NAT44 """
2401         remote_host = self.pg9.remote_hosts[0]
2402         local_host = self.pg9.remote_hosts[1]
2403         external_port = 0
2404
2405         self.nat44_add_address(self.nat_addr)
2406         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2407         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2408                                                   is_inside=0)
2409
2410         # in2out
2411         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2412              IP(src=local_host.ip4, dst=remote_host.ip4) /
2413              TCP(sport=12345, dport=80))
2414         self.pg9.add_stream(p)
2415         self.pg_enable_capture(self.pg_interfaces)
2416         self.pg_start()
2417         capture = self.pg9.get_capture(1)
2418         p = capture[0]
2419         try:
2420             ip = p[IP]
2421             tcp = p[TCP]
2422             self.assertEqual(ip.src, self.nat_addr)
2423             self.assertEqual(ip.dst, remote_host.ip4)
2424             self.assertNotEqual(tcp.sport, 12345)
2425             external_port = tcp.sport
2426             self.assertEqual(tcp.dport, 80)
2427             self.check_tcp_checksum(p)
2428             self.check_ip_checksum(p)
2429         except:
2430             self.logger.error(ppp("Unexpected or invalid packet:", p))
2431             raise
2432
2433         # out2in
2434         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2435              IP(src=remote_host.ip4, dst=self.nat_addr) /
2436              TCP(sport=80, dport=external_port))
2437         self.pg9.add_stream(p)
2438         self.pg_enable_capture(self.pg_interfaces)
2439         self.pg_start()
2440         capture = self.pg9.get_capture(1)
2441         p = capture[0]
2442         try:
2443             ip = p[IP]
2444             tcp = p[TCP]
2445             self.assertEqual(ip.src, remote_host.ip4)
2446             self.assertEqual(ip.dst, local_host.ip4)
2447             self.assertEqual(tcp.sport, 80)
2448             self.assertEqual(tcp.dport, 12345)
2449             self.check_tcp_checksum(p)
2450             self.check_ip_checksum(p)
2451         except:
2452             self.logger.error(ppp("Unexpected or invalid packet:", p))
2453             raise
2454
2455     def test_del_session(self):
2456         """ Delete NAT44 session """
2457         self.nat44_add_address(self.nat_addr)
2458         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2459         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2460                                                   is_inside=0)
2461
2462         pkts = self.create_stream_in(self.pg0, self.pg1)
2463         self.pg0.add_stream(pkts)
2464         self.pg_enable_capture(self.pg_interfaces)
2465         self.pg_start()
2466         capture = self.pg1.get_capture(len(pkts))
2467
2468         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2469         nsessions = len(sessions)
2470
2471         self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2472                                     sessions[0].inside_port,
2473                                     sessions[0].protocol)
2474         self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2475                                     sessions[1].outside_port,
2476                                     sessions[1].protocol,
2477                                     is_in=0)
2478
2479         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2480         self.assertEqual(nsessions - len(sessions), 2)
2481
2482     def tearDown(self):
2483         super(TestNAT44, self).tearDown()
2484         if not self.vpp_dead:
2485             self.logger.info(self.vapi.cli("show nat44 verbose"))
2486             self.clear_nat44()
2487
2488
2489 class TestDeterministicNAT(MethodHolder):
2490     """ Deterministic NAT Test Cases """
2491
2492     @classmethod
2493     def setUpConstants(cls):
2494         super(TestDeterministicNAT, cls).setUpConstants()
2495         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2496
2497     @classmethod
2498     def setUpClass(cls):
2499         super(TestDeterministicNAT, cls).setUpClass()
2500
2501         try:
2502             cls.tcp_port_in = 6303
2503             cls.tcp_external_port = 6303
2504             cls.udp_port_in = 6304
2505             cls.udp_external_port = 6304
2506             cls.icmp_id_in = 6305
2507             cls.nat_addr = '10.0.0.3'
2508
2509             cls.create_pg_interfaces(range(3))
2510             cls.interfaces = list(cls.pg_interfaces)
2511
2512             for i in cls.interfaces:
2513                 i.admin_up()
2514                 i.config_ip4()
2515                 i.resolve_arp()
2516
2517             cls.pg0.generate_remote_hosts(2)
2518             cls.pg0.configure_ipv4_neighbors()
2519
2520         except Exception:
2521             super(TestDeterministicNAT, cls).tearDownClass()
2522             raise
2523
2524     def create_stream_in(self, in_if, out_if, ttl=64):
2525         """
2526         Create packet stream for inside network
2527
2528         :param in_if: Inside interface
2529         :param out_if: Outside interface
2530         :param ttl: TTL of generated packets
2531         """
2532         pkts = []
2533         # TCP
2534         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2535              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2536              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2537         pkts.append(p)
2538
2539         # UDP
2540         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2541              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2542              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2543         pkts.append(p)
2544
2545         # ICMP
2546         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2547              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2548              ICMP(id=self.icmp_id_in, type='echo-request'))
2549         pkts.append(p)
2550
2551         return pkts
2552
2553     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2554         """
2555         Create packet stream for outside network
2556
2557         :param out_if: Outside interface
2558         :param dst_ip: Destination IP address (Default use global NAT address)
2559         :param ttl: TTL of generated packets
2560         """
2561         if dst_ip is None:
2562             dst_ip = self.nat_addr
2563         pkts = []
2564         # TCP
2565         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2566              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2567              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2568         pkts.append(p)
2569
2570         # UDP
2571         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2572              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2573              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2574         pkts.append(p)
2575
2576         # ICMP
2577         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2578              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2579              ICMP(id=self.icmp_external_id, type='echo-reply'))
2580         pkts.append(p)
2581
2582         return pkts
2583
2584     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2585         """
2586         Verify captured packets on outside network
2587
2588         :param capture: Captured packets
2589         :param nat_ip: Translated IP address (Default use global NAT address)
2590         :param same_port: Sorce port number is not translated (Default False)
2591         :param packet_num: Expected number of packets (Default 3)
2592         """
2593         if nat_ip is None:
2594             nat_ip = self.nat_addr
2595         self.assertEqual(packet_num, len(capture))
2596         for packet in capture:
2597             try:
2598                 self.assertEqual(packet[IP].src, nat_ip)
2599                 if packet.haslayer(TCP):
2600                     self.tcp_port_out = packet[TCP].sport
2601                 elif packet.haslayer(UDP):
2602                     self.udp_port_out = packet[UDP].sport
2603                 else:
2604                     self.icmp_external_id = packet[ICMP].id
2605             except:
2606                 self.logger.error(ppp("Unexpected or invalid packet "
2607                                       "(outside network):", packet))
2608                 raise
2609
2610     def initiate_tcp_session(self, in_if, out_if):
2611         """
2612         Initiates TCP session
2613
2614         :param in_if: Inside interface
2615         :param out_if: Outside interface
2616         """
2617         try:
2618             # SYN packet in->out
2619             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2620                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2621                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2622                      flags="S"))
2623             in_if.add_stream(p)
2624             self.pg_enable_capture(self.pg_interfaces)
2625             self.pg_start()
2626             capture = out_if.get_capture(1)
2627             p = capture[0]
2628             self.tcp_port_out = p[TCP].sport
2629
2630             # SYN + ACK packet out->in
2631             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2632                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
2633                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2634                      flags="SA"))
2635             out_if.add_stream(p)
2636             self.pg_enable_capture(self.pg_interfaces)
2637             self.pg_start()
2638             in_if.get_capture(1)
2639
2640             # ACK packet in->out
2641             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2642                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2643                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2644                      flags="A"))
2645             in_if.add_stream(p)
2646             self.pg_enable_capture(self.pg_interfaces)
2647             self.pg_start()
2648             out_if.get_capture(1)
2649
2650         except:
2651             self.logger.error("TCP 3 way handshake failed")
2652             raise
2653
2654     def verify_ipfix_max_entries_per_user(self, data):
2655         """
2656         Verify IPFIX maximum entries per user exceeded event
2657
2658         :param data: Decoded IPFIX data records
2659         """
2660         self.assertEqual(1, len(data))
2661         record = data[0]
2662         # natEvent
2663         self.assertEqual(ord(record[230]), 13)
2664         # natQuotaExceededEvent
2665         self.assertEqual('\x03\x00\x00\x00', record[466])
2666         # sourceIPv4Address
2667         self.assertEqual(self.pg0.remote_ip4n, record[8])
2668
2669     def test_deterministic_mode(self):
2670         """ NAT plugin run deterministic mode """
2671         in_addr = '172.16.255.0'
2672         out_addr = '172.17.255.50'
2673         in_addr_t = '172.16.255.20'
2674         in_addr_n = socket.inet_aton(in_addr)
2675         out_addr_n = socket.inet_aton(out_addr)
2676         in_addr_t_n = socket.inet_aton(in_addr_t)
2677         in_plen = 24
2678         out_plen = 32
2679
2680         nat_config = self.vapi.nat_show_config()
2681         self.assertEqual(1, nat_config.deterministic)
2682
2683         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
2684
2685         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
2686         self.assertEqual(rep1.out_addr[:4], out_addr_n)
2687         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
2688         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2689
2690         deterministic_mappings = self.vapi.nat_det_map_dump()
2691         self.assertEqual(len(deterministic_mappings), 1)
2692         dsm = deterministic_mappings[0]
2693         self.assertEqual(in_addr_n, dsm.in_addr[:4])
2694         self.assertEqual(in_plen, dsm.in_plen)
2695         self.assertEqual(out_addr_n, dsm.out_addr[:4])
2696         self.assertEqual(out_plen, dsm.out_plen)
2697
2698         self.clear_nat_det()
2699         deterministic_mappings = self.vapi.nat_det_map_dump()
2700         self.assertEqual(len(deterministic_mappings), 0)
2701
2702     def test_set_timeouts(self):
2703         """ Set deterministic NAT timeouts """
2704         timeouts_before = self.vapi.nat_det_get_timeouts()
2705
2706         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
2707                                        timeouts_before.tcp_established + 10,
2708                                        timeouts_before.tcp_transitory + 10,
2709                                        timeouts_before.icmp + 10)
2710
2711         timeouts_after = self.vapi.nat_det_get_timeouts()
2712
2713         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2714         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2715         self.assertNotEqual(timeouts_before.tcp_established,
2716                             timeouts_after.tcp_established)
2717         self.assertNotEqual(timeouts_before.tcp_transitory,
2718                             timeouts_after.tcp_transitory)
2719
2720     def test_det_in(self):
2721         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
2722
2723         nat_ip = "10.0.0.10"
2724
2725         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2726                                       32,
2727                                       socket.inet_aton(nat_ip),
2728                                       32)
2729         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2730         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2731                                                   is_inside=0)
2732
2733         # in2out
2734         pkts = self.create_stream_in(self.pg0, self.pg1)
2735         self.pg0.add_stream(pkts)
2736         self.pg_enable_capture(self.pg_interfaces)
2737         self.pg_start()
2738         capture = self.pg1.get_capture(len(pkts))
2739         self.verify_capture_out(capture, nat_ip)
2740
2741         # out2in
2742         pkts = self.create_stream_out(self.pg1, nat_ip)
2743         self.pg1.add_stream(pkts)
2744         self.pg_enable_capture(self.pg_interfaces)
2745         self.pg_start()
2746         capture = self.pg0.get_capture(len(pkts))
2747         self.verify_capture_in(capture, self.pg0)
2748
2749         # session dump test
2750         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
2751         self.assertEqual(len(sessions), 3)
2752
2753         # TCP session
2754         s = sessions[0]
2755         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2756         self.assertEqual(s.in_port, self.tcp_port_in)
2757         self.assertEqual(s.out_port, self.tcp_port_out)
2758         self.assertEqual(s.ext_port, self.tcp_external_port)
2759
2760         # UDP session
2761         s = sessions[1]
2762         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2763         self.assertEqual(s.in_port, self.udp_port_in)
2764         self.assertEqual(s.out_port, self.udp_port_out)
2765         self.assertEqual(s.ext_port, self.udp_external_port)
2766
2767         # ICMP session
2768         s = sessions[2]
2769         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2770         self.assertEqual(s.in_port, self.icmp_id_in)
2771         self.assertEqual(s.out_port, self.icmp_external_id)
2772
2773     def test_multiple_users(self):
2774         """ Deterministic NAT multiple users """
2775
2776         nat_ip = "10.0.0.10"
2777         port_in = 80
2778         external_port = 6303
2779
2780         host0 = self.pg0.remote_hosts[0]
2781         host1 = self.pg0.remote_hosts[1]
2782
2783         self.vapi.nat_det_add_del_map(host0.ip4n,
2784                                       24,
2785                                       socket.inet_aton(nat_ip),
2786                                       32)
2787         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2788         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2789                                                   is_inside=0)
2790
2791         # host0 to out
2792         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2793              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2794              TCP(sport=port_in, dport=external_port))
2795         self.pg0.add_stream(p)
2796         self.pg_enable_capture(self.pg_interfaces)
2797         self.pg_start()
2798         capture = self.pg1.get_capture(1)
2799         p = capture[0]
2800         try:
2801             ip = p[IP]
2802             tcp = p[TCP]
2803             self.assertEqual(ip.src, nat_ip)
2804             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2805             self.assertEqual(tcp.dport, external_port)
2806             port_out0 = tcp.sport
2807         except:
2808             self.logger.error(ppp("Unexpected or invalid packet:", p))
2809             raise
2810
2811         # host1 to out
2812         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2813              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2814              TCP(sport=port_in, dport=external_port))
2815         self.pg0.add_stream(p)
2816         self.pg_enable_capture(self.pg_interfaces)
2817         self.pg_start()
2818         capture = self.pg1.get_capture(1)
2819         p = capture[0]
2820         try:
2821             ip = p[IP]
2822             tcp = p[TCP]
2823             self.assertEqual(ip.src, nat_ip)
2824             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2825             self.assertEqual(tcp.dport, external_port)
2826             port_out1 = tcp.sport
2827         except:
2828             self.logger.error(ppp("Unexpected or invalid packet:", p))
2829             raise
2830
2831         dms = self.vapi.nat_det_map_dump()
2832         self.assertEqual(1, len(dms))
2833         self.assertEqual(2, dms[0].ses_num)
2834
2835         # out to host0
2836         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2837              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2838              TCP(sport=external_port, dport=port_out0))
2839         self.pg1.add_stream(p)
2840         self.pg_enable_capture(self.pg_interfaces)
2841         self.pg_start()
2842         capture = self.pg0.get_capture(1)
2843         p = capture[0]
2844         try:
2845             ip = p[IP]
2846             tcp = p[TCP]
2847             self.assertEqual(ip.src, self.pg1.remote_ip4)
2848             self.assertEqual(ip.dst, host0.ip4)
2849             self.assertEqual(tcp.dport, port_in)
2850             self.assertEqual(tcp.sport, external_port)
2851         except:
2852             self.logger.error(ppp("Unexpected or invalid packet:", p))
2853             raise
2854
2855         # out to host1
2856         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2857              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2858              TCP(sport=external_port, dport=port_out1))
2859         self.pg1.add_stream(p)
2860         self.pg_enable_capture(self.pg_interfaces)
2861         self.pg_start()
2862         capture = self.pg0.get_capture(1)
2863         p = capture[0]
2864         try:
2865             ip = p[IP]
2866             tcp = p[TCP]
2867             self.assertEqual(ip.src, self.pg1.remote_ip4)
2868             self.assertEqual(ip.dst, host1.ip4)
2869             self.assertEqual(tcp.dport, port_in)
2870             self.assertEqual(tcp.sport, external_port)
2871         except:
2872             self.logger.error(ppp("Unexpected or invalid packet", p))
2873             raise
2874
2875         # session close api test
2876         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
2877                                             port_out1,
2878                                             self.pg1.remote_ip4n,
2879                                             external_port)
2880         dms = self.vapi.nat_det_map_dump()
2881         self.assertEqual(dms[0].ses_num, 1)
2882
2883         self.vapi.nat_det_close_session_in(host0.ip4n,
2884                                            port_in,
2885                                            self.pg1.remote_ip4n,
2886                                            external_port)
2887         dms = self.vapi.nat_det_map_dump()
2888         self.assertEqual(dms[0].ses_num, 0)
2889
2890     def test_tcp_session_close_detection_in(self):
2891         """ Deterministic NAT TCP session close from inside network """
2892         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2893                                       32,
2894                                       socket.inet_aton(self.nat_addr),
2895                                       32)
2896         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2897         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2898                                                   is_inside=0)
2899
2900         self.initiate_tcp_session(self.pg0, self.pg1)
2901
2902         # close the session from inside
2903         try:
2904             # FIN packet in -> out
2905             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2906                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2907                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2908                      flags="F"))
2909             self.pg0.add_stream(p)
2910             self.pg_enable_capture(self.pg_interfaces)
2911             self.pg_start()
2912             self.pg1.get_capture(1)
2913
2914             pkts = []
2915
2916             # ACK packet out -> in
2917             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2918                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2919                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2920                      flags="A"))
2921             pkts.append(p)
2922
2923             # FIN packet out -> in
2924             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2925                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2926                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2927                      flags="F"))
2928             pkts.append(p)
2929
2930             self.pg1.add_stream(pkts)
2931             self.pg_enable_capture(self.pg_interfaces)
2932             self.pg_start()
2933             self.pg0.get_capture(2)
2934
2935             # ACK packet in -> out
2936             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2937                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2938                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2939                      flags="A"))
2940             self.pg0.add_stream(p)
2941             self.pg_enable_capture(self.pg_interfaces)
2942             self.pg_start()
2943             self.pg1.get_capture(1)
2944
2945             # Check if deterministic NAT44 closed the session
2946             dms = self.vapi.nat_det_map_dump()
2947             self.assertEqual(0, dms[0].ses_num)
2948         except:
2949             self.logger.error("TCP session termination failed")
2950             raise
2951
2952     def test_tcp_session_close_detection_out(self):
2953         """ Deterministic NAT TCP session close from outside network """
2954         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2955                                       32,
2956                                       socket.inet_aton(self.nat_addr),
2957                                       32)
2958         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2959         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2960                                                   is_inside=0)
2961
2962         self.initiate_tcp_session(self.pg0, self.pg1)
2963
2964         # close the session from outside
2965         try:
2966             # FIN packet out -> in
2967             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2968                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2969                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2970                      flags="F"))
2971             self.pg1.add_stream(p)
2972             self.pg_enable_capture(self.pg_interfaces)
2973             self.pg_start()
2974             self.pg0.get_capture(1)
2975
2976             pkts = []
2977
2978             # ACK packet in -> out
2979             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2980                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2981                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2982                      flags="A"))
2983             pkts.append(p)
2984
2985             # ACK packet in -> out
2986             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2987                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2988                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2989                      flags="F"))
2990             pkts.append(p)
2991
2992             self.pg0.add_stream(pkts)
2993             self.pg_enable_capture(self.pg_interfaces)
2994             self.pg_start()
2995             self.pg1.get_capture(2)
2996
2997             # ACK packet out -> in
2998             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2999                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3000                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3001                      flags="A"))
3002             self.pg1.add_stream(p)
3003             self.pg_enable_capture(self.pg_interfaces)
3004             self.pg_start()
3005             self.pg0.get_capture(1)
3006
3007             # Check if deterministic NAT44 closed the session
3008             dms = self.vapi.nat_det_map_dump()
3009             self.assertEqual(0, dms[0].ses_num)
3010         except:
3011             self.logger.error("TCP session termination failed")
3012             raise
3013
3014     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3015     def test_session_timeout(self):
3016         """ Deterministic NAT session timeouts """
3017         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3018                                       32,
3019                                       socket.inet_aton(self.nat_addr),
3020                                       32)
3021         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3022         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3023                                                   is_inside=0)
3024
3025         self.initiate_tcp_session(self.pg0, self.pg1)
3026         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
3027         pkts = self.create_stream_in(self.pg0, self.pg1)
3028         self.pg0.add_stream(pkts)
3029         self.pg_enable_capture(self.pg_interfaces)
3030         self.pg_start()
3031         capture = self.pg1.get_capture(len(pkts))
3032         sleep(15)
3033
3034         dms = self.vapi.nat_det_map_dump()
3035         self.assertEqual(0, dms[0].ses_num)
3036
3037     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3038     def test_session_limit_per_user(self):
3039         """ Deterministic NAT maximum sessions per user limit """
3040         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3041                                       32,
3042                                       socket.inet_aton(self.nat_addr),
3043                                       32)
3044         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3045         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3046                                                   is_inside=0)
3047         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
3048                                      src_address=self.pg2.local_ip4n,
3049                                      path_mtu=512,
3050                                      template_interval=10)
3051         self.vapi.nat_ipfix()
3052
3053         pkts = []
3054         for port in range(1025, 2025):
3055             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3056                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3057                  UDP(sport=port, dport=port))
3058             pkts.append(p)
3059
3060         self.pg0.add_stream(pkts)
3061         self.pg_enable_capture(self.pg_interfaces)
3062         self.pg_start()
3063         capture = self.pg1.get_capture(len(pkts))
3064
3065         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3066              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3067              UDP(sport=3001, dport=3002))
3068         self.pg0.add_stream(p)
3069         self.pg_enable_capture(self.pg_interfaces)
3070         self.pg_start()
3071         capture = self.pg1.assert_nothing_captured()
3072
3073         # verify ICMP error packet
3074         capture = self.pg0.get_capture(1)
3075         p = capture[0]
3076         self.assertTrue(p.haslayer(ICMP))
3077         icmp = p[ICMP]
3078         self.assertEqual(icmp.type, 3)
3079         self.assertEqual(icmp.code, 1)
3080         self.assertTrue(icmp.haslayer(IPerror))
3081         inner_ip = icmp[IPerror]
3082         self.assertEqual(inner_ip[UDPerror].sport, 3001)
3083         self.assertEqual(inner_ip[UDPerror].dport, 3002)
3084
3085         dms = self.vapi.nat_det_map_dump()
3086
3087         self.assertEqual(1000, dms[0].ses_num)
3088
3089         # verify IPFIX logging
3090         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
3091         sleep(1)
3092         capture = self.pg2.get_capture(2)
3093         ipfix = IPFIXDecoder()
3094         # first load template
3095         for p in capture:
3096             self.assertTrue(p.haslayer(IPFIX))
3097             if p.haslayer(Template):
3098                 ipfix.add_template(p.getlayer(Template))
3099         # verify events in data set
3100         for p in capture:
3101             if p.haslayer(Data):
3102                 data = ipfix.decode_data_set(p.getlayer(Set))
3103                 self.verify_ipfix_max_entries_per_user(data)
3104
3105     def clear_nat_det(self):
3106         """
3107         Clear deterministic NAT configuration.
3108         """
3109         self.vapi.nat_ipfix(enable=0)
3110         self.vapi.nat_det_set_timeouts()
3111         deterministic_mappings = self.vapi.nat_det_map_dump()
3112         for dsm in deterministic_mappings:
3113             self.vapi.nat_det_add_del_map(dsm.in_addr,
3114                                           dsm.in_plen,
3115                                           dsm.out_addr,
3116                                           dsm.out_plen,
3117                                           is_add=0)
3118
3119         interfaces = self.vapi.nat44_interface_dump()
3120         for intf in interfaces:
3121             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3122                                                       intf.is_inside,
3123                                                       is_add=0)
3124
3125     def tearDown(self):
3126         super(TestDeterministicNAT, self).tearDown()
3127         if not self.vpp_dead:
3128             self.logger.info(self.vapi.cli("show nat44 detail"))
3129             self.clear_nat_det()
3130
3131
3132 class TestNAT64(MethodHolder):
3133     """ NAT64 Test Cases """
3134
3135     @classmethod
3136     def setUpClass(cls):
3137         super(TestNAT64, cls).setUpClass()
3138
3139         try:
3140             cls.tcp_port_in = 6303
3141             cls.tcp_port_out = 6303
3142             cls.udp_port_in = 6304
3143             cls.udp_port_out = 6304
3144             cls.icmp_id_in = 6305
3145             cls.icmp_id_out = 6305
3146             cls.nat_addr = '10.0.0.3'
3147             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3148             cls.vrf1_id = 10
3149             cls.vrf1_nat_addr = '10.0.10.3'
3150             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3151                                                    cls.vrf1_nat_addr)
3152
3153             cls.create_pg_interfaces(range(4))
3154             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3155             cls.ip6_interfaces.append(cls.pg_interfaces[2])
3156             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3157
3158             cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3159
3160             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3161
3162             cls.pg0.generate_remote_hosts(2)
3163
3164             for i in cls.ip6_interfaces:
3165                 i.admin_up()
3166                 i.config_ip6()
3167                 i.configure_ipv6_neighbors()
3168
3169             for i in cls.ip4_interfaces:
3170                 i.admin_up()
3171                 i.config_ip4()
3172                 i.resolve_arp()
3173
3174             cls.pg3.admin_up()
3175             cls.pg3.config_ip4()
3176             cls.pg3.resolve_arp()
3177             cls.pg3.config_ip6()
3178             cls.pg3.configure_ipv6_neighbors()
3179
3180         except Exception:
3181             super(TestNAT64, cls).tearDownClass()
3182             raise
3183
3184     def test_pool(self):
3185         """ Add/delete address to NAT64 pool """
3186         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3187
3188         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3189
3190         addresses = self.vapi.nat64_pool_addr_dump()
3191         self.assertEqual(len(addresses), 1)
3192         self.assertEqual(addresses[0].address, nat_addr)
3193
3194         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3195
3196         addresses = self.vapi.nat64_pool_addr_dump()
3197         self.assertEqual(len(addresses), 0)
3198
3199     def test_interface(self):
3200         """ Enable/disable NAT64 feature on the interface """
3201         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3202         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3203
3204         interfaces = self.vapi.nat64_interface_dump()
3205         self.assertEqual(len(interfaces), 2)
3206         pg0_found = False
3207         pg1_found = False
3208         for intf in interfaces:
3209             if intf.sw_if_index == self.pg0.sw_if_index:
3210                 self.assertEqual(intf.is_inside, 1)
3211                 pg0_found = True
3212             elif intf.sw_if_index == self.pg1.sw_if_index:
3213                 self.assertEqual(intf.is_inside, 0)
3214                 pg1_found = True
3215         self.assertTrue(pg0_found)
3216         self.assertTrue(pg1_found)
3217
3218         features = self.vapi.cli("show interface features pg0")
3219         self.assertNotEqual(features.find('nat64-in2out'), -1)
3220         features = self.vapi.cli("show interface features pg1")
3221         self.assertNotEqual(features.find('nat64-out2in'), -1)
3222
3223         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3224         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3225
3226         interfaces = self.vapi.nat64_interface_dump()
3227         self.assertEqual(len(interfaces), 0)
3228
3229     def test_static_bib(self):
3230         """ Add/delete static BIB entry """
3231         in_addr = socket.inet_pton(socket.AF_INET6,
3232                                    '2001:db8:85a3::8a2e:370:7334')
3233         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3234         in_port = 1234
3235         out_port = 5678
3236         proto = IP_PROTOS.tcp
3237
3238         self.vapi.nat64_add_del_static_bib(in_addr,
3239                                            out_addr,
3240                                            in_port,
3241                                            out_port,
3242                                            proto)
3243         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3244         static_bib_num = 0
3245         for bibe in bib:
3246             if bibe.is_static:
3247                 static_bib_num += 1
3248                 self.assertEqual(bibe.i_addr, in_addr)
3249                 self.assertEqual(bibe.o_addr, out_addr)
3250                 self.assertEqual(bibe.i_port, in_port)
3251                 self.assertEqual(bibe.o_port, out_port)
3252         self.assertEqual(static_bib_num, 1)
3253
3254         self.vapi.nat64_add_del_static_bib(in_addr,
3255                                            out_addr,
3256                                            in_port,
3257                                            out_port,
3258                                            proto,
3259                                            is_add=0)
3260         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3261         static_bib_num = 0
3262         for bibe in bib:
3263             if bibe.is_static:
3264                 static_bib_num += 1
3265         self.assertEqual(static_bib_num, 0)
3266
3267     def test_set_timeouts(self):
3268         """ Set NAT64 timeouts """
3269         # verify default values
3270         timeouts = self.vapi.nat64_get_timeouts()
3271         self.assertEqual(timeouts.udp, 300)
3272         self.assertEqual(timeouts.icmp, 60)
3273         self.assertEqual(timeouts.tcp_trans, 240)
3274         self.assertEqual(timeouts.tcp_est, 7440)
3275         self.assertEqual(timeouts.tcp_incoming_syn, 6)
3276
3277         # set and verify custom values
3278         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3279                                      tcp_est=7450, tcp_incoming_syn=10)
3280         timeouts = self.vapi.nat64_get_timeouts()
3281         self.assertEqual(timeouts.udp, 200)
3282         self.assertEqual(timeouts.icmp, 30)
3283         self.assertEqual(timeouts.tcp_trans, 250)
3284         self.assertEqual(timeouts.tcp_est, 7450)
3285         self.assertEqual(timeouts.tcp_incoming_syn, 10)
3286
3287     def test_dynamic(self):
3288         """ NAT64 dynamic translation test """
3289         self.tcp_port_in = 6303
3290         self.udp_port_in = 6304
3291         self.icmp_id_in = 6305
3292
3293         ses_num_start = self.nat64_get_ses_num()
3294
3295         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3296                                                 self.nat_addr_n)
3297         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3298         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3299
3300         # in2out
3301         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3302         self.pg0.add_stream(pkts)
3303         self.pg_enable_capture(self.pg_interfaces)
3304         self.pg_start()
3305         capture = self.pg1.get_capture(len(pkts))
3306         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3307                                 dst_ip=self.pg1.remote_ip4)
3308
3309         # out2in
3310         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3311         self.pg1.add_stream(pkts)
3312         self.pg_enable_capture(self.pg_interfaces)
3313         self.pg_start()
3314         capture = self.pg0.get_capture(len(pkts))
3315         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3316         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3317
3318         # in2out
3319         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3320         self.pg0.add_stream(pkts)
3321         self.pg_enable_capture(self.pg_interfaces)
3322         self.pg_start()
3323         capture = self.pg1.get_capture(len(pkts))
3324         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3325                                 dst_ip=self.pg1.remote_ip4)
3326
3327         # out2in
3328         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3329         self.pg1.add_stream(pkts)
3330         self.pg_enable_capture(self.pg_interfaces)
3331         self.pg_start()
3332         capture = self.pg0.get_capture(len(pkts))
3333         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3334
3335         ses_num_end = self.nat64_get_ses_num()
3336
3337         self.assertEqual(ses_num_end - ses_num_start, 3)
3338
3339         # tenant with specific VRF
3340         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3341                                                 self.vrf1_nat_addr_n,
3342                                                 vrf_id=self.vrf1_id)
3343         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3344
3345         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3346         self.pg2.add_stream(pkts)
3347         self.pg_enable_capture(self.pg_interfaces)
3348         self.pg_start()
3349         capture = self.pg1.get_capture(len(pkts))
3350         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3351                                 dst_ip=self.pg1.remote_ip4)
3352
3353         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3354         self.pg1.add_stream(pkts)
3355         self.pg_enable_capture(self.pg_interfaces)
3356         self.pg_start()
3357         capture = self.pg2.get_capture(len(pkts))
3358         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3359
3360     def test_static(self):
3361         """ NAT64 static translation test """
3362         self.tcp_port_in = 60303
3363         self.udp_port_in = 60304
3364         self.icmp_id_in = 60305
3365         self.tcp_port_out = 60303
3366         self.udp_port_out = 60304
3367         self.icmp_id_out = 60305
3368
3369         ses_num_start = self.nat64_get_ses_num()
3370
3371         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3372                                                 self.nat_addr_n)
3373         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3374         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3375
3376         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3377                                            self.nat_addr_n,
3378                                            self.tcp_port_in,
3379                                            self.tcp_port_out,
3380                                            IP_PROTOS.tcp)
3381         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3382                                            self.nat_addr_n,
3383                                            self.udp_port_in,
3384                                            self.udp_port_out,
3385                                            IP_PROTOS.udp)
3386         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3387                                            self.nat_addr_n,
3388                                            self.icmp_id_in,
3389                                            self.icmp_id_out,
3390                                            IP_PROTOS.icmp)
3391
3392         # in2out
3393         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3394         self.pg0.add_stream(pkts)
3395         self.pg_enable_capture(self.pg_interfaces)
3396         self.pg_start()
3397         capture = self.pg1.get_capture(len(pkts))
3398         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3399                                 dst_ip=self.pg1.remote_ip4, same_port=True)
3400
3401         # out2in
3402         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3403         self.pg1.add_stream(pkts)
3404         self.pg_enable_capture(self.pg_interfaces)
3405         self.pg_start()
3406         capture = self.pg0.get_capture(len(pkts))
3407         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3408         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3409
3410         ses_num_end = self.nat64_get_ses_num()
3411
3412         self.assertEqual(ses_num_end - ses_num_start, 3)
3413
3414     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3415     def test_session_timeout(self):
3416         """ NAT64 session timeout """
3417         self.icmp_id_in = 1234
3418         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3419                                                 self.nat_addr_n)
3420         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3421         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3422         self.vapi.nat64_set_timeouts(icmp=5)
3423
3424         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3425         self.pg0.add_stream(pkts)
3426         self.pg_enable_capture(self.pg_interfaces)
3427         self.pg_start()
3428         capture = self.pg1.get_capture(len(pkts))
3429
3430         ses_num_before_timeout = self.nat64_get_ses_num()
3431
3432         sleep(15)
3433
3434         # ICMP session after timeout
3435         ses_num_after_timeout = self.nat64_get_ses_num()
3436         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3437
3438     def test_icmp_error(self):
3439         """ NAT64 ICMP Error message translation """
3440         self.tcp_port_in = 6303
3441         self.udp_port_in = 6304
3442         self.icmp_id_in = 6305
3443
3444         ses_num_start = self.nat64_get_ses_num()
3445
3446         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3447                                                 self.nat_addr_n)
3448         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3449         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3450
3451         # send some packets to create sessions
3452         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3453         self.pg0.add_stream(pkts)
3454         self.pg_enable_capture(self.pg_interfaces)
3455         self.pg_start()
3456         capture_ip4 = self.pg1.get_capture(len(pkts))
3457         self.verify_capture_out(capture_ip4,
3458                                 nat_ip=self.nat_addr,
3459                                 dst_ip=self.pg1.remote_ip4)
3460
3461         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3462         self.pg1.add_stream(pkts)
3463         self.pg_enable_capture(self.pg_interfaces)
3464         self.pg_start()
3465         capture_ip6 = self.pg0.get_capture(len(pkts))
3466         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3467         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3468                                    self.pg0.remote_ip6)
3469
3470         # in2out
3471         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3472                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3473                 ICMPv6DestUnreach(code=1) /
3474                 packet[IPv6] for packet in capture_ip6]
3475         self.pg0.add_stream(pkts)
3476         self.pg_enable_capture(self.pg_interfaces)
3477         self.pg_start()
3478         capture = self.pg1.get_capture(len(pkts))
3479         for packet in capture:
3480             try:
3481                 self.assertEqual(packet[IP].src, self.nat_addr)
3482                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3483                 self.assertEqual(packet[ICMP].type, 3)
3484                 self.assertEqual(packet[ICMP].code, 13)
3485                 inner = packet[IPerror]
3486                 self.assertEqual(inner.src, self.pg1.remote_ip4)
3487                 self.assertEqual(inner.dst, self.nat_addr)
3488                 self.check_icmp_checksum(packet)
3489                 if inner.haslayer(TCPerror):
3490                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3491                 elif inner.haslayer(UDPerror):
3492                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3493                 else:
3494                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3495             except:
3496                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3497                 raise
3498
3499         # out2in
3500         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3501                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3502                 ICMP(type=3, code=13) /
3503                 packet[IP] for packet in capture_ip4]
3504         self.pg1.add_stream(pkts)
3505         self.pg_enable_capture(self.pg_interfaces)
3506         self.pg_start()
3507         capture = self.pg0.get_capture(len(pkts))
3508         for packet in capture:
3509             try:
3510                 self.assertEqual(packet[IPv6].src, ip.src)
3511                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3512                 icmp = packet[ICMPv6DestUnreach]
3513                 self.assertEqual(icmp.code, 1)
3514                 inner = icmp[IPerror6]
3515                 self.assertEqual(inner.src, self.pg0.remote_ip6)
3516                 self.assertEqual(inner.dst, ip.src)
3517                 self.check_icmpv6_checksum(packet)
3518                 if inner.haslayer(TCPerror):
3519                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3520                 elif inner.haslayer(UDPerror):
3521                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3522                 else:
3523                     self.assertEqual(inner[ICMPv6EchoRequest].id,
3524                                      self.icmp_id_in)
3525             except:
3526                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3527                 raise
3528
3529     def test_hairpinning(self):
3530         """ NAT64 hairpinning """
3531
3532         client = self.pg0.remote_hosts[0]
3533         server = self.pg0.remote_hosts[1]
3534         server_tcp_in_port = 22
3535         server_tcp_out_port = 4022
3536         server_udp_in_port = 23
3537         server_udp_out_port = 4023
3538         client_tcp_in_port = 1234
3539         client_udp_in_port = 1235
3540         client_tcp_out_port = 0
3541         client_udp_out_port = 0
3542         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3543         nat_addr_ip6 = ip.src
3544
3545         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3546                                                 self.nat_addr_n)
3547         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3548         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3549
3550         self.vapi.nat64_add_del_static_bib(server.ip6n,
3551                                            self.nat_addr_n,
3552                                            server_tcp_in_port,
3553                                            server_tcp_out_port,
3554                                            IP_PROTOS.tcp)
3555         self.vapi.nat64_add_del_static_bib(server.ip6n,
3556                                            self.nat_addr_n,
3557                                            server_udp_in_port,
3558                                            server_udp_out_port,
3559                                            IP_PROTOS.udp)
3560
3561         # client to server
3562         pkts = []
3563         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3564              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3565              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3566         pkts.append(p)
3567         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3568              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3569              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3570         pkts.append(p)
3571         self.pg0.add_stream(pkts)
3572         self.pg_enable_capture(self.pg_interfaces)
3573         self.pg_start()
3574         capture = self.pg0.get_capture(len(pkts))
3575         for packet in capture:
3576             try:
3577                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3578                 self.assertEqual(packet[IPv6].dst, server.ip6)
3579                 if packet.haslayer(TCP):
3580                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3581                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3582                     self.check_tcp_checksum(packet)
3583                     client_tcp_out_port = packet[TCP].sport
3584                 else:
3585                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3586                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
3587                     self.check_udp_checksum(packet)
3588                     client_udp_out_port = packet[UDP].sport
3589             except:
3590                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3591                 raise
3592
3593         # server to client
3594         pkts = []
3595         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3596              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3597              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3598         pkts.append(p)
3599         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3600              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3601              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3602         pkts.append(p)
3603         self.pg0.add_stream(pkts)
3604         self.pg_enable_capture(self.pg_interfaces)
3605         self.pg_start()
3606         capture = self.pg0.get_capture(len(pkts))
3607         for packet in capture:
3608             try:
3609                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3610                 self.assertEqual(packet[IPv6].dst, client.ip6)
3611                 if packet.haslayer(TCP):
3612                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3613                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3614                     self.check_tcp_checksum(packet)
3615                 else:
3616                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
3617                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
3618                     self.check_udp_checksum(packet)
3619             except:
3620                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3621                 raise
3622
3623         # ICMP error
3624         pkts = []
3625         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3626                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3627                 ICMPv6DestUnreach(code=1) /
3628                 packet[IPv6] for packet in capture]
3629         self.pg0.add_stream(pkts)
3630         self.pg_enable_capture(self.pg_interfaces)
3631         self.pg_start()
3632         capture = self.pg0.get_capture(len(pkts))
3633         for packet in capture:
3634             try:
3635                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3636                 self.assertEqual(packet[IPv6].dst, server.ip6)
3637                 icmp = packet[ICMPv6DestUnreach]
3638                 self.assertEqual(icmp.code, 1)
3639                 inner = icmp[IPerror6]
3640                 self.assertEqual(inner.src, server.ip6)
3641                 self.assertEqual(inner.dst, nat_addr_ip6)
3642                 self.check_icmpv6_checksum(packet)
3643                 if inner.haslayer(TCPerror):
3644                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3645                     self.assertEqual(inner[TCPerror].dport,
3646                                      client_tcp_out_port)
3647                 else:
3648                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3649                     self.assertEqual(inner[UDPerror].dport,
3650                                      client_udp_out_port)
3651             except:
3652                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3653                 raise
3654
3655     def test_prefix(self):
3656         """ NAT64 Network-Specific Prefix """
3657
3658         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3659                                                 self.nat_addr_n)
3660         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3661         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3662         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3663                                                 self.vrf1_nat_addr_n,
3664                                                 vrf_id=self.vrf1_id)
3665         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3666
3667         # Add global prefix
3668         global_pref64 = "2001:db8::"
3669         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3670         global_pref64_len = 32
3671         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3672
3673         prefix = self.vapi.nat64_prefix_dump()
3674         self.assertEqual(len(prefix), 1)
3675         self.assertEqual(prefix[0].prefix, global_pref64_n)
3676         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3677         self.assertEqual(prefix[0].vrf_id, 0)
3678
3679         # Add tenant specific prefix
3680         vrf1_pref64 = "2001:db8:122:300::"
3681         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3682         vrf1_pref64_len = 56
3683         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3684                                        vrf1_pref64_len,
3685                                        vrf_id=self.vrf1_id)
3686         prefix = self.vapi.nat64_prefix_dump()
3687         self.assertEqual(len(prefix), 2)
3688
3689         # Global prefix
3690         pkts = self.create_stream_in_ip6(self.pg0,
3691                                          self.pg1,
3692                                          pref=global_pref64,
3693                                          plen=global_pref64_len)
3694         self.pg0.add_stream(pkts)
3695         self.pg_enable_capture(self.pg_interfaces)
3696         self.pg_start()
3697         capture = self.pg1.get_capture(len(pkts))
3698         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3699                                 dst_ip=self.pg1.remote_ip4)
3700
3701         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3702         self.pg1.add_stream(pkts)
3703         self.pg_enable_capture(self.pg_interfaces)
3704         self.pg_start()
3705         capture = self.pg0.get_capture(len(pkts))
3706         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3707                                   global_pref64,
3708                                   global_pref64_len)
3709         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3710
3711         # Tenant specific prefix
3712         pkts = self.create_stream_in_ip6(self.pg2,
3713                                          self.pg1,
3714                                          pref=vrf1_pref64,
3715                                          plen=vrf1_pref64_len)
3716         self.pg2.add_stream(pkts)
3717         self.pg_enable_capture(self.pg_interfaces)
3718         self.pg_start()
3719         capture = self.pg1.get_capture(len(pkts))
3720         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3721                                 dst_ip=self.pg1.remote_ip4)
3722
3723         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3724         self.pg1.add_stream(pkts)
3725         self.pg_enable_capture(self.pg_interfaces)
3726         self.pg_start()
3727         capture = self.pg2.get_capture(len(pkts))
3728         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3729                                   vrf1_pref64,
3730                                   vrf1_pref64_len)
3731         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3732
3733     def test_unknown_proto(self):
3734         """ NAT64 translate packet with unknown protocol """
3735
3736         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3737                                                 self.nat_addr_n)
3738         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3739         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3740         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
3741
3742         # in2out
3743         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3744              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3745              TCP(sport=self.tcp_port_in, dport=20))
3746         self.pg0.add_stream(p)
3747         self.pg_enable_capture(self.pg_interfaces)
3748         self.pg_start()
3749         p = self.pg1.get_capture(1)
3750
3751         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3752              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
3753              GRE() /
3754              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3755              TCP(sport=1234, dport=1234))
3756         self.pg0.add_stream(p)
3757         self.pg_enable_capture(self.pg_interfaces)
3758         self.pg_start()
3759         p = self.pg1.get_capture(1)
3760         packet = p[0]
3761         try:
3762             self.assertEqual(packet[IP].src, self.nat_addr)
3763             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3764             self.assertTrue(packet.haslayer(GRE))
3765             self.check_ip_checksum(packet)
3766         except:
3767             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3768             raise
3769
3770         # out2in
3771         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3772              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3773              GRE() /
3774              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3775              TCP(sport=1234, dport=1234))
3776         self.pg1.add_stream(p)
3777         self.pg_enable_capture(self.pg_interfaces)
3778         self.pg_start()
3779         p = self.pg0.get_capture(1)
3780         packet = p[0]
3781         try:
3782             self.assertEqual(packet[IPv6].src, remote_ip6)
3783             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3784             self.assertEqual(packet[IPv6].nh, 47)
3785         except:
3786             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3787             raise
3788
3789     def test_hairpinning_unknown_proto(self):
3790         """ NAT64 translate packet with unknown protocol - hairpinning """
3791
3792         client = self.pg0.remote_hosts[0]
3793         server = self.pg0.remote_hosts[1]
3794         server_tcp_in_port = 22
3795         server_tcp_out_port = 4022
3796         client_tcp_in_port = 1234
3797         client_tcp_out_port = 1235
3798         server_nat_ip = "10.0.0.100"
3799         client_nat_ip = "10.0.0.110"
3800         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
3801         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
3802         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
3803         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
3804
3805         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
3806                                                 client_nat_ip_n)
3807         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3808         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3809
3810         self.vapi.nat64_add_del_static_bib(server.ip6n,
3811                                            server_nat_ip_n,
3812                                            server_tcp_in_port,
3813                                            server_tcp_out_port,
3814                                            IP_PROTOS.tcp)
3815
3816         self.vapi.nat64_add_del_static_bib(server.ip6n,
3817                                            server_nat_ip_n,
3818                                            0,
3819                                            0,
3820                                            IP_PROTOS.gre)
3821
3822         self.vapi.nat64_add_del_static_bib(client.ip6n,
3823                                            client_nat_ip_n,
3824                                            client_tcp_in_port,
3825                                            client_tcp_out_port,
3826                                            IP_PROTOS.tcp)
3827
3828         # client to server
3829         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3830              IPv6(src=client.ip6, dst=server_nat_ip6) /
3831              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3832         self.pg0.add_stream(p)
3833         self.pg_enable_capture(self.pg_interfaces)
3834         self.pg_start()
3835         p = self.pg0.get_capture(1)
3836
3837         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3838              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
3839              GRE() /
3840              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3841              TCP(sport=1234, dport=1234))
3842         self.pg0.add_stream(p)
3843         self.pg_enable_capture(self.pg_interfaces)
3844         self.pg_start()
3845         p = self.pg0.get_capture(1)
3846         packet = p[0]
3847         try:
3848             self.assertEqual(packet[IPv6].src, client_nat_ip6)
3849             self.assertEqual(packet[IPv6].dst, server.ip6)
3850             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3851         except:
3852             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3853             raise
3854
3855         # server to client
3856         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3857              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
3858              GRE() /
3859              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3860              TCP(sport=1234, dport=1234))
3861         self.pg0.add_stream(p)
3862         self.pg_enable_capture(self.pg_interfaces)
3863         self.pg_start()
3864         p = self.pg0.get_capture(1)
3865         packet = p[0]
3866         try:
3867             self.assertEqual(packet[IPv6].src, server_nat_ip6)
3868             self.assertEqual(packet[IPv6].dst, client.ip6)
3869             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3870         except:
3871             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3872             raise
3873
3874     def test_one_armed_nat64(self):
3875         """ One armed NAT64 """
3876         external_port = 0
3877         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
3878                                            '64:ff9b::',
3879                                            96)
3880
3881         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3882                                                 self.nat_addr_n)
3883         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
3884         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
3885
3886         # in2out
3887         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
3888              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
3889              TCP(sport=12345, dport=80))
3890         self.pg3.add_stream(p)
3891         self.pg_enable_capture(self.pg_interfaces)
3892         self.pg_start()
3893         capture = self.pg3.get_capture(1)
3894         p = capture[0]
3895         try:
3896             ip = p[IP]
3897             tcp = p[TCP]
3898             self.assertEqual(ip.src, self.nat_addr)
3899             self.assertEqual(ip.dst, self.pg3.remote_ip4)
3900             self.assertNotEqual(tcp.sport, 12345)
3901             external_port = tcp.sport
3902             self.assertEqual(tcp.dport, 80)
3903             self.check_tcp_checksum(p)
3904             self.check_ip_checksum(p)
3905         except:
3906             self.logger.error(ppp("Unexpected or invalid packet:", p))
3907             raise
3908
3909         # out2in
3910         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
3911              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
3912              TCP(sport=80, dport=external_port))
3913         self.pg3.add_stream(p)
3914         self.pg_enable_capture(self.pg_interfaces)
3915         self.pg_start()
3916         capture = self.pg3.get_capture(1)
3917         p = capture[0]
3918         try:
3919             ip = p[IPv6]
3920             tcp = p[TCP]
3921             self.assertEqual(ip.src, remote_host_ip6)
3922             self.assertEqual(ip.dst, self.pg3.remote_ip6)
3923             self.assertEqual(tcp.sport, 80)
3924             self.assertEqual(tcp.dport, 12345)
3925             self.check_tcp_checksum(p)
3926         except:
3927             self.logger.error(ppp("Unexpected or invalid packet:", p))
3928             raise
3929
3930     def nat64_get_ses_num(self):
3931         """
3932         Return number of active NAT64 sessions.
3933         """
3934         st = self.vapi.nat64_st_dump()
3935         return len(st)
3936
3937     def clear_nat64(self):
3938         """
3939         Clear NAT64 configuration.
3940         """
3941         self.vapi.nat64_set_timeouts()
3942
3943         interfaces = self.vapi.nat64_interface_dump()
3944         for intf in interfaces:
3945             if intf.is_inside > 1:
3946                 self.vapi.nat64_add_del_interface(intf.sw_if_index,
3947                                                   0,
3948                                                   is_add=0)
3949             self.vapi.nat64_add_del_interface(intf.sw_if_index,
3950                                               intf.is_inside,
3951                                               is_add=0)
3952
3953         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3954         for bibe in bib:
3955             if bibe.is_static:
3956                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3957                                                    bibe.o_addr,
3958                                                    bibe.i_port,
3959                                                    bibe.o_port,
3960                                                    bibe.proto,
3961                                                    bibe.vrf_id,
3962                                                    is_add=0)
3963
3964         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3965         for bibe in bib:
3966             if bibe.is_static:
3967                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3968                                                    bibe.o_addr,
3969                                                    bibe.i_port,
3970                                                    bibe.o_port,
3971                                                    bibe.proto,
3972                                                    bibe.vrf_id,
3973                                                    is_add=0)
3974
3975         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3976         for bibe in bib:
3977             if bibe.is_static:
3978                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3979                                                    bibe.o_addr,
3980                                                    bibe.i_port,
3981                                                    bibe.o_port,
3982                                                    bibe.proto,
3983                                                    bibe.vrf_id,
3984                                                    is_add=0)
3985
3986         adresses = self.vapi.nat64_pool_addr_dump()
3987         for addr in adresses:
3988             self.vapi.nat64_add_del_pool_addr_range(addr.address,
3989                                                     addr.address,
3990                                                     vrf_id=addr.vrf_id,
3991                                                     is_add=0)
3992
3993         prefixes = self.vapi.nat64_prefix_dump()
3994         for prefix in prefixes:
3995             self.vapi.nat64_add_del_prefix(prefix.prefix,
3996                                            prefix.prefix_len,
3997                                            vrf_id=prefix.vrf_id,
3998                                            is_add=0)
3999
4000     def tearDown(self):
4001         super(TestNAT64, self).tearDown()
4002         if not self.vpp_dead:
4003             self.logger.info(self.vapi.cli("show nat64 pool"))
4004             self.logger.info(self.vapi.cli("show nat64 interfaces"))
4005             self.logger.info(self.vapi.cli("show nat64 prefix"))
4006             self.logger.info(self.vapi.cli("show nat64 bib all"))
4007             self.logger.info(self.vapi.cli("show nat64 session table all"))
4008             self.clear_nat64()
4009
4010 if __name__ == '__main__':
4011     unittest.main(testRunner=VppTestRunner)