One armed NAT (VPP-1035)
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
15 from util import ppp
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
18 from util import ip4_range
19
20
21 class MethodHolder(VppTestCase):
22     """ NAT create capture and verify method holder """
23
24     @classmethod
25     def setUpClass(cls):
26         super(MethodHolder, cls).setUpClass()
27
28     def tearDown(self):
29         super(MethodHolder, self).tearDown()
30
31     def check_ip_checksum(self, pkt):
32         """
33         Check IP checksum of the packet
34
35         :param pkt: Packet to check IP checksum
36         """
37         new = pkt.__class__(str(pkt))
38         del new['IP'].chksum
39         new = new.__class__(str(new))
40         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
41
42     def check_tcp_checksum(self, pkt):
43         """
44         Check TCP checksum in IP packet
45
46         :param pkt: Packet to check TCP checksum
47         """
48         new = pkt.__class__(str(pkt))
49         del new['TCP'].chksum
50         new = new.__class__(str(new))
51         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
52
53     def check_udp_checksum(self, pkt):
54         """
55         Check UDP checksum in IP packet
56
57         :param pkt: Packet to check UDP checksum
58         """
59         new = pkt.__class__(str(pkt))
60         del new['UDP'].chksum
61         new = new.__class__(str(new))
62         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
63
64     def check_icmp_errror_embedded(self, pkt):
65         """
66         Check ICMP error embeded packet checksum
67
68         :param pkt: Packet to check ICMP error embeded packet checksum
69         """
70         if pkt.haslayer(IPerror):
71             new = pkt.__class__(str(pkt))
72             del new['IPerror'].chksum
73             new = new.__class__(str(new))
74             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
75
76         if pkt.haslayer(TCPerror):
77             new = pkt.__class__(str(pkt))
78             del new['TCPerror'].chksum
79             new = new.__class__(str(new))
80             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
81
82         if pkt.haslayer(UDPerror):
83             if pkt['UDPerror'].chksum != 0:
84                 new = pkt.__class__(str(pkt))
85                 del new['UDPerror'].chksum
86                 new = new.__class__(str(new))
87                 self.assertEqual(new['UDPerror'].chksum,
88                                  pkt['UDPerror'].chksum)
89
90         if pkt.haslayer(ICMPerror):
91             del new['ICMPerror'].chksum
92             new = new.__class__(str(new))
93             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
94
95     def check_icmp_checksum(self, pkt):
96         """
97         Check ICMP checksum in IPv4 packet
98
99         :param pkt: Packet to check ICMP checksum
100         """
101         new = pkt.__class__(str(pkt))
102         del new['ICMP'].chksum
103         new = new.__class__(str(new))
104         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
105         if pkt.haslayer(IPerror):
106             self.check_icmp_errror_embedded(pkt)
107
108     def check_icmpv6_checksum(self, pkt):
109         """
110         Check ICMPv6 checksum in IPv4 packet
111
112         :param pkt: Packet to check ICMPv6 checksum
113         """
114         new = pkt.__class__(str(pkt))
115         if pkt.haslayer(ICMPv6DestUnreach):
116             del new['ICMPv6DestUnreach'].cksum
117             new = new.__class__(str(new))
118             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
119                              pkt['ICMPv6DestUnreach'].cksum)
120             self.check_icmp_errror_embedded(pkt)
121         if pkt.haslayer(ICMPv6EchoRequest):
122             del new['ICMPv6EchoRequest'].cksum
123             new = new.__class__(str(new))
124             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
125                              pkt['ICMPv6EchoRequest'].cksum)
126         if pkt.haslayer(ICMPv6EchoReply):
127             del new['ICMPv6EchoReply'].cksum
128             new = new.__class__(str(new))
129             self.assertEqual(new['ICMPv6EchoReply'].cksum,
130                              pkt['ICMPv6EchoReply'].cksum)
131
132     def create_stream_in(self, in_if, out_if, ttl=64):
133         """
134         Create packet stream for inside network
135
136         :param in_if: Inside interface
137         :param out_if: Outside interface
138         :param ttl: TTL of generated packets
139         """
140         pkts = []
141         # TCP
142         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
143              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
144              TCP(sport=self.tcp_port_in, dport=20))
145         pkts.append(p)
146
147         # UDP
148         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
149              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
150              UDP(sport=self.udp_port_in, dport=20))
151         pkts.append(p)
152
153         # ICMP
154         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
155              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
156              ICMP(id=self.icmp_id_in, type='echo-request'))
157         pkts.append(p)
158
159         return pkts
160
161     def compose_ip6(self, ip4, pref, plen):
162         """
163         Compose IPv4-embedded IPv6 addresses
164
165         :param ip4: IPv4 address
166         :param pref: IPv6 prefix
167         :param plen: IPv6 prefix length
168         :returns: IPv4-embedded IPv6 addresses
169         """
170         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
171         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
172         if plen == 32:
173             pref_n[4] = ip4_n[0]
174             pref_n[5] = ip4_n[1]
175             pref_n[6] = ip4_n[2]
176             pref_n[7] = ip4_n[3]
177         elif plen == 40:
178             pref_n[5] = ip4_n[0]
179             pref_n[6] = ip4_n[1]
180             pref_n[7] = ip4_n[2]
181             pref_n[9] = ip4_n[3]
182         elif plen == 48:
183             pref_n[6] = ip4_n[0]
184             pref_n[7] = ip4_n[1]
185             pref_n[9] = ip4_n[2]
186             pref_n[10] = ip4_n[3]
187         elif plen == 56:
188             pref_n[7] = ip4_n[0]
189             pref_n[9] = ip4_n[1]
190             pref_n[10] = ip4_n[2]
191             pref_n[11] = ip4_n[3]
192         elif plen == 64:
193             pref_n[9] = ip4_n[0]
194             pref_n[10] = ip4_n[1]
195             pref_n[11] = ip4_n[2]
196             pref_n[12] = ip4_n[3]
197         elif plen == 96:
198             pref_n[12] = ip4_n[0]
199             pref_n[13] = ip4_n[1]
200             pref_n[14] = ip4_n[2]
201             pref_n[15] = ip4_n[3]
202         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
203
204     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
205         """
206         Create IPv6 packet stream for inside network
207
208         :param in_if: Inside interface
209         :param out_if: Outside interface
210         :param ttl: Hop Limit of generated packets
211         :param pref: NAT64 prefix
212         :param plen: NAT64 prefix length
213         """
214         pkts = []
215         if pref is None:
216             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
217         else:
218             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
219
220         # TCP
221         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
222              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
223              TCP(sport=self.tcp_port_in, dport=20))
224         pkts.append(p)
225
226         # UDP
227         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
228              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
229              UDP(sport=self.udp_port_in, dport=20))
230         pkts.append(p)
231
232         # ICMP
233         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
234              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
235              ICMPv6EchoRequest(id=self.icmp_id_in))
236         pkts.append(p)
237
238         return pkts
239
240     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
241         """
242         Create packet stream for outside network
243
244         :param out_if: Outside interface
245         :param dst_ip: Destination IP address (Default use global NAT address)
246         :param ttl: TTL of generated packets
247         """
248         if dst_ip is None:
249             dst_ip = self.nat_addr
250         pkts = []
251         # TCP
252         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
253              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
254              TCP(dport=self.tcp_port_out, sport=20))
255         pkts.append(p)
256
257         # UDP
258         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
259              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
260              UDP(dport=self.udp_port_out, sport=20))
261         pkts.append(p)
262
263         # ICMP
264         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
265              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
266              ICMP(id=self.icmp_id_out, type='echo-reply'))
267         pkts.append(p)
268
269         return pkts
270
271     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
272                            packet_num=3, dst_ip=None):
273         """
274         Verify captured packets on outside network
275
276         :param capture: Captured packets
277         :param nat_ip: Translated IP address (Default use global NAT address)
278         :param same_port: Sorce port number is not translated (Default False)
279         :param packet_num: Expected number of packets (Default 3)
280         :param dst_ip: Destination IP address (Default do not verify)
281         """
282         if nat_ip is None:
283             nat_ip = self.nat_addr
284         self.assertEqual(packet_num, len(capture))
285         for packet in capture:
286             try:
287                 self.check_ip_checksum(packet)
288                 self.assertEqual(packet[IP].src, nat_ip)
289                 if dst_ip is not None:
290                     self.assertEqual(packet[IP].dst, dst_ip)
291                 if packet.haslayer(TCP):
292                     if same_port:
293                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
294                     else:
295                         self.assertNotEqual(
296                             packet[TCP].sport, self.tcp_port_in)
297                     self.tcp_port_out = packet[TCP].sport
298                     self.check_tcp_checksum(packet)
299                 elif packet.haslayer(UDP):
300                     if same_port:
301                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
302                     else:
303                         self.assertNotEqual(
304                             packet[UDP].sport, self.udp_port_in)
305                     self.udp_port_out = packet[UDP].sport
306                 else:
307                     if same_port:
308                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
309                     else:
310                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
311                     self.icmp_id_out = packet[ICMP].id
312                     self.check_icmp_checksum(packet)
313             except:
314                 self.logger.error(ppp("Unexpected or invalid packet "
315                                       "(outside network):", packet))
316                 raise
317
318     def verify_capture_in(self, capture, in_if, packet_num=3):
319         """
320         Verify captured packets on inside network
321
322         :param capture: Captured packets
323         :param in_if: Inside interface
324         :param packet_num: Expected number of packets (Default 3)
325         """
326         self.assertEqual(packet_num, len(capture))
327         for packet in capture:
328             try:
329                 self.check_ip_checksum(packet)
330                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
331                 if packet.haslayer(TCP):
332                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
333                     self.check_tcp_checksum(packet)
334                 elif packet.haslayer(UDP):
335                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
336                 else:
337                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
338                     self.check_icmp_checksum(packet)
339             except:
340                 self.logger.error(ppp("Unexpected or invalid packet "
341                                       "(inside network):", packet))
342                 raise
343
344     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
345         """
346         Verify captured IPv6 packets on inside network
347
348         :param capture: Captured packets
349         :param src_ip: Source IP
350         :param dst_ip: Destination IP address
351         :param packet_num: Expected number of packets (Default 3)
352         """
353         self.assertEqual(packet_num, len(capture))
354         for packet in capture:
355             try:
356                 self.assertEqual(packet[IPv6].src, src_ip)
357                 self.assertEqual(packet[IPv6].dst, dst_ip)
358                 if packet.haslayer(TCP):
359                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
360                     self.check_tcp_checksum(packet)
361                 elif packet.haslayer(UDP):
362                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
363                     self.check_udp_checksum(packet)
364                 else:
365                     self.assertEqual(packet[ICMPv6EchoReply].id,
366                                      self.icmp_id_in)
367                     self.check_icmpv6_checksum(packet)
368             except:
369                 self.logger.error(ppp("Unexpected or invalid packet "
370                                       "(inside network):", packet))
371                 raise
372
373     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
374         """
375         Verify captured packet that don't have to be translated
376
377         :param capture: Captured packets
378         :param ingress_if: Ingress interface
379         :param egress_if: Egress interface
380         """
381         for packet in capture:
382             try:
383                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
384                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
385                 if packet.haslayer(TCP):
386                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
387                 elif packet.haslayer(UDP):
388                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
389                 else:
390                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
391             except:
392                 self.logger.error(ppp("Unexpected or invalid packet "
393                                       "(inside network):", packet))
394                 raise
395
396     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
397                                             packet_num=3, icmp_type=11):
398         """
399         Verify captured packets with ICMP errors on outside network
400
401         :param capture: Captured packets
402         :param src_ip: Translated IP address or IP address of VPP
403                        (Default use global NAT address)
404         :param packet_num: Expected number of packets (Default 3)
405         :param icmp_type: Type of error ICMP packet
406                           we are expecting (Default 11)
407         """
408         if src_ip is None:
409             src_ip = self.nat_addr
410         self.assertEqual(packet_num, len(capture))
411         for packet in capture:
412             try:
413                 self.assertEqual(packet[IP].src, src_ip)
414                 self.assertTrue(packet.haslayer(ICMP))
415                 icmp = packet[ICMP]
416                 self.assertEqual(icmp.type, icmp_type)
417                 self.assertTrue(icmp.haslayer(IPerror))
418                 inner_ip = icmp[IPerror]
419                 if inner_ip.haslayer(TCPerror):
420                     self.assertEqual(inner_ip[TCPerror].dport,
421                                      self.tcp_port_out)
422                 elif inner_ip.haslayer(UDPerror):
423                     self.assertEqual(inner_ip[UDPerror].dport,
424                                      self.udp_port_out)
425                 else:
426                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
427             except:
428                 self.logger.error(ppp("Unexpected or invalid packet "
429                                       "(outside network):", packet))
430                 raise
431
432     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
433                                            icmp_type=11):
434         """
435         Verify captured packets with ICMP errors on inside network
436
437         :param capture: Captured packets
438         :param in_if: Inside interface
439         :param packet_num: Expected number of packets (Default 3)
440         :param icmp_type: Type of error ICMP packet
441                           we are expecting (Default 11)
442         """
443         self.assertEqual(packet_num, len(capture))
444         for packet in capture:
445             try:
446                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447                 self.assertTrue(packet.haslayer(ICMP))
448                 icmp = packet[ICMP]
449                 self.assertEqual(icmp.type, icmp_type)
450                 self.assertTrue(icmp.haslayer(IPerror))
451                 inner_ip = icmp[IPerror]
452                 if inner_ip.haslayer(TCPerror):
453                     self.assertEqual(inner_ip[TCPerror].sport,
454                                      self.tcp_port_in)
455                 elif inner_ip.haslayer(UDPerror):
456                     self.assertEqual(inner_ip[UDPerror].sport,
457                                      self.udp_port_in)
458                 else:
459                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
460             except:
461                 self.logger.error(ppp("Unexpected or invalid packet "
462                                       "(inside network):", packet))
463                 raise
464
465     def verify_ipfix_nat44_ses(self, data):
466         """
467         Verify IPFIX NAT44 session create/delete event
468
469         :param data: Decoded IPFIX data records
470         """
471         nat44_ses_create_num = 0
472         nat44_ses_delete_num = 0
473         self.assertEqual(6, len(data))
474         for record in data:
475             # natEvent
476             self.assertIn(ord(record[230]), [4, 5])
477             if ord(record[230]) == 4:
478                 nat44_ses_create_num += 1
479             else:
480                 nat44_ses_delete_num += 1
481             # sourceIPv4Address
482             self.assertEqual(self.pg0.remote_ip4n, record[8])
483             # postNATSourceIPv4Address
484             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
485                              record[225])
486             # ingressVRFID
487             self.assertEqual(struct.pack("!I", 0), record[234])
488             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
489             if IP_PROTOS.icmp == ord(record[4]):
490                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
491                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
492                                  record[227])
493             elif IP_PROTOS.tcp == ord(record[4]):
494                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
495                                  record[7])
496                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
497                                  record[227])
498             elif IP_PROTOS.udp == ord(record[4]):
499                 self.assertEqual(struct.pack("!H", self.udp_port_in),
500                                  record[7])
501                 self.assertEqual(struct.pack("!H", self.udp_port_out),
502                                  record[227])
503             else:
504                 self.fail("Invalid protocol")
505         self.assertEqual(3, nat44_ses_create_num)
506         self.assertEqual(3, nat44_ses_delete_num)
507
508     def verify_ipfix_addr_exhausted(self, data):
509         """
510         Verify IPFIX NAT addresses event
511
512         :param data: Decoded IPFIX data records
513         """
514         self.assertEqual(1, len(data))
515         record = data[0]
516         # natEvent
517         self.assertEqual(ord(record[230]), 3)
518         # natPoolID
519         self.assertEqual(struct.pack("!I", 0), record[283])
520
521
522 class TestNAT44(MethodHolder):
523     """ NAT44 Test Cases """
524
525     @classmethod
526     def setUpClass(cls):
527         super(TestNAT44, cls).setUpClass()
528
529         try:
530             cls.tcp_port_in = 6303
531             cls.tcp_port_out = 6303
532             cls.udp_port_in = 6304
533             cls.udp_port_out = 6304
534             cls.icmp_id_in = 6305
535             cls.icmp_id_out = 6305
536             cls.nat_addr = '10.0.0.3'
537             cls.ipfix_src_port = 4739
538             cls.ipfix_domain_id = 1
539
540             cls.create_pg_interfaces(range(10))
541             cls.interfaces = list(cls.pg_interfaces[0:4])
542
543             for i in cls.interfaces:
544                 i.admin_up()
545                 i.config_ip4()
546                 i.resolve_arp()
547
548             cls.pg0.generate_remote_hosts(3)
549             cls.pg0.configure_ipv4_neighbors()
550
551             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
552             cls.vapi.ip_table_add_del(10, is_add=1)
553             cls.vapi.ip_table_add_del(20, is_add=1)
554
555             cls.pg4._local_ip4 = "172.16.255.1"
556             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
557             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
558             cls.pg4.set_table_ip4(10)
559             cls.pg5._local_ip4 = "172.17.255.3"
560             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
561             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
562             cls.pg5.set_table_ip4(10)
563             cls.pg6._local_ip4 = "172.16.255.1"
564             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
565             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
566             cls.pg6.set_table_ip4(20)
567             for i in cls.overlapping_interfaces:
568                 i.config_ip4()
569                 i.admin_up()
570                 i.resolve_arp()
571
572             cls.pg7.admin_up()
573             cls.pg8.admin_up()
574
575             cls.pg9.generate_remote_hosts(2)
576             cls.pg9.config_ip4()
577             ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
578             cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
579                                                   ip_addr_n,
580                                                   24)
581             cls.pg9.admin_up()
582             cls.pg9.resolve_arp()
583             cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
584             cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
585             cls.pg9.resolve_arp()
586
587         except Exception:
588             super(TestNAT44, cls).tearDownClass()
589             raise
590
591     def clear_nat44(self):
592         """
593         Clear NAT44 configuration.
594         """
595         # I found no elegant way to do this
596         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
597                                    dst_address_length=32,
598                                    next_hop_address=self.pg7.remote_ip4n,
599                                    next_hop_sw_if_index=self.pg7.sw_if_index,
600                                    is_add=0)
601         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
602                                    dst_address_length=32,
603                                    next_hop_address=self.pg8.remote_ip4n,
604                                    next_hop_sw_if_index=self.pg8.sw_if_index,
605                                    is_add=0)
606
607         for intf in [self.pg7, self.pg8]:
608             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
609             for n in neighbors:
610                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
611                                               n.mac_address,
612                                               n.ip_address,
613                                               is_add=0)
614
615         if self.pg7.has_ip4_config:
616             self.pg7.unconfig_ip4()
617
618         interfaces = self.vapi.nat44_interface_addr_dump()
619         for intf in interfaces:
620             self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
621
622         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
623                             domain_id=self.ipfix_domain_id)
624         self.ipfix_src_port = 4739
625         self.ipfix_domain_id = 1
626
627         interfaces = self.vapi.nat44_interface_dump()
628         for intf in interfaces:
629             if intf.is_inside > 1:
630                 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
631                                                           0,
632                                                           is_add=0)
633             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
634                                                       intf.is_inside,
635                                                       is_add=0)
636
637         interfaces = self.vapi.nat44_interface_output_feature_dump()
638         for intf in interfaces:
639             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
640                                                              intf.is_inside,
641                                                              is_add=0)
642
643         static_mappings = self.vapi.nat44_static_mapping_dump()
644         for sm in static_mappings:
645             self.vapi.nat44_add_del_static_mapping(
646                 sm.local_ip_address,
647                 sm.external_ip_address,
648                 local_port=sm.local_port,
649                 external_port=sm.external_port,
650                 addr_only=sm.addr_only,
651                 vrf_id=sm.vrf_id,
652                 protocol=sm.protocol,
653                 is_add=0)
654
655         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
656         for lb_sm in lb_static_mappings:
657             self.vapi.nat44_add_del_lb_static_mapping(
658                 lb_sm.external_addr,
659                 lb_sm.external_port,
660                 lb_sm.protocol,
661                 lb_sm.vrf_id,
662                 is_add=0)
663
664         adresses = self.vapi.nat44_address_dump()
665         for addr in adresses:
666             self.vapi.nat44_add_del_address_range(addr.ip_address,
667                                                   addr.ip_address,
668                                                   is_add=0)
669
670     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
671                                  local_port=0, external_port=0, vrf_id=0,
672                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
673                                  proto=0):
674         """
675         Add/delete NAT44 static mapping
676
677         :param local_ip: Local IP address
678         :param external_ip: External IP address
679         :param local_port: Local port number (Optional)
680         :param external_port: External port number (Optional)
681         :param vrf_id: VRF ID (Default 0)
682         :param is_add: 1 if add, 0 if delete (Default add)
683         :param external_sw_if_index: External interface instead of IP address
684         :param proto: IP protocol (Mandatory if port specified)
685         """
686         addr_only = 1
687         if local_port and external_port:
688             addr_only = 0
689         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
690         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
691         self.vapi.nat44_add_del_static_mapping(
692             l_ip,
693             e_ip,
694             external_sw_if_index,
695             local_port,
696             external_port,
697             addr_only,
698             vrf_id,
699             proto,
700             is_add)
701
702     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
703         """
704         Add/delete NAT44 address
705
706         :param ip: IP address
707         :param is_add: 1 if add, 0 if delete (Default add)
708         """
709         nat_addr = socket.inet_pton(socket.AF_INET, ip)
710         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
711                                               vrf_id=vrf_id)
712
713     def test_dynamic(self):
714         """ NAT44 dynamic translation test """
715
716         self.nat44_add_address(self.nat_addr)
717         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
718         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
719                                                   is_inside=0)
720
721         # in2out
722         pkts = self.create_stream_in(self.pg0, self.pg1)
723         self.pg0.add_stream(pkts)
724         self.pg_enable_capture(self.pg_interfaces)
725         self.pg_start()
726         capture = self.pg1.get_capture(len(pkts))
727         self.verify_capture_out(capture)
728
729         # out2in
730         pkts = self.create_stream_out(self.pg1)
731         self.pg1.add_stream(pkts)
732         self.pg_enable_capture(self.pg_interfaces)
733         self.pg_start()
734         capture = self.pg0.get_capture(len(pkts))
735         self.verify_capture_in(capture, self.pg0)
736
737     def test_dynamic_icmp_errors_in2out_ttl_1(self):
738         """ NAT44 handling of client packets with TTL=1 """
739
740         self.nat44_add_address(self.nat_addr)
741         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
742         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
743                                                   is_inside=0)
744
745         # Client side - generate traffic
746         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
747         self.pg0.add_stream(pkts)
748         self.pg_enable_capture(self.pg_interfaces)
749         self.pg_start()
750
751         # Client side - verify ICMP type 11 packets
752         capture = self.pg0.get_capture(len(pkts))
753         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
754
755     def test_dynamic_icmp_errors_out2in_ttl_1(self):
756         """ NAT44 handling of server packets with TTL=1 """
757
758         self.nat44_add_address(self.nat_addr)
759         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
760         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
761                                                   is_inside=0)
762
763         # Client side - create sessions
764         pkts = self.create_stream_in(self.pg0, self.pg1)
765         self.pg0.add_stream(pkts)
766         self.pg_enable_capture(self.pg_interfaces)
767         self.pg_start()
768
769         # Server side - generate traffic
770         capture = self.pg1.get_capture(len(pkts))
771         self.verify_capture_out(capture)
772         pkts = self.create_stream_out(self.pg1, ttl=1)
773         self.pg1.add_stream(pkts)
774         self.pg_enable_capture(self.pg_interfaces)
775         self.pg_start()
776
777         # Server side - verify ICMP type 11 packets
778         capture = self.pg1.get_capture(len(pkts))
779         self.verify_capture_out_with_icmp_errors(capture,
780                                                  src_ip=self.pg1.local_ip4)
781
782     def test_dynamic_icmp_errors_in2out_ttl_2(self):
783         """ NAT44 handling of error responses to client packets with TTL=2 """
784
785         self.nat44_add_address(self.nat_addr)
786         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
787         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
788                                                   is_inside=0)
789
790         # Client side - generate traffic
791         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
792         self.pg0.add_stream(pkts)
793         self.pg_enable_capture(self.pg_interfaces)
794         self.pg_start()
795
796         # Server side - simulate ICMP type 11 response
797         capture = self.pg1.get_capture(len(pkts))
798         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
799                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
800                 ICMP(type=11) / packet[IP] for packet in capture]
801         self.pg1.add_stream(pkts)
802         self.pg_enable_capture(self.pg_interfaces)
803         self.pg_start()
804
805         # Client side - verify ICMP type 11 packets
806         capture = self.pg0.get_capture(len(pkts))
807         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
808
809     def test_dynamic_icmp_errors_out2in_ttl_2(self):
810         """ NAT44 handling of error responses to server packets with TTL=2 """
811
812         self.nat44_add_address(self.nat_addr)
813         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
814         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
815                                                   is_inside=0)
816
817         # Client side - create sessions
818         pkts = self.create_stream_in(self.pg0, self.pg1)
819         self.pg0.add_stream(pkts)
820         self.pg_enable_capture(self.pg_interfaces)
821         self.pg_start()
822
823         # Server side - generate traffic
824         capture = self.pg1.get_capture(len(pkts))
825         self.verify_capture_out(capture)
826         pkts = self.create_stream_out(self.pg1, ttl=2)
827         self.pg1.add_stream(pkts)
828         self.pg_enable_capture(self.pg_interfaces)
829         self.pg_start()
830
831         # Client side - simulate ICMP type 11 response
832         capture = self.pg0.get_capture(len(pkts))
833         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
834                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
835                 ICMP(type=11) / packet[IP] for packet in capture]
836         self.pg0.add_stream(pkts)
837         self.pg_enable_capture(self.pg_interfaces)
838         self.pg_start()
839
840         # Server side - verify ICMP type 11 packets
841         capture = self.pg1.get_capture(len(pkts))
842         self.verify_capture_out_with_icmp_errors(capture)
843
844     def test_ping_out_interface_from_outside(self):
845         """ Ping NAT44 out interface from outside network """
846
847         self.nat44_add_address(self.nat_addr)
848         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
849         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
850                                                   is_inside=0)
851
852         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
853              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
854              ICMP(id=self.icmp_id_out, type='echo-request'))
855         pkts = [p]
856         self.pg1.add_stream(pkts)
857         self.pg_enable_capture(self.pg_interfaces)
858         self.pg_start()
859         capture = self.pg1.get_capture(len(pkts))
860         self.assertEqual(1, len(capture))
861         packet = capture[0]
862         try:
863             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
864             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
865             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
866             self.assertEqual(packet[ICMP].type, 0)  # echo reply
867         except:
868             self.logger.error(ppp("Unexpected or invalid packet "
869                                   "(outside network):", packet))
870             raise
871
872     def test_ping_internal_host_from_outside(self):
873         """ Ping internal host from outside network """
874
875         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
876         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
877         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
878                                                   is_inside=0)
879
880         # out2in
881         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
882                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
883                ICMP(id=self.icmp_id_out, type='echo-request'))
884         self.pg1.add_stream(pkt)
885         self.pg_enable_capture(self.pg_interfaces)
886         self.pg_start()
887         capture = self.pg0.get_capture(1)
888         self.verify_capture_in(capture, self.pg0, packet_num=1)
889         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
890
891         # in2out
892         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
893                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
894                ICMP(id=self.icmp_id_in, type='echo-reply'))
895         self.pg0.add_stream(pkt)
896         self.pg_enable_capture(self.pg_interfaces)
897         self.pg_start()
898         capture = self.pg1.get_capture(1)
899         self.verify_capture_out(capture, same_port=True, packet_num=1)
900         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
901
902     def test_static_in(self):
903         """ 1:1 NAT initialized from inside network """
904
905         nat_ip = "10.0.0.10"
906         self.tcp_port_out = 6303
907         self.udp_port_out = 6304
908         self.icmp_id_out = 6305
909
910         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
911         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
912         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
913                                                   is_inside=0)
914
915         # in2out
916         pkts = self.create_stream_in(self.pg0, self.pg1)
917         self.pg0.add_stream(pkts)
918         self.pg_enable_capture(self.pg_interfaces)
919         self.pg_start()
920         capture = self.pg1.get_capture(len(pkts))
921         self.verify_capture_out(capture, nat_ip, True)
922
923         # out2in
924         pkts = self.create_stream_out(self.pg1, nat_ip)
925         self.pg1.add_stream(pkts)
926         self.pg_enable_capture(self.pg_interfaces)
927         self.pg_start()
928         capture = self.pg0.get_capture(len(pkts))
929         self.verify_capture_in(capture, self.pg0)
930
931     def test_static_out(self):
932         """ 1:1 NAT initialized from outside network """
933
934         nat_ip = "10.0.0.20"
935         self.tcp_port_out = 6303
936         self.udp_port_out = 6304
937         self.icmp_id_out = 6305
938
939         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
940         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
941         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
942                                                   is_inside=0)
943
944         # out2in
945         pkts = self.create_stream_out(self.pg1, nat_ip)
946         self.pg1.add_stream(pkts)
947         self.pg_enable_capture(self.pg_interfaces)
948         self.pg_start()
949         capture = self.pg0.get_capture(len(pkts))
950         self.verify_capture_in(capture, self.pg0)
951
952         # in2out
953         pkts = self.create_stream_in(self.pg0, self.pg1)
954         self.pg0.add_stream(pkts)
955         self.pg_enable_capture(self.pg_interfaces)
956         self.pg_start()
957         capture = self.pg1.get_capture(len(pkts))
958         self.verify_capture_out(capture, nat_ip, True)
959
960     def test_static_with_port_in(self):
961         """ 1:1 NAPT initialized from inside network """
962
963         self.tcp_port_out = 3606
964         self.udp_port_out = 3607
965         self.icmp_id_out = 3608
966
967         self.nat44_add_address(self.nat_addr)
968         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
969                                       self.tcp_port_in, self.tcp_port_out,
970                                       proto=IP_PROTOS.tcp)
971         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
972                                       self.udp_port_in, self.udp_port_out,
973                                       proto=IP_PROTOS.udp)
974         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
975                                       self.icmp_id_in, self.icmp_id_out,
976                                       proto=IP_PROTOS.icmp)
977         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
978         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
979                                                   is_inside=0)
980
981         # in2out
982         pkts = self.create_stream_in(self.pg0, self.pg1)
983         self.pg0.add_stream(pkts)
984         self.pg_enable_capture(self.pg_interfaces)
985         self.pg_start()
986         capture = self.pg1.get_capture(len(pkts))
987         self.verify_capture_out(capture)
988
989         # out2in
990         pkts = self.create_stream_out(self.pg1)
991         self.pg1.add_stream(pkts)
992         self.pg_enable_capture(self.pg_interfaces)
993         self.pg_start()
994         capture = self.pg0.get_capture(len(pkts))
995         self.verify_capture_in(capture, self.pg0)
996
997     def test_static_with_port_out(self):
998         """ 1:1 NAPT initialized from outside network """
999
1000         self.tcp_port_out = 30606
1001         self.udp_port_out = 30607
1002         self.icmp_id_out = 30608
1003
1004         self.nat44_add_address(self.nat_addr)
1005         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1006                                       self.tcp_port_in, self.tcp_port_out,
1007                                       proto=IP_PROTOS.tcp)
1008         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1009                                       self.udp_port_in, self.udp_port_out,
1010                                       proto=IP_PROTOS.udp)
1011         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1012                                       self.icmp_id_in, self.icmp_id_out,
1013                                       proto=IP_PROTOS.icmp)
1014         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1015         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1016                                                   is_inside=0)
1017
1018         # out2in
1019         pkts = self.create_stream_out(self.pg1)
1020         self.pg1.add_stream(pkts)
1021         self.pg_enable_capture(self.pg_interfaces)
1022         self.pg_start()
1023         capture = self.pg0.get_capture(len(pkts))
1024         self.verify_capture_in(capture, self.pg0)
1025
1026         # in2out
1027         pkts = self.create_stream_in(self.pg0, self.pg1)
1028         self.pg0.add_stream(pkts)
1029         self.pg_enable_capture(self.pg_interfaces)
1030         self.pg_start()
1031         capture = self.pg1.get_capture(len(pkts))
1032         self.verify_capture_out(capture)
1033
1034     def test_static_vrf_aware(self):
1035         """ 1:1 NAT VRF awareness """
1036
1037         nat_ip1 = "10.0.0.30"
1038         nat_ip2 = "10.0.0.40"
1039         self.tcp_port_out = 6303
1040         self.udp_port_out = 6304
1041         self.icmp_id_out = 6305
1042
1043         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1044                                       vrf_id=10)
1045         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1046                                       vrf_id=10)
1047         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1048                                                   is_inside=0)
1049         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1050         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1051
1052         # inside interface VRF match NAT44 static mapping VRF
1053         pkts = self.create_stream_in(self.pg4, self.pg3)
1054         self.pg4.add_stream(pkts)
1055         self.pg_enable_capture(self.pg_interfaces)
1056         self.pg_start()
1057         capture = self.pg3.get_capture(len(pkts))
1058         self.verify_capture_out(capture, nat_ip1, True)
1059
1060         # inside interface VRF don't match NAT44 static mapping VRF (packets
1061         # are dropped)
1062         pkts = self.create_stream_in(self.pg0, self.pg3)
1063         self.pg0.add_stream(pkts)
1064         self.pg_enable_capture(self.pg_interfaces)
1065         self.pg_start()
1066         self.pg3.assert_nothing_captured()
1067
1068     def test_static_lb(self):
1069         """ NAT44 local service load balancing """
1070         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1071         external_port = 80
1072         local_port = 8080
1073         server1 = self.pg0.remote_hosts[0]
1074         server2 = self.pg0.remote_hosts[1]
1075
1076         locals = [{'addr': server1.ip4n,
1077                    'port': local_port,
1078                    'probability': 70},
1079                   {'addr': server2.ip4n,
1080                    'port': local_port,
1081                    'probability': 30}]
1082
1083         self.nat44_add_address(self.nat_addr)
1084         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1085                                                   external_port,
1086                                                   IP_PROTOS.tcp,
1087                                                   local_num=len(locals),
1088                                                   locals=locals)
1089         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1090         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1091                                                   is_inside=0)
1092
1093         # from client to service
1094         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1095              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1096              TCP(sport=12345, dport=external_port))
1097         self.pg1.add_stream(p)
1098         self.pg_enable_capture(self.pg_interfaces)
1099         self.pg_start()
1100         capture = self.pg0.get_capture(1)
1101         p = capture[0]
1102         server = None
1103         try:
1104             ip = p[IP]
1105             tcp = p[TCP]
1106             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1107             if ip.dst == server1.ip4:
1108                 server = server1
1109             else:
1110                 server = server2
1111             self.assertEqual(tcp.dport, local_port)
1112             self.check_tcp_checksum(p)
1113             self.check_ip_checksum(p)
1114         except:
1115             self.logger.error(ppp("Unexpected or invalid packet:", p))
1116             raise
1117
1118         # from service back to client
1119         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1120              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1121              TCP(sport=local_port, dport=12345))
1122         self.pg0.add_stream(p)
1123         self.pg_enable_capture(self.pg_interfaces)
1124         self.pg_start()
1125         capture = self.pg1.get_capture(1)
1126         p = capture[0]
1127         try:
1128             ip = p[IP]
1129             tcp = p[TCP]
1130             self.assertEqual(ip.src, self.nat_addr)
1131             self.assertEqual(tcp.sport, external_port)
1132             self.check_tcp_checksum(p)
1133             self.check_ip_checksum(p)
1134         except:
1135             self.logger.error(ppp("Unexpected or invalid packet:", p))
1136             raise
1137
1138         # multiple clients
1139         server1_n = 0
1140         server2_n = 0
1141         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1142         pkts = []
1143         for client in clients:
1144             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1145                  IP(src=client, dst=self.nat_addr) /
1146                  TCP(sport=12345, dport=external_port))
1147             pkts.append(p)
1148         self.pg1.add_stream(pkts)
1149         self.pg_enable_capture(self.pg_interfaces)
1150         self.pg_start()
1151         capture = self.pg0.get_capture(len(pkts))
1152         for p in capture:
1153             if p[IP].dst == server1.ip4:
1154                 server1_n += 1
1155             else:
1156                 server2_n += 1
1157         self.assertTrue(server1_n > server2_n)
1158
1159     def test_multiple_inside_interfaces(self):
1160         """ NAT44 multiple non-overlapping address space inside interfaces """
1161
1162         self.nat44_add_address(self.nat_addr)
1163         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1164         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1165         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1166                                                   is_inside=0)
1167
1168         # between two NAT44 inside interfaces (no translation)
1169         pkts = self.create_stream_in(self.pg0, self.pg1)
1170         self.pg0.add_stream(pkts)
1171         self.pg_enable_capture(self.pg_interfaces)
1172         self.pg_start()
1173         capture = self.pg1.get_capture(len(pkts))
1174         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1175
1176         # from NAT44 inside to interface without NAT44 feature (no translation)
1177         pkts = self.create_stream_in(self.pg0, self.pg2)
1178         self.pg0.add_stream(pkts)
1179         self.pg_enable_capture(self.pg_interfaces)
1180         self.pg_start()
1181         capture = self.pg2.get_capture(len(pkts))
1182         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1183
1184         # in2out 1st interface
1185         pkts = self.create_stream_in(self.pg0, self.pg3)
1186         self.pg0.add_stream(pkts)
1187         self.pg_enable_capture(self.pg_interfaces)
1188         self.pg_start()
1189         capture = self.pg3.get_capture(len(pkts))
1190         self.verify_capture_out(capture)
1191
1192         # out2in 1st interface
1193         pkts = self.create_stream_out(self.pg3)
1194         self.pg3.add_stream(pkts)
1195         self.pg_enable_capture(self.pg_interfaces)
1196         self.pg_start()
1197         capture = self.pg0.get_capture(len(pkts))
1198         self.verify_capture_in(capture, self.pg0)
1199
1200         # in2out 2nd interface
1201         pkts = self.create_stream_in(self.pg1, self.pg3)
1202         self.pg1.add_stream(pkts)
1203         self.pg_enable_capture(self.pg_interfaces)
1204         self.pg_start()
1205         capture = self.pg3.get_capture(len(pkts))
1206         self.verify_capture_out(capture)
1207
1208         # out2in 2nd interface
1209         pkts = self.create_stream_out(self.pg3)
1210         self.pg3.add_stream(pkts)
1211         self.pg_enable_capture(self.pg_interfaces)
1212         self.pg_start()
1213         capture = self.pg1.get_capture(len(pkts))
1214         self.verify_capture_in(capture, self.pg1)
1215
1216     def test_inside_overlapping_interfaces(self):
1217         """ NAT44 multiple inside interfaces with overlapping address space """
1218
1219         static_nat_ip = "10.0.0.10"
1220         self.nat44_add_address(self.nat_addr)
1221         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1222                                                   is_inside=0)
1223         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1224         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1225         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1226         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1227                                       vrf_id=20)
1228
1229         # between NAT44 inside interfaces with same VRF (no translation)
1230         pkts = self.create_stream_in(self.pg4, self.pg5)
1231         self.pg4.add_stream(pkts)
1232         self.pg_enable_capture(self.pg_interfaces)
1233         self.pg_start()
1234         capture = self.pg5.get_capture(len(pkts))
1235         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1236
1237         # between NAT44 inside interfaces with different VRF (hairpinning)
1238         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1239              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1240              TCP(sport=1234, dport=5678))
1241         self.pg4.add_stream(p)
1242         self.pg_enable_capture(self.pg_interfaces)
1243         self.pg_start()
1244         capture = self.pg6.get_capture(1)
1245         p = capture[0]
1246         try:
1247             ip = p[IP]
1248             tcp = p[TCP]
1249             self.assertEqual(ip.src, self.nat_addr)
1250             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1251             self.assertNotEqual(tcp.sport, 1234)
1252             self.assertEqual(tcp.dport, 5678)
1253         except:
1254             self.logger.error(ppp("Unexpected or invalid packet:", p))
1255             raise
1256
1257         # in2out 1st interface
1258         pkts = self.create_stream_in(self.pg4, self.pg3)
1259         self.pg4.add_stream(pkts)
1260         self.pg_enable_capture(self.pg_interfaces)
1261         self.pg_start()
1262         capture = self.pg3.get_capture(len(pkts))
1263         self.verify_capture_out(capture)
1264
1265         # out2in 1st interface
1266         pkts = self.create_stream_out(self.pg3)
1267         self.pg3.add_stream(pkts)
1268         self.pg_enable_capture(self.pg_interfaces)
1269         self.pg_start()
1270         capture = self.pg4.get_capture(len(pkts))
1271         self.verify_capture_in(capture, self.pg4)
1272
1273         # in2out 2nd interface
1274         pkts = self.create_stream_in(self.pg5, self.pg3)
1275         self.pg5.add_stream(pkts)
1276         self.pg_enable_capture(self.pg_interfaces)
1277         self.pg_start()
1278         capture = self.pg3.get_capture(len(pkts))
1279         self.verify_capture_out(capture)
1280
1281         # out2in 2nd interface
1282         pkts = self.create_stream_out(self.pg3)
1283         self.pg3.add_stream(pkts)
1284         self.pg_enable_capture(self.pg_interfaces)
1285         self.pg_start()
1286         capture = self.pg5.get_capture(len(pkts))
1287         self.verify_capture_in(capture, self.pg5)
1288
1289         # pg5 session dump
1290         addresses = self.vapi.nat44_address_dump()
1291         self.assertEqual(len(addresses), 1)
1292         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1293         self.assertEqual(len(sessions), 3)
1294         for session in sessions:
1295             self.assertFalse(session.is_static)
1296             self.assertEqual(session.inside_ip_address[0:4],
1297                              self.pg5.remote_ip4n)
1298             self.assertEqual(session.outside_ip_address,
1299                              addresses[0].ip_address)
1300         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1301         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1302         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1303         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1304         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1305         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1306         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1307         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1308         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1309
1310         # in2out 3rd interface
1311         pkts = self.create_stream_in(self.pg6, self.pg3)
1312         self.pg6.add_stream(pkts)
1313         self.pg_enable_capture(self.pg_interfaces)
1314         self.pg_start()
1315         capture = self.pg3.get_capture(len(pkts))
1316         self.verify_capture_out(capture, static_nat_ip, True)
1317
1318         # out2in 3rd interface
1319         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1320         self.pg3.add_stream(pkts)
1321         self.pg_enable_capture(self.pg_interfaces)
1322         self.pg_start()
1323         capture = self.pg6.get_capture(len(pkts))
1324         self.verify_capture_in(capture, self.pg6)
1325
1326         # general user and session dump verifications
1327         users = self.vapi.nat44_user_dump()
1328         self.assertTrue(len(users) >= 3)
1329         addresses = self.vapi.nat44_address_dump()
1330         self.assertEqual(len(addresses), 1)
1331         for user in users:
1332             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1333                                                          user.vrf_id)
1334             for session in sessions:
1335                 self.assertEqual(user.ip_address, session.inside_ip_address)
1336                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1337                 self.assertTrue(session.protocol in
1338                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1339                                  IP_PROTOS.icmp])
1340
1341         # pg4 session dump
1342         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1343         self.assertTrue(len(sessions) >= 4)
1344         for session in sessions:
1345             self.assertFalse(session.is_static)
1346             self.assertEqual(session.inside_ip_address[0:4],
1347                              self.pg4.remote_ip4n)
1348             self.assertEqual(session.outside_ip_address,
1349                              addresses[0].ip_address)
1350
1351         # pg6 session dump
1352         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1353         self.assertTrue(len(sessions) >= 3)
1354         for session in sessions:
1355             self.assertTrue(session.is_static)
1356             self.assertEqual(session.inside_ip_address[0:4],
1357                              self.pg6.remote_ip4n)
1358             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1359                              map(int, static_nat_ip.split('.')))
1360             self.assertTrue(session.inside_port in
1361                             [self.tcp_port_in, self.udp_port_in,
1362                              self.icmp_id_in])
1363
1364     def test_hairpinning(self):
1365         """ NAT44 hairpinning - 1:1 NAPT """
1366
1367         host = self.pg0.remote_hosts[0]
1368         server = self.pg0.remote_hosts[1]
1369         host_in_port = 1234
1370         host_out_port = 0
1371         server_in_port = 5678
1372         server_out_port = 8765
1373
1374         self.nat44_add_address(self.nat_addr)
1375         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1376         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1377                                                   is_inside=0)
1378         # add static mapping for server
1379         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1380                                       server_in_port, server_out_port,
1381                                       proto=IP_PROTOS.tcp)
1382
1383         # send packet from host to server
1384         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1385              IP(src=host.ip4, dst=self.nat_addr) /
1386              TCP(sport=host_in_port, dport=server_out_port))
1387         self.pg0.add_stream(p)
1388         self.pg_enable_capture(self.pg_interfaces)
1389         self.pg_start()
1390         capture = self.pg0.get_capture(1)
1391         p = capture[0]
1392         try:
1393             ip = p[IP]
1394             tcp = p[TCP]
1395             self.assertEqual(ip.src, self.nat_addr)
1396             self.assertEqual(ip.dst, server.ip4)
1397             self.assertNotEqual(tcp.sport, host_in_port)
1398             self.assertEqual(tcp.dport, server_in_port)
1399             self.check_tcp_checksum(p)
1400             host_out_port = tcp.sport
1401         except:
1402             self.logger.error(ppp("Unexpected or invalid packet:", p))
1403             raise
1404
1405         # send reply from server to host
1406         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1407              IP(src=server.ip4, dst=self.nat_addr) /
1408              TCP(sport=server_in_port, dport=host_out_port))
1409         self.pg0.add_stream(p)
1410         self.pg_enable_capture(self.pg_interfaces)
1411         self.pg_start()
1412         capture = self.pg0.get_capture(1)
1413         p = capture[0]
1414         try:
1415             ip = p[IP]
1416             tcp = p[TCP]
1417             self.assertEqual(ip.src, self.nat_addr)
1418             self.assertEqual(ip.dst, host.ip4)
1419             self.assertEqual(tcp.sport, server_out_port)
1420             self.assertEqual(tcp.dport, host_in_port)
1421             self.check_tcp_checksum(p)
1422         except:
1423             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1424             raise
1425
1426     def test_hairpinning2(self):
1427         """ NAT44 hairpinning - 1:1 NAT"""
1428
1429         server1_nat_ip = "10.0.0.10"
1430         server2_nat_ip = "10.0.0.11"
1431         host = self.pg0.remote_hosts[0]
1432         server1 = self.pg0.remote_hosts[1]
1433         server2 = self.pg0.remote_hosts[2]
1434         server_tcp_port = 22
1435         server_udp_port = 20
1436
1437         self.nat44_add_address(self.nat_addr)
1438         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1439         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1440                                                   is_inside=0)
1441
1442         # add static mapping for servers
1443         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1444         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1445
1446         # host to server1
1447         pkts = []
1448         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1449              IP(src=host.ip4, dst=server1_nat_ip) /
1450              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1451         pkts.append(p)
1452         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1453              IP(src=host.ip4, dst=server1_nat_ip) /
1454              UDP(sport=self.udp_port_in, dport=server_udp_port))
1455         pkts.append(p)
1456         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1457              IP(src=host.ip4, dst=server1_nat_ip) /
1458              ICMP(id=self.icmp_id_in, type='echo-request'))
1459         pkts.append(p)
1460         self.pg0.add_stream(pkts)
1461         self.pg_enable_capture(self.pg_interfaces)
1462         self.pg_start()
1463         capture = self.pg0.get_capture(len(pkts))
1464         for packet in capture:
1465             try:
1466                 self.assertEqual(packet[IP].src, self.nat_addr)
1467                 self.assertEqual(packet[IP].dst, server1.ip4)
1468                 if packet.haslayer(TCP):
1469                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1470                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1471                     self.tcp_port_out = packet[TCP].sport
1472                     self.check_tcp_checksum(packet)
1473                 elif packet.haslayer(UDP):
1474                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1475                     self.assertEqual(packet[UDP].dport, server_udp_port)
1476                     self.udp_port_out = packet[UDP].sport
1477                 else:
1478                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1479                     self.icmp_id_out = packet[ICMP].id
1480             except:
1481                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1482                 raise
1483
1484         # server1 to host
1485         pkts = []
1486         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1487              IP(src=server1.ip4, dst=self.nat_addr) /
1488              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1489         pkts.append(p)
1490         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1491              IP(src=server1.ip4, dst=self.nat_addr) /
1492              UDP(sport=server_udp_port, dport=self.udp_port_out))
1493         pkts.append(p)
1494         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1495              IP(src=server1.ip4, dst=self.nat_addr) /
1496              ICMP(id=self.icmp_id_out, type='echo-reply'))
1497         pkts.append(p)
1498         self.pg0.add_stream(pkts)
1499         self.pg_enable_capture(self.pg_interfaces)
1500         self.pg_start()
1501         capture = self.pg0.get_capture(len(pkts))
1502         for packet in capture:
1503             try:
1504                 self.assertEqual(packet[IP].src, server1_nat_ip)
1505                 self.assertEqual(packet[IP].dst, host.ip4)
1506                 if packet.haslayer(TCP):
1507                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1508                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1509                     self.check_tcp_checksum(packet)
1510                 elif packet.haslayer(UDP):
1511                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1512                     self.assertEqual(packet[UDP].sport, server_udp_port)
1513                 else:
1514                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1515             except:
1516                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1517                 raise
1518
1519         # server2 to server1
1520         pkts = []
1521         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1522              IP(src=server2.ip4, dst=server1_nat_ip) /
1523              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1524         pkts.append(p)
1525         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1526              IP(src=server2.ip4, dst=server1_nat_ip) /
1527              UDP(sport=self.udp_port_in, dport=server_udp_port))
1528         pkts.append(p)
1529         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1530              IP(src=server2.ip4, dst=server1_nat_ip) /
1531              ICMP(id=self.icmp_id_in, type='echo-request'))
1532         pkts.append(p)
1533         self.pg0.add_stream(pkts)
1534         self.pg_enable_capture(self.pg_interfaces)
1535         self.pg_start()
1536         capture = self.pg0.get_capture(len(pkts))
1537         for packet in capture:
1538             try:
1539                 self.assertEqual(packet[IP].src, server2_nat_ip)
1540                 self.assertEqual(packet[IP].dst, server1.ip4)
1541                 if packet.haslayer(TCP):
1542                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1543                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1544                     self.tcp_port_out = packet[TCP].sport
1545                     self.check_tcp_checksum(packet)
1546                 elif packet.haslayer(UDP):
1547                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1548                     self.assertEqual(packet[UDP].dport, server_udp_port)
1549                     self.udp_port_out = packet[UDP].sport
1550                 else:
1551                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1552                     self.icmp_id_out = packet[ICMP].id
1553             except:
1554                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1555                 raise
1556
1557         # server1 to server2
1558         pkts = []
1559         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1560              IP(src=server1.ip4, dst=server2_nat_ip) /
1561              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1562         pkts.append(p)
1563         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1564              IP(src=server1.ip4, dst=server2_nat_ip) /
1565              UDP(sport=server_udp_port, dport=self.udp_port_out))
1566         pkts.append(p)
1567         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1568              IP(src=server1.ip4, dst=server2_nat_ip) /
1569              ICMP(id=self.icmp_id_out, type='echo-reply'))
1570         pkts.append(p)
1571         self.pg0.add_stream(pkts)
1572         self.pg_enable_capture(self.pg_interfaces)
1573         self.pg_start()
1574         capture = self.pg0.get_capture(len(pkts))
1575         for packet in capture:
1576             try:
1577                 self.assertEqual(packet[IP].src, server1_nat_ip)
1578                 self.assertEqual(packet[IP].dst, server2.ip4)
1579                 if packet.haslayer(TCP):
1580                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1581                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1582                     self.check_tcp_checksum(packet)
1583                 elif packet.haslayer(UDP):
1584                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1585                     self.assertEqual(packet[UDP].sport, server_udp_port)
1586                 else:
1587                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1588             except:
1589                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1590                 raise
1591
1592     def test_max_translations_per_user(self):
1593         """ MAX translations per user - recycle the least recently used """
1594
1595         self.nat44_add_address(self.nat_addr)
1596         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1597         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1598                                                   is_inside=0)
1599
1600         # get maximum number of translations per user
1601         nat44_config = self.vapi.nat_show_config()
1602
1603         # send more than maximum number of translations per user packets
1604         pkts_num = nat44_config.max_translations_per_user + 5
1605         pkts = []
1606         for port in range(0, pkts_num):
1607             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1608                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1609                  TCP(sport=1025 + port))
1610             pkts.append(p)
1611         self.pg0.add_stream(pkts)
1612         self.pg_enable_capture(self.pg_interfaces)
1613         self.pg_start()
1614
1615         # verify number of translated packet
1616         self.pg1.get_capture(pkts_num)
1617
1618     def test_interface_addr(self):
1619         """ Acquire NAT44 addresses from interface """
1620         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1621
1622         # no address in NAT pool
1623         adresses = self.vapi.nat44_address_dump()
1624         self.assertEqual(0, len(adresses))
1625
1626         # configure interface address and check NAT address pool
1627         self.pg7.config_ip4()
1628         adresses = self.vapi.nat44_address_dump()
1629         self.assertEqual(1, len(adresses))
1630         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1631
1632         # remove interface address and check NAT address pool
1633         self.pg7.unconfig_ip4()
1634         adresses = self.vapi.nat44_address_dump()
1635         self.assertEqual(0, len(adresses))
1636
1637     def test_interface_addr_static_mapping(self):
1638         """ Static mapping with addresses from interface """
1639         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1640         self.nat44_add_static_mapping(
1641             '1.2.3.4',
1642             external_sw_if_index=self.pg7.sw_if_index)
1643
1644         # static mappings with external interface
1645         static_mappings = self.vapi.nat44_static_mapping_dump()
1646         self.assertEqual(1, len(static_mappings))
1647         self.assertEqual(self.pg7.sw_if_index,
1648                          static_mappings[0].external_sw_if_index)
1649
1650         # configure interface address and check static mappings
1651         self.pg7.config_ip4()
1652         static_mappings = self.vapi.nat44_static_mapping_dump()
1653         self.assertEqual(1, len(static_mappings))
1654         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1655                          self.pg7.local_ip4n)
1656         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1657
1658         # remove interface address and check static mappings
1659         self.pg7.unconfig_ip4()
1660         static_mappings = self.vapi.nat44_static_mapping_dump()
1661         self.assertEqual(0, len(static_mappings))
1662
1663     def test_ipfix_nat44_sess(self):
1664         """ IPFIX logging NAT44 session created/delted """
1665         self.ipfix_domain_id = 10
1666         self.ipfix_src_port = 20202
1667         colector_port = 30303
1668         bind_layers(UDP, IPFIX, dport=30303)
1669         self.nat44_add_address(self.nat_addr)
1670         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1671         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1672                                                   is_inside=0)
1673         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1674                                      src_address=self.pg3.local_ip4n,
1675                                      path_mtu=512,
1676                                      template_interval=10,
1677                                      collector_port=colector_port)
1678         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1679                             src_port=self.ipfix_src_port)
1680
1681         pkts = self.create_stream_in(self.pg0, self.pg1)
1682         self.pg0.add_stream(pkts)
1683         self.pg_enable_capture(self.pg_interfaces)
1684         self.pg_start()
1685         capture = self.pg1.get_capture(len(pkts))
1686         self.verify_capture_out(capture)
1687         self.nat44_add_address(self.nat_addr, is_add=0)
1688         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1689         capture = self.pg3.get_capture(3)
1690         ipfix = IPFIXDecoder()
1691         # first load template
1692         for p in capture:
1693             self.assertTrue(p.haslayer(IPFIX))
1694             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1695             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1696             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1697             self.assertEqual(p[UDP].dport, colector_port)
1698             self.assertEqual(p[IPFIX].observationDomainID,
1699                              self.ipfix_domain_id)
1700             if p.haslayer(Template):
1701                 ipfix.add_template(p.getlayer(Template))
1702         # verify events in data set
1703         for p in capture:
1704             if p.haslayer(Data):
1705                 data = ipfix.decode_data_set(p.getlayer(Set))
1706                 self.verify_ipfix_nat44_ses(data)
1707
1708     def test_ipfix_addr_exhausted(self):
1709         """ IPFIX logging NAT addresses exhausted """
1710         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1711         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1712                                                   is_inside=0)
1713         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1714                                      src_address=self.pg3.local_ip4n,
1715                                      path_mtu=512,
1716                                      template_interval=10)
1717         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1718                             src_port=self.ipfix_src_port)
1719
1720         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1721              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1722              TCP(sport=3025))
1723         self.pg0.add_stream(p)
1724         self.pg_enable_capture(self.pg_interfaces)
1725         self.pg_start()
1726         capture = self.pg1.get_capture(0)
1727         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1728         capture = self.pg3.get_capture(3)
1729         ipfix = IPFIXDecoder()
1730         # first load template
1731         for p in capture:
1732             self.assertTrue(p.haslayer(IPFIX))
1733             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1734             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1735             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1736             self.assertEqual(p[UDP].dport, 4739)
1737             self.assertEqual(p[IPFIX].observationDomainID,
1738                              self.ipfix_domain_id)
1739             if p.haslayer(Template):
1740                 ipfix.add_template(p.getlayer(Template))
1741         # verify events in data set
1742         for p in capture:
1743             if p.haslayer(Data):
1744                 data = ipfix.decode_data_set(p.getlayer(Set))
1745                 self.verify_ipfix_addr_exhausted(data)
1746
1747     def test_pool_addr_fib(self):
1748         """ NAT44 add pool addresses to FIB """
1749         static_addr = '10.0.0.10'
1750         self.nat44_add_address(self.nat_addr)
1751         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1752         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1753                                                   is_inside=0)
1754         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1755
1756         # NAT44 address
1757         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1758              ARP(op=ARP.who_has, pdst=self.nat_addr,
1759                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1760         self.pg1.add_stream(p)
1761         self.pg_enable_capture(self.pg_interfaces)
1762         self.pg_start()
1763         capture = self.pg1.get_capture(1)
1764         self.assertTrue(capture[0].haslayer(ARP))
1765         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1766
1767         # 1:1 NAT address
1768         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1769              ARP(op=ARP.who_has, pdst=static_addr,
1770                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1771         self.pg1.add_stream(p)
1772         self.pg_enable_capture(self.pg_interfaces)
1773         self.pg_start()
1774         capture = self.pg1.get_capture(1)
1775         self.assertTrue(capture[0].haslayer(ARP))
1776         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1777
1778         # send ARP to non-NAT44 interface
1779         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1780              ARP(op=ARP.who_has, pdst=self.nat_addr,
1781                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1782         self.pg2.add_stream(p)
1783         self.pg_enable_capture(self.pg_interfaces)
1784         self.pg_start()
1785         capture = self.pg1.get_capture(0)
1786
1787         # remove addresses and verify
1788         self.nat44_add_address(self.nat_addr, is_add=0)
1789         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1790                                       is_add=0)
1791
1792         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1793              ARP(op=ARP.who_has, pdst=self.nat_addr,
1794                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1795         self.pg1.add_stream(p)
1796         self.pg_enable_capture(self.pg_interfaces)
1797         self.pg_start()
1798         capture = self.pg1.get_capture(0)
1799
1800         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1801              ARP(op=ARP.who_has, pdst=static_addr,
1802                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1803         self.pg1.add_stream(p)
1804         self.pg_enable_capture(self.pg_interfaces)
1805         self.pg_start()
1806         capture = self.pg1.get_capture(0)
1807
1808     def test_vrf_mode(self):
1809         """ NAT44 tenant VRF aware address pool mode """
1810
1811         vrf_id1 = 1
1812         vrf_id2 = 2
1813         nat_ip1 = "10.0.0.10"
1814         nat_ip2 = "10.0.0.11"
1815
1816         self.pg0.unconfig_ip4()
1817         self.pg1.unconfig_ip4()
1818         self.vapi.ip_table_add_del(vrf_id1, is_add=1)
1819         self.vapi.ip_table_add_del(vrf_id2, is_add=1)
1820         self.pg0.set_table_ip4(vrf_id1)
1821         self.pg1.set_table_ip4(vrf_id2)
1822         self.pg0.config_ip4()
1823         self.pg1.config_ip4()
1824
1825         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
1826         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
1827         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1828         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1829         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1830                                                   is_inside=0)
1831
1832         # first VRF
1833         pkts = self.create_stream_in(self.pg0, self.pg2)
1834         self.pg0.add_stream(pkts)
1835         self.pg_enable_capture(self.pg_interfaces)
1836         self.pg_start()
1837         capture = self.pg2.get_capture(len(pkts))
1838         self.verify_capture_out(capture, nat_ip1)
1839
1840         # second VRF
1841         pkts = self.create_stream_in(self.pg1, self.pg2)
1842         self.pg1.add_stream(pkts)
1843         self.pg_enable_capture(self.pg_interfaces)
1844         self.pg_start()
1845         capture = self.pg2.get_capture(len(pkts))
1846         self.verify_capture_out(capture, nat_ip2)
1847
1848         self.pg0.unconfig_ip4()
1849         self.pg1.unconfig_ip4()
1850         self.pg0.set_table_ip4(0)
1851         self.pg1.set_table_ip4(0)
1852         self.vapi.ip_table_add_del(vrf_id1, is_add=0)
1853         self.vapi.ip_table_add_del(vrf_id2, is_add=0)
1854
1855     def test_vrf_feature_independent(self):
1856         """ NAT44 tenant VRF independent address pool mode """
1857
1858         nat_ip1 = "10.0.0.10"
1859         nat_ip2 = "10.0.0.11"
1860
1861         self.nat44_add_address(nat_ip1)
1862         self.nat44_add_address(nat_ip2)
1863         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1864         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1865         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1866                                                   is_inside=0)
1867
1868         # first VRF
1869         pkts = self.create_stream_in(self.pg0, self.pg2)
1870         self.pg0.add_stream(pkts)
1871         self.pg_enable_capture(self.pg_interfaces)
1872         self.pg_start()
1873         capture = self.pg2.get_capture(len(pkts))
1874         self.verify_capture_out(capture, nat_ip1)
1875
1876         # second VRF
1877         pkts = self.create_stream_in(self.pg1, self.pg2)
1878         self.pg1.add_stream(pkts)
1879         self.pg_enable_capture(self.pg_interfaces)
1880         self.pg_start()
1881         capture = self.pg2.get_capture(len(pkts))
1882         self.verify_capture_out(capture, nat_ip1)
1883
1884     def test_dynamic_ipless_interfaces(self):
1885         """ NAT44 interfaces without configured IP address """
1886
1887         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1888                                       self.pg7.remote_mac,
1889                                       self.pg7.remote_ip4n,
1890                                       is_static=1)
1891         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1892                                       self.pg8.remote_mac,
1893                                       self.pg8.remote_ip4n,
1894                                       is_static=1)
1895
1896         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1897                                    dst_address_length=32,
1898                                    next_hop_address=self.pg7.remote_ip4n,
1899                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1900         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1901                                    dst_address_length=32,
1902                                    next_hop_address=self.pg8.remote_ip4n,
1903                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1904
1905         self.nat44_add_address(self.nat_addr)
1906         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1907         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1908                                                   is_inside=0)
1909
1910         # in2out
1911         pkts = self.create_stream_in(self.pg7, self.pg8)
1912         self.pg7.add_stream(pkts)
1913         self.pg_enable_capture(self.pg_interfaces)
1914         self.pg_start()
1915         capture = self.pg8.get_capture(len(pkts))
1916         self.verify_capture_out(capture)
1917
1918         # out2in
1919         pkts = self.create_stream_out(self.pg8, self.nat_addr)
1920         self.pg8.add_stream(pkts)
1921         self.pg_enable_capture(self.pg_interfaces)
1922         self.pg_start()
1923         capture = self.pg7.get_capture(len(pkts))
1924         self.verify_capture_in(capture, self.pg7)
1925
1926     def test_static_ipless_interfaces(self):
1927         """ NAT44 interfaces without configured IP address - 1:1 NAT """
1928
1929         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1930                                       self.pg7.remote_mac,
1931                                       self.pg7.remote_ip4n,
1932                                       is_static=1)
1933         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1934                                       self.pg8.remote_mac,
1935                                       self.pg8.remote_ip4n,
1936                                       is_static=1)
1937
1938         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1939                                    dst_address_length=32,
1940                                    next_hop_address=self.pg7.remote_ip4n,
1941                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1942         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1943                                    dst_address_length=32,
1944                                    next_hop_address=self.pg8.remote_ip4n,
1945                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1946
1947         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
1948         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1949         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1950                                                   is_inside=0)
1951
1952         # out2in
1953         pkts = self.create_stream_out(self.pg8)
1954         self.pg8.add_stream(pkts)
1955         self.pg_enable_capture(self.pg_interfaces)
1956         self.pg_start()
1957         capture = self.pg7.get_capture(len(pkts))
1958         self.verify_capture_in(capture, self.pg7)
1959
1960         # in2out
1961         pkts = self.create_stream_in(self.pg7, self.pg8)
1962         self.pg7.add_stream(pkts)
1963         self.pg_enable_capture(self.pg_interfaces)
1964         self.pg_start()
1965         capture = self.pg8.get_capture(len(pkts))
1966         self.verify_capture_out(capture, self.nat_addr, True)
1967
1968     def test_static_with_port_ipless_interfaces(self):
1969         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
1970
1971         self.tcp_port_out = 30606
1972         self.udp_port_out = 30607
1973         self.icmp_id_out = 30608
1974
1975         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1976                                       self.pg7.remote_mac,
1977                                       self.pg7.remote_ip4n,
1978                                       is_static=1)
1979         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1980                                       self.pg8.remote_mac,
1981                                       self.pg8.remote_ip4n,
1982                                       is_static=1)
1983
1984         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1985                                    dst_address_length=32,
1986                                    next_hop_address=self.pg7.remote_ip4n,
1987                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1988         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1989                                    dst_address_length=32,
1990                                    next_hop_address=self.pg8.remote_ip4n,
1991                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1992
1993         self.nat44_add_address(self.nat_addr)
1994         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1995                                       self.tcp_port_in, self.tcp_port_out,
1996                                       proto=IP_PROTOS.tcp)
1997         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1998                                       self.udp_port_in, self.udp_port_out,
1999                                       proto=IP_PROTOS.udp)
2000         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2001                                       self.icmp_id_in, self.icmp_id_out,
2002                                       proto=IP_PROTOS.icmp)
2003         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2004         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2005                                                   is_inside=0)
2006
2007         # out2in
2008         pkts = self.create_stream_out(self.pg8)
2009         self.pg8.add_stream(pkts)
2010         self.pg_enable_capture(self.pg_interfaces)
2011         self.pg_start()
2012         capture = self.pg7.get_capture(len(pkts))
2013         self.verify_capture_in(capture, self.pg7)
2014
2015         # in2out
2016         pkts = self.create_stream_in(self.pg7, self.pg8)
2017         self.pg7.add_stream(pkts)
2018         self.pg_enable_capture(self.pg_interfaces)
2019         self.pg_start()
2020         capture = self.pg8.get_capture(len(pkts))
2021         self.verify_capture_out(capture)
2022
2023     def test_static_unknown_proto(self):
2024         """ 1:1 NAT translate packet with unknown protocol """
2025         nat_ip = "10.0.0.10"
2026         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2027         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2028         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2029                                                   is_inside=0)
2030
2031         # in2out
2032         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2033              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2034              GRE() /
2035              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2036              TCP(sport=1234, dport=1234))
2037         self.pg0.add_stream(p)
2038         self.pg_enable_capture(self.pg_interfaces)
2039         self.pg_start()
2040         p = self.pg1.get_capture(1)
2041         packet = p[0]
2042         try:
2043             self.assertEqual(packet[IP].src, nat_ip)
2044             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2045             self.assertTrue(packet.haslayer(GRE))
2046             self.check_ip_checksum(packet)
2047         except:
2048             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2049             raise
2050
2051         # out2in
2052         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2053              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2054              GRE() /
2055              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2056              TCP(sport=1234, dport=1234))
2057         self.pg1.add_stream(p)
2058         self.pg_enable_capture(self.pg_interfaces)
2059         self.pg_start()
2060         p = self.pg0.get_capture(1)
2061         packet = p[0]
2062         try:
2063             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2064             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2065             self.assertTrue(packet.haslayer(GRE))
2066             self.check_ip_checksum(packet)
2067         except:
2068             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2069             raise
2070
2071     def test_hairpinning_static_unknown_proto(self):
2072         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2073
2074         host = self.pg0.remote_hosts[0]
2075         server = self.pg0.remote_hosts[1]
2076
2077         host_nat_ip = "10.0.0.10"
2078         server_nat_ip = "10.0.0.11"
2079
2080         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2081         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2082         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2083         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2084                                                   is_inside=0)
2085
2086         # host to server
2087         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2088              IP(src=host.ip4, dst=server_nat_ip) /
2089              GRE() /
2090              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2091              TCP(sport=1234, dport=1234))
2092         self.pg0.add_stream(p)
2093         self.pg_enable_capture(self.pg_interfaces)
2094         self.pg_start()
2095         p = self.pg0.get_capture(1)
2096         packet = p[0]
2097         try:
2098             self.assertEqual(packet[IP].src, host_nat_ip)
2099             self.assertEqual(packet[IP].dst, server.ip4)
2100             self.assertTrue(packet.haslayer(GRE))
2101             self.check_ip_checksum(packet)
2102         except:
2103             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2104             raise
2105
2106         # server to host
2107         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2108              IP(src=server.ip4, dst=host_nat_ip) /
2109              GRE() /
2110              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2111              TCP(sport=1234, dport=1234))
2112         self.pg0.add_stream(p)
2113         self.pg_enable_capture(self.pg_interfaces)
2114         self.pg_start()
2115         p = self.pg0.get_capture(1)
2116         packet = p[0]
2117         try:
2118             self.assertEqual(packet[IP].src, server_nat_ip)
2119             self.assertEqual(packet[IP].dst, host.ip4)
2120             self.assertTrue(packet.haslayer(GRE))
2121             self.check_ip_checksum(packet)
2122         except:
2123             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2124             raise
2125
2126     def test_unknown_proto(self):
2127         """ NAT44 translate packet with unknown protocol """
2128         self.nat44_add_address(self.nat_addr)
2129         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2130         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2131                                                   is_inside=0)
2132
2133         # in2out
2134         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2135              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2136              TCP(sport=self.tcp_port_in, dport=20))
2137         self.pg0.add_stream(p)
2138         self.pg_enable_capture(self.pg_interfaces)
2139         self.pg_start()
2140         p = self.pg1.get_capture(1)
2141
2142         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2143              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2144              GRE() /
2145              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2146              TCP(sport=1234, dport=1234))
2147         self.pg0.add_stream(p)
2148         self.pg_enable_capture(self.pg_interfaces)
2149         self.pg_start()
2150         p = self.pg1.get_capture(1)
2151         packet = p[0]
2152         try:
2153             self.assertEqual(packet[IP].src, self.nat_addr)
2154             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2155             self.assertTrue(packet.haslayer(GRE))
2156             self.check_ip_checksum(packet)
2157         except:
2158             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2159             raise
2160
2161         # out2in
2162         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2163              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2164              GRE() /
2165              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2166              TCP(sport=1234, dport=1234))
2167         self.pg1.add_stream(p)
2168         self.pg_enable_capture(self.pg_interfaces)
2169         self.pg_start()
2170         p = self.pg0.get_capture(1)
2171         packet = p[0]
2172         try:
2173             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2174             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2175             self.assertTrue(packet.haslayer(GRE))
2176             self.check_ip_checksum(packet)
2177         except:
2178             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2179             raise
2180
2181     def test_hairpinning_unknown_proto(self):
2182         """ NAT44 translate packet with unknown protocol - hairpinning """
2183         host = self.pg0.remote_hosts[0]
2184         server = self.pg0.remote_hosts[1]
2185         host_in_port = 1234
2186         host_out_port = 0
2187         server_in_port = 5678
2188         server_out_port = 8765
2189         server_nat_ip = "10.0.0.11"
2190
2191         self.nat44_add_address(self.nat_addr)
2192         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2193         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2194                                                   is_inside=0)
2195
2196         # add static mapping for server
2197         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2198
2199         # host to server
2200         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2201              IP(src=host.ip4, dst=server_nat_ip) /
2202              TCP(sport=host_in_port, dport=server_out_port))
2203         self.pg0.add_stream(p)
2204         self.pg_enable_capture(self.pg_interfaces)
2205         self.pg_start()
2206         capture = self.pg0.get_capture(1)
2207
2208         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2209              IP(src=host.ip4, dst=server_nat_ip) /
2210              GRE() /
2211              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2212              TCP(sport=1234, dport=1234))
2213         self.pg0.add_stream(p)
2214         self.pg_enable_capture(self.pg_interfaces)
2215         self.pg_start()
2216         p = self.pg0.get_capture(1)
2217         packet = p[0]
2218         try:
2219             self.assertEqual(packet[IP].src, self.nat_addr)
2220             self.assertEqual(packet[IP].dst, server.ip4)
2221             self.assertTrue(packet.haslayer(GRE))
2222             self.check_ip_checksum(packet)
2223         except:
2224             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2225             raise
2226
2227         # server to host
2228         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2229              IP(src=server.ip4, dst=self.nat_addr) /
2230              GRE() /
2231              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2232              TCP(sport=1234, dport=1234))
2233         self.pg0.add_stream(p)
2234         self.pg_enable_capture(self.pg_interfaces)
2235         self.pg_start()
2236         p = self.pg0.get_capture(1)
2237         packet = p[0]
2238         try:
2239             self.assertEqual(packet[IP].src, server_nat_ip)
2240             self.assertEqual(packet[IP].dst, host.ip4)
2241             self.assertTrue(packet.haslayer(GRE))
2242             self.check_ip_checksum(packet)
2243         except:
2244             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2245             raise
2246
2247     def test_output_feature(self):
2248         """ NAT44 interface output feature (in2out postrouting) """
2249         self.nat44_add_address(self.nat_addr)
2250         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2251         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2252         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2253                                                          is_inside=0)
2254
2255         # in2out
2256         pkts = self.create_stream_in(self.pg0, self.pg3)
2257         self.pg0.add_stream(pkts)
2258         self.pg_enable_capture(self.pg_interfaces)
2259         self.pg_start()
2260         capture = self.pg3.get_capture(len(pkts))
2261         self.verify_capture_out(capture)
2262
2263         # out2in
2264         pkts = self.create_stream_out(self.pg3)
2265         self.pg3.add_stream(pkts)
2266         self.pg_enable_capture(self.pg_interfaces)
2267         self.pg_start()
2268         capture = self.pg0.get_capture(len(pkts))
2269         self.verify_capture_in(capture, self.pg0)
2270
2271         # from non-NAT interface to NAT inside interface
2272         pkts = self.create_stream_in(self.pg2, self.pg0)
2273         self.pg2.add_stream(pkts)
2274         self.pg_enable_capture(self.pg_interfaces)
2275         self.pg_start()
2276         capture = self.pg0.get_capture(len(pkts))
2277         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2278
2279     def test_output_feature_vrf_aware(self):
2280         """ NAT44 interface output feature VRF aware (in2out postrouting) """
2281         nat_ip_vrf10 = "10.0.0.10"
2282         nat_ip_vrf20 = "10.0.0.20"
2283
2284         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2285                                    dst_address_length=32,
2286                                    next_hop_address=self.pg3.remote_ip4n,
2287                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2288                                    table_id=10)
2289         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2290                                    dst_address_length=32,
2291                                    next_hop_address=self.pg3.remote_ip4n,
2292                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2293                                    table_id=20)
2294
2295         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2296         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2297         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2298         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2299         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2300                                                          is_inside=0)
2301
2302         # in2out VRF 10
2303         pkts = self.create_stream_in(self.pg4, self.pg3)
2304         self.pg4.add_stream(pkts)
2305         self.pg_enable_capture(self.pg_interfaces)
2306         self.pg_start()
2307         capture = self.pg3.get_capture(len(pkts))
2308         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2309
2310         # out2in VRF 10
2311         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2312         self.pg3.add_stream(pkts)
2313         self.pg_enable_capture(self.pg_interfaces)
2314         self.pg_start()
2315         capture = self.pg4.get_capture(len(pkts))
2316         self.verify_capture_in(capture, self.pg4)
2317
2318         # in2out VRF 20
2319         pkts = self.create_stream_in(self.pg6, self.pg3)
2320         self.pg6.add_stream(pkts)
2321         self.pg_enable_capture(self.pg_interfaces)
2322         self.pg_start()
2323         capture = self.pg3.get_capture(len(pkts))
2324         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2325
2326         # out2in VRF 20
2327         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2328         self.pg3.add_stream(pkts)
2329         self.pg_enable_capture(self.pg_interfaces)
2330         self.pg_start()
2331         capture = self.pg6.get_capture(len(pkts))
2332         self.verify_capture_in(capture, self.pg6)
2333
2334     def test_output_feature_hairpinning(self):
2335         """ NAT44 interface output feature hairpinning (in2out postrouting) """
2336         host = self.pg0.remote_hosts[0]
2337         server = self.pg0.remote_hosts[1]
2338         host_in_port = 1234
2339         host_out_port = 0
2340         server_in_port = 5678
2341         server_out_port = 8765
2342
2343         self.nat44_add_address(self.nat_addr)
2344         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2345         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2346                                                          is_inside=0)
2347
2348         # add static mapping for server
2349         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2350                                       server_in_port, server_out_port,
2351                                       proto=IP_PROTOS.tcp)
2352
2353         # send packet from host to server
2354         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2355              IP(src=host.ip4, dst=self.nat_addr) /
2356              TCP(sport=host_in_port, dport=server_out_port))
2357         self.pg0.add_stream(p)
2358         self.pg_enable_capture(self.pg_interfaces)
2359         self.pg_start()
2360         capture = self.pg0.get_capture(1)
2361         p = capture[0]
2362         try:
2363             ip = p[IP]
2364             tcp = p[TCP]
2365             self.assertEqual(ip.src, self.nat_addr)
2366             self.assertEqual(ip.dst, server.ip4)
2367             self.assertNotEqual(tcp.sport, host_in_port)
2368             self.assertEqual(tcp.dport, server_in_port)
2369             self.check_tcp_checksum(p)
2370             host_out_port = tcp.sport
2371         except:
2372             self.logger.error(ppp("Unexpected or invalid packet:", p))
2373             raise
2374
2375         # send reply from server to host
2376         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2377              IP(src=server.ip4, dst=self.nat_addr) /
2378              TCP(sport=server_in_port, dport=host_out_port))
2379         self.pg0.add_stream(p)
2380         self.pg_enable_capture(self.pg_interfaces)
2381         self.pg_start()
2382         capture = self.pg0.get_capture(1)
2383         p = capture[0]
2384         try:
2385             ip = p[IP]
2386             tcp = p[TCP]
2387             self.assertEqual(ip.src, self.nat_addr)
2388             self.assertEqual(ip.dst, host.ip4)
2389             self.assertEqual(tcp.sport, server_out_port)
2390             self.assertEqual(tcp.dport, host_in_port)
2391             self.check_tcp_checksum(p)
2392         except:
2393             self.logger.error(ppp("Unexpected or invalid packet:"), p)
2394             raise
2395
2396     def test_one_armed_nat44(self):
2397         """ One armed NAT44 """
2398         remote_host = self.pg9.remote_hosts[0]
2399         local_host = self.pg9.remote_hosts[1]
2400         external_port = 0
2401
2402         self.nat44_add_address(self.nat_addr)
2403         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2404         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2405                                                   is_inside=0)
2406
2407         # in2out
2408         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2409              IP(src=local_host.ip4, dst=remote_host.ip4) /
2410              TCP(sport=12345, dport=80))
2411         self.pg9.add_stream(p)
2412         self.pg_enable_capture(self.pg_interfaces)
2413         self.pg_start()
2414         capture = self.pg9.get_capture(1)
2415         p = capture[0]
2416         try:
2417             ip = p[IP]
2418             tcp = p[TCP]
2419             self.assertEqual(ip.src, self.nat_addr)
2420             self.assertEqual(ip.dst, remote_host.ip4)
2421             self.assertNotEqual(tcp.sport, 12345)
2422             external_port = tcp.sport
2423             self.assertEqual(tcp.dport, 80)
2424             self.check_tcp_checksum(p)
2425             self.check_ip_checksum(p)
2426         except:
2427             self.logger.error(ppp("Unexpected or invalid packet:", p))
2428             raise
2429
2430         # out2in
2431         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2432              IP(src=remote_host.ip4, dst=self.nat_addr) /
2433              TCP(sport=80, dport=external_port))
2434         self.pg9.add_stream(p)
2435         self.pg_enable_capture(self.pg_interfaces)
2436         self.pg_start()
2437         capture = self.pg9.get_capture(1)
2438         p = capture[0]
2439         try:
2440             ip = p[IP]
2441             tcp = p[TCP]
2442             self.assertEqual(ip.src, remote_host.ip4)
2443             self.assertEqual(ip.dst, local_host.ip4)
2444             self.assertEqual(tcp.sport, 80)
2445             self.assertEqual(tcp.dport, 12345)
2446             self.check_tcp_checksum(p)
2447             self.check_ip_checksum(p)
2448         except:
2449             self.logger.error(ppp("Unexpected or invalid packet:", p))
2450             raise
2451
2452     def tearDown(self):
2453         super(TestNAT44, self).tearDown()
2454         if not self.vpp_dead:
2455             self.logger.info(self.vapi.cli("show nat44 verbose"))
2456             self.clear_nat44()
2457
2458
2459 class TestDeterministicNAT(MethodHolder):
2460     """ Deterministic NAT Test Cases """
2461
2462     @classmethod
2463     def setUpConstants(cls):
2464         super(TestDeterministicNAT, cls).setUpConstants()
2465         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2466
2467     @classmethod
2468     def setUpClass(cls):
2469         super(TestDeterministicNAT, cls).setUpClass()
2470
2471         try:
2472             cls.tcp_port_in = 6303
2473             cls.tcp_external_port = 6303
2474             cls.udp_port_in = 6304
2475             cls.udp_external_port = 6304
2476             cls.icmp_id_in = 6305
2477             cls.nat_addr = '10.0.0.3'
2478
2479             cls.create_pg_interfaces(range(3))
2480             cls.interfaces = list(cls.pg_interfaces)
2481
2482             for i in cls.interfaces:
2483                 i.admin_up()
2484                 i.config_ip4()
2485                 i.resolve_arp()
2486
2487             cls.pg0.generate_remote_hosts(2)
2488             cls.pg0.configure_ipv4_neighbors()
2489
2490         except Exception:
2491             super(TestDeterministicNAT, cls).tearDownClass()
2492             raise
2493
2494     def create_stream_in(self, in_if, out_if, ttl=64):
2495         """
2496         Create packet stream for inside network
2497
2498         :param in_if: Inside interface
2499         :param out_if: Outside interface
2500         :param ttl: TTL of generated packets
2501         """
2502         pkts = []
2503         # TCP
2504         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2505              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2506              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2507         pkts.append(p)
2508
2509         # UDP
2510         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2511              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2512              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2513         pkts.append(p)
2514
2515         # ICMP
2516         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2517              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2518              ICMP(id=self.icmp_id_in, type='echo-request'))
2519         pkts.append(p)
2520
2521         return pkts
2522
2523     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2524         """
2525         Create packet stream for outside network
2526
2527         :param out_if: Outside interface
2528         :param dst_ip: Destination IP address (Default use global NAT address)
2529         :param ttl: TTL of generated packets
2530         """
2531         if dst_ip is None:
2532             dst_ip = self.nat_addr
2533         pkts = []
2534         # TCP
2535         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2536              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2537              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2538         pkts.append(p)
2539
2540         # UDP
2541         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2542              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2543              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2544         pkts.append(p)
2545
2546         # ICMP
2547         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2548              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2549              ICMP(id=self.icmp_external_id, type='echo-reply'))
2550         pkts.append(p)
2551
2552         return pkts
2553
2554     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2555         """
2556         Verify captured packets on outside network
2557
2558         :param capture: Captured packets
2559         :param nat_ip: Translated IP address (Default use global NAT address)
2560         :param same_port: Sorce port number is not translated (Default False)
2561         :param packet_num: Expected number of packets (Default 3)
2562         """
2563         if nat_ip is None:
2564             nat_ip = self.nat_addr
2565         self.assertEqual(packet_num, len(capture))
2566         for packet in capture:
2567             try:
2568                 self.assertEqual(packet[IP].src, nat_ip)
2569                 if packet.haslayer(TCP):
2570                     self.tcp_port_out = packet[TCP].sport
2571                 elif packet.haslayer(UDP):
2572                     self.udp_port_out = packet[UDP].sport
2573                 else:
2574                     self.icmp_external_id = packet[ICMP].id
2575             except:
2576                 self.logger.error(ppp("Unexpected or invalid packet "
2577                                       "(outside network):", packet))
2578                 raise
2579
2580     def initiate_tcp_session(self, in_if, out_if):
2581         """
2582         Initiates TCP session
2583
2584         :param in_if: Inside interface
2585         :param out_if: Outside interface
2586         """
2587         try:
2588             # SYN packet in->out
2589             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2590                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2591                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2592                      flags="S"))
2593             in_if.add_stream(p)
2594             self.pg_enable_capture(self.pg_interfaces)
2595             self.pg_start()
2596             capture = out_if.get_capture(1)
2597             p = capture[0]
2598             self.tcp_port_out = p[TCP].sport
2599
2600             # SYN + ACK packet out->in
2601             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2602                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
2603                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2604                      flags="SA"))
2605             out_if.add_stream(p)
2606             self.pg_enable_capture(self.pg_interfaces)
2607             self.pg_start()
2608             in_if.get_capture(1)
2609
2610             # ACK packet in->out
2611             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2612                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2613                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2614                      flags="A"))
2615             in_if.add_stream(p)
2616             self.pg_enable_capture(self.pg_interfaces)
2617             self.pg_start()
2618             out_if.get_capture(1)
2619
2620         except:
2621             self.logger.error("TCP 3 way handshake failed")
2622             raise
2623
2624     def verify_ipfix_max_entries_per_user(self, data):
2625         """
2626         Verify IPFIX maximum entries per user exceeded event
2627
2628         :param data: Decoded IPFIX data records
2629         """
2630         self.assertEqual(1, len(data))
2631         record = data[0]
2632         # natEvent
2633         self.assertEqual(ord(record[230]), 13)
2634         # natQuotaExceededEvent
2635         self.assertEqual('\x03\x00\x00\x00', record[466])
2636         # sourceIPv4Address
2637         self.assertEqual(self.pg0.remote_ip4n, record[8])
2638
2639     def test_deterministic_mode(self):
2640         """ NAT plugin run deterministic mode """
2641         in_addr = '172.16.255.0'
2642         out_addr = '172.17.255.50'
2643         in_addr_t = '172.16.255.20'
2644         in_addr_n = socket.inet_aton(in_addr)
2645         out_addr_n = socket.inet_aton(out_addr)
2646         in_addr_t_n = socket.inet_aton(in_addr_t)
2647         in_plen = 24
2648         out_plen = 32
2649
2650         nat_config = self.vapi.nat_show_config()
2651         self.assertEqual(1, nat_config.deterministic)
2652
2653         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
2654
2655         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
2656         self.assertEqual(rep1.out_addr[:4], out_addr_n)
2657         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
2658         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2659
2660         deterministic_mappings = self.vapi.nat_det_map_dump()
2661         self.assertEqual(len(deterministic_mappings), 1)
2662         dsm = deterministic_mappings[0]
2663         self.assertEqual(in_addr_n, dsm.in_addr[:4])
2664         self.assertEqual(in_plen, dsm.in_plen)
2665         self.assertEqual(out_addr_n, dsm.out_addr[:4])
2666         self.assertEqual(out_plen, dsm.out_plen)
2667
2668         self.clear_nat_det()
2669         deterministic_mappings = self.vapi.nat_det_map_dump()
2670         self.assertEqual(len(deterministic_mappings), 0)
2671
2672     def test_set_timeouts(self):
2673         """ Set deterministic NAT timeouts """
2674         timeouts_before = self.vapi.nat_det_get_timeouts()
2675
2676         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
2677                                        timeouts_before.tcp_established + 10,
2678                                        timeouts_before.tcp_transitory + 10,
2679                                        timeouts_before.icmp + 10)
2680
2681         timeouts_after = self.vapi.nat_det_get_timeouts()
2682
2683         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2684         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2685         self.assertNotEqual(timeouts_before.tcp_established,
2686                             timeouts_after.tcp_established)
2687         self.assertNotEqual(timeouts_before.tcp_transitory,
2688                             timeouts_after.tcp_transitory)
2689
2690     def test_det_in(self):
2691         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
2692
2693         nat_ip = "10.0.0.10"
2694
2695         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2696                                       32,
2697                                       socket.inet_aton(nat_ip),
2698                                       32)
2699         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2700         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2701                                                   is_inside=0)
2702
2703         # in2out
2704         pkts = self.create_stream_in(self.pg0, self.pg1)
2705         self.pg0.add_stream(pkts)
2706         self.pg_enable_capture(self.pg_interfaces)
2707         self.pg_start()
2708         capture = self.pg1.get_capture(len(pkts))
2709         self.verify_capture_out(capture, nat_ip)
2710
2711         # out2in
2712         pkts = self.create_stream_out(self.pg1, nat_ip)
2713         self.pg1.add_stream(pkts)
2714         self.pg_enable_capture(self.pg_interfaces)
2715         self.pg_start()
2716         capture = self.pg0.get_capture(len(pkts))
2717         self.verify_capture_in(capture, self.pg0)
2718
2719         # session dump test
2720         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
2721         self.assertEqual(len(sessions), 3)
2722
2723         # TCP session
2724         s = sessions[0]
2725         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2726         self.assertEqual(s.in_port, self.tcp_port_in)
2727         self.assertEqual(s.out_port, self.tcp_port_out)
2728         self.assertEqual(s.ext_port, self.tcp_external_port)
2729
2730         # UDP session
2731         s = sessions[1]
2732         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2733         self.assertEqual(s.in_port, self.udp_port_in)
2734         self.assertEqual(s.out_port, self.udp_port_out)
2735         self.assertEqual(s.ext_port, self.udp_external_port)
2736
2737         # ICMP session
2738         s = sessions[2]
2739         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2740         self.assertEqual(s.in_port, self.icmp_id_in)
2741         self.assertEqual(s.out_port, self.icmp_external_id)
2742
2743     def test_multiple_users(self):
2744         """ Deterministic NAT multiple users """
2745
2746         nat_ip = "10.0.0.10"
2747         port_in = 80
2748         external_port = 6303
2749
2750         host0 = self.pg0.remote_hosts[0]
2751         host1 = self.pg0.remote_hosts[1]
2752
2753         self.vapi.nat_det_add_del_map(host0.ip4n,
2754                                       24,
2755                                       socket.inet_aton(nat_ip),
2756                                       32)
2757         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2758         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2759                                                   is_inside=0)
2760
2761         # host0 to out
2762         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2763              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2764              TCP(sport=port_in, dport=external_port))
2765         self.pg0.add_stream(p)
2766         self.pg_enable_capture(self.pg_interfaces)
2767         self.pg_start()
2768         capture = self.pg1.get_capture(1)
2769         p = capture[0]
2770         try:
2771             ip = p[IP]
2772             tcp = p[TCP]
2773             self.assertEqual(ip.src, nat_ip)
2774             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2775             self.assertEqual(tcp.dport, external_port)
2776             port_out0 = tcp.sport
2777         except:
2778             self.logger.error(ppp("Unexpected or invalid packet:", p))
2779             raise
2780
2781         # host1 to out
2782         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2783              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2784              TCP(sport=port_in, dport=external_port))
2785         self.pg0.add_stream(p)
2786         self.pg_enable_capture(self.pg_interfaces)
2787         self.pg_start()
2788         capture = self.pg1.get_capture(1)
2789         p = capture[0]
2790         try:
2791             ip = p[IP]
2792             tcp = p[TCP]
2793             self.assertEqual(ip.src, nat_ip)
2794             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2795             self.assertEqual(tcp.dport, external_port)
2796             port_out1 = tcp.sport
2797         except:
2798             self.logger.error(ppp("Unexpected or invalid packet:", p))
2799             raise
2800
2801         dms = self.vapi.nat_det_map_dump()
2802         self.assertEqual(1, len(dms))
2803         self.assertEqual(2, dms[0].ses_num)
2804
2805         # out to host0
2806         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2807              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2808              TCP(sport=external_port, dport=port_out0))
2809         self.pg1.add_stream(p)
2810         self.pg_enable_capture(self.pg_interfaces)
2811         self.pg_start()
2812         capture = self.pg0.get_capture(1)
2813         p = capture[0]
2814         try:
2815             ip = p[IP]
2816             tcp = p[TCP]
2817             self.assertEqual(ip.src, self.pg1.remote_ip4)
2818             self.assertEqual(ip.dst, host0.ip4)
2819             self.assertEqual(tcp.dport, port_in)
2820             self.assertEqual(tcp.sport, external_port)
2821         except:
2822             self.logger.error(ppp("Unexpected or invalid packet:", p))
2823             raise
2824
2825         # out to host1
2826         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2827              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2828              TCP(sport=external_port, dport=port_out1))
2829         self.pg1.add_stream(p)
2830         self.pg_enable_capture(self.pg_interfaces)
2831         self.pg_start()
2832         capture = self.pg0.get_capture(1)
2833         p = capture[0]
2834         try:
2835             ip = p[IP]
2836             tcp = p[TCP]
2837             self.assertEqual(ip.src, self.pg1.remote_ip4)
2838             self.assertEqual(ip.dst, host1.ip4)
2839             self.assertEqual(tcp.dport, port_in)
2840             self.assertEqual(tcp.sport, external_port)
2841         except:
2842             self.logger.error(ppp("Unexpected or invalid packet", p))
2843             raise
2844
2845         # session close api test
2846         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
2847                                             port_out1,
2848                                             self.pg1.remote_ip4n,
2849                                             external_port)
2850         dms = self.vapi.nat_det_map_dump()
2851         self.assertEqual(dms[0].ses_num, 1)
2852
2853         self.vapi.nat_det_close_session_in(host0.ip4n,
2854                                            port_in,
2855                                            self.pg1.remote_ip4n,
2856                                            external_port)
2857         dms = self.vapi.nat_det_map_dump()
2858         self.assertEqual(dms[0].ses_num, 0)
2859
2860     def test_tcp_session_close_detection_in(self):
2861         """ Deterministic NAT TCP session close from inside network """
2862         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2863                                       32,
2864                                       socket.inet_aton(self.nat_addr),
2865                                       32)
2866         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2867         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2868                                                   is_inside=0)
2869
2870         self.initiate_tcp_session(self.pg0, self.pg1)
2871
2872         # close the session from inside
2873         try:
2874             # FIN packet in -> out
2875             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2876                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2877                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2878                      flags="F"))
2879             self.pg0.add_stream(p)
2880             self.pg_enable_capture(self.pg_interfaces)
2881             self.pg_start()
2882             self.pg1.get_capture(1)
2883
2884             pkts = []
2885
2886             # ACK packet out -> in
2887             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2888                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2889                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2890                      flags="A"))
2891             pkts.append(p)
2892
2893             # FIN packet out -> in
2894             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2895                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2896                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2897                      flags="F"))
2898             pkts.append(p)
2899
2900             self.pg1.add_stream(pkts)
2901             self.pg_enable_capture(self.pg_interfaces)
2902             self.pg_start()
2903             self.pg0.get_capture(2)
2904
2905             # ACK packet in -> out
2906             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2907                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2908                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2909                      flags="A"))
2910             self.pg0.add_stream(p)
2911             self.pg_enable_capture(self.pg_interfaces)
2912             self.pg_start()
2913             self.pg1.get_capture(1)
2914
2915             # Check if deterministic NAT44 closed the session
2916             dms = self.vapi.nat_det_map_dump()
2917             self.assertEqual(0, dms[0].ses_num)
2918         except:
2919             self.logger.error("TCP session termination failed")
2920             raise
2921
2922     def test_tcp_session_close_detection_out(self):
2923         """ Deterministic NAT TCP session close from outside network """
2924         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2925                                       32,
2926                                       socket.inet_aton(self.nat_addr),
2927                                       32)
2928         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2929         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2930                                                   is_inside=0)
2931
2932         self.initiate_tcp_session(self.pg0, self.pg1)
2933
2934         # close the session from outside
2935         try:
2936             # FIN packet out -> in
2937             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2938                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2939                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2940                      flags="F"))
2941             self.pg1.add_stream(p)
2942             self.pg_enable_capture(self.pg_interfaces)
2943             self.pg_start()
2944             self.pg0.get_capture(1)
2945
2946             pkts = []
2947
2948             # ACK packet in -> out
2949             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2950                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2951                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2952                      flags="A"))
2953             pkts.append(p)
2954
2955             # ACK packet in -> out
2956             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2957                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2958                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2959                      flags="F"))
2960             pkts.append(p)
2961
2962             self.pg0.add_stream(pkts)
2963             self.pg_enable_capture(self.pg_interfaces)
2964             self.pg_start()
2965             self.pg1.get_capture(2)
2966
2967             # ACK packet out -> in
2968             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2969                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2970                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2971                      flags="A"))
2972             self.pg1.add_stream(p)
2973             self.pg_enable_capture(self.pg_interfaces)
2974             self.pg_start()
2975             self.pg0.get_capture(1)
2976
2977             # Check if deterministic NAT44 closed the session
2978             dms = self.vapi.nat_det_map_dump()
2979             self.assertEqual(0, dms[0].ses_num)
2980         except:
2981             self.logger.error("TCP session termination failed")
2982             raise
2983
2984     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2985     def test_session_timeout(self):
2986         """ Deterministic NAT session timeouts """
2987         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2988                                       32,
2989                                       socket.inet_aton(self.nat_addr),
2990                                       32)
2991         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2992         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2993                                                   is_inside=0)
2994
2995         self.initiate_tcp_session(self.pg0, self.pg1)
2996         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
2997         pkts = self.create_stream_in(self.pg0, self.pg1)
2998         self.pg0.add_stream(pkts)
2999         self.pg_enable_capture(self.pg_interfaces)
3000         self.pg_start()
3001         capture = self.pg1.get_capture(len(pkts))
3002         sleep(15)
3003
3004         dms = self.vapi.nat_det_map_dump()
3005         self.assertEqual(0, dms[0].ses_num)
3006
3007     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3008     def test_session_limit_per_user(self):
3009         """ Deterministic NAT maximum sessions per user limit """
3010         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3011                                       32,
3012                                       socket.inet_aton(self.nat_addr),
3013                                       32)
3014         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3015         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3016                                                   is_inside=0)
3017         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
3018                                      src_address=self.pg2.local_ip4n,
3019                                      path_mtu=512,
3020                                      template_interval=10)
3021         self.vapi.nat_ipfix()
3022
3023         pkts = []
3024         for port in range(1025, 2025):
3025             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3026                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3027                  UDP(sport=port, dport=port))
3028             pkts.append(p)
3029
3030         self.pg0.add_stream(pkts)
3031         self.pg_enable_capture(self.pg_interfaces)
3032         self.pg_start()
3033         capture = self.pg1.get_capture(len(pkts))
3034
3035         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3036              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3037              UDP(sport=3001, dport=3002))
3038         self.pg0.add_stream(p)
3039         self.pg_enable_capture(self.pg_interfaces)
3040         self.pg_start()
3041         capture = self.pg1.assert_nothing_captured()
3042
3043         # verify ICMP error packet
3044         capture = self.pg0.get_capture(1)
3045         p = capture[0]
3046         self.assertTrue(p.haslayer(ICMP))
3047         icmp = p[ICMP]
3048         self.assertEqual(icmp.type, 3)
3049         self.assertEqual(icmp.code, 1)
3050         self.assertTrue(icmp.haslayer(IPerror))
3051         inner_ip = icmp[IPerror]
3052         self.assertEqual(inner_ip[UDPerror].sport, 3001)
3053         self.assertEqual(inner_ip[UDPerror].dport, 3002)
3054
3055         dms = self.vapi.nat_det_map_dump()
3056
3057         self.assertEqual(1000, dms[0].ses_num)
3058
3059         # verify IPFIX logging
3060         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
3061         sleep(1)
3062         capture = self.pg2.get_capture(2)
3063         ipfix = IPFIXDecoder()
3064         # first load template
3065         for p in capture:
3066             self.assertTrue(p.haslayer(IPFIX))
3067             if p.haslayer(Template):
3068                 ipfix.add_template(p.getlayer(Template))
3069         # verify events in data set
3070         for p in capture:
3071             if p.haslayer(Data):
3072                 data = ipfix.decode_data_set(p.getlayer(Set))
3073                 self.verify_ipfix_max_entries_per_user(data)
3074
3075     def clear_nat_det(self):
3076         """
3077         Clear deterministic NAT configuration.
3078         """
3079         self.vapi.nat_ipfix(enable=0)
3080         self.vapi.nat_det_set_timeouts()
3081         deterministic_mappings = self.vapi.nat_det_map_dump()
3082         for dsm in deterministic_mappings:
3083             self.vapi.nat_det_add_del_map(dsm.in_addr,
3084                                           dsm.in_plen,
3085                                           dsm.out_addr,
3086                                           dsm.out_plen,
3087                                           is_add=0)
3088
3089         interfaces = self.vapi.nat44_interface_dump()
3090         for intf in interfaces:
3091             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3092                                                       intf.is_inside,
3093                                                       is_add=0)
3094
3095     def tearDown(self):
3096         super(TestDeterministicNAT, self).tearDown()
3097         if not self.vpp_dead:
3098             self.logger.info(self.vapi.cli("show nat44 detail"))
3099             self.clear_nat_det()
3100
3101
3102 class TestNAT64(MethodHolder):
3103     """ NAT64 Test Cases """
3104
3105     @classmethod
3106     def setUpClass(cls):
3107         super(TestNAT64, cls).setUpClass()
3108
3109         try:
3110             cls.tcp_port_in = 6303
3111             cls.tcp_port_out = 6303
3112             cls.udp_port_in = 6304
3113             cls.udp_port_out = 6304
3114             cls.icmp_id_in = 6305
3115             cls.icmp_id_out = 6305
3116             cls.nat_addr = '10.0.0.3'
3117             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3118             cls.vrf1_id = 10
3119             cls.vrf1_nat_addr = '10.0.10.3'
3120             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3121                                                    cls.vrf1_nat_addr)
3122
3123             cls.create_pg_interfaces(range(4))
3124             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3125             cls.ip6_interfaces.append(cls.pg_interfaces[2])
3126             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3127
3128             cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3129
3130             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3131
3132             cls.pg0.generate_remote_hosts(2)
3133
3134             for i in cls.ip6_interfaces:
3135                 i.admin_up()
3136                 i.config_ip6()
3137                 i.configure_ipv6_neighbors()
3138
3139             for i in cls.ip4_interfaces:
3140                 i.admin_up()
3141                 i.config_ip4()
3142                 i.resolve_arp()
3143
3144             cls.pg3.admin_up()
3145             cls.pg3.config_ip4()
3146             cls.pg3.resolve_arp()
3147             cls.pg3.config_ip6()
3148             cls.pg3.configure_ipv6_neighbors()
3149
3150         except Exception:
3151             super(TestNAT64, cls).tearDownClass()
3152             raise
3153
3154     def test_pool(self):
3155         """ Add/delete address to NAT64 pool """
3156         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3157
3158         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3159
3160         addresses = self.vapi.nat64_pool_addr_dump()
3161         self.assertEqual(len(addresses), 1)
3162         self.assertEqual(addresses[0].address, nat_addr)
3163
3164         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3165
3166         addresses = self.vapi.nat64_pool_addr_dump()
3167         self.assertEqual(len(addresses), 0)
3168
3169     def test_interface(self):
3170         """ Enable/disable NAT64 feature on the interface """
3171         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3172         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3173
3174         interfaces = self.vapi.nat64_interface_dump()
3175         self.assertEqual(len(interfaces), 2)
3176         pg0_found = False
3177         pg1_found = False
3178         for intf in interfaces:
3179             if intf.sw_if_index == self.pg0.sw_if_index:
3180                 self.assertEqual(intf.is_inside, 1)
3181                 pg0_found = True
3182             elif intf.sw_if_index == self.pg1.sw_if_index:
3183                 self.assertEqual(intf.is_inside, 0)
3184                 pg1_found = True
3185         self.assertTrue(pg0_found)
3186         self.assertTrue(pg1_found)
3187
3188         features = self.vapi.cli("show interface features pg0")
3189         self.assertNotEqual(features.find('nat64-in2out'), -1)
3190         features = self.vapi.cli("show interface features pg1")
3191         self.assertNotEqual(features.find('nat64-out2in'), -1)
3192
3193         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3194         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3195
3196         interfaces = self.vapi.nat64_interface_dump()
3197         self.assertEqual(len(interfaces), 0)
3198
3199     def test_static_bib(self):
3200         """ Add/delete static BIB entry """
3201         in_addr = socket.inet_pton(socket.AF_INET6,
3202                                    '2001:db8:85a3::8a2e:370:7334')
3203         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3204         in_port = 1234
3205         out_port = 5678
3206         proto = IP_PROTOS.tcp
3207
3208         self.vapi.nat64_add_del_static_bib(in_addr,
3209                                            out_addr,
3210                                            in_port,
3211                                            out_port,
3212                                            proto)
3213         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3214         static_bib_num = 0
3215         for bibe in bib:
3216             if bibe.is_static:
3217                 static_bib_num += 1
3218                 self.assertEqual(bibe.i_addr, in_addr)
3219                 self.assertEqual(bibe.o_addr, out_addr)
3220                 self.assertEqual(bibe.i_port, in_port)
3221                 self.assertEqual(bibe.o_port, out_port)
3222         self.assertEqual(static_bib_num, 1)
3223
3224         self.vapi.nat64_add_del_static_bib(in_addr,
3225                                            out_addr,
3226                                            in_port,
3227                                            out_port,
3228                                            proto,
3229                                            is_add=0)
3230         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3231         static_bib_num = 0
3232         for bibe in bib:
3233             if bibe.is_static:
3234                 static_bib_num += 1
3235         self.assertEqual(static_bib_num, 0)
3236
3237     def test_set_timeouts(self):
3238         """ Set NAT64 timeouts """
3239         # verify default values
3240         timeouts = self.vapi.nat64_get_timeouts()
3241         self.assertEqual(timeouts.udp, 300)
3242         self.assertEqual(timeouts.icmp, 60)
3243         self.assertEqual(timeouts.tcp_trans, 240)
3244         self.assertEqual(timeouts.tcp_est, 7440)
3245         self.assertEqual(timeouts.tcp_incoming_syn, 6)
3246
3247         # set and verify custom values
3248         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3249                                      tcp_est=7450, tcp_incoming_syn=10)
3250         timeouts = self.vapi.nat64_get_timeouts()
3251         self.assertEqual(timeouts.udp, 200)
3252         self.assertEqual(timeouts.icmp, 30)
3253         self.assertEqual(timeouts.tcp_trans, 250)
3254         self.assertEqual(timeouts.tcp_est, 7450)
3255         self.assertEqual(timeouts.tcp_incoming_syn, 10)
3256
3257     def test_dynamic(self):
3258         """ NAT64 dynamic translation test """
3259         self.tcp_port_in = 6303
3260         self.udp_port_in = 6304
3261         self.icmp_id_in = 6305
3262
3263         ses_num_start = self.nat64_get_ses_num()
3264
3265         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3266                                                 self.nat_addr_n)
3267         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3268         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3269
3270         # in2out
3271         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3272         self.pg0.add_stream(pkts)
3273         self.pg_enable_capture(self.pg_interfaces)
3274         self.pg_start()
3275         capture = self.pg1.get_capture(len(pkts))
3276         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3277                                 dst_ip=self.pg1.remote_ip4)
3278
3279         # out2in
3280         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3281         self.pg1.add_stream(pkts)
3282         self.pg_enable_capture(self.pg_interfaces)
3283         self.pg_start()
3284         capture = self.pg0.get_capture(len(pkts))
3285         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3286         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3287
3288         # in2out
3289         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3290         self.pg0.add_stream(pkts)
3291         self.pg_enable_capture(self.pg_interfaces)
3292         self.pg_start()
3293         capture = self.pg1.get_capture(len(pkts))
3294         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3295                                 dst_ip=self.pg1.remote_ip4)
3296
3297         # out2in
3298         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3299         self.pg1.add_stream(pkts)
3300         self.pg_enable_capture(self.pg_interfaces)
3301         self.pg_start()
3302         capture = self.pg0.get_capture(len(pkts))
3303         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3304
3305         ses_num_end = self.nat64_get_ses_num()
3306
3307         self.assertEqual(ses_num_end - ses_num_start, 3)
3308
3309         # tenant with specific VRF
3310         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3311                                                 self.vrf1_nat_addr_n,
3312                                                 vrf_id=self.vrf1_id)
3313         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3314
3315         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3316         self.pg2.add_stream(pkts)
3317         self.pg_enable_capture(self.pg_interfaces)
3318         self.pg_start()
3319         capture = self.pg1.get_capture(len(pkts))
3320         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3321                                 dst_ip=self.pg1.remote_ip4)
3322
3323         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3324         self.pg1.add_stream(pkts)
3325         self.pg_enable_capture(self.pg_interfaces)
3326         self.pg_start()
3327         capture = self.pg2.get_capture(len(pkts))
3328         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3329
3330     def test_static(self):
3331         """ NAT64 static translation test """
3332         self.tcp_port_in = 60303
3333         self.udp_port_in = 60304
3334         self.icmp_id_in = 60305
3335         self.tcp_port_out = 60303
3336         self.udp_port_out = 60304
3337         self.icmp_id_out = 60305
3338
3339         ses_num_start = self.nat64_get_ses_num()
3340
3341         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3342                                                 self.nat_addr_n)
3343         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3344         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3345
3346         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3347                                            self.nat_addr_n,
3348                                            self.tcp_port_in,
3349                                            self.tcp_port_out,
3350                                            IP_PROTOS.tcp)
3351         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3352                                            self.nat_addr_n,
3353                                            self.udp_port_in,
3354                                            self.udp_port_out,
3355                                            IP_PROTOS.udp)
3356         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3357                                            self.nat_addr_n,
3358                                            self.icmp_id_in,
3359                                            self.icmp_id_out,
3360                                            IP_PROTOS.icmp)
3361
3362         # in2out
3363         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3364         self.pg0.add_stream(pkts)
3365         self.pg_enable_capture(self.pg_interfaces)
3366         self.pg_start()
3367         capture = self.pg1.get_capture(len(pkts))
3368         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3369                                 dst_ip=self.pg1.remote_ip4, same_port=True)
3370
3371         # out2in
3372         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3373         self.pg1.add_stream(pkts)
3374         self.pg_enable_capture(self.pg_interfaces)
3375         self.pg_start()
3376         capture = self.pg0.get_capture(len(pkts))
3377         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3378         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3379
3380         ses_num_end = self.nat64_get_ses_num()
3381
3382         self.assertEqual(ses_num_end - ses_num_start, 3)
3383
3384     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3385     def test_session_timeout(self):
3386         """ NAT64 session timeout """
3387         self.icmp_id_in = 1234
3388         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3389                                                 self.nat_addr_n)
3390         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3391         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3392         self.vapi.nat64_set_timeouts(icmp=5)
3393
3394         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3395         self.pg0.add_stream(pkts)
3396         self.pg_enable_capture(self.pg_interfaces)
3397         self.pg_start()
3398         capture = self.pg1.get_capture(len(pkts))
3399
3400         ses_num_before_timeout = self.nat64_get_ses_num()
3401
3402         sleep(15)
3403
3404         # ICMP session after timeout
3405         ses_num_after_timeout = self.nat64_get_ses_num()
3406         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3407
3408     def test_icmp_error(self):
3409         """ NAT64 ICMP Error message translation """
3410         self.tcp_port_in = 6303
3411         self.udp_port_in = 6304
3412         self.icmp_id_in = 6305
3413
3414         ses_num_start = self.nat64_get_ses_num()
3415
3416         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3417                                                 self.nat_addr_n)
3418         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3419         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3420
3421         # send some packets to create sessions
3422         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3423         self.pg0.add_stream(pkts)
3424         self.pg_enable_capture(self.pg_interfaces)
3425         self.pg_start()
3426         capture_ip4 = self.pg1.get_capture(len(pkts))
3427         self.verify_capture_out(capture_ip4,
3428                                 nat_ip=self.nat_addr,
3429                                 dst_ip=self.pg1.remote_ip4)
3430
3431         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3432         self.pg1.add_stream(pkts)
3433         self.pg_enable_capture(self.pg_interfaces)
3434         self.pg_start()
3435         capture_ip6 = self.pg0.get_capture(len(pkts))
3436         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3437         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3438                                    self.pg0.remote_ip6)
3439
3440         # in2out
3441         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3442                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3443                 ICMPv6DestUnreach(code=1) /
3444                 packet[IPv6] for packet in capture_ip6]
3445         self.pg0.add_stream(pkts)
3446         self.pg_enable_capture(self.pg_interfaces)
3447         self.pg_start()
3448         capture = self.pg1.get_capture(len(pkts))
3449         for packet in capture:
3450             try:
3451                 self.assertEqual(packet[IP].src, self.nat_addr)
3452                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3453                 self.assertEqual(packet[ICMP].type, 3)
3454                 self.assertEqual(packet[ICMP].code, 13)
3455                 inner = packet[IPerror]
3456                 self.assertEqual(inner.src, self.pg1.remote_ip4)
3457                 self.assertEqual(inner.dst, self.nat_addr)
3458                 self.check_icmp_checksum(packet)
3459                 if inner.haslayer(TCPerror):
3460                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3461                 elif inner.haslayer(UDPerror):
3462                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3463                 else:
3464                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3465             except:
3466                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3467                 raise
3468
3469         # out2in
3470         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3471                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3472                 ICMP(type=3, code=13) /
3473                 packet[IP] for packet in capture_ip4]
3474         self.pg1.add_stream(pkts)
3475         self.pg_enable_capture(self.pg_interfaces)
3476         self.pg_start()
3477         capture = self.pg0.get_capture(len(pkts))
3478         for packet in capture:
3479             try:
3480                 self.assertEqual(packet[IPv6].src, ip.src)
3481                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3482                 icmp = packet[ICMPv6DestUnreach]
3483                 self.assertEqual(icmp.code, 1)
3484                 inner = icmp[IPerror6]
3485                 self.assertEqual(inner.src, self.pg0.remote_ip6)
3486                 self.assertEqual(inner.dst, ip.src)
3487                 self.check_icmpv6_checksum(packet)
3488                 if inner.haslayer(TCPerror):
3489                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3490                 elif inner.haslayer(UDPerror):
3491                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3492                 else:
3493                     self.assertEqual(inner[ICMPv6EchoRequest].id,
3494                                      self.icmp_id_in)
3495             except:
3496                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3497                 raise
3498
3499     def test_hairpinning(self):
3500         """ NAT64 hairpinning """
3501
3502         client = self.pg0.remote_hosts[0]
3503         server = self.pg0.remote_hosts[1]
3504         server_tcp_in_port = 22
3505         server_tcp_out_port = 4022
3506         server_udp_in_port = 23
3507         server_udp_out_port = 4023
3508         client_tcp_in_port = 1234
3509         client_udp_in_port = 1235
3510         client_tcp_out_port = 0
3511         client_udp_out_port = 0
3512         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3513         nat_addr_ip6 = ip.src
3514
3515         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3516                                                 self.nat_addr_n)
3517         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3518         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3519
3520         self.vapi.nat64_add_del_static_bib(server.ip6n,
3521                                            self.nat_addr_n,
3522                                            server_tcp_in_port,
3523                                            server_tcp_out_port,
3524                                            IP_PROTOS.tcp)
3525         self.vapi.nat64_add_del_static_bib(server.ip6n,
3526                                            self.nat_addr_n,
3527                                            server_udp_in_port,
3528                                            server_udp_out_port,
3529                                            IP_PROTOS.udp)
3530
3531         # client to server
3532         pkts = []
3533         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3534              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3535              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3536         pkts.append(p)
3537         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3538              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3539              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3540         pkts.append(p)
3541         self.pg0.add_stream(pkts)
3542         self.pg_enable_capture(self.pg_interfaces)
3543         self.pg_start()
3544         capture = self.pg0.get_capture(len(pkts))
3545         for packet in capture:
3546             try:
3547                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3548                 self.assertEqual(packet[IPv6].dst, server.ip6)
3549                 if packet.haslayer(TCP):
3550                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3551                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3552                     self.check_tcp_checksum(packet)
3553                     client_tcp_out_port = packet[TCP].sport
3554                 else:
3555                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3556                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
3557                     self.check_udp_checksum(packet)
3558                     client_udp_out_port = packet[UDP].sport
3559             except:
3560                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3561                 raise
3562
3563         # server to client
3564         pkts = []
3565         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3566              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3567              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3568         pkts.append(p)
3569         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3570              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3571              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3572         pkts.append(p)
3573         self.pg0.add_stream(pkts)
3574         self.pg_enable_capture(self.pg_interfaces)
3575         self.pg_start()
3576         capture = self.pg0.get_capture(len(pkts))
3577         for packet in capture:
3578             try:
3579                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3580                 self.assertEqual(packet[IPv6].dst, client.ip6)
3581                 if packet.haslayer(TCP):
3582                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3583                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3584                     self.check_tcp_checksum(packet)
3585                 else:
3586                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
3587                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
3588                     self.check_udp_checksum(packet)
3589             except:
3590                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3591                 raise
3592
3593         # ICMP error
3594         pkts = []
3595         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3596                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3597                 ICMPv6DestUnreach(code=1) /
3598                 packet[IPv6] for packet in capture]
3599         self.pg0.add_stream(pkts)
3600         self.pg_enable_capture(self.pg_interfaces)
3601         self.pg_start()
3602         capture = self.pg0.get_capture(len(pkts))
3603         for packet in capture:
3604             try:
3605                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3606                 self.assertEqual(packet[IPv6].dst, server.ip6)
3607                 icmp = packet[ICMPv6DestUnreach]
3608                 self.assertEqual(icmp.code, 1)
3609                 inner = icmp[IPerror6]
3610                 self.assertEqual(inner.src, server.ip6)
3611                 self.assertEqual(inner.dst, nat_addr_ip6)
3612                 self.check_icmpv6_checksum(packet)
3613                 if inner.haslayer(TCPerror):
3614                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3615                     self.assertEqual(inner[TCPerror].dport,
3616                                      client_tcp_out_port)
3617                 else:
3618                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3619                     self.assertEqual(inner[UDPerror].dport,
3620                                      client_udp_out_port)
3621             except:
3622                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3623                 raise
3624
3625     def test_prefix(self):
3626         """ NAT64 Network-Specific Prefix """
3627
3628         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3629                                                 self.nat_addr_n)
3630         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3631         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3632         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3633                                                 self.vrf1_nat_addr_n,
3634                                                 vrf_id=self.vrf1_id)
3635         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3636
3637         # Add global prefix
3638         global_pref64 = "2001:db8::"
3639         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3640         global_pref64_len = 32
3641         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3642
3643         prefix = self.vapi.nat64_prefix_dump()
3644         self.assertEqual(len(prefix), 1)
3645         self.assertEqual(prefix[0].prefix, global_pref64_n)
3646         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3647         self.assertEqual(prefix[0].vrf_id, 0)
3648
3649         # Add tenant specific prefix
3650         vrf1_pref64 = "2001:db8:122:300::"
3651         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3652         vrf1_pref64_len = 56
3653         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3654                                        vrf1_pref64_len,
3655                                        vrf_id=self.vrf1_id)
3656         prefix = self.vapi.nat64_prefix_dump()
3657         self.assertEqual(len(prefix), 2)
3658
3659         # Global prefix
3660         pkts = self.create_stream_in_ip6(self.pg0,
3661                                          self.pg1,
3662                                          pref=global_pref64,
3663                                          plen=global_pref64_len)
3664         self.pg0.add_stream(pkts)
3665         self.pg_enable_capture(self.pg_interfaces)
3666         self.pg_start()
3667         capture = self.pg1.get_capture(len(pkts))
3668         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3669                                 dst_ip=self.pg1.remote_ip4)
3670
3671         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3672         self.pg1.add_stream(pkts)
3673         self.pg_enable_capture(self.pg_interfaces)
3674         self.pg_start()
3675         capture = self.pg0.get_capture(len(pkts))
3676         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3677                                   global_pref64,
3678                                   global_pref64_len)
3679         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3680
3681         # Tenant specific prefix
3682         pkts = self.create_stream_in_ip6(self.pg2,
3683                                          self.pg1,
3684                                          pref=vrf1_pref64,
3685                                          plen=vrf1_pref64_len)
3686         self.pg2.add_stream(pkts)
3687         self.pg_enable_capture(self.pg_interfaces)
3688         self.pg_start()
3689         capture = self.pg1.get_capture(len(pkts))
3690         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3691                                 dst_ip=self.pg1.remote_ip4)
3692
3693         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3694         self.pg1.add_stream(pkts)
3695         self.pg_enable_capture(self.pg_interfaces)
3696         self.pg_start()
3697         capture = self.pg2.get_capture(len(pkts))
3698         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3699                                   vrf1_pref64,
3700                                   vrf1_pref64_len)
3701         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3702
3703     def test_unknown_proto(self):
3704         """ NAT64 translate packet with unknown protocol """
3705
3706         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3707                                                 self.nat_addr_n)
3708         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3709         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3710         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
3711
3712         # in2out
3713         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3714              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3715              TCP(sport=self.tcp_port_in, dport=20))
3716         self.pg0.add_stream(p)
3717         self.pg_enable_capture(self.pg_interfaces)
3718         self.pg_start()
3719         p = self.pg1.get_capture(1)
3720
3721         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3722              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
3723              GRE() /
3724              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3725              TCP(sport=1234, dport=1234))
3726         self.pg0.add_stream(p)
3727         self.pg_enable_capture(self.pg_interfaces)
3728         self.pg_start()
3729         p = self.pg1.get_capture(1)
3730         packet = p[0]
3731         try:
3732             self.assertEqual(packet[IP].src, self.nat_addr)
3733             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3734             self.assertTrue(packet.haslayer(GRE))
3735             self.check_ip_checksum(packet)
3736         except:
3737             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3738             raise
3739
3740         # out2in
3741         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3742              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3743              GRE() /
3744              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3745              TCP(sport=1234, dport=1234))
3746         self.pg1.add_stream(p)
3747         self.pg_enable_capture(self.pg_interfaces)
3748         self.pg_start()
3749         p = self.pg0.get_capture(1)
3750         packet = p[0]
3751         try:
3752             self.assertEqual(packet[IPv6].src, remote_ip6)
3753             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3754             self.assertEqual(packet[IPv6].nh, 47)
3755         except:
3756             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3757             raise
3758
3759     def test_hairpinning_unknown_proto(self):
3760         """ NAT64 translate packet with unknown protocol - hairpinning """
3761
3762         client = self.pg0.remote_hosts[0]
3763         server = self.pg0.remote_hosts[1]
3764         server_tcp_in_port = 22
3765         server_tcp_out_port = 4022
3766         client_tcp_in_port = 1234
3767         client_tcp_out_port = 1235
3768         server_nat_ip = "10.0.0.100"
3769         client_nat_ip = "10.0.0.110"
3770         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
3771         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
3772         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
3773         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
3774
3775         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
3776                                                 client_nat_ip_n)
3777         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3778         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3779
3780         self.vapi.nat64_add_del_static_bib(server.ip6n,
3781                                            server_nat_ip_n,
3782                                            server_tcp_in_port,
3783                                            server_tcp_out_port,
3784                                            IP_PROTOS.tcp)
3785
3786         self.vapi.nat64_add_del_static_bib(server.ip6n,
3787                                            server_nat_ip_n,
3788                                            0,
3789                                            0,
3790                                            IP_PROTOS.gre)
3791
3792         self.vapi.nat64_add_del_static_bib(client.ip6n,
3793                                            client_nat_ip_n,
3794                                            client_tcp_in_port,
3795                                            client_tcp_out_port,
3796                                            IP_PROTOS.tcp)
3797
3798         # client to server
3799         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3800              IPv6(src=client.ip6, dst=server_nat_ip6) /
3801              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3802         self.pg0.add_stream(p)
3803         self.pg_enable_capture(self.pg_interfaces)
3804         self.pg_start()
3805         p = self.pg0.get_capture(1)
3806
3807         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3808              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
3809              GRE() /
3810              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3811              TCP(sport=1234, dport=1234))
3812         self.pg0.add_stream(p)
3813         self.pg_enable_capture(self.pg_interfaces)
3814         self.pg_start()
3815         p = self.pg0.get_capture(1)
3816         packet = p[0]
3817         try:
3818             self.assertEqual(packet[IPv6].src, client_nat_ip6)
3819             self.assertEqual(packet[IPv6].dst, server.ip6)
3820             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3821         except:
3822             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3823             raise
3824
3825         # server to client
3826         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3827              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
3828              GRE() /
3829              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3830              TCP(sport=1234, dport=1234))
3831         self.pg0.add_stream(p)
3832         self.pg_enable_capture(self.pg_interfaces)
3833         self.pg_start()
3834         p = self.pg0.get_capture(1)
3835         packet = p[0]
3836         try:
3837             self.assertEqual(packet[IPv6].src, server_nat_ip6)
3838             self.assertEqual(packet[IPv6].dst, client.ip6)
3839             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3840         except:
3841             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3842             raise
3843
3844     def test_one_armed_nat64(self):
3845         """ One armed NAT64 """
3846         external_port = 0
3847         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
3848                                            '64:ff9b::',
3849                                            96)
3850
3851         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3852                                                 self.nat_addr_n)
3853         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
3854         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
3855
3856         # in2out
3857         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
3858              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
3859              TCP(sport=12345, dport=80))
3860         self.pg3.add_stream(p)
3861         self.pg_enable_capture(self.pg_interfaces)
3862         self.pg_start()
3863         capture = self.pg3.get_capture(1)
3864         p = capture[0]
3865         try:
3866             ip = p[IP]
3867             tcp = p[TCP]
3868             self.assertEqual(ip.src, self.nat_addr)
3869             self.assertEqual(ip.dst, self.pg3.remote_ip4)
3870             self.assertNotEqual(tcp.sport, 12345)
3871             external_port = tcp.sport
3872             self.assertEqual(tcp.dport, 80)
3873             self.check_tcp_checksum(p)
3874             self.check_ip_checksum(p)
3875         except:
3876             self.logger.error(ppp("Unexpected or invalid packet:", p))
3877             raise
3878
3879         # out2in
3880         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
3881              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
3882              TCP(sport=80, dport=external_port))
3883         self.pg3.add_stream(p)
3884         self.pg_enable_capture(self.pg_interfaces)
3885         self.pg_start()
3886         capture = self.pg3.get_capture(1)
3887         p = capture[0]
3888         try:
3889             ip = p[IPv6]
3890             tcp = p[TCP]
3891             self.assertEqual(ip.src, remote_host_ip6)
3892             self.assertEqual(ip.dst, self.pg3.remote_ip6)
3893             self.assertEqual(tcp.sport, 80)
3894             self.assertEqual(tcp.dport, 12345)
3895             self.check_tcp_checksum(p)
3896         except:
3897             self.logger.error(ppp("Unexpected or invalid packet:", p))
3898             raise
3899
3900     def nat64_get_ses_num(self):
3901         """
3902         Return number of active NAT64 sessions.
3903         """
3904         st = self.vapi.nat64_st_dump()
3905         return len(st)
3906
3907     def clear_nat64(self):
3908         """
3909         Clear NAT64 configuration.
3910         """
3911         self.vapi.nat64_set_timeouts()
3912
3913         interfaces = self.vapi.nat64_interface_dump()
3914         for intf in interfaces:
3915             if intf.is_inside > 1:
3916                 self.vapi.nat64_add_del_interface(intf.sw_if_index,
3917                                                   0,
3918                                                   is_add=0)
3919             self.vapi.nat64_add_del_interface(intf.sw_if_index,
3920                                               intf.is_inside,
3921                                               is_add=0)
3922
3923         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3924         for bibe in bib:
3925             if bibe.is_static:
3926                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3927                                                    bibe.o_addr,
3928                                                    bibe.i_port,
3929                                                    bibe.o_port,
3930                                                    bibe.proto,
3931                                                    bibe.vrf_id,
3932                                                    is_add=0)
3933
3934         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3935         for bibe in bib:
3936             if bibe.is_static:
3937                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3938                                                    bibe.o_addr,
3939                                                    bibe.i_port,
3940                                                    bibe.o_port,
3941                                                    bibe.proto,
3942                                                    bibe.vrf_id,
3943                                                    is_add=0)
3944
3945         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3946         for bibe in bib:
3947             if bibe.is_static:
3948                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3949                                                    bibe.o_addr,
3950                                                    bibe.i_port,
3951                                                    bibe.o_port,
3952                                                    bibe.proto,
3953                                                    bibe.vrf_id,
3954                                                    is_add=0)
3955
3956         adresses = self.vapi.nat64_pool_addr_dump()
3957         for addr in adresses:
3958             self.vapi.nat64_add_del_pool_addr_range(addr.address,
3959                                                     addr.address,
3960                                                     vrf_id=addr.vrf_id,
3961                                                     is_add=0)
3962
3963         prefixes = self.vapi.nat64_prefix_dump()
3964         for prefix in prefixes:
3965             self.vapi.nat64_add_del_prefix(prefix.prefix,
3966                                            prefix.prefix_len,
3967                                            vrf_id=prefix.vrf_id,
3968                                            is_add=0)
3969
3970     def tearDown(self):
3971         super(TestNAT64, self).tearDown()
3972         if not self.vpp_dead:
3973             self.logger.info(self.vapi.cli("show nat64 pool"))
3974             self.logger.info(self.vapi.cli("show nat64 interfaces"))
3975             self.logger.info(self.vapi.cli("show nat64 prefix"))
3976             self.logger.info(self.vapi.cli("show nat64 bib all"))
3977             self.logger.info(self.vapi.cli("show nat64 session table all"))
3978             self.clear_nat64()
3979
3980 if __name__ == '__main__':
3981     unittest.main(testRunner=VppTestRunner)