VPP-1033: Python API support arbitrary sized input parameters.
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
15 from util import ppp
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
18 from util import ip4_range
19 from util import mactobinary
20
21
22 class MethodHolder(VppTestCase):
23     """ NAT create capture and verify method holder """
24
25     @classmethod
26     def setUpClass(cls):
27         super(MethodHolder, cls).setUpClass()
28
29     def tearDown(self):
30         super(MethodHolder, self).tearDown()
31
32     def check_ip_checksum(self, pkt):
33         """
34         Check IP checksum of the packet
35
36         :param pkt: Packet to check IP checksum
37         """
38         new = pkt.__class__(str(pkt))
39         del new['IP'].chksum
40         new = new.__class__(str(new))
41         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
42
43     def check_tcp_checksum(self, pkt):
44         """
45         Check TCP checksum in IP packet
46
47         :param pkt: Packet to check TCP checksum
48         """
49         new = pkt.__class__(str(pkt))
50         del new['TCP'].chksum
51         new = new.__class__(str(new))
52         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
53
54     def check_udp_checksum(self, pkt):
55         """
56         Check UDP checksum in IP packet
57
58         :param pkt: Packet to check UDP checksum
59         """
60         new = pkt.__class__(str(pkt))
61         del new['UDP'].chksum
62         new = new.__class__(str(new))
63         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
64
65     def check_icmp_errror_embedded(self, pkt):
66         """
67         Check ICMP error embeded packet checksum
68
69         :param pkt: Packet to check ICMP error embeded packet checksum
70         """
71         if pkt.haslayer(IPerror):
72             new = pkt.__class__(str(pkt))
73             del new['IPerror'].chksum
74             new = new.__class__(str(new))
75             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
76
77         if pkt.haslayer(TCPerror):
78             new = pkt.__class__(str(pkt))
79             del new['TCPerror'].chksum
80             new = new.__class__(str(new))
81             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
82
83         if pkt.haslayer(UDPerror):
84             if pkt['UDPerror'].chksum != 0:
85                 new = pkt.__class__(str(pkt))
86                 del new['UDPerror'].chksum
87                 new = new.__class__(str(new))
88                 self.assertEqual(new['UDPerror'].chksum,
89                                  pkt['UDPerror'].chksum)
90
91         if pkt.haslayer(ICMPerror):
92             del new['ICMPerror'].chksum
93             new = new.__class__(str(new))
94             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
95
96     def check_icmp_checksum(self, pkt):
97         """
98         Check ICMP checksum in IPv4 packet
99
100         :param pkt: Packet to check ICMP checksum
101         """
102         new = pkt.__class__(str(pkt))
103         del new['ICMP'].chksum
104         new = new.__class__(str(new))
105         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
106         if pkt.haslayer(IPerror):
107             self.check_icmp_errror_embedded(pkt)
108
109     def check_icmpv6_checksum(self, pkt):
110         """
111         Check ICMPv6 checksum in IPv4 packet
112
113         :param pkt: Packet to check ICMPv6 checksum
114         """
115         new = pkt.__class__(str(pkt))
116         if pkt.haslayer(ICMPv6DestUnreach):
117             del new['ICMPv6DestUnreach'].cksum
118             new = new.__class__(str(new))
119             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
120                              pkt['ICMPv6DestUnreach'].cksum)
121             self.check_icmp_errror_embedded(pkt)
122         if pkt.haslayer(ICMPv6EchoRequest):
123             del new['ICMPv6EchoRequest'].cksum
124             new = new.__class__(str(new))
125             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
126                              pkt['ICMPv6EchoRequest'].cksum)
127         if pkt.haslayer(ICMPv6EchoReply):
128             del new['ICMPv6EchoReply'].cksum
129             new = new.__class__(str(new))
130             self.assertEqual(new['ICMPv6EchoReply'].cksum,
131                              pkt['ICMPv6EchoReply'].cksum)
132
133     def create_stream_in(self, in_if, out_if, ttl=64):
134         """
135         Create packet stream for inside network
136
137         :param in_if: Inside interface
138         :param out_if: Outside interface
139         :param ttl: TTL of generated packets
140         """
141         pkts = []
142         # TCP
143         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
144              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
145              TCP(sport=self.tcp_port_in, dport=20))
146         pkts.append(p)
147
148         # UDP
149         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
150              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
151              UDP(sport=self.udp_port_in, dport=20))
152         pkts.append(p)
153
154         # ICMP
155         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
156              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
157              ICMP(id=self.icmp_id_in, type='echo-request'))
158         pkts.append(p)
159
160         return pkts
161
162     def compose_ip6(self, ip4, pref, plen):
163         """
164         Compose IPv4-embedded IPv6 addresses
165
166         :param ip4: IPv4 address
167         :param pref: IPv6 prefix
168         :param plen: IPv6 prefix length
169         :returns: IPv4-embedded IPv6 addresses
170         """
171         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
172         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
173         if plen == 32:
174             pref_n[4] = ip4_n[0]
175             pref_n[5] = ip4_n[1]
176             pref_n[6] = ip4_n[2]
177             pref_n[7] = ip4_n[3]
178         elif plen == 40:
179             pref_n[5] = ip4_n[0]
180             pref_n[6] = ip4_n[1]
181             pref_n[7] = ip4_n[2]
182             pref_n[9] = ip4_n[3]
183         elif plen == 48:
184             pref_n[6] = ip4_n[0]
185             pref_n[7] = ip4_n[1]
186             pref_n[9] = ip4_n[2]
187             pref_n[10] = ip4_n[3]
188         elif plen == 56:
189             pref_n[7] = ip4_n[0]
190             pref_n[9] = ip4_n[1]
191             pref_n[10] = ip4_n[2]
192             pref_n[11] = ip4_n[3]
193         elif plen == 64:
194             pref_n[9] = ip4_n[0]
195             pref_n[10] = ip4_n[1]
196             pref_n[11] = ip4_n[2]
197             pref_n[12] = ip4_n[3]
198         elif plen == 96:
199             pref_n[12] = ip4_n[0]
200             pref_n[13] = ip4_n[1]
201             pref_n[14] = ip4_n[2]
202             pref_n[15] = ip4_n[3]
203         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
204
205     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
206         """
207         Create IPv6 packet stream for inside network
208
209         :param in_if: Inside interface
210         :param out_if: Outside interface
211         :param ttl: Hop Limit of generated packets
212         :param pref: NAT64 prefix
213         :param plen: NAT64 prefix length
214         """
215         pkts = []
216         if pref is None:
217             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
218         else:
219             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
220
221         # TCP
222         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
223              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
224              TCP(sport=self.tcp_port_in, dport=20))
225         pkts.append(p)
226
227         # UDP
228         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
229              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
230              UDP(sport=self.udp_port_in, dport=20))
231         pkts.append(p)
232
233         # ICMP
234         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
235              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
236              ICMPv6EchoRequest(id=self.icmp_id_in))
237         pkts.append(p)
238
239         return pkts
240
241     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
242         """
243         Create packet stream for outside network
244
245         :param out_if: Outside interface
246         :param dst_ip: Destination IP address (Default use global NAT address)
247         :param ttl: TTL of generated packets
248         """
249         if dst_ip is None:
250             dst_ip = self.nat_addr
251         pkts = []
252         # TCP
253         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
254              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
255              TCP(dport=self.tcp_port_out, sport=20))
256         pkts.append(p)
257
258         # UDP
259         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
260              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
261              UDP(dport=self.udp_port_out, sport=20))
262         pkts.append(p)
263
264         # ICMP
265         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
266              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
267              ICMP(id=self.icmp_id_out, type='echo-reply'))
268         pkts.append(p)
269
270         return pkts
271
272     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
273                            packet_num=3, dst_ip=None):
274         """
275         Verify captured packets on outside network
276
277         :param capture: Captured packets
278         :param nat_ip: Translated IP address (Default use global NAT address)
279         :param same_port: Sorce port number is not translated (Default False)
280         :param packet_num: Expected number of packets (Default 3)
281         :param dst_ip: Destination IP address (Default do not verify)
282         """
283         if nat_ip is None:
284             nat_ip = self.nat_addr
285         self.assertEqual(packet_num, len(capture))
286         for packet in capture:
287             try:
288                 self.check_ip_checksum(packet)
289                 self.assertEqual(packet[IP].src, nat_ip)
290                 if dst_ip is not None:
291                     self.assertEqual(packet[IP].dst, dst_ip)
292                 if packet.haslayer(TCP):
293                     if same_port:
294                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
295                     else:
296                         self.assertNotEqual(
297                             packet[TCP].sport, self.tcp_port_in)
298                     self.tcp_port_out = packet[TCP].sport
299                     self.check_tcp_checksum(packet)
300                 elif packet.haslayer(UDP):
301                     if same_port:
302                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
303                     else:
304                         self.assertNotEqual(
305                             packet[UDP].sport, self.udp_port_in)
306                     self.udp_port_out = packet[UDP].sport
307                 else:
308                     if same_port:
309                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
310                     else:
311                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
312                     self.icmp_id_out = packet[ICMP].id
313                     self.check_icmp_checksum(packet)
314             except:
315                 self.logger.error(ppp("Unexpected or invalid packet "
316                                       "(outside network):", packet))
317                 raise
318
319     def verify_capture_in(self, capture, in_if, packet_num=3):
320         """
321         Verify captured packets on inside network
322
323         :param capture: Captured packets
324         :param in_if: Inside interface
325         :param packet_num: Expected number of packets (Default 3)
326         """
327         self.assertEqual(packet_num, len(capture))
328         for packet in capture:
329             try:
330                 self.check_ip_checksum(packet)
331                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
332                 if packet.haslayer(TCP):
333                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
334                     self.check_tcp_checksum(packet)
335                 elif packet.haslayer(UDP):
336                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
337                 else:
338                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
339                     self.check_icmp_checksum(packet)
340             except:
341                 self.logger.error(ppp("Unexpected or invalid packet "
342                                       "(inside network):", packet))
343                 raise
344
345     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
346         """
347         Verify captured IPv6 packets on inside network
348
349         :param capture: Captured packets
350         :param src_ip: Source IP
351         :param dst_ip: Destination IP address
352         :param packet_num: Expected number of packets (Default 3)
353         """
354         self.assertEqual(packet_num, len(capture))
355         for packet in capture:
356             try:
357                 self.assertEqual(packet[IPv6].src, src_ip)
358                 self.assertEqual(packet[IPv6].dst, dst_ip)
359                 if packet.haslayer(TCP):
360                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
361                     self.check_tcp_checksum(packet)
362                 elif packet.haslayer(UDP):
363                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
364                     self.check_udp_checksum(packet)
365                 else:
366                     self.assertEqual(packet[ICMPv6EchoReply].id,
367                                      self.icmp_id_in)
368                     self.check_icmpv6_checksum(packet)
369             except:
370                 self.logger.error(ppp("Unexpected or invalid packet "
371                                       "(inside network):", packet))
372                 raise
373
374     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
375         """
376         Verify captured packet that don't have to be translated
377
378         :param capture: Captured packets
379         :param ingress_if: Ingress interface
380         :param egress_if: Egress interface
381         """
382         for packet in capture:
383             try:
384                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
385                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
386                 if packet.haslayer(TCP):
387                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
388                 elif packet.haslayer(UDP):
389                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
390                 else:
391                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
392             except:
393                 self.logger.error(ppp("Unexpected or invalid packet "
394                                       "(inside network):", packet))
395                 raise
396
397     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
398                                             packet_num=3, icmp_type=11):
399         """
400         Verify captured packets with ICMP errors on outside network
401
402         :param capture: Captured packets
403         :param src_ip: Translated IP address or IP address of VPP
404                        (Default use global NAT address)
405         :param packet_num: Expected number of packets (Default 3)
406         :param icmp_type: Type of error ICMP packet
407                           we are expecting (Default 11)
408         """
409         if src_ip is None:
410             src_ip = self.nat_addr
411         self.assertEqual(packet_num, len(capture))
412         for packet in capture:
413             try:
414                 self.assertEqual(packet[IP].src, src_ip)
415                 self.assertTrue(packet.haslayer(ICMP))
416                 icmp = packet[ICMP]
417                 self.assertEqual(icmp.type, icmp_type)
418                 self.assertTrue(icmp.haslayer(IPerror))
419                 inner_ip = icmp[IPerror]
420                 if inner_ip.haslayer(TCPerror):
421                     self.assertEqual(inner_ip[TCPerror].dport,
422                                      self.tcp_port_out)
423                 elif inner_ip.haslayer(UDPerror):
424                     self.assertEqual(inner_ip[UDPerror].dport,
425                                      self.udp_port_out)
426                 else:
427                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
428             except:
429                 self.logger.error(ppp("Unexpected or invalid packet "
430                                       "(outside network):", packet))
431                 raise
432
433     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
434                                            icmp_type=11):
435         """
436         Verify captured packets with ICMP errors on inside network
437
438         :param capture: Captured packets
439         :param in_if: Inside interface
440         :param packet_num: Expected number of packets (Default 3)
441         :param icmp_type: Type of error ICMP packet
442                           we are expecting (Default 11)
443         """
444         self.assertEqual(packet_num, len(capture))
445         for packet in capture:
446             try:
447                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
448                 self.assertTrue(packet.haslayer(ICMP))
449                 icmp = packet[ICMP]
450                 self.assertEqual(icmp.type, icmp_type)
451                 self.assertTrue(icmp.haslayer(IPerror))
452                 inner_ip = icmp[IPerror]
453                 if inner_ip.haslayer(TCPerror):
454                     self.assertEqual(inner_ip[TCPerror].sport,
455                                      self.tcp_port_in)
456                 elif inner_ip.haslayer(UDPerror):
457                     self.assertEqual(inner_ip[UDPerror].sport,
458                                      self.udp_port_in)
459                 else:
460                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
461             except:
462                 self.logger.error(ppp("Unexpected or invalid packet "
463                                       "(inside network):", packet))
464                 raise
465
466     def verify_ipfix_nat44_ses(self, data):
467         """
468         Verify IPFIX NAT44 session create/delete event
469
470         :param data: Decoded IPFIX data records
471         """
472         nat44_ses_create_num = 0
473         nat44_ses_delete_num = 0
474         self.assertEqual(6, len(data))
475         for record in data:
476             # natEvent
477             self.assertIn(ord(record[230]), [4, 5])
478             if ord(record[230]) == 4:
479                 nat44_ses_create_num += 1
480             else:
481                 nat44_ses_delete_num += 1
482             # sourceIPv4Address
483             self.assertEqual(self.pg0.remote_ip4n, record[8])
484             # postNATSourceIPv4Address
485             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
486                              record[225])
487             # ingressVRFID
488             self.assertEqual(struct.pack("!I", 0), record[234])
489             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
490             if IP_PROTOS.icmp == ord(record[4]):
491                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
492                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
493                                  record[227])
494             elif IP_PROTOS.tcp == ord(record[4]):
495                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
496                                  record[7])
497                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
498                                  record[227])
499             elif IP_PROTOS.udp == ord(record[4]):
500                 self.assertEqual(struct.pack("!H", self.udp_port_in),
501                                  record[7])
502                 self.assertEqual(struct.pack("!H", self.udp_port_out),
503                                  record[227])
504             else:
505                 self.fail("Invalid protocol")
506         self.assertEqual(3, nat44_ses_create_num)
507         self.assertEqual(3, nat44_ses_delete_num)
508
509     def verify_ipfix_addr_exhausted(self, data):
510         """
511         Verify IPFIX NAT addresses event
512
513         :param data: Decoded IPFIX data records
514         """
515         self.assertEqual(1, len(data))
516         record = data[0]
517         # natEvent
518         self.assertEqual(ord(record[230]), 3)
519         # natPoolID
520         self.assertEqual(struct.pack("!I", 0), record[283])
521
522
523 class TestNAT44(MethodHolder):
524     """ NAT44 Test Cases """
525
526     @classmethod
527     def setUpClass(cls):
528         super(TestNAT44, cls).setUpClass()
529
530         try:
531             cls.tcp_port_in = 6303
532             cls.tcp_port_out = 6303
533             cls.udp_port_in = 6304
534             cls.udp_port_out = 6304
535             cls.icmp_id_in = 6305
536             cls.icmp_id_out = 6305
537             cls.nat_addr = '10.0.0.3'
538             cls.ipfix_src_port = 4739
539             cls.ipfix_domain_id = 1
540
541             cls.create_pg_interfaces(range(10))
542             cls.interfaces = list(cls.pg_interfaces[0:4])
543
544             for i in cls.interfaces:
545                 i.admin_up()
546                 i.config_ip4()
547                 i.resolve_arp()
548
549             cls.pg0.generate_remote_hosts(3)
550             cls.pg0.configure_ipv4_neighbors()
551
552             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
553             cls.vapi.ip_table_add_del(10, is_add=1)
554             cls.vapi.ip_table_add_del(20, is_add=1)
555
556             cls.pg4._local_ip4 = "172.16.255.1"
557             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
558             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
559             cls.pg4.set_table_ip4(10)
560             cls.pg5._local_ip4 = "172.17.255.3"
561             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
562             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
563             cls.pg5.set_table_ip4(10)
564             cls.pg6._local_ip4 = "172.16.255.1"
565             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
566             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
567             cls.pg6.set_table_ip4(20)
568             for i in cls.overlapping_interfaces:
569                 i.config_ip4()
570                 i.admin_up()
571                 i.resolve_arp()
572
573             cls.pg7.admin_up()
574             cls.pg8.admin_up()
575
576             cls.pg9.generate_remote_hosts(2)
577             cls.pg9.config_ip4()
578             ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
579             cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
580                                                   ip_addr_n,
581                                                   24)
582             cls.pg9.admin_up()
583             cls.pg9.resolve_arp()
584             cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
585             cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
586             cls.pg9.resolve_arp()
587
588         except Exception:
589             super(TestNAT44, cls).tearDownClass()
590             raise
591
592     def clear_nat44(self):
593         """
594         Clear NAT44 configuration.
595         """
596         # I found no elegant way to do this
597         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
598                                    dst_address_length=32,
599                                    next_hop_address=self.pg7.remote_ip4n,
600                                    next_hop_sw_if_index=self.pg7.sw_if_index,
601                                    is_add=0)
602         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
603                                    dst_address_length=32,
604                                    next_hop_address=self.pg8.remote_ip4n,
605                                    next_hop_sw_if_index=self.pg8.sw_if_index,
606                                    is_add=0)
607
608         for intf in [self.pg7, self.pg8]:
609             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
610             for n in neighbors:
611                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
612                                               n.mac_address,
613                                               n.ip_address,
614                                               is_add=0)
615
616         if self.pg7.has_ip4_config:
617             self.pg7.unconfig_ip4()
618
619         interfaces = self.vapi.nat44_interface_addr_dump()
620         for intf in interfaces:
621             self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
622
623         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
624                             domain_id=self.ipfix_domain_id)
625         self.ipfix_src_port = 4739
626         self.ipfix_domain_id = 1
627
628         interfaces = self.vapi.nat44_interface_dump()
629         for intf in interfaces:
630             if intf.is_inside > 1:
631                 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
632                                                           0,
633                                                           is_add=0)
634             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
635                                                       intf.is_inside,
636                                                       is_add=0)
637
638         interfaces = self.vapi.nat44_interface_output_feature_dump()
639         for intf in interfaces:
640             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
641                                                              intf.is_inside,
642                                                              is_add=0)
643
644         static_mappings = self.vapi.nat44_static_mapping_dump()
645         for sm in static_mappings:
646             self.vapi.nat44_add_del_static_mapping(
647                 sm.local_ip_address,
648                 sm.external_ip_address,
649                 local_port=sm.local_port,
650                 external_port=sm.external_port,
651                 addr_only=sm.addr_only,
652                 vrf_id=sm.vrf_id,
653                 protocol=sm.protocol,
654                 is_add=0)
655
656         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
657         for lb_sm in lb_static_mappings:
658             self.vapi.nat44_add_del_lb_static_mapping(
659                 lb_sm.external_addr,
660                 lb_sm.external_port,
661                 lb_sm.protocol,
662                 lb_sm.vrf_id,
663                 is_add=0,
664                 local_num=0,
665                 locals=[])
666
667         adresses = self.vapi.nat44_address_dump()
668         for addr in adresses:
669             self.vapi.nat44_add_del_address_range(addr.ip_address,
670                                                   addr.ip_address,
671                                                   is_add=0)
672
673     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
674                                  local_port=0, external_port=0, vrf_id=0,
675                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
676                                  proto=0):
677         """
678         Add/delete NAT44 static mapping
679
680         :param local_ip: Local IP address
681         :param external_ip: External IP address
682         :param local_port: Local port number (Optional)
683         :param external_port: External port number (Optional)
684         :param vrf_id: VRF ID (Default 0)
685         :param is_add: 1 if add, 0 if delete (Default add)
686         :param external_sw_if_index: External interface instead of IP address
687         :param proto: IP protocol (Mandatory if port specified)
688         """
689         addr_only = 1
690         if local_port and external_port:
691             addr_only = 0
692         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
693         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
694         self.vapi.nat44_add_del_static_mapping(
695             l_ip,
696             e_ip,
697             external_sw_if_index,
698             local_port,
699             external_port,
700             addr_only,
701             vrf_id,
702             proto,
703             is_add)
704
705     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
706         """
707         Add/delete NAT44 address
708
709         :param ip: IP address
710         :param is_add: 1 if add, 0 if delete (Default add)
711         """
712         nat_addr = socket.inet_pton(socket.AF_INET, ip)
713         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
714                                               vrf_id=vrf_id)
715
716     def test_dynamic(self):
717         """ NAT44 dynamic translation test """
718
719         self.nat44_add_address(self.nat_addr)
720         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
721         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
722                                                   is_inside=0)
723
724         # in2out
725         pkts = self.create_stream_in(self.pg0, self.pg1)
726         self.pg0.add_stream(pkts)
727         self.pg_enable_capture(self.pg_interfaces)
728         self.pg_start()
729         capture = self.pg1.get_capture(len(pkts))
730         self.verify_capture_out(capture)
731
732         # out2in
733         pkts = self.create_stream_out(self.pg1)
734         self.pg1.add_stream(pkts)
735         self.pg_enable_capture(self.pg_interfaces)
736         self.pg_start()
737         capture = self.pg0.get_capture(len(pkts))
738         self.verify_capture_in(capture, self.pg0)
739
740     def test_dynamic_icmp_errors_in2out_ttl_1(self):
741         """ NAT44 handling of client packets with TTL=1 """
742
743         self.nat44_add_address(self.nat_addr)
744         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
745         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
746                                                   is_inside=0)
747
748         # Client side - generate traffic
749         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
750         self.pg0.add_stream(pkts)
751         self.pg_enable_capture(self.pg_interfaces)
752         self.pg_start()
753
754         # Client side - verify ICMP type 11 packets
755         capture = self.pg0.get_capture(len(pkts))
756         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
757
758     def test_dynamic_icmp_errors_out2in_ttl_1(self):
759         """ NAT44 handling of server packets with TTL=1 """
760
761         self.nat44_add_address(self.nat_addr)
762         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
763         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
764                                                   is_inside=0)
765
766         # Client side - create sessions
767         pkts = self.create_stream_in(self.pg0, self.pg1)
768         self.pg0.add_stream(pkts)
769         self.pg_enable_capture(self.pg_interfaces)
770         self.pg_start()
771
772         # Server side - generate traffic
773         capture = self.pg1.get_capture(len(pkts))
774         self.verify_capture_out(capture)
775         pkts = self.create_stream_out(self.pg1, ttl=1)
776         self.pg1.add_stream(pkts)
777         self.pg_enable_capture(self.pg_interfaces)
778         self.pg_start()
779
780         # Server side - verify ICMP type 11 packets
781         capture = self.pg1.get_capture(len(pkts))
782         self.verify_capture_out_with_icmp_errors(capture,
783                                                  src_ip=self.pg1.local_ip4)
784
785     def test_dynamic_icmp_errors_in2out_ttl_2(self):
786         """ NAT44 handling of error responses to client packets with TTL=2 """
787
788         self.nat44_add_address(self.nat_addr)
789         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
790         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
791                                                   is_inside=0)
792
793         # Client side - generate traffic
794         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
795         self.pg0.add_stream(pkts)
796         self.pg_enable_capture(self.pg_interfaces)
797         self.pg_start()
798
799         # Server side - simulate ICMP type 11 response
800         capture = self.pg1.get_capture(len(pkts))
801         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
802                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
803                 ICMP(type=11) / packet[IP] for packet in capture]
804         self.pg1.add_stream(pkts)
805         self.pg_enable_capture(self.pg_interfaces)
806         self.pg_start()
807
808         # Client side - verify ICMP type 11 packets
809         capture = self.pg0.get_capture(len(pkts))
810         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
811
812     def test_dynamic_icmp_errors_out2in_ttl_2(self):
813         """ NAT44 handling of error responses to server packets with TTL=2 """
814
815         self.nat44_add_address(self.nat_addr)
816         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
817         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
818                                                   is_inside=0)
819
820         # Client side - create sessions
821         pkts = self.create_stream_in(self.pg0, self.pg1)
822         self.pg0.add_stream(pkts)
823         self.pg_enable_capture(self.pg_interfaces)
824         self.pg_start()
825
826         # Server side - generate traffic
827         capture = self.pg1.get_capture(len(pkts))
828         self.verify_capture_out(capture)
829         pkts = self.create_stream_out(self.pg1, ttl=2)
830         self.pg1.add_stream(pkts)
831         self.pg_enable_capture(self.pg_interfaces)
832         self.pg_start()
833
834         # Client side - simulate ICMP type 11 response
835         capture = self.pg0.get_capture(len(pkts))
836         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
837                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
838                 ICMP(type=11) / packet[IP] for packet in capture]
839         self.pg0.add_stream(pkts)
840         self.pg_enable_capture(self.pg_interfaces)
841         self.pg_start()
842
843         # Server side - verify ICMP type 11 packets
844         capture = self.pg1.get_capture(len(pkts))
845         self.verify_capture_out_with_icmp_errors(capture)
846
847     def test_ping_out_interface_from_outside(self):
848         """ Ping NAT44 out interface from outside network """
849
850         self.nat44_add_address(self.nat_addr)
851         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
852         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
853                                                   is_inside=0)
854
855         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
856              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
857              ICMP(id=self.icmp_id_out, type='echo-request'))
858         pkts = [p]
859         self.pg1.add_stream(pkts)
860         self.pg_enable_capture(self.pg_interfaces)
861         self.pg_start()
862         capture = self.pg1.get_capture(len(pkts))
863         self.assertEqual(1, len(capture))
864         packet = capture[0]
865         try:
866             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
867             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
868             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
869             self.assertEqual(packet[ICMP].type, 0)  # echo reply
870         except:
871             self.logger.error(ppp("Unexpected or invalid packet "
872                                   "(outside network):", packet))
873             raise
874
875     def test_ping_internal_host_from_outside(self):
876         """ Ping internal host from outside network """
877
878         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
879         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
880         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
881                                                   is_inside=0)
882
883         # out2in
884         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
885                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
886                ICMP(id=self.icmp_id_out, type='echo-request'))
887         self.pg1.add_stream(pkt)
888         self.pg_enable_capture(self.pg_interfaces)
889         self.pg_start()
890         capture = self.pg0.get_capture(1)
891         self.verify_capture_in(capture, self.pg0, packet_num=1)
892         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
893
894         # in2out
895         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
896                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
897                ICMP(id=self.icmp_id_in, type='echo-reply'))
898         self.pg0.add_stream(pkt)
899         self.pg_enable_capture(self.pg_interfaces)
900         self.pg_start()
901         capture = self.pg1.get_capture(1)
902         self.verify_capture_out(capture, same_port=True, packet_num=1)
903         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
904
905     def test_static_in(self):
906         """ 1:1 NAT initialized from inside network """
907
908         nat_ip = "10.0.0.10"
909         self.tcp_port_out = 6303
910         self.udp_port_out = 6304
911         self.icmp_id_out = 6305
912
913         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
914         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
915         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
916                                                   is_inside=0)
917
918         # in2out
919         pkts = self.create_stream_in(self.pg0, self.pg1)
920         self.pg0.add_stream(pkts)
921         self.pg_enable_capture(self.pg_interfaces)
922         self.pg_start()
923         capture = self.pg1.get_capture(len(pkts))
924         self.verify_capture_out(capture, nat_ip, True)
925
926         # out2in
927         pkts = self.create_stream_out(self.pg1, nat_ip)
928         self.pg1.add_stream(pkts)
929         self.pg_enable_capture(self.pg_interfaces)
930         self.pg_start()
931         capture = self.pg0.get_capture(len(pkts))
932         self.verify_capture_in(capture, self.pg0)
933
934     def test_static_out(self):
935         """ 1:1 NAT initialized from outside network """
936
937         nat_ip = "10.0.0.20"
938         self.tcp_port_out = 6303
939         self.udp_port_out = 6304
940         self.icmp_id_out = 6305
941
942         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
943         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
944         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
945                                                   is_inside=0)
946
947         # out2in
948         pkts = self.create_stream_out(self.pg1, nat_ip)
949         self.pg1.add_stream(pkts)
950         self.pg_enable_capture(self.pg_interfaces)
951         self.pg_start()
952         capture = self.pg0.get_capture(len(pkts))
953         self.verify_capture_in(capture, self.pg0)
954
955         # in2out
956         pkts = self.create_stream_in(self.pg0, self.pg1)
957         self.pg0.add_stream(pkts)
958         self.pg_enable_capture(self.pg_interfaces)
959         self.pg_start()
960         capture = self.pg1.get_capture(len(pkts))
961         self.verify_capture_out(capture, nat_ip, True)
962
963     def test_static_with_port_in(self):
964         """ 1:1 NAPT initialized from inside network """
965
966         self.tcp_port_out = 3606
967         self.udp_port_out = 3607
968         self.icmp_id_out = 3608
969
970         self.nat44_add_address(self.nat_addr)
971         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
972                                       self.tcp_port_in, self.tcp_port_out,
973                                       proto=IP_PROTOS.tcp)
974         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
975                                       self.udp_port_in, self.udp_port_out,
976                                       proto=IP_PROTOS.udp)
977         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
978                                       self.icmp_id_in, self.icmp_id_out,
979                                       proto=IP_PROTOS.icmp)
980         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
981         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
982                                                   is_inside=0)
983
984         # in2out
985         pkts = self.create_stream_in(self.pg0, self.pg1)
986         self.pg0.add_stream(pkts)
987         self.pg_enable_capture(self.pg_interfaces)
988         self.pg_start()
989         capture = self.pg1.get_capture(len(pkts))
990         self.verify_capture_out(capture)
991
992         # out2in
993         pkts = self.create_stream_out(self.pg1)
994         self.pg1.add_stream(pkts)
995         self.pg_enable_capture(self.pg_interfaces)
996         self.pg_start()
997         capture = self.pg0.get_capture(len(pkts))
998         self.verify_capture_in(capture, self.pg0)
999
1000     def test_static_with_port_out(self):
1001         """ 1:1 NAPT initialized from outside network """
1002
1003         self.tcp_port_out = 30606
1004         self.udp_port_out = 30607
1005         self.icmp_id_out = 30608
1006
1007         self.nat44_add_address(self.nat_addr)
1008         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1009                                       self.tcp_port_in, self.tcp_port_out,
1010                                       proto=IP_PROTOS.tcp)
1011         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1012                                       self.udp_port_in, self.udp_port_out,
1013                                       proto=IP_PROTOS.udp)
1014         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1015                                       self.icmp_id_in, self.icmp_id_out,
1016                                       proto=IP_PROTOS.icmp)
1017         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1018         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1019                                                   is_inside=0)
1020
1021         # out2in
1022         pkts = self.create_stream_out(self.pg1)
1023         self.pg1.add_stream(pkts)
1024         self.pg_enable_capture(self.pg_interfaces)
1025         self.pg_start()
1026         capture = self.pg0.get_capture(len(pkts))
1027         self.verify_capture_in(capture, self.pg0)
1028
1029         # in2out
1030         pkts = self.create_stream_in(self.pg0, self.pg1)
1031         self.pg0.add_stream(pkts)
1032         self.pg_enable_capture(self.pg_interfaces)
1033         self.pg_start()
1034         capture = self.pg1.get_capture(len(pkts))
1035         self.verify_capture_out(capture)
1036
1037     def test_static_vrf_aware(self):
1038         """ 1:1 NAT VRF awareness """
1039
1040         nat_ip1 = "10.0.0.30"
1041         nat_ip2 = "10.0.0.40"
1042         self.tcp_port_out = 6303
1043         self.udp_port_out = 6304
1044         self.icmp_id_out = 6305
1045
1046         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1047                                       vrf_id=10)
1048         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1049                                       vrf_id=10)
1050         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1051                                                   is_inside=0)
1052         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1053         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1054
1055         # inside interface VRF match NAT44 static mapping VRF
1056         pkts = self.create_stream_in(self.pg4, self.pg3)
1057         self.pg4.add_stream(pkts)
1058         self.pg_enable_capture(self.pg_interfaces)
1059         self.pg_start()
1060         capture = self.pg3.get_capture(len(pkts))
1061         self.verify_capture_out(capture, nat_ip1, True)
1062
1063         # inside interface VRF don't match NAT44 static mapping VRF (packets
1064         # are dropped)
1065         pkts = self.create_stream_in(self.pg0, self.pg3)
1066         self.pg0.add_stream(pkts)
1067         self.pg_enable_capture(self.pg_interfaces)
1068         self.pg_start()
1069         self.pg3.assert_nothing_captured()
1070
1071     def test_static_lb(self):
1072         """ NAT44 local service load balancing """
1073         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1074         external_port = 80
1075         local_port = 8080
1076         server1 = self.pg0.remote_hosts[0]
1077         server2 = self.pg0.remote_hosts[1]
1078
1079         locals = [{'addr': server1.ip4n,
1080                    'port': local_port,
1081                    'probability': 70},
1082                   {'addr': server2.ip4n,
1083                    'port': local_port,
1084                    'probability': 30}]
1085
1086         self.nat44_add_address(self.nat_addr)
1087         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1088                                                   external_port,
1089                                                   IP_PROTOS.tcp,
1090                                                   local_num=len(locals),
1091                                                   locals=locals)
1092         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1093         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1094                                                   is_inside=0)
1095
1096         # from client to service
1097         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1098              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1099              TCP(sport=12345, dport=external_port))
1100         self.pg1.add_stream(p)
1101         self.pg_enable_capture(self.pg_interfaces)
1102         self.pg_start()
1103         capture = self.pg0.get_capture(1)
1104         p = capture[0]
1105         server = None
1106         try:
1107             ip = p[IP]
1108             tcp = p[TCP]
1109             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1110             if ip.dst == server1.ip4:
1111                 server = server1
1112             else:
1113                 server = server2
1114             self.assertEqual(tcp.dport, local_port)
1115             self.check_tcp_checksum(p)
1116             self.check_ip_checksum(p)
1117         except:
1118             self.logger.error(ppp("Unexpected or invalid packet:", p))
1119             raise
1120
1121         # from service back to client
1122         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1123              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1124              TCP(sport=local_port, dport=12345))
1125         self.pg0.add_stream(p)
1126         self.pg_enable_capture(self.pg_interfaces)
1127         self.pg_start()
1128         capture = self.pg1.get_capture(1)
1129         p = capture[0]
1130         try:
1131             ip = p[IP]
1132             tcp = p[TCP]
1133             self.assertEqual(ip.src, self.nat_addr)
1134             self.assertEqual(tcp.sport, external_port)
1135             self.check_tcp_checksum(p)
1136             self.check_ip_checksum(p)
1137         except:
1138             self.logger.error(ppp("Unexpected or invalid packet:", p))
1139             raise
1140
1141         # multiple clients
1142         server1_n = 0
1143         server2_n = 0
1144         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1145         pkts = []
1146         for client in clients:
1147             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1148                  IP(src=client, dst=self.nat_addr) /
1149                  TCP(sport=12345, dport=external_port))
1150             pkts.append(p)
1151         self.pg1.add_stream(pkts)
1152         self.pg_enable_capture(self.pg_interfaces)
1153         self.pg_start()
1154         capture = self.pg0.get_capture(len(pkts))
1155         for p in capture:
1156             if p[IP].dst == server1.ip4:
1157                 server1_n += 1
1158             else:
1159                 server2_n += 1
1160         self.assertTrue(server1_n > server2_n)
1161
1162     def test_multiple_inside_interfaces(self):
1163         """ NAT44 multiple non-overlapping address space inside interfaces """
1164
1165         self.nat44_add_address(self.nat_addr)
1166         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1167         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1168         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1169                                                   is_inside=0)
1170
1171         # between two NAT44 inside interfaces (no translation)
1172         pkts = self.create_stream_in(self.pg0, self.pg1)
1173         self.pg0.add_stream(pkts)
1174         self.pg_enable_capture(self.pg_interfaces)
1175         self.pg_start()
1176         capture = self.pg1.get_capture(len(pkts))
1177         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1178
1179         # from NAT44 inside to interface without NAT44 feature (no translation)
1180         pkts = self.create_stream_in(self.pg0, self.pg2)
1181         self.pg0.add_stream(pkts)
1182         self.pg_enable_capture(self.pg_interfaces)
1183         self.pg_start()
1184         capture = self.pg2.get_capture(len(pkts))
1185         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1186
1187         # in2out 1st interface
1188         pkts = self.create_stream_in(self.pg0, self.pg3)
1189         self.pg0.add_stream(pkts)
1190         self.pg_enable_capture(self.pg_interfaces)
1191         self.pg_start()
1192         capture = self.pg3.get_capture(len(pkts))
1193         self.verify_capture_out(capture)
1194
1195         # out2in 1st interface
1196         pkts = self.create_stream_out(self.pg3)
1197         self.pg3.add_stream(pkts)
1198         self.pg_enable_capture(self.pg_interfaces)
1199         self.pg_start()
1200         capture = self.pg0.get_capture(len(pkts))
1201         self.verify_capture_in(capture, self.pg0)
1202
1203         # in2out 2nd interface
1204         pkts = self.create_stream_in(self.pg1, self.pg3)
1205         self.pg1.add_stream(pkts)
1206         self.pg_enable_capture(self.pg_interfaces)
1207         self.pg_start()
1208         capture = self.pg3.get_capture(len(pkts))
1209         self.verify_capture_out(capture)
1210
1211         # out2in 2nd interface
1212         pkts = self.create_stream_out(self.pg3)
1213         self.pg3.add_stream(pkts)
1214         self.pg_enable_capture(self.pg_interfaces)
1215         self.pg_start()
1216         capture = self.pg1.get_capture(len(pkts))
1217         self.verify_capture_in(capture, self.pg1)
1218
1219     def test_inside_overlapping_interfaces(self):
1220         """ NAT44 multiple inside interfaces with overlapping address space """
1221
1222         static_nat_ip = "10.0.0.10"
1223         self.nat44_add_address(self.nat_addr)
1224         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1225                                                   is_inside=0)
1226         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1227         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1228         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1229         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1230                                       vrf_id=20)
1231
1232         # between NAT44 inside interfaces with same VRF (no translation)
1233         pkts = self.create_stream_in(self.pg4, self.pg5)
1234         self.pg4.add_stream(pkts)
1235         self.pg_enable_capture(self.pg_interfaces)
1236         self.pg_start()
1237         capture = self.pg5.get_capture(len(pkts))
1238         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1239
1240         # between NAT44 inside interfaces with different VRF (hairpinning)
1241         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1242              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1243              TCP(sport=1234, dport=5678))
1244         self.pg4.add_stream(p)
1245         self.pg_enable_capture(self.pg_interfaces)
1246         self.pg_start()
1247         capture = self.pg6.get_capture(1)
1248         p = capture[0]
1249         try:
1250             ip = p[IP]
1251             tcp = p[TCP]
1252             self.assertEqual(ip.src, self.nat_addr)
1253             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1254             self.assertNotEqual(tcp.sport, 1234)
1255             self.assertEqual(tcp.dport, 5678)
1256         except:
1257             self.logger.error(ppp("Unexpected or invalid packet:", p))
1258             raise
1259
1260         # in2out 1st interface
1261         pkts = self.create_stream_in(self.pg4, self.pg3)
1262         self.pg4.add_stream(pkts)
1263         self.pg_enable_capture(self.pg_interfaces)
1264         self.pg_start()
1265         capture = self.pg3.get_capture(len(pkts))
1266         self.verify_capture_out(capture)
1267
1268         # out2in 1st interface
1269         pkts = self.create_stream_out(self.pg3)
1270         self.pg3.add_stream(pkts)
1271         self.pg_enable_capture(self.pg_interfaces)
1272         self.pg_start()
1273         capture = self.pg4.get_capture(len(pkts))
1274         self.verify_capture_in(capture, self.pg4)
1275
1276         # in2out 2nd interface
1277         pkts = self.create_stream_in(self.pg5, self.pg3)
1278         self.pg5.add_stream(pkts)
1279         self.pg_enable_capture(self.pg_interfaces)
1280         self.pg_start()
1281         capture = self.pg3.get_capture(len(pkts))
1282         self.verify_capture_out(capture)
1283
1284         # out2in 2nd interface
1285         pkts = self.create_stream_out(self.pg3)
1286         self.pg3.add_stream(pkts)
1287         self.pg_enable_capture(self.pg_interfaces)
1288         self.pg_start()
1289         capture = self.pg5.get_capture(len(pkts))
1290         self.verify_capture_in(capture, self.pg5)
1291
1292         # pg5 session dump
1293         addresses = self.vapi.nat44_address_dump()
1294         self.assertEqual(len(addresses), 1)
1295         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1296         self.assertEqual(len(sessions), 3)
1297         for session in sessions:
1298             self.assertFalse(session.is_static)
1299             self.assertEqual(session.inside_ip_address[0:4],
1300                              self.pg5.remote_ip4n)
1301             self.assertEqual(session.outside_ip_address,
1302                              addresses[0].ip_address)
1303         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1304         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1305         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1306         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1307         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1308         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1309         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1310         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1311         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1312
1313         # in2out 3rd interface
1314         pkts = self.create_stream_in(self.pg6, self.pg3)
1315         self.pg6.add_stream(pkts)
1316         self.pg_enable_capture(self.pg_interfaces)
1317         self.pg_start()
1318         capture = self.pg3.get_capture(len(pkts))
1319         self.verify_capture_out(capture, static_nat_ip, True)
1320
1321         # out2in 3rd interface
1322         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1323         self.pg3.add_stream(pkts)
1324         self.pg_enable_capture(self.pg_interfaces)
1325         self.pg_start()
1326         capture = self.pg6.get_capture(len(pkts))
1327         self.verify_capture_in(capture, self.pg6)
1328
1329         # general user and session dump verifications
1330         users = self.vapi.nat44_user_dump()
1331         self.assertTrue(len(users) >= 3)
1332         addresses = self.vapi.nat44_address_dump()
1333         self.assertEqual(len(addresses), 1)
1334         for user in users:
1335             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1336                                                          user.vrf_id)
1337             for session in sessions:
1338                 self.assertEqual(user.ip_address, session.inside_ip_address)
1339                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1340                 self.assertTrue(session.protocol in
1341                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1342                                  IP_PROTOS.icmp])
1343
1344         # pg4 session dump
1345         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1346         self.assertTrue(len(sessions) >= 4)
1347         for session in sessions:
1348             self.assertFalse(session.is_static)
1349             self.assertEqual(session.inside_ip_address[0:4],
1350                              self.pg4.remote_ip4n)
1351             self.assertEqual(session.outside_ip_address,
1352                              addresses[0].ip_address)
1353
1354         # pg6 session dump
1355         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1356         self.assertTrue(len(sessions) >= 3)
1357         for session in sessions:
1358             self.assertTrue(session.is_static)
1359             self.assertEqual(session.inside_ip_address[0:4],
1360                              self.pg6.remote_ip4n)
1361             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1362                              map(int, static_nat_ip.split('.')))
1363             self.assertTrue(session.inside_port in
1364                             [self.tcp_port_in, self.udp_port_in,
1365                              self.icmp_id_in])
1366
1367     def test_hairpinning(self):
1368         """ NAT44 hairpinning - 1:1 NAPT """
1369
1370         host = self.pg0.remote_hosts[0]
1371         server = self.pg0.remote_hosts[1]
1372         host_in_port = 1234
1373         host_out_port = 0
1374         server_in_port = 5678
1375         server_out_port = 8765
1376
1377         self.nat44_add_address(self.nat_addr)
1378         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1379         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1380                                                   is_inside=0)
1381         # add static mapping for server
1382         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1383                                       server_in_port, server_out_port,
1384                                       proto=IP_PROTOS.tcp)
1385
1386         # send packet from host to server
1387         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1388              IP(src=host.ip4, dst=self.nat_addr) /
1389              TCP(sport=host_in_port, dport=server_out_port))
1390         self.pg0.add_stream(p)
1391         self.pg_enable_capture(self.pg_interfaces)
1392         self.pg_start()
1393         capture = self.pg0.get_capture(1)
1394         p = capture[0]
1395         try:
1396             ip = p[IP]
1397             tcp = p[TCP]
1398             self.assertEqual(ip.src, self.nat_addr)
1399             self.assertEqual(ip.dst, server.ip4)
1400             self.assertNotEqual(tcp.sport, host_in_port)
1401             self.assertEqual(tcp.dport, server_in_port)
1402             self.check_tcp_checksum(p)
1403             host_out_port = tcp.sport
1404         except:
1405             self.logger.error(ppp("Unexpected or invalid packet:", p))
1406             raise
1407
1408         # send reply from server to host
1409         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1410              IP(src=server.ip4, dst=self.nat_addr) /
1411              TCP(sport=server_in_port, dport=host_out_port))
1412         self.pg0.add_stream(p)
1413         self.pg_enable_capture(self.pg_interfaces)
1414         self.pg_start()
1415         capture = self.pg0.get_capture(1)
1416         p = capture[0]
1417         try:
1418             ip = p[IP]
1419             tcp = p[TCP]
1420             self.assertEqual(ip.src, self.nat_addr)
1421             self.assertEqual(ip.dst, host.ip4)
1422             self.assertEqual(tcp.sport, server_out_port)
1423             self.assertEqual(tcp.dport, host_in_port)
1424             self.check_tcp_checksum(p)
1425         except:
1426             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1427             raise
1428
1429     def test_hairpinning2(self):
1430         """ NAT44 hairpinning - 1:1 NAT"""
1431
1432         server1_nat_ip = "10.0.0.10"
1433         server2_nat_ip = "10.0.0.11"
1434         host = self.pg0.remote_hosts[0]
1435         server1 = self.pg0.remote_hosts[1]
1436         server2 = self.pg0.remote_hosts[2]
1437         server_tcp_port = 22
1438         server_udp_port = 20
1439
1440         self.nat44_add_address(self.nat_addr)
1441         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1442         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1443                                                   is_inside=0)
1444
1445         # add static mapping for servers
1446         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1447         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1448
1449         # host to server1
1450         pkts = []
1451         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1452              IP(src=host.ip4, dst=server1_nat_ip) /
1453              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1454         pkts.append(p)
1455         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1456              IP(src=host.ip4, dst=server1_nat_ip) /
1457              UDP(sport=self.udp_port_in, dport=server_udp_port))
1458         pkts.append(p)
1459         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1460              IP(src=host.ip4, dst=server1_nat_ip) /
1461              ICMP(id=self.icmp_id_in, type='echo-request'))
1462         pkts.append(p)
1463         self.pg0.add_stream(pkts)
1464         self.pg_enable_capture(self.pg_interfaces)
1465         self.pg_start()
1466         capture = self.pg0.get_capture(len(pkts))
1467         for packet in capture:
1468             try:
1469                 self.assertEqual(packet[IP].src, self.nat_addr)
1470                 self.assertEqual(packet[IP].dst, server1.ip4)
1471                 if packet.haslayer(TCP):
1472                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1473                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1474                     self.tcp_port_out = packet[TCP].sport
1475                     self.check_tcp_checksum(packet)
1476                 elif packet.haslayer(UDP):
1477                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1478                     self.assertEqual(packet[UDP].dport, server_udp_port)
1479                     self.udp_port_out = packet[UDP].sport
1480                 else:
1481                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1482                     self.icmp_id_out = packet[ICMP].id
1483             except:
1484                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1485                 raise
1486
1487         # server1 to host
1488         pkts = []
1489         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1490              IP(src=server1.ip4, dst=self.nat_addr) /
1491              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1492         pkts.append(p)
1493         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1494              IP(src=server1.ip4, dst=self.nat_addr) /
1495              UDP(sport=server_udp_port, dport=self.udp_port_out))
1496         pkts.append(p)
1497         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1498              IP(src=server1.ip4, dst=self.nat_addr) /
1499              ICMP(id=self.icmp_id_out, type='echo-reply'))
1500         pkts.append(p)
1501         self.pg0.add_stream(pkts)
1502         self.pg_enable_capture(self.pg_interfaces)
1503         self.pg_start()
1504         capture = self.pg0.get_capture(len(pkts))
1505         for packet in capture:
1506             try:
1507                 self.assertEqual(packet[IP].src, server1_nat_ip)
1508                 self.assertEqual(packet[IP].dst, host.ip4)
1509                 if packet.haslayer(TCP):
1510                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1511                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1512                     self.check_tcp_checksum(packet)
1513                 elif packet.haslayer(UDP):
1514                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1515                     self.assertEqual(packet[UDP].sport, server_udp_port)
1516                 else:
1517                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1518             except:
1519                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1520                 raise
1521
1522         # server2 to server1
1523         pkts = []
1524         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1525              IP(src=server2.ip4, dst=server1_nat_ip) /
1526              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1527         pkts.append(p)
1528         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1529              IP(src=server2.ip4, dst=server1_nat_ip) /
1530              UDP(sport=self.udp_port_in, dport=server_udp_port))
1531         pkts.append(p)
1532         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1533              IP(src=server2.ip4, dst=server1_nat_ip) /
1534              ICMP(id=self.icmp_id_in, type='echo-request'))
1535         pkts.append(p)
1536         self.pg0.add_stream(pkts)
1537         self.pg_enable_capture(self.pg_interfaces)
1538         self.pg_start()
1539         capture = self.pg0.get_capture(len(pkts))
1540         for packet in capture:
1541             try:
1542                 self.assertEqual(packet[IP].src, server2_nat_ip)
1543                 self.assertEqual(packet[IP].dst, server1.ip4)
1544                 if packet.haslayer(TCP):
1545                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1546                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1547                     self.tcp_port_out = packet[TCP].sport
1548                     self.check_tcp_checksum(packet)
1549                 elif packet.haslayer(UDP):
1550                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1551                     self.assertEqual(packet[UDP].dport, server_udp_port)
1552                     self.udp_port_out = packet[UDP].sport
1553                 else:
1554                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1555                     self.icmp_id_out = packet[ICMP].id
1556             except:
1557                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1558                 raise
1559
1560         # server1 to server2
1561         pkts = []
1562         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1563              IP(src=server1.ip4, dst=server2_nat_ip) /
1564              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1565         pkts.append(p)
1566         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1567              IP(src=server1.ip4, dst=server2_nat_ip) /
1568              UDP(sport=server_udp_port, dport=self.udp_port_out))
1569         pkts.append(p)
1570         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1571              IP(src=server1.ip4, dst=server2_nat_ip) /
1572              ICMP(id=self.icmp_id_out, type='echo-reply'))
1573         pkts.append(p)
1574         self.pg0.add_stream(pkts)
1575         self.pg_enable_capture(self.pg_interfaces)
1576         self.pg_start()
1577         capture = self.pg0.get_capture(len(pkts))
1578         for packet in capture:
1579             try:
1580                 self.assertEqual(packet[IP].src, server1_nat_ip)
1581                 self.assertEqual(packet[IP].dst, server2.ip4)
1582                 if packet.haslayer(TCP):
1583                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1584                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1585                     self.check_tcp_checksum(packet)
1586                 elif packet.haslayer(UDP):
1587                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1588                     self.assertEqual(packet[UDP].sport, server_udp_port)
1589                 else:
1590                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1591             except:
1592                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1593                 raise
1594
1595     def test_max_translations_per_user(self):
1596         """ MAX translations per user - recycle the least recently used """
1597
1598         self.nat44_add_address(self.nat_addr)
1599         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1600         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1601                                                   is_inside=0)
1602
1603         # get maximum number of translations per user
1604         nat44_config = self.vapi.nat_show_config()
1605
1606         # send more than maximum number of translations per user packets
1607         pkts_num = nat44_config.max_translations_per_user + 5
1608         pkts = []
1609         for port in range(0, pkts_num):
1610             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1611                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1612                  TCP(sport=1025 + port))
1613             pkts.append(p)
1614         self.pg0.add_stream(pkts)
1615         self.pg_enable_capture(self.pg_interfaces)
1616         self.pg_start()
1617
1618         # verify number of translated packet
1619         self.pg1.get_capture(pkts_num)
1620
1621     def test_interface_addr(self):
1622         """ Acquire NAT44 addresses from interface """
1623         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1624
1625         # no address in NAT pool
1626         adresses = self.vapi.nat44_address_dump()
1627         self.assertEqual(0, len(adresses))
1628
1629         # configure interface address and check NAT address pool
1630         self.pg7.config_ip4()
1631         adresses = self.vapi.nat44_address_dump()
1632         self.assertEqual(1, len(adresses))
1633         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1634
1635         # remove interface address and check NAT address pool
1636         self.pg7.unconfig_ip4()
1637         adresses = self.vapi.nat44_address_dump()
1638         self.assertEqual(0, len(adresses))
1639
1640     def test_interface_addr_static_mapping(self):
1641         """ Static mapping with addresses from interface """
1642         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1643         self.nat44_add_static_mapping(
1644             '1.2.3.4',
1645             external_sw_if_index=self.pg7.sw_if_index)
1646
1647         # static mappings with external interface
1648         static_mappings = self.vapi.nat44_static_mapping_dump()
1649         self.assertEqual(1, len(static_mappings))
1650         self.assertEqual(self.pg7.sw_if_index,
1651                          static_mappings[0].external_sw_if_index)
1652
1653         # configure interface address and check static mappings
1654         self.pg7.config_ip4()
1655         static_mappings = self.vapi.nat44_static_mapping_dump()
1656         self.assertEqual(1, len(static_mappings))
1657         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1658                          self.pg7.local_ip4n)
1659         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1660
1661         # remove interface address and check static mappings
1662         self.pg7.unconfig_ip4()
1663         static_mappings = self.vapi.nat44_static_mapping_dump()
1664         self.assertEqual(0, len(static_mappings))
1665
1666     def test_ipfix_nat44_sess(self):
1667         """ IPFIX logging NAT44 session created/delted """
1668         self.ipfix_domain_id = 10
1669         self.ipfix_src_port = 20202
1670         colector_port = 30303
1671         bind_layers(UDP, IPFIX, dport=30303)
1672         self.nat44_add_address(self.nat_addr)
1673         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1674         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1675                                                   is_inside=0)
1676         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1677                                      src_address=self.pg3.local_ip4n,
1678                                      path_mtu=512,
1679                                      template_interval=10,
1680                                      collector_port=colector_port)
1681         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1682                             src_port=self.ipfix_src_port)
1683
1684         pkts = self.create_stream_in(self.pg0, self.pg1)
1685         self.pg0.add_stream(pkts)
1686         self.pg_enable_capture(self.pg_interfaces)
1687         self.pg_start()
1688         capture = self.pg1.get_capture(len(pkts))
1689         self.verify_capture_out(capture)
1690         self.nat44_add_address(self.nat_addr, is_add=0)
1691         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1692         capture = self.pg3.get_capture(3)
1693         ipfix = IPFIXDecoder()
1694         # first load template
1695         for p in capture:
1696             self.assertTrue(p.haslayer(IPFIX))
1697             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1698             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1699             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1700             self.assertEqual(p[UDP].dport, colector_port)
1701             self.assertEqual(p[IPFIX].observationDomainID,
1702                              self.ipfix_domain_id)
1703             if p.haslayer(Template):
1704                 ipfix.add_template(p.getlayer(Template))
1705         # verify events in data set
1706         for p in capture:
1707             if p.haslayer(Data):
1708                 data = ipfix.decode_data_set(p.getlayer(Set))
1709                 self.verify_ipfix_nat44_ses(data)
1710
1711     def test_ipfix_addr_exhausted(self):
1712         """ IPFIX logging NAT addresses exhausted """
1713         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1714         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1715                                                   is_inside=0)
1716         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1717                                      src_address=self.pg3.local_ip4n,
1718                                      path_mtu=512,
1719                                      template_interval=10)
1720         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1721                             src_port=self.ipfix_src_port)
1722
1723         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1724              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1725              TCP(sport=3025))
1726         self.pg0.add_stream(p)
1727         self.pg_enable_capture(self.pg_interfaces)
1728         self.pg_start()
1729         capture = self.pg1.get_capture(0)
1730         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1731         capture = self.pg3.get_capture(3)
1732         ipfix = IPFIXDecoder()
1733         # first load template
1734         for p in capture:
1735             self.assertTrue(p.haslayer(IPFIX))
1736             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1737             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1738             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1739             self.assertEqual(p[UDP].dport, 4739)
1740             self.assertEqual(p[IPFIX].observationDomainID,
1741                              self.ipfix_domain_id)
1742             if p.haslayer(Template):
1743                 ipfix.add_template(p.getlayer(Template))
1744         # verify events in data set
1745         for p in capture:
1746             if p.haslayer(Data):
1747                 data = ipfix.decode_data_set(p.getlayer(Set))
1748                 self.verify_ipfix_addr_exhausted(data)
1749
1750     def test_pool_addr_fib(self):
1751         """ NAT44 add pool addresses to FIB """
1752         static_addr = '10.0.0.10'
1753         self.nat44_add_address(self.nat_addr)
1754         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1755         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1756                                                   is_inside=0)
1757         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1758
1759         # NAT44 address
1760         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1761              ARP(op=ARP.who_has, pdst=self.nat_addr,
1762                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1763         self.pg1.add_stream(p)
1764         self.pg_enable_capture(self.pg_interfaces)
1765         self.pg_start()
1766         capture = self.pg1.get_capture(1)
1767         self.assertTrue(capture[0].haslayer(ARP))
1768         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1769
1770         # 1:1 NAT address
1771         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1772              ARP(op=ARP.who_has, pdst=static_addr,
1773                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1774         self.pg1.add_stream(p)
1775         self.pg_enable_capture(self.pg_interfaces)
1776         self.pg_start()
1777         capture = self.pg1.get_capture(1)
1778         self.assertTrue(capture[0].haslayer(ARP))
1779         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1780
1781         # send ARP to non-NAT44 interface
1782         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1783              ARP(op=ARP.who_has, pdst=self.nat_addr,
1784                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1785         self.pg2.add_stream(p)
1786         self.pg_enable_capture(self.pg_interfaces)
1787         self.pg_start()
1788         capture = self.pg1.get_capture(0)
1789
1790         # remove addresses and verify
1791         self.nat44_add_address(self.nat_addr, is_add=0)
1792         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1793                                       is_add=0)
1794
1795         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1796              ARP(op=ARP.who_has, pdst=self.nat_addr,
1797                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1798         self.pg1.add_stream(p)
1799         self.pg_enable_capture(self.pg_interfaces)
1800         self.pg_start()
1801         capture = self.pg1.get_capture(0)
1802
1803         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1804              ARP(op=ARP.who_has, pdst=static_addr,
1805                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1806         self.pg1.add_stream(p)
1807         self.pg_enable_capture(self.pg_interfaces)
1808         self.pg_start()
1809         capture = self.pg1.get_capture(0)
1810
1811     def test_vrf_mode(self):
1812         """ NAT44 tenant VRF aware address pool mode """
1813
1814         vrf_id1 = 1
1815         vrf_id2 = 2
1816         nat_ip1 = "10.0.0.10"
1817         nat_ip2 = "10.0.0.11"
1818
1819         self.pg0.unconfig_ip4()
1820         self.pg1.unconfig_ip4()
1821         self.vapi.ip_table_add_del(vrf_id1, is_add=1)
1822         self.vapi.ip_table_add_del(vrf_id2, is_add=1)
1823         self.pg0.set_table_ip4(vrf_id1)
1824         self.pg1.set_table_ip4(vrf_id2)
1825         self.pg0.config_ip4()
1826         self.pg1.config_ip4()
1827
1828         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
1829         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
1830         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1831         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1832         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1833                                                   is_inside=0)
1834
1835         # first VRF
1836         pkts = self.create_stream_in(self.pg0, self.pg2)
1837         self.pg0.add_stream(pkts)
1838         self.pg_enable_capture(self.pg_interfaces)
1839         self.pg_start()
1840         capture = self.pg2.get_capture(len(pkts))
1841         self.verify_capture_out(capture, nat_ip1)
1842
1843         # second VRF
1844         pkts = self.create_stream_in(self.pg1, self.pg2)
1845         self.pg1.add_stream(pkts)
1846         self.pg_enable_capture(self.pg_interfaces)
1847         self.pg_start()
1848         capture = self.pg2.get_capture(len(pkts))
1849         self.verify_capture_out(capture, nat_ip2)
1850
1851         self.pg0.unconfig_ip4()
1852         self.pg1.unconfig_ip4()
1853         self.pg0.set_table_ip4(0)
1854         self.pg1.set_table_ip4(0)
1855         self.vapi.ip_table_add_del(vrf_id1, is_add=0)
1856         self.vapi.ip_table_add_del(vrf_id2, is_add=0)
1857
1858     def test_vrf_feature_independent(self):
1859         """ NAT44 tenant VRF independent address pool mode """
1860
1861         nat_ip1 = "10.0.0.10"
1862         nat_ip2 = "10.0.0.11"
1863
1864         self.nat44_add_address(nat_ip1)
1865         self.nat44_add_address(nat_ip2)
1866         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1867         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1868         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1869                                                   is_inside=0)
1870
1871         # first VRF
1872         pkts = self.create_stream_in(self.pg0, self.pg2)
1873         self.pg0.add_stream(pkts)
1874         self.pg_enable_capture(self.pg_interfaces)
1875         self.pg_start()
1876         capture = self.pg2.get_capture(len(pkts))
1877         self.verify_capture_out(capture, nat_ip1)
1878
1879         # second VRF
1880         pkts = self.create_stream_in(self.pg1, self.pg2)
1881         self.pg1.add_stream(pkts)
1882         self.pg_enable_capture(self.pg_interfaces)
1883         self.pg_start()
1884         capture = self.pg2.get_capture(len(pkts))
1885         self.verify_capture_out(capture, nat_ip1)
1886
1887     def test_dynamic_ipless_interfaces(self):
1888         """ NAT44 interfaces without configured IP address """
1889
1890         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1891                                       mactobinary(self.pg7.remote_mac),
1892                                       self.pg7.remote_ip4n,
1893                                       is_static=1)
1894         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1895                                       mactobinary(self.pg8.remote_mac),
1896                                       self.pg8.remote_ip4n,
1897                                       is_static=1)
1898
1899         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1900                                    dst_address_length=32,
1901                                    next_hop_address=self.pg7.remote_ip4n,
1902                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1903         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1904                                    dst_address_length=32,
1905                                    next_hop_address=self.pg8.remote_ip4n,
1906                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1907
1908         self.nat44_add_address(self.nat_addr)
1909         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1910         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1911                                                   is_inside=0)
1912
1913         # in2out
1914         pkts = self.create_stream_in(self.pg7, self.pg8)
1915         self.pg7.add_stream(pkts)
1916         self.pg_enable_capture(self.pg_interfaces)
1917         self.pg_start()
1918         capture = self.pg8.get_capture(len(pkts))
1919         self.verify_capture_out(capture)
1920
1921         # out2in
1922         pkts = self.create_stream_out(self.pg8, self.nat_addr)
1923         self.pg8.add_stream(pkts)
1924         self.pg_enable_capture(self.pg_interfaces)
1925         self.pg_start()
1926         capture = self.pg7.get_capture(len(pkts))
1927         self.verify_capture_in(capture, self.pg7)
1928
1929     def test_static_ipless_interfaces(self):
1930         """ NAT44 interfaces without configured IP address - 1:1 NAT """
1931
1932         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1933                                       mactobinary(self.pg7.remote_mac),
1934                                       self.pg7.remote_ip4n,
1935                                       is_static=1)
1936         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1937                                       mactobinary(self.pg8.remote_mac),
1938                                       self.pg8.remote_ip4n,
1939                                       is_static=1)
1940
1941         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1942                                    dst_address_length=32,
1943                                    next_hop_address=self.pg7.remote_ip4n,
1944                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1945         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1946                                    dst_address_length=32,
1947                                    next_hop_address=self.pg8.remote_ip4n,
1948                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1949
1950         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
1951         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1952         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1953                                                   is_inside=0)
1954
1955         # out2in
1956         pkts = self.create_stream_out(self.pg8)
1957         self.pg8.add_stream(pkts)
1958         self.pg_enable_capture(self.pg_interfaces)
1959         self.pg_start()
1960         capture = self.pg7.get_capture(len(pkts))
1961         self.verify_capture_in(capture, self.pg7)
1962
1963         # in2out
1964         pkts = self.create_stream_in(self.pg7, self.pg8)
1965         self.pg7.add_stream(pkts)
1966         self.pg_enable_capture(self.pg_interfaces)
1967         self.pg_start()
1968         capture = self.pg8.get_capture(len(pkts))
1969         self.verify_capture_out(capture, self.nat_addr, True)
1970
1971     def test_static_with_port_ipless_interfaces(self):
1972         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
1973
1974         self.tcp_port_out = 30606
1975         self.udp_port_out = 30607
1976         self.icmp_id_out = 30608
1977
1978         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1979                                       mactobinary(self.pg7.remote_mac),
1980                                       self.pg7.remote_ip4n,
1981                                       is_static=1)
1982         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1983                                       mactobinary(self.pg8.remote_mac),
1984                                       self.pg8.remote_ip4n,
1985                                       is_static=1)
1986
1987         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1988                                    dst_address_length=32,
1989                                    next_hop_address=self.pg7.remote_ip4n,
1990                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1991         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1992                                    dst_address_length=32,
1993                                    next_hop_address=self.pg8.remote_ip4n,
1994                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1995
1996         self.nat44_add_address(self.nat_addr)
1997         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1998                                       self.tcp_port_in, self.tcp_port_out,
1999                                       proto=IP_PROTOS.tcp)
2000         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2001                                       self.udp_port_in, self.udp_port_out,
2002                                       proto=IP_PROTOS.udp)
2003         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2004                                       self.icmp_id_in, self.icmp_id_out,
2005                                       proto=IP_PROTOS.icmp)
2006         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2007         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2008                                                   is_inside=0)
2009
2010         # out2in
2011         pkts = self.create_stream_out(self.pg8)
2012         self.pg8.add_stream(pkts)
2013         self.pg_enable_capture(self.pg_interfaces)
2014         self.pg_start()
2015         capture = self.pg7.get_capture(len(pkts))
2016         self.verify_capture_in(capture, self.pg7)
2017
2018         # in2out
2019         pkts = self.create_stream_in(self.pg7, self.pg8)
2020         self.pg7.add_stream(pkts)
2021         self.pg_enable_capture(self.pg_interfaces)
2022         self.pg_start()
2023         capture = self.pg8.get_capture(len(pkts))
2024         self.verify_capture_out(capture)
2025
2026     def test_static_unknown_proto(self):
2027         """ 1:1 NAT translate packet with unknown protocol """
2028         nat_ip = "10.0.0.10"
2029         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2030         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2031         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2032                                                   is_inside=0)
2033
2034         # in2out
2035         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2036              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2037              GRE() /
2038              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2039              TCP(sport=1234, dport=1234))
2040         self.pg0.add_stream(p)
2041         self.pg_enable_capture(self.pg_interfaces)
2042         self.pg_start()
2043         p = self.pg1.get_capture(1)
2044         packet = p[0]
2045         try:
2046             self.assertEqual(packet[IP].src, nat_ip)
2047             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2048             self.assertTrue(packet.haslayer(GRE))
2049             self.check_ip_checksum(packet)
2050         except:
2051             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2052             raise
2053
2054         # out2in
2055         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2056              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2057              GRE() /
2058              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2059              TCP(sport=1234, dport=1234))
2060         self.pg1.add_stream(p)
2061         self.pg_enable_capture(self.pg_interfaces)
2062         self.pg_start()
2063         p = self.pg0.get_capture(1)
2064         packet = p[0]
2065         try:
2066             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2067             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2068             self.assertTrue(packet.haslayer(GRE))
2069             self.check_ip_checksum(packet)
2070         except:
2071             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2072             raise
2073
2074     def test_hairpinning_static_unknown_proto(self):
2075         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2076
2077         host = self.pg0.remote_hosts[0]
2078         server = self.pg0.remote_hosts[1]
2079
2080         host_nat_ip = "10.0.0.10"
2081         server_nat_ip = "10.0.0.11"
2082
2083         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2084         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2085         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2086         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2087                                                   is_inside=0)
2088
2089         # host to server
2090         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2091              IP(src=host.ip4, dst=server_nat_ip) /
2092              GRE() /
2093              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2094              TCP(sport=1234, dport=1234))
2095         self.pg0.add_stream(p)
2096         self.pg_enable_capture(self.pg_interfaces)
2097         self.pg_start()
2098         p = self.pg0.get_capture(1)
2099         packet = p[0]
2100         try:
2101             self.assertEqual(packet[IP].src, host_nat_ip)
2102             self.assertEqual(packet[IP].dst, server.ip4)
2103             self.assertTrue(packet.haslayer(GRE))
2104             self.check_ip_checksum(packet)
2105         except:
2106             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2107             raise
2108
2109         # server to host
2110         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2111              IP(src=server.ip4, dst=host_nat_ip) /
2112              GRE() /
2113              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2114              TCP(sport=1234, dport=1234))
2115         self.pg0.add_stream(p)
2116         self.pg_enable_capture(self.pg_interfaces)
2117         self.pg_start()
2118         p = self.pg0.get_capture(1)
2119         packet = p[0]
2120         try:
2121             self.assertEqual(packet[IP].src, server_nat_ip)
2122             self.assertEqual(packet[IP].dst, host.ip4)
2123             self.assertTrue(packet.haslayer(GRE))
2124             self.check_ip_checksum(packet)
2125         except:
2126             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2127             raise
2128
2129     def test_unknown_proto(self):
2130         """ NAT44 translate packet with unknown protocol """
2131         self.nat44_add_address(self.nat_addr)
2132         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2133         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2134                                                   is_inside=0)
2135
2136         # in2out
2137         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2138              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2139              TCP(sport=self.tcp_port_in, dport=20))
2140         self.pg0.add_stream(p)
2141         self.pg_enable_capture(self.pg_interfaces)
2142         self.pg_start()
2143         p = self.pg1.get_capture(1)
2144
2145         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2146              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2147              GRE() /
2148              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2149              TCP(sport=1234, dport=1234))
2150         self.pg0.add_stream(p)
2151         self.pg_enable_capture(self.pg_interfaces)
2152         self.pg_start()
2153         p = self.pg1.get_capture(1)
2154         packet = p[0]
2155         try:
2156             self.assertEqual(packet[IP].src, self.nat_addr)
2157             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2158             self.assertTrue(packet.haslayer(GRE))
2159             self.check_ip_checksum(packet)
2160         except:
2161             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2162             raise
2163
2164         # out2in
2165         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2166              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2167              GRE() /
2168              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2169              TCP(sport=1234, dport=1234))
2170         self.pg1.add_stream(p)
2171         self.pg_enable_capture(self.pg_interfaces)
2172         self.pg_start()
2173         p = self.pg0.get_capture(1)
2174         packet = p[0]
2175         try:
2176             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2177             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2178             self.assertTrue(packet.haslayer(GRE))
2179             self.check_ip_checksum(packet)
2180         except:
2181             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2182             raise
2183
2184     def test_hairpinning_unknown_proto(self):
2185         """ NAT44 translate packet with unknown protocol - hairpinning """
2186         host = self.pg0.remote_hosts[0]
2187         server = self.pg0.remote_hosts[1]
2188         host_in_port = 1234
2189         host_out_port = 0
2190         server_in_port = 5678
2191         server_out_port = 8765
2192         server_nat_ip = "10.0.0.11"
2193
2194         self.nat44_add_address(self.nat_addr)
2195         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2196         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2197                                                   is_inside=0)
2198
2199         # add static mapping for server
2200         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2201
2202         # host to server
2203         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2204              IP(src=host.ip4, dst=server_nat_ip) /
2205              TCP(sport=host_in_port, dport=server_out_port))
2206         self.pg0.add_stream(p)
2207         self.pg_enable_capture(self.pg_interfaces)
2208         self.pg_start()
2209         capture = self.pg0.get_capture(1)
2210
2211         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2212              IP(src=host.ip4, dst=server_nat_ip) /
2213              GRE() /
2214              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2215              TCP(sport=1234, dport=1234))
2216         self.pg0.add_stream(p)
2217         self.pg_enable_capture(self.pg_interfaces)
2218         self.pg_start()
2219         p = self.pg0.get_capture(1)
2220         packet = p[0]
2221         try:
2222             self.assertEqual(packet[IP].src, self.nat_addr)
2223             self.assertEqual(packet[IP].dst, server.ip4)
2224             self.assertTrue(packet.haslayer(GRE))
2225             self.check_ip_checksum(packet)
2226         except:
2227             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2228             raise
2229
2230         # server to host
2231         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2232              IP(src=server.ip4, dst=self.nat_addr) /
2233              GRE() /
2234              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2235              TCP(sport=1234, dport=1234))
2236         self.pg0.add_stream(p)
2237         self.pg_enable_capture(self.pg_interfaces)
2238         self.pg_start()
2239         p = self.pg0.get_capture(1)
2240         packet = p[0]
2241         try:
2242             self.assertEqual(packet[IP].src, server_nat_ip)
2243             self.assertEqual(packet[IP].dst, host.ip4)
2244             self.assertTrue(packet.haslayer(GRE))
2245             self.check_ip_checksum(packet)
2246         except:
2247             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2248             raise
2249
2250     def test_output_feature(self):
2251         """ NAT44 interface output feature (in2out postrouting) """
2252         self.nat44_add_address(self.nat_addr)
2253         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2254         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2255         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2256                                                          is_inside=0)
2257
2258         # in2out
2259         pkts = self.create_stream_in(self.pg0, self.pg3)
2260         self.pg0.add_stream(pkts)
2261         self.pg_enable_capture(self.pg_interfaces)
2262         self.pg_start()
2263         capture = self.pg3.get_capture(len(pkts))
2264         self.verify_capture_out(capture)
2265
2266         # out2in
2267         pkts = self.create_stream_out(self.pg3)
2268         self.pg3.add_stream(pkts)
2269         self.pg_enable_capture(self.pg_interfaces)
2270         self.pg_start()
2271         capture = self.pg0.get_capture(len(pkts))
2272         self.verify_capture_in(capture, self.pg0)
2273
2274         # from non-NAT interface to NAT inside interface
2275         pkts = self.create_stream_in(self.pg2, self.pg0)
2276         self.pg2.add_stream(pkts)
2277         self.pg_enable_capture(self.pg_interfaces)
2278         self.pg_start()
2279         capture = self.pg0.get_capture(len(pkts))
2280         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2281
2282     def test_output_feature_vrf_aware(self):
2283         """ NAT44 interface output feature VRF aware (in2out postrouting) """
2284         nat_ip_vrf10 = "10.0.0.10"
2285         nat_ip_vrf20 = "10.0.0.20"
2286
2287         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2288                                    dst_address_length=32,
2289                                    next_hop_address=self.pg3.remote_ip4n,
2290                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2291                                    table_id=10)
2292         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2293                                    dst_address_length=32,
2294                                    next_hop_address=self.pg3.remote_ip4n,
2295                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2296                                    table_id=20)
2297
2298         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2299         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2300         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2301         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2302         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2303                                                          is_inside=0)
2304
2305         # in2out VRF 10
2306         pkts = self.create_stream_in(self.pg4, self.pg3)
2307         self.pg4.add_stream(pkts)
2308         self.pg_enable_capture(self.pg_interfaces)
2309         self.pg_start()
2310         capture = self.pg3.get_capture(len(pkts))
2311         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2312
2313         # out2in VRF 10
2314         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2315         self.pg3.add_stream(pkts)
2316         self.pg_enable_capture(self.pg_interfaces)
2317         self.pg_start()
2318         capture = self.pg4.get_capture(len(pkts))
2319         self.verify_capture_in(capture, self.pg4)
2320
2321         # in2out VRF 20
2322         pkts = self.create_stream_in(self.pg6, self.pg3)
2323         self.pg6.add_stream(pkts)
2324         self.pg_enable_capture(self.pg_interfaces)
2325         self.pg_start()
2326         capture = self.pg3.get_capture(len(pkts))
2327         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2328
2329         # out2in VRF 20
2330         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2331         self.pg3.add_stream(pkts)
2332         self.pg_enable_capture(self.pg_interfaces)
2333         self.pg_start()
2334         capture = self.pg6.get_capture(len(pkts))
2335         self.verify_capture_in(capture, self.pg6)
2336
2337     def test_output_feature_hairpinning(self):
2338         """ NAT44 interface output feature hairpinning (in2out postrouting) """
2339         host = self.pg0.remote_hosts[0]
2340         server = self.pg0.remote_hosts[1]
2341         host_in_port = 1234
2342         host_out_port = 0
2343         server_in_port = 5678
2344         server_out_port = 8765
2345
2346         self.nat44_add_address(self.nat_addr)
2347         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2348         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2349                                                          is_inside=0)
2350
2351         # add static mapping for server
2352         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2353                                       server_in_port, server_out_port,
2354                                       proto=IP_PROTOS.tcp)
2355
2356         # send packet from host to server
2357         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2358              IP(src=host.ip4, dst=self.nat_addr) /
2359              TCP(sport=host_in_port, dport=server_out_port))
2360         self.pg0.add_stream(p)
2361         self.pg_enable_capture(self.pg_interfaces)
2362         self.pg_start()
2363         capture = self.pg0.get_capture(1)
2364         p = capture[0]
2365         try:
2366             ip = p[IP]
2367             tcp = p[TCP]
2368             self.assertEqual(ip.src, self.nat_addr)
2369             self.assertEqual(ip.dst, server.ip4)
2370             self.assertNotEqual(tcp.sport, host_in_port)
2371             self.assertEqual(tcp.dport, server_in_port)
2372             self.check_tcp_checksum(p)
2373             host_out_port = tcp.sport
2374         except:
2375             self.logger.error(ppp("Unexpected or invalid packet:", p))
2376             raise
2377
2378         # send reply from server to host
2379         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2380              IP(src=server.ip4, dst=self.nat_addr) /
2381              TCP(sport=server_in_port, dport=host_out_port))
2382         self.pg0.add_stream(p)
2383         self.pg_enable_capture(self.pg_interfaces)
2384         self.pg_start()
2385         capture = self.pg0.get_capture(1)
2386         p = capture[0]
2387         try:
2388             ip = p[IP]
2389             tcp = p[TCP]
2390             self.assertEqual(ip.src, self.nat_addr)
2391             self.assertEqual(ip.dst, host.ip4)
2392             self.assertEqual(tcp.sport, server_out_port)
2393             self.assertEqual(tcp.dport, host_in_port)
2394             self.check_tcp_checksum(p)
2395         except:
2396             self.logger.error(ppp("Unexpected or invalid packet:"), p)
2397             raise
2398
2399     def test_one_armed_nat44(self):
2400         """ One armed NAT44 """
2401         remote_host = self.pg9.remote_hosts[0]
2402         local_host = self.pg9.remote_hosts[1]
2403         external_port = 0
2404
2405         self.nat44_add_address(self.nat_addr)
2406         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2407         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2408                                                   is_inside=0)
2409
2410         # in2out
2411         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2412              IP(src=local_host.ip4, dst=remote_host.ip4) /
2413              TCP(sport=12345, dport=80))
2414         self.pg9.add_stream(p)
2415         self.pg_enable_capture(self.pg_interfaces)
2416         self.pg_start()
2417         capture = self.pg9.get_capture(1)
2418         p = capture[0]
2419         try:
2420             ip = p[IP]
2421             tcp = p[TCP]
2422             self.assertEqual(ip.src, self.nat_addr)
2423             self.assertEqual(ip.dst, remote_host.ip4)
2424             self.assertNotEqual(tcp.sport, 12345)
2425             external_port = tcp.sport
2426             self.assertEqual(tcp.dport, 80)
2427             self.check_tcp_checksum(p)
2428             self.check_ip_checksum(p)
2429         except:
2430             self.logger.error(ppp("Unexpected or invalid packet:", p))
2431             raise
2432
2433         # out2in
2434         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2435              IP(src=remote_host.ip4, dst=self.nat_addr) /
2436              TCP(sport=80, dport=external_port))
2437         self.pg9.add_stream(p)
2438         self.pg_enable_capture(self.pg_interfaces)
2439         self.pg_start()
2440         capture = self.pg9.get_capture(1)
2441         p = capture[0]
2442         try:
2443             ip = p[IP]
2444             tcp = p[TCP]
2445             self.assertEqual(ip.src, remote_host.ip4)
2446             self.assertEqual(ip.dst, local_host.ip4)
2447             self.assertEqual(tcp.sport, 80)
2448             self.assertEqual(tcp.dport, 12345)
2449             self.check_tcp_checksum(p)
2450             self.check_ip_checksum(p)
2451         except:
2452             self.logger.error(ppp("Unexpected or invalid packet:", p))
2453             raise
2454
2455     def tearDown(self):
2456         super(TestNAT44, self).tearDown()
2457         if not self.vpp_dead:
2458             self.logger.info(self.vapi.cli("show nat44 verbose"))
2459             self.clear_nat44()
2460
2461
2462 class TestDeterministicNAT(MethodHolder):
2463     """ Deterministic NAT Test Cases """
2464
2465     @classmethod
2466     def setUpConstants(cls):
2467         super(TestDeterministicNAT, cls).setUpConstants()
2468         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2469
2470     @classmethod
2471     def setUpClass(cls):
2472         super(TestDeterministicNAT, cls).setUpClass()
2473
2474         try:
2475             cls.tcp_port_in = 6303
2476             cls.tcp_external_port = 6303
2477             cls.udp_port_in = 6304
2478             cls.udp_external_port = 6304
2479             cls.icmp_id_in = 6305
2480             cls.nat_addr = '10.0.0.3'
2481
2482             cls.create_pg_interfaces(range(3))
2483             cls.interfaces = list(cls.pg_interfaces)
2484
2485             for i in cls.interfaces:
2486                 i.admin_up()
2487                 i.config_ip4()
2488                 i.resolve_arp()
2489
2490             cls.pg0.generate_remote_hosts(2)
2491             cls.pg0.configure_ipv4_neighbors()
2492
2493         except Exception:
2494             super(TestDeterministicNAT, cls).tearDownClass()
2495             raise
2496
2497     def create_stream_in(self, in_if, out_if, ttl=64):
2498         """
2499         Create packet stream for inside network
2500
2501         :param in_if: Inside interface
2502         :param out_if: Outside interface
2503         :param ttl: TTL of generated packets
2504         """
2505         pkts = []
2506         # TCP
2507         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2508              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2509              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2510         pkts.append(p)
2511
2512         # UDP
2513         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2514              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2515              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2516         pkts.append(p)
2517
2518         # ICMP
2519         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2520              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2521              ICMP(id=self.icmp_id_in, type='echo-request'))
2522         pkts.append(p)
2523
2524         return pkts
2525
2526     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2527         """
2528         Create packet stream for outside network
2529
2530         :param out_if: Outside interface
2531         :param dst_ip: Destination IP address (Default use global NAT address)
2532         :param ttl: TTL of generated packets
2533         """
2534         if dst_ip is None:
2535             dst_ip = self.nat_addr
2536         pkts = []
2537         # TCP
2538         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2539              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2540              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2541         pkts.append(p)
2542
2543         # UDP
2544         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2545              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2546              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2547         pkts.append(p)
2548
2549         # ICMP
2550         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2551              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2552              ICMP(id=self.icmp_external_id, type='echo-reply'))
2553         pkts.append(p)
2554
2555         return pkts
2556
2557     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2558         """
2559         Verify captured packets on outside network
2560
2561         :param capture: Captured packets
2562         :param nat_ip: Translated IP address (Default use global NAT address)
2563         :param same_port: Sorce port number is not translated (Default False)
2564         :param packet_num: Expected number of packets (Default 3)
2565         """
2566         if nat_ip is None:
2567             nat_ip = self.nat_addr
2568         self.assertEqual(packet_num, len(capture))
2569         for packet in capture:
2570             try:
2571                 self.assertEqual(packet[IP].src, nat_ip)
2572                 if packet.haslayer(TCP):
2573                     self.tcp_port_out = packet[TCP].sport
2574                 elif packet.haslayer(UDP):
2575                     self.udp_port_out = packet[UDP].sport
2576                 else:
2577                     self.icmp_external_id = packet[ICMP].id
2578             except:
2579                 self.logger.error(ppp("Unexpected or invalid packet "
2580                                       "(outside network):", packet))
2581                 raise
2582
2583     def initiate_tcp_session(self, in_if, out_if):
2584         """
2585         Initiates TCP session
2586
2587         :param in_if: Inside interface
2588         :param out_if: Outside interface
2589         """
2590         try:
2591             # SYN packet in->out
2592             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2593                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2594                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2595                      flags="S"))
2596             in_if.add_stream(p)
2597             self.pg_enable_capture(self.pg_interfaces)
2598             self.pg_start()
2599             capture = out_if.get_capture(1)
2600             p = capture[0]
2601             self.tcp_port_out = p[TCP].sport
2602
2603             # SYN + ACK packet out->in
2604             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2605                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
2606                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2607                      flags="SA"))
2608             out_if.add_stream(p)
2609             self.pg_enable_capture(self.pg_interfaces)
2610             self.pg_start()
2611             in_if.get_capture(1)
2612
2613             # ACK packet in->out
2614             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2615                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2616                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2617                      flags="A"))
2618             in_if.add_stream(p)
2619             self.pg_enable_capture(self.pg_interfaces)
2620             self.pg_start()
2621             out_if.get_capture(1)
2622
2623         except:
2624             self.logger.error("TCP 3 way handshake failed")
2625             raise
2626
2627     def verify_ipfix_max_entries_per_user(self, data):
2628         """
2629         Verify IPFIX maximum entries per user exceeded event
2630
2631         :param data: Decoded IPFIX data records
2632         """
2633         self.assertEqual(1, len(data))
2634         record = data[0]
2635         # natEvent
2636         self.assertEqual(ord(record[230]), 13)
2637         # natQuotaExceededEvent
2638         self.assertEqual('\x03\x00\x00\x00', record[466])
2639         # sourceIPv4Address
2640         self.assertEqual(self.pg0.remote_ip4n, record[8])
2641
2642     def test_deterministic_mode(self):
2643         """ NAT plugin run deterministic mode """
2644         in_addr = '172.16.255.0'
2645         out_addr = '172.17.255.50'
2646         in_addr_t = '172.16.255.20'
2647         in_addr_n = socket.inet_aton(in_addr)
2648         out_addr_n = socket.inet_aton(out_addr)
2649         in_addr_t_n = socket.inet_aton(in_addr_t)
2650         in_plen = 24
2651         out_plen = 32
2652
2653         nat_config = self.vapi.nat_show_config()
2654         self.assertEqual(1, nat_config.deterministic)
2655
2656         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
2657
2658         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
2659         self.assertEqual(rep1.out_addr[:4], out_addr_n)
2660         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
2661         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2662
2663         deterministic_mappings = self.vapi.nat_det_map_dump()
2664         self.assertEqual(len(deterministic_mappings), 1)
2665         dsm = deterministic_mappings[0]
2666         self.assertEqual(in_addr_n, dsm.in_addr[:4])
2667         self.assertEqual(in_plen, dsm.in_plen)
2668         self.assertEqual(out_addr_n, dsm.out_addr[:4])
2669         self.assertEqual(out_plen, dsm.out_plen)
2670
2671         self.clear_nat_det()
2672         deterministic_mappings = self.vapi.nat_det_map_dump()
2673         self.assertEqual(len(deterministic_mappings), 0)
2674
2675     def test_set_timeouts(self):
2676         """ Set deterministic NAT timeouts """
2677         timeouts_before = self.vapi.nat_det_get_timeouts()
2678
2679         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
2680                                        timeouts_before.tcp_established + 10,
2681                                        timeouts_before.tcp_transitory + 10,
2682                                        timeouts_before.icmp + 10)
2683
2684         timeouts_after = self.vapi.nat_det_get_timeouts()
2685
2686         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2687         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2688         self.assertNotEqual(timeouts_before.tcp_established,
2689                             timeouts_after.tcp_established)
2690         self.assertNotEqual(timeouts_before.tcp_transitory,
2691                             timeouts_after.tcp_transitory)
2692
2693     def test_det_in(self):
2694         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
2695
2696         nat_ip = "10.0.0.10"
2697
2698         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2699                                       32,
2700                                       socket.inet_aton(nat_ip),
2701                                       32)
2702         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2703         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2704                                                   is_inside=0)
2705
2706         # in2out
2707         pkts = self.create_stream_in(self.pg0, self.pg1)
2708         self.pg0.add_stream(pkts)
2709         self.pg_enable_capture(self.pg_interfaces)
2710         self.pg_start()
2711         capture = self.pg1.get_capture(len(pkts))
2712         self.verify_capture_out(capture, nat_ip)
2713
2714         # out2in
2715         pkts = self.create_stream_out(self.pg1, nat_ip)
2716         self.pg1.add_stream(pkts)
2717         self.pg_enable_capture(self.pg_interfaces)
2718         self.pg_start()
2719         capture = self.pg0.get_capture(len(pkts))
2720         self.verify_capture_in(capture, self.pg0)
2721
2722         # session dump test
2723         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
2724         self.assertEqual(len(sessions), 3)
2725
2726         # TCP session
2727         s = sessions[0]
2728         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2729         self.assertEqual(s.in_port, self.tcp_port_in)
2730         self.assertEqual(s.out_port, self.tcp_port_out)
2731         self.assertEqual(s.ext_port, self.tcp_external_port)
2732
2733         # UDP session
2734         s = sessions[1]
2735         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2736         self.assertEqual(s.in_port, self.udp_port_in)
2737         self.assertEqual(s.out_port, self.udp_port_out)
2738         self.assertEqual(s.ext_port, self.udp_external_port)
2739
2740         # ICMP session
2741         s = sessions[2]
2742         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2743         self.assertEqual(s.in_port, self.icmp_id_in)
2744         self.assertEqual(s.out_port, self.icmp_external_id)
2745
2746     def test_multiple_users(self):
2747         """ Deterministic NAT multiple users """
2748
2749         nat_ip = "10.0.0.10"
2750         port_in = 80
2751         external_port = 6303
2752
2753         host0 = self.pg0.remote_hosts[0]
2754         host1 = self.pg0.remote_hosts[1]
2755
2756         self.vapi.nat_det_add_del_map(host0.ip4n,
2757                                       24,
2758                                       socket.inet_aton(nat_ip),
2759                                       32)
2760         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2761         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2762                                                   is_inside=0)
2763
2764         # host0 to out
2765         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2766              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2767              TCP(sport=port_in, dport=external_port))
2768         self.pg0.add_stream(p)
2769         self.pg_enable_capture(self.pg_interfaces)
2770         self.pg_start()
2771         capture = self.pg1.get_capture(1)
2772         p = capture[0]
2773         try:
2774             ip = p[IP]
2775             tcp = p[TCP]
2776             self.assertEqual(ip.src, nat_ip)
2777             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2778             self.assertEqual(tcp.dport, external_port)
2779             port_out0 = tcp.sport
2780         except:
2781             self.logger.error(ppp("Unexpected or invalid packet:", p))
2782             raise
2783
2784         # host1 to out
2785         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2786              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2787              TCP(sport=port_in, dport=external_port))
2788         self.pg0.add_stream(p)
2789         self.pg_enable_capture(self.pg_interfaces)
2790         self.pg_start()
2791         capture = self.pg1.get_capture(1)
2792         p = capture[0]
2793         try:
2794             ip = p[IP]
2795             tcp = p[TCP]
2796             self.assertEqual(ip.src, nat_ip)
2797             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2798             self.assertEqual(tcp.dport, external_port)
2799             port_out1 = tcp.sport
2800         except:
2801             self.logger.error(ppp("Unexpected or invalid packet:", p))
2802             raise
2803
2804         dms = self.vapi.nat_det_map_dump()
2805         self.assertEqual(1, len(dms))
2806         self.assertEqual(2, dms[0].ses_num)
2807
2808         # out to host0
2809         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2810              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2811              TCP(sport=external_port, dport=port_out0))
2812         self.pg1.add_stream(p)
2813         self.pg_enable_capture(self.pg_interfaces)
2814         self.pg_start()
2815         capture = self.pg0.get_capture(1)
2816         p = capture[0]
2817         try:
2818             ip = p[IP]
2819             tcp = p[TCP]
2820             self.assertEqual(ip.src, self.pg1.remote_ip4)
2821             self.assertEqual(ip.dst, host0.ip4)
2822             self.assertEqual(tcp.dport, port_in)
2823             self.assertEqual(tcp.sport, external_port)
2824         except:
2825             self.logger.error(ppp("Unexpected or invalid packet:", p))
2826             raise
2827
2828         # out to host1
2829         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2830              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2831              TCP(sport=external_port, dport=port_out1))
2832         self.pg1.add_stream(p)
2833         self.pg_enable_capture(self.pg_interfaces)
2834         self.pg_start()
2835         capture = self.pg0.get_capture(1)
2836         p = capture[0]
2837         try:
2838             ip = p[IP]
2839             tcp = p[TCP]
2840             self.assertEqual(ip.src, self.pg1.remote_ip4)
2841             self.assertEqual(ip.dst, host1.ip4)
2842             self.assertEqual(tcp.dport, port_in)
2843             self.assertEqual(tcp.sport, external_port)
2844         except:
2845             self.logger.error(ppp("Unexpected or invalid packet", p))
2846             raise
2847
2848         # session close api test
2849         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
2850                                             port_out1,
2851                                             self.pg1.remote_ip4n,
2852                                             external_port)
2853         dms = self.vapi.nat_det_map_dump()
2854         self.assertEqual(dms[0].ses_num, 1)
2855
2856         self.vapi.nat_det_close_session_in(host0.ip4n,
2857                                            port_in,
2858                                            self.pg1.remote_ip4n,
2859                                            external_port)
2860         dms = self.vapi.nat_det_map_dump()
2861         self.assertEqual(dms[0].ses_num, 0)
2862
2863     def test_tcp_session_close_detection_in(self):
2864         """ Deterministic NAT TCP session close from inside network """
2865         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2866                                       32,
2867                                       socket.inet_aton(self.nat_addr),
2868                                       32)
2869         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2870         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2871                                                   is_inside=0)
2872
2873         self.initiate_tcp_session(self.pg0, self.pg1)
2874
2875         # close the session from inside
2876         try:
2877             # FIN packet in -> out
2878             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2879                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2880                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2881                      flags="F"))
2882             self.pg0.add_stream(p)
2883             self.pg_enable_capture(self.pg_interfaces)
2884             self.pg_start()
2885             self.pg1.get_capture(1)
2886
2887             pkts = []
2888
2889             # ACK packet out -> in
2890             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2891                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2892                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2893                      flags="A"))
2894             pkts.append(p)
2895
2896             # FIN packet out -> in
2897             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2898                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2899                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2900                      flags="F"))
2901             pkts.append(p)
2902
2903             self.pg1.add_stream(pkts)
2904             self.pg_enable_capture(self.pg_interfaces)
2905             self.pg_start()
2906             self.pg0.get_capture(2)
2907
2908             # ACK packet in -> out
2909             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2910                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2911                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2912                      flags="A"))
2913             self.pg0.add_stream(p)
2914             self.pg_enable_capture(self.pg_interfaces)
2915             self.pg_start()
2916             self.pg1.get_capture(1)
2917
2918             # Check if deterministic NAT44 closed the session
2919             dms = self.vapi.nat_det_map_dump()
2920             self.assertEqual(0, dms[0].ses_num)
2921         except:
2922             self.logger.error("TCP session termination failed")
2923             raise
2924
2925     def test_tcp_session_close_detection_out(self):
2926         """ Deterministic NAT TCP session close from outside network """
2927         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2928                                       32,
2929                                       socket.inet_aton(self.nat_addr),
2930                                       32)
2931         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2932         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2933                                                   is_inside=0)
2934
2935         self.initiate_tcp_session(self.pg0, self.pg1)
2936
2937         # close the session from outside
2938         try:
2939             # FIN packet out -> in
2940             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2941                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2942                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2943                      flags="F"))
2944             self.pg1.add_stream(p)
2945             self.pg_enable_capture(self.pg_interfaces)
2946             self.pg_start()
2947             self.pg0.get_capture(1)
2948
2949             pkts = []
2950
2951             # ACK packet in -> out
2952             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2953                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2954                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2955                      flags="A"))
2956             pkts.append(p)
2957
2958             # ACK packet in -> out
2959             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2960                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2961                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2962                      flags="F"))
2963             pkts.append(p)
2964
2965             self.pg0.add_stream(pkts)
2966             self.pg_enable_capture(self.pg_interfaces)
2967             self.pg_start()
2968             self.pg1.get_capture(2)
2969
2970             # ACK packet out -> in
2971             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2972                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2973                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2974                      flags="A"))
2975             self.pg1.add_stream(p)
2976             self.pg_enable_capture(self.pg_interfaces)
2977             self.pg_start()
2978             self.pg0.get_capture(1)
2979
2980             # Check if deterministic NAT44 closed the session
2981             dms = self.vapi.nat_det_map_dump()
2982             self.assertEqual(0, dms[0].ses_num)
2983         except:
2984             self.logger.error("TCP session termination failed")
2985             raise
2986
2987     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2988     def test_session_timeout(self):
2989         """ Deterministic NAT session timeouts """
2990         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2991                                       32,
2992                                       socket.inet_aton(self.nat_addr),
2993                                       32)
2994         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2995         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2996                                                   is_inside=0)
2997
2998         self.initiate_tcp_session(self.pg0, self.pg1)
2999         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
3000         pkts = self.create_stream_in(self.pg0, self.pg1)
3001         self.pg0.add_stream(pkts)
3002         self.pg_enable_capture(self.pg_interfaces)
3003         self.pg_start()
3004         capture = self.pg1.get_capture(len(pkts))
3005         sleep(15)
3006
3007         dms = self.vapi.nat_det_map_dump()
3008         self.assertEqual(0, dms[0].ses_num)
3009
3010     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3011     def test_session_limit_per_user(self):
3012         """ Deterministic NAT maximum sessions per user limit """
3013         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3014                                       32,
3015                                       socket.inet_aton(self.nat_addr),
3016                                       32)
3017         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3018         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3019                                                   is_inside=0)
3020         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
3021                                      src_address=self.pg2.local_ip4n,
3022                                      path_mtu=512,
3023                                      template_interval=10)
3024         self.vapi.nat_ipfix()
3025
3026         pkts = []
3027         for port in range(1025, 2025):
3028             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3029                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3030                  UDP(sport=port, dport=port))
3031             pkts.append(p)
3032
3033         self.pg0.add_stream(pkts)
3034         self.pg_enable_capture(self.pg_interfaces)
3035         self.pg_start()
3036         capture = self.pg1.get_capture(len(pkts))
3037
3038         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3039              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3040              UDP(sport=3001, dport=3002))
3041         self.pg0.add_stream(p)
3042         self.pg_enable_capture(self.pg_interfaces)
3043         self.pg_start()
3044         capture = self.pg1.assert_nothing_captured()
3045
3046         # verify ICMP error packet
3047         capture = self.pg0.get_capture(1)
3048         p = capture[0]
3049         self.assertTrue(p.haslayer(ICMP))
3050         icmp = p[ICMP]
3051         self.assertEqual(icmp.type, 3)
3052         self.assertEqual(icmp.code, 1)
3053         self.assertTrue(icmp.haslayer(IPerror))
3054         inner_ip = icmp[IPerror]
3055         self.assertEqual(inner_ip[UDPerror].sport, 3001)
3056         self.assertEqual(inner_ip[UDPerror].dport, 3002)
3057
3058         dms = self.vapi.nat_det_map_dump()
3059
3060         self.assertEqual(1000, dms[0].ses_num)
3061
3062         # verify IPFIX logging
3063         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
3064         sleep(1)
3065         capture = self.pg2.get_capture(2)
3066         ipfix = IPFIXDecoder()
3067         # first load template
3068         for p in capture:
3069             self.assertTrue(p.haslayer(IPFIX))
3070             if p.haslayer(Template):
3071                 ipfix.add_template(p.getlayer(Template))
3072         # verify events in data set
3073         for p in capture:
3074             if p.haslayer(Data):
3075                 data = ipfix.decode_data_set(p.getlayer(Set))
3076                 self.verify_ipfix_max_entries_per_user(data)
3077
3078     def clear_nat_det(self):
3079         """
3080         Clear deterministic NAT configuration.
3081         """
3082         self.vapi.nat_ipfix(enable=0)
3083         self.vapi.nat_det_set_timeouts()
3084         deterministic_mappings = self.vapi.nat_det_map_dump()
3085         for dsm in deterministic_mappings:
3086             self.vapi.nat_det_add_del_map(dsm.in_addr,
3087                                           dsm.in_plen,
3088                                           dsm.out_addr,
3089                                           dsm.out_plen,
3090                                           is_add=0)
3091
3092         interfaces = self.vapi.nat44_interface_dump()
3093         for intf in interfaces:
3094             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3095                                                       intf.is_inside,
3096                                                       is_add=0)
3097
3098     def tearDown(self):
3099         super(TestDeterministicNAT, self).tearDown()
3100         if not self.vpp_dead:
3101             self.logger.info(self.vapi.cli("show nat44 detail"))
3102             self.clear_nat_det()
3103
3104
3105 class TestNAT64(MethodHolder):
3106     """ NAT64 Test Cases """
3107
3108     @classmethod
3109     def setUpClass(cls):
3110         super(TestNAT64, cls).setUpClass()
3111
3112         try:
3113             cls.tcp_port_in = 6303
3114             cls.tcp_port_out = 6303
3115             cls.udp_port_in = 6304
3116             cls.udp_port_out = 6304
3117             cls.icmp_id_in = 6305
3118             cls.icmp_id_out = 6305
3119             cls.nat_addr = '10.0.0.3'
3120             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3121             cls.vrf1_id = 10
3122             cls.vrf1_nat_addr = '10.0.10.3'
3123             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3124                                                    cls.vrf1_nat_addr)
3125
3126             cls.create_pg_interfaces(range(4))
3127             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3128             cls.ip6_interfaces.append(cls.pg_interfaces[2])
3129             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3130
3131             cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3132
3133             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3134
3135             cls.pg0.generate_remote_hosts(2)
3136
3137             for i in cls.ip6_interfaces:
3138                 i.admin_up()
3139                 i.config_ip6()
3140                 i.configure_ipv6_neighbors()
3141
3142             for i in cls.ip4_interfaces:
3143                 i.admin_up()
3144                 i.config_ip4()
3145                 i.resolve_arp()
3146
3147             cls.pg3.admin_up()
3148             cls.pg3.config_ip4()
3149             cls.pg3.resolve_arp()
3150             cls.pg3.config_ip6()
3151             cls.pg3.configure_ipv6_neighbors()
3152
3153         except Exception:
3154             super(TestNAT64, cls).tearDownClass()
3155             raise
3156
3157     def test_pool(self):
3158         """ Add/delete address to NAT64 pool """
3159         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3160
3161         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3162
3163         addresses = self.vapi.nat64_pool_addr_dump()
3164         self.assertEqual(len(addresses), 1)
3165         self.assertEqual(addresses[0].address, nat_addr)
3166
3167         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3168
3169         addresses = self.vapi.nat64_pool_addr_dump()
3170         self.assertEqual(len(addresses), 0)
3171
3172     def test_interface(self):
3173         """ Enable/disable NAT64 feature on the interface """
3174         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3175         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3176
3177         interfaces = self.vapi.nat64_interface_dump()
3178         self.assertEqual(len(interfaces), 2)
3179         pg0_found = False
3180         pg1_found = False
3181         for intf in interfaces:
3182             if intf.sw_if_index == self.pg0.sw_if_index:
3183                 self.assertEqual(intf.is_inside, 1)
3184                 pg0_found = True
3185             elif intf.sw_if_index == self.pg1.sw_if_index:
3186                 self.assertEqual(intf.is_inside, 0)
3187                 pg1_found = True
3188         self.assertTrue(pg0_found)
3189         self.assertTrue(pg1_found)
3190
3191         features = self.vapi.cli("show interface features pg0")
3192         self.assertNotEqual(features.find('nat64-in2out'), -1)
3193         features = self.vapi.cli("show interface features pg1")
3194         self.assertNotEqual(features.find('nat64-out2in'), -1)
3195
3196         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3197         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3198
3199         interfaces = self.vapi.nat64_interface_dump()
3200         self.assertEqual(len(interfaces), 0)
3201
3202     def test_static_bib(self):
3203         """ Add/delete static BIB entry """
3204         in_addr = socket.inet_pton(socket.AF_INET6,
3205                                    '2001:db8:85a3::8a2e:370:7334')
3206         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3207         in_port = 1234
3208         out_port = 5678
3209         proto = IP_PROTOS.tcp
3210
3211         self.vapi.nat64_add_del_static_bib(in_addr,
3212                                            out_addr,
3213                                            in_port,
3214                                            out_port,
3215                                            proto)
3216         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3217         static_bib_num = 0
3218         for bibe in bib:
3219             if bibe.is_static:
3220                 static_bib_num += 1
3221                 self.assertEqual(bibe.i_addr, in_addr)
3222                 self.assertEqual(bibe.o_addr, out_addr)
3223                 self.assertEqual(bibe.i_port, in_port)
3224                 self.assertEqual(bibe.o_port, out_port)
3225         self.assertEqual(static_bib_num, 1)
3226
3227         self.vapi.nat64_add_del_static_bib(in_addr,
3228                                            out_addr,
3229                                            in_port,
3230                                            out_port,
3231                                            proto,
3232                                            is_add=0)
3233         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3234         static_bib_num = 0
3235         for bibe in bib:
3236             if bibe.is_static:
3237                 static_bib_num += 1
3238         self.assertEqual(static_bib_num, 0)
3239
3240     def test_set_timeouts(self):
3241         """ Set NAT64 timeouts """
3242         # verify default values
3243         timeouts = self.vapi.nat64_get_timeouts()
3244         self.assertEqual(timeouts.udp, 300)
3245         self.assertEqual(timeouts.icmp, 60)
3246         self.assertEqual(timeouts.tcp_trans, 240)
3247         self.assertEqual(timeouts.tcp_est, 7440)
3248         self.assertEqual(timeouts.tcp_incoming_syn, 6)
3249
3250         # set and verify custom values
3251         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3252                                      tcp_est=7450, tcp_incoming_syn=10)
3253         timeouts = self.vapi.nat64_get_timeouts()
3254         self.assertEqual(timeouts.udp, 200)
3255         self.assertEqual(timeouts.icmp, 30)
3256         self.assertEqual(timeouts.tcp_trans, 250)
3257         self.assertEqual(timeouts.tcp_est, 7450)
3258         self.assertEqual(timeouts.tcp_incoming_syn, 10)
3259
3260     def test_dynamic(self):
3261         """ NAT64 dynamic translation test """
3262         self.tcp_port_in = 6303
3263         self.udp_port_in = 6304
3264         self.icmp_id_in = 6305
3265
3266         ses_num_start = self.nat64_get_ses_num()
3267
3268         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3269                                                 self.nat_addr_n)
3270         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3271         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3272
3273         # in2out
3274         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3275         self.pg0.add_stream(pkts)
3276         self.pg_enable_capture(self.pg_interfaces)
3277         self.pg_start()
3278         capture = self.pg1.get_capture(len(pkts))
3279         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3280                                 dst_ip=self.pg1.remote_ip4)
3281
3282         # out2in
3283         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3284         self.pg1.add_stream(pkts)
3285         self.pg_enable_capture(self.pg_interfaces)
3286         self.pg_start()
3287         capture = self.pg0.get_capture(len(pkts))
3288         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3289         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3290
3291         # in2out
3292         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3293         self.pg0.add_stream(pkts)
3294         self.pg_enable_capture(self.pg_interfaces)
3295         self.pg_start()
3296         capture = self.pg1.get_capture(len(pkts))
3297         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3298                                 dst_ip=self.pg1.remote_ip4)
3299
3300         # out2in
3301         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3302         self.pg1.add_stream(pkts)
3303         self.pg_enable_capture(self.pg_interfaces)
3304         self.pg_start()
3305         capture = self.pg0.get_capture(len(pkts))
3306         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3307
3308         ses_num_end = self.nat64_get_ses_num()
3309
3310         self.assertEqual(ses_num_end - ses_num_start, 3)
3311
3312         # tenant with specific VRF
3313         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3314                                                 self.vrf1_nat_addr_n,
3315                                                 vrf_id=self.vrf1_id)
3316         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3317
3318         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3319         self.pg2.add_stream(pkts)
3320         self.pg_enable_capture(self.pg_interfaces)
3321         self.pg_start()
3322         capture = self.pg1.get_capture(len(pkts))
3323         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3324                                 dst_ip=self.pg1.remote_ip4)
3325
3326         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3327         self.pg1.add_stream(pkts)
3328         self.pg_enable_capture(self.pg_interfaces)
3329         self.pg_start()
3330         capture = self.pg2.get_capture(len(pkts))
3331         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3332
3333     def test_static(self):
3334         """ NAT64 static translation test """
3335         self.tcp_port_in = 60303
3336         self.udp_port_in = 60304
3337         self.icmp_id_in = 60305
3338         self.tcp_port_out = 60303
3339         self.udp_port_out = 60304
3340         self.icmp_id_out = 60305
3341
3342         ses_num_start = self.nat64_get_ses_num()
3343
3344         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3345                                                 self.nat_addr_n)
3346         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3347         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3348
3349         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3350                                            self.nat_addr_n,
3351                                            self.tcp_port_in,
3352                                            self.tcp_port_out,
3353                                            IP_PROTOS.tcp)
3354         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3355                                            self.nat_addr_n,
3356                                            self.udp_port_in,
3357                                            self.udp_port_out,
3358                                            IP_PROTOS.udp)
3359         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3360                                            self.nat_addr_n,
3361                                            self.icmp_id_in,
3362                                            self.icmp_id_out,
3363                                            IP_PROTOS.icmp)
3364
3365         # in2out
3366         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3367         self.pg0.add_stream(pkts)
3368         self.pg_enable_capture(self.pg_interfaces)
3369         self.pg_start()
3370         capture = self.pg1.get_capture(len(pkts))
3371         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3372                                 dst_ip=self.pg1.remote_ip4, same_port=True)
3373
3374         # out2in
3375         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3376         self.pg1.add_stream(pkts)
3377         self.pg_enable_capture(self.pg_interfaces)
3378         self.pg_start()
3379         capture = self.pg0.get_capture(len(pkts))
3380         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3381         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3382
3383         ses_num_end = self.nat64_get_ses_num()
3384
3385         self.assertEqual(ses_num_end - ses_num_start, 3)
3386
3387     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3388     def test_session_timeout(self):
3389         """ NAT64 session timeout """
3390         self.icmp_id_in = 1234
3391         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3392                                                 self.nat_addr_n)
3393         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3394         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3395         self.vapi.nat64_set_timeouts(icmp=5)
3396
3397         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3398         self.pg0.add_stream(pkts)
3399         self.pg_enable_capture(self.pg_interfaces)
3400         self.pg_start()
3401         capture = self.pg1.get_capture(len(pkts))
3402
3403         ses_num_before_timeout = self.nat64_get_ses_num()
3404
3405         sleep(15)
3406
3407         # ICMP session after timeout
3408         ses_num_after_timeout = self.nat64_get_ses_num()
3409         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3410
3411     def test_icmp_error(self):
3412         """ NAT64 ICMP Error message translation """
3413         self.tcp_port_in = 6303
3414         self.udp_port_in = 6304
3415         self.icmp_id_in = 6305
3416
3417         ses_num_start = self.nat64_get_ses_num()
3418
3419         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3420                                                 self.nat_addr_n)
3421         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3422         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3423
3424         # send some packets to create sessions
3425         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3426         self.pg0.add_stream(pkts)
3427         self.pg_enable_capture(self.pg_interfaces)
3428         self.pg_start()
3429         capture_ip4 = self.pg1.get_capture(len(pkts))
3430         self.verify_capture_out(capture_ip4,
3431                                 nat_ip=self.nat_addr,
3432                                 dst_ip=self.pg1.remote_ip4)
3433
3434         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3435         self.pg1.add_stream(pkts)
3436         self.pg_enable_capture(self.pg_interfaces)
3437         self.pg_start()
3438         capture_ip6 = self.pg0.get_capture(len(pkts))
3439         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3440         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3441                                    self.pg0.remote_ip6)
3442
3443         # in2out
3444         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3445                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3446                 ICMPv6DestUnreach(code=1) /
3447                 packet[IPv6] for packet in capture_ip6]
3448         self.pg0.add_stream(pkts)
3449         self.pg_enable_capture(self.pg_interfaces)
3450         self.pg_start()
3451         capture = self.pg1.get_capture(len(pkts))
3452         for packet in capture:
3453             try:
3454                 self.assertEqual(packet[IP].src, self.nat_addr)
3455                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3456                 self.assertEqual(packet[ICMP].type, 3)
3457                 self.assertEqual(packet[ICMP].code, 13)
3458                 inner = packet[IPerror]
3459                 self.assertEqual(inner.src, self.pg1.remote_ip4)
3460                 self.assertEqual(inner.dst, self.nat_addr)
3461                 self.check_icmp_checksum(packet)
3462                 if inner.haslayer(TCPerror):
3463                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3464                 elif inner.haslayer(UDPerror):
3465                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3466                 else:
3467                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3468             except:
3469                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3470                 raise
3471
3472         # out2in
3473         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3474                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3475                 ICMP(type=3, code=13) /
3476                 packet[IP] for packet in capture_ip4]
3477         self.pg1.add_stream(pkts)
3478         self.pg_enable_capture(self.pg_interfaces)
3479         self.pg_start()
3480         capture = self.pg0.get_capture(len(pkts))
3481         for packet in capture:
3482             try:
3483                 self.assertEqual(packet[IPv6].src, ip.src)
3484                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3485                 icmp = packet[ICMPv6DestUnreach]
3486                 self.assertEqual(icmp.code, 1)
3487                 inner = icmp[IPerror6]
3488                 self.assertEqual(inner.src, self.pg0.remote_ip6)
3489                 self.assertEqual(inner.dst, ip.src)
3490                 self.check_icmpv6_checksum(packet)
3491                 if inner.haslayer(TCPerror):
3492                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3493                 elif inner.haslayer(UDPerror):
3494                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3495                 else:
3496                     self.assertEqual(inner[ICMPv6EchoRequest].id,
3497                                      self.icmp_id_in)
3498             except:
3499                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3500                 raise
3501
3502     def test_hairpinning(self):
3503         """ NAT64 hairpinning """
3504
3505         client = self.pg0.remote_hosts[0]
3506         server = self.pg0.remote_hosts[1]
3507         server_tcp_in_port = 22
3508         server_tcp_out_port = 4022
3509         server_udp_in_port = 23
3510         server_udp_out_port = 4023
3511         client_tcp_in_port = 1234
3512         client_udp_in_port = 1235
3513         client_tcp_out_port = 0
3514         client_udp_out_port = 0
3515         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3516         nat_addr_ip6 = ip.src
3517
3518         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3519                                                 self.nat_addr_n)
3520         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3521         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3522
3523         self.vapi.nat64_add_del_static_bib(server.ip6n,
3524                                            self.nat_addr_n,
3525                                            server_tcp_in_port,
3526                                            server_tcp_out_port,
3527                                            IP_PROTOS.tcp)
3528         self.vapi.nat64_add_del_static_bib(server.ip6n,
3529                                            self.nat_addr_n,
3530                                            server_udp_in_port,
3531                                            server_udp_out_port,
3532                                            IP_PROTOS.udp)
3533
3534         # client to server
3535         pkts = []
3536         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3537              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3538              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3539         pkts.append(p)
3540         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3541              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3542              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3543         pkts.append(p)
3544         self.pg0.add_stream(pkts)
3545         self.pg_enable_capture(self.pg_interfaces)
3546         self.pg_start()
3547         capture = self.pg0.get_capture(len(pkts))
3548         for packet in capture:
3549             try:
3550                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3551                 self.assertEqual(packet[IPv6].dst, server.ip6)
3552                 if packet.haslayer(TCP):
3553                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3554                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3555                     self.check_tcp_checksum(packet)
3556                     client_tcp_out_port = packet[TCP].sport
3557                 else:
3558                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3559                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
3560                     self.check_udp_checksum(packet)
3561                     client_udp_out_port = packet[UDP].sport
3562             except:
3563                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3564                 raise
3565
3566         # server to client
3567         pkts = []
3568         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3569              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3570              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3571         pkts.append(p)
3572         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3573              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3574              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3575         pkts.append(p)
3576         self.pg0.add_stream(pkts)
3577         self.pg_enable_capture(self.pg_interfaces)
3578         self.pg_start()
3579         capture = self.pg0.get_capture(len(pkts))
3580         for packet in capture:
3581             try:
3582                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3583                 self.assertEqual(packet[IPv6].dst, client.ip6)
3584                 if packet.haslayer(TCP):
3585                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3586                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3587                     self.check_tcp_checksum(packet)
3588                 else:
3589                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
3590                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
3591                     self.check_udp_checksum(packet)
3592             except:
3593                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3594                 raise
3595
3596         # ICMP error
3597         pkts = []
3598         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3599                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3600                 ICMPv6DestUnreach(code=1) /
3601                 packet[IPv6] for packet in capture]
3602         self.pg0.add_stream(pkts)
3603         self.pg_enable_capture(self.pg_interfaces)
3604         self.pg_start()
3605         capture = self.pg0.get_capture(len(pkts))
3606         for packet in capture:
3607             try:
3608                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3609                 self.assertEqual(packet[IPv6].dst, server.ip6)
3610                 icmp = packet[ICMPv6DestUnreach]
3611                 self.assertEqual(icmp.code, 1)
3612                 inner = icmp[IPerror6]
3613                 self.assertEqual(inner.src, server.ip6)
3614                 self.assertEqual(inner.dst, nat_addr_ip6)
3615                 self.check_icmpv6_checksum(packet)
3616                 if inner.haslayer(TCPerror):
3617                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3618                     self.assertEqual(inner[TCPerror].dport,
3619                                      client_tcp_out_port)
3620                 else:
3621                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3622                     self.assertEqual(inner[UDPerror].dport,
3623                                      client_udp_out_port)
3624             except:
3625                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3626                 raise
3627
3628     def test_prefix(self):
3629         """ NAT64 Network-Specific Prefix """
3630
3631         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3632                                                 self.nat_addr_n)
3633         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3634         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3635         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3636                                                 self.vrf1_nat_addr_n,
3637                                                 vrf_id=self.vrf1_id)
3638         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3639
3640         # Add global prefix
3641         global_pref64 = "2001:db8::"
3642         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3643         global_pref64_len = 32
3644         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3645
3646         prefix = self.vapi.nat64_prefix_dump()
3647         self.assertEqual(len(prefix), 1)
3648         self.assertEqual(prefix[0].prefix, global_pref64_n)
3649         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3650         self.assertEqual(prefix[0].vrf_id, 0)
3651
3652         # Add tenant specific prefix
3653         vrf1_pref64 = "2001:db8:122:300::"
3654         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3655         vrf1_pref64_len = 56
3656         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3657                                        vrf1_pref64_len,
3658                                        vrf_id=self.vrf1_id)
3659         prefix = self.vapi.nat64_prefix_dump()
3660         self.assertEqual(len(prefix), 2)
3661
3662         # Global prefix
3663         pkts = self.create_stream_in_ip6(self.pg0,
3664                                          self.pg1,
3665                                          pref=global_pref64,
3666                                          plen=global_pref64_len)
3667         self.pg0.add_stream(pkts)
3668         self.pg_enable_capture(self.pg_interfaces)
3669         self.pg_start()
3670         capture = self.pg1.get_capture(len(pkts))
3671         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3672                                 dst_ip=self.pg1.remote_ip4)
3673
3674         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3675         self.pg1.add_stream(pkts)
3676         self.pg_enable_capture(self.pg_interfaces)
3677         self.pg_start()
3678         capture = self.pg0.get_capture(len(pkts))
3679         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3680                                   global_pref64,
3681                                   global_pref64_len)
3682         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3683
3684         # Tenant specific prefix
3685         pkts = self.create_stream_in_ip6(self.pg2,
3686                                          self.pg1,
3687                                          pref=vrf1_pref64,
3688                                          plen=vrf1_pref64_len)
3689         self.pg2.add_stream(pkts)
3690         self.pg_enable_capture(self.pg_interfaces)
3691         self.pg_start()
3692         capture = self.pg1.get_capture(len(pkts))
3693         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3694                                 dst_ip=self.pg1.remote_ip4)
3695
3696         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3697         self.pg1.add_stream(pkts)
3698         self.pg_enable_capture(self.pg_interfaces)
3699         self.pg_start()
3700         capture = self.pg2.get_capture(len(pkts))
3701         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3702                                   vrf1_pref64,
3703                                   vrf1_pref64_len)
3704         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3705
3706     def test_unknown_proto(self):
3707         """ NAT64 translate packet with unknown protocol """
3708
3709         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3710                                                 self.nat_addr_n)
3711         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3712         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3713         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
3714
3715         # in2out
3716         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3717              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3718              TCP(sport=self.tcp_port_in, dport=20))
3719         self.pg0.add_stream(p)
3720         self.pg_enable_capture(self.pg_interfaces)
3721         self.pg_start()
3722         p = self.pg1.get_capture(1)
3723
3724         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3725              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
3726              GRE() /
3727              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3728              TCP(sport=1234, dport=1234))
3729         self.pg0.add_stream(p)
3730         self.pg_enable_capture(self.pg_interfaces)
3731         self.pg_start()
3732         p = self.pg1.get_capture(1)
3733         packet = p[0]
3734         try:
3735             self.assertEqual(packet[IP].src, self.nat_addr)
3736             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3737             self.assertTrue(packet.haslayer(GRE))
3738             self.check_ip_checksum(packet)
3739         except:
3740             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3741             raise
3742
3743         # out2in
3744         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3745              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3746              GRE() /
3747              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3748              TCP(sport=1234, dport=1234))
3749         self.pg1.add_stream(p)
3750         self.pg_enable_capture(self.pg_interfaces)
3751         self.pg_start()
3752         p = self.pg0.get_capture(1)
3753         packet = p[0]
3754         try:
3755             self.assertEqual(packet[IPv6].src, remote_ip6)
3756             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3757             self.assertEqual(packet[IPv6].nh, 47)
3758         except:
3759             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3760             raise
3761
3762     def test_hairpinning_unknown_proto(self):
3763         """ NAT64 translate packet with unknown protocol - hairpinning """
3764
3765         client = self.pg0.remote_hosts[0]
3766         server = self.pg0.remote_hosts[1]
3767         server_tcp_in_port = 22
3768         server_tcp_out_port = 4022
3769         client_tcp_in_port = 1234
3770         client_tcp_out_port = 1235
3771         server_nat_ip = "10.0.0.100"
3772         client_nat_ip = "10.0.0.110"
3773         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
3774         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
3775         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
3776         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
3777
3778         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
3779                                                 client_nat_ip_n)
3780         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3781         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3782
3783         self.vapi.nat64_add_del_static_bib(server.ip6n,
3784                                            server_nat_ip_n,
3785                                            server_tcp_in_port,
3786                                            server_tcp_out_port,
3787                                            IP_PROTOS.tcp)
3788
3789         self.vapi.nat64_add_del_static_bib(server.ip6n,
3790                                            server_nat_ip_n,
3791                                            0,
3792                                            0,
3793                                            IP_PROTOS.gre)
3794
3795         self.vapi.nat64_add_del_static_bib(client.ip6n,
3796                                            client_nat_ip_n,
3797                                            client_tcp_in_port,
3798                                            client_tcp_out_port,
3799                                            IP_PROTOS.tcp)
3800
3801         # client to server
3802         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3803              IPv6(src=client.ip6, dst=server_nat_ip6) /
3804              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3805         self.pg0.add_stream(p)
3806         self.pg_enable_capture(self.pg_interfaces)
3807         self.pg_start()
3808         p = self.pg0.get_capture(1)
3809
3810         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3811              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
3812              GRE() /
3813              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3814              TCP(sport=1234, dport=1234))
3815         self.pg0.add_stream(p)
3816         self.pg_enable_capture(self.pg_interfaces)
3817         self.pg_start()
3818         p = self.pg0.get_capture(1)
3819         packet = p[0]
3820         try:
3821             self.assertEqual(packet[IPv6].src, client_nat_ip6)
3822             self.assertEqual(packet[IPv6].dst, server.ip6)
3823             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3824         except:
3825             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3826             raise
3827
3828         # server to client
3829         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3830              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
3831              GRE() /
3832              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3833              TCP(sport=1234, dport=1234))
3834         self.pg0.add_stream(p)
3835         self.pg_enable_capture(self.pg_interfaces)
3836         self.pg_start()
3837         p = self.pg0.get_capture(1)
3838         packet = p[0]
3839         try:
3840             self.assertEqual(packet[IPv6].src, server_nat_ip6)
3841             self.assertEqual(packet[IPv6].dst, client.ip6)
3842             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3843         except:
3844             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3845             raise
3846
3847     def test_one_armed_nat64(self):
3848         """ One armed NAT64 """
3849         external_port = 0
3850         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
3851                                            '64:ff9b::',
3852                                            96)
3853
3854         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3855                                                 self.nat_addr_n)
3856         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
3857         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
3858
3859         # in2out
3860         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
3861              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
3862              TCP(sport=12345, dport=80))
3863         self.pg3.add_stream(p)
3864         self.pg_enable_capture(self.pg_interfaces)
3865         self.pg_start()
3866         capture = self.pg3.get_capture(1)
3867         p = capture[0]
3868         try:
3869             ip = p[IP]
3870             tcp = p[TCP]
3871             self.assertEqual(ip.src, self.nat_addr)
3872             self.assertEqual(ip.dst, self.pg3.remote_ip4)
3873             self.assertNotEqual(tcp.sport, 12345)
3874             external_port = tcp.sport
3875             self.assertEqual(tcp.dport, 80)
3876             self.check_tcp_checksum(p)
3877             self.check_ip_checksum(p)
3878         except:
3879             self.logger.error(ppp("Unexpected or invalid packet:", p))
3880             raise
3881
3882         # out2in
3883         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
3884              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
3885              TCP(sport=80, dport=external_port))
3886         self.pg3.add_stream(p)
3887         self.pg_enable_capture(self.pg_interfaces)
3888         self.pg_start()
3889         capture = self.pg3.get_capture(1)
3890         p = capture[0]
3891         try:
3892             ip = p[IPv6]
3893             tcp = p[TCP]
3894             self.assertEqual(ip.src, remote_host_ip6)
3895             self.assertEqual(ip.dst, self.pg3.remote_ip6)
3896             self.assertEqual(tcp.sport, 80)
3897             self.assertEqual(tcp.dport, 12345)
3898             self.check_tcp_checksum(p)
3899         except:
3900             self.logger.error(ppp("Unexpected or invalid packet:", p))
3901             raise
3902
3903     def nat64_get_ses_num(self):
3904         """
3905         Return number of active NAT64 sessions.
3906         """
3907         st = self.vapi.nat64_st_dump()
3908         return len(st)
3909
3910     def clear_nat64(self):
3911         """
3912         Clear NAT64 configuration.
3913         """
3914         self.vapi.nat64_set_timeouts()
3915
3916         interfaces = self.vapi.nat64_interface_dump()
3917         for intf in interfaces:
3918             if intf.is_inside > 1:
3919                 self.vapi.nat64_add_del_interface(intf.sw_if_index,
3920                                                   0,
3921                                                   is_add=0)
3922             self.vapi.nat64_add_del_interface(intf.sw_if_index,
3923                                               intf.is_inside,
3924                                               is_add=0)
3925
3926         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3927         for bibe in bib:
3928             if bibe.is_static:
3929                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3930                                                    bibe.o_addr,
3931                                                    bibe.i_port,
3932                                                    bibe.o_port,
3933                                                    bibe.proto,
3934                                                    bibe.vrf_id,
3935                                                    is_add=0)
3936
3937         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3938         for bibe in bib:
3939             if bibe.is_static:
3940                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3941                                                    bibe.o_addr,
3942                                                    bibe.i_port,
3943                                                    bibe.o_port,
3944                                                    bibe.proto,
3945                                                    bibe.vrf_id,
3946                                                    is_add=0)
3947
3948         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3949         for bibe in bib:
3950             if bibe.is_static:
3951                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3952                                                    bibe.o_addr,
3953                                                    bibe.i_port,
3954                                                    bibe.o_port,
3955                                                    bibe.proto,
3956                                                    bibe.vrf_id,
3957                                                    is_add=0)
3958
3959         adresses = self.vapi.nat64_pool_addr_dump()
3960         for addr in adresses:
3961             self.vapi.nat64_add_del_pool_addr_range(addr.address,
3962                                                     addr.address,
3963                                                     vrf_id=addr.vrf_id,
3964                                                     is_add=0)
3965
3966         prefixes = self.vapi.nat64_prefix_dump()
3967         for prefix in prefixes:
3968             self.vapi.nat64_add_del_prefix(prefix.prefix,
3969                                            prefix.prefix_len,
3970                                            vrf_id=prefix.vrf_id,
3971                                            is_add=0)
3972
3973     def tearDown(self):
3974         super(TestNAT64, self).tearDown()
3975         if not self.vpp_dead:
3976             self.logger.info(self.vapi.cli("show nat64 pool"))
3977             self.logger.info(self.vapi.cli("show nat64 interfaces"))
3978             self.logger.info(self.vapi.cli("show nat64 prefix"))
3979             self.logger.info(self.vapi.cli("show nat64 bib all"))
3980             self.logger.info(self.vapi.cli("show nat64 session table all"))
3981             self.clear_nat64()
3982
3983 if __name__ == '__main__':
3984     unittest.main(testRunner=VppTestRunner)