NAT: Destination NAT44 with load-balancing (VPP-954)
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
15 from util import ppp
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
18 from util import ip4_range
19
20
21 class MethodHolder(VppTestCase):
22     """ NAT create capture and verify method holder """
23
24     @classmethod
25     def setUpClass(cls):
26         super(MethodHolder, cls).setUpClass()
27
28     def tearDown(self):
29         super(MethodHolder, self).tearDown()
30
31     def check_ip_checksum(self, pkt):
32         """
33         Check IP checksum of the packet
34
35         :param pkt: Packet to check IP checksum
36         """
37         new = pkt.__class__(str(pkt))
38         del new['IP'].chksum
39         new = new.__class__(str(new))
40         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
41
42     def check_tcp_checksum(self, pkt):
43         """
44         Check TCP checksum in IP packet
45
46         :param pkt: Packet to check TCP checksum
47         """
48         new = pkt.__class__(str(pkt))
49         del new['TCP'].chksum
50         new = new.__class__(str(new))
51         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
52
53     def check_udp_checksum(self, pkt):
54         """
55         Check UDP checksum in IP packet
56
57         :param pkt: Packet to check UDP checksum
58         """
59         new = pkt.__class__(str(pkt))
60         del new['UDP'].chksum
61         new = new.__class__(str(new))
62         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
63
64     def check_icmp_errror_embedded(self, pkt):
65         """
66         Check ICMP error embeded packet checksum
67
68         :param pkt: Packet to check ICMP error embeded packet checksum
69         """
70         if pkt.haslayer(IPerror):
71             new = pkt.__class__(str(pkt))
72             del new['IPerror'].chksum
73             new = new.__class__(str(new))
74             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
75
76         if pkt.haslayer(TCPerror):
77             new = pkt.__class__(str(pkt))
78             del new['TCPerror'].chksum
79             new = new.__class__(str(new))
80             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
81
82         if pkt.haslayer(UDPerror):
83             if pkt['UDPerror'].chksum != 0:
84                 new = pkt.__class__(str(pkt))
85                 del new['UDPerror'].chksum
86                 new = new.__class__(str(new))
87                 self.assertEqual(new['UDPerror'].chksum,
88                                  pkt['UDPerror'].chksum)
89
90         if pkt.haslayer(ICMPerror):
91             del new['ICMPerror'].chksum
92             new = new.__class__(str(new))
93             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
94
95     def check_icmp_checksum(self, pkt):
96         """
97         Check ICMP checksum in IPv4 packet
98
99         :param pkt: Packet to check ICMP checksum
100         """
101         new = pkt.__class__(str(pkt))
102         del new['ICMP'].chksum
103         new = new.__class__(str(new))
104         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
105         if pkt.haslayer(IPerror):
106             self.check_icmp_errror_embedded(pkt)
107
108     def check_icmpv6_checksum(self, pkt):
109         """
110         Check ICMPv6 checksum in IPv4 packet
111
112         :param pkt: Packet to check ICMPv6 checksum
113         """
114         new = pkt.__class__(str(pkt))
115         if pkt.haslayer(ICMPv6DestUnreach):
116             del new['ICMPv6DestUnreach'].cksum
117             new = new.__class__(str(new))
118             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
119                              pkt['ICMPv6DestUnreach'].cksum)
120             self.check_icmp_errror_embedded(pkt)
121         if pkt.haslayer(ICMPv6EchoRequest):
122             del new['ICMPv6EchoRequest'].cksum
123             new = new.__class__(str(new))
124             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
125                              pkt['ICMPv6EchoRequest'].cksum)
126         if pkt.haslayer(ICMPv6EchoReply):
127             del new['ICMPv6EchoReply'].cksum
128             new = new.__class__(str(new))
129             self.assertEqual(new['ICMPv6EchoReply'].cksum,
130                              pkt['ICMPv6EchoReply'].cksum)
131
132     def create_stream_in(self, in_if, out_if, ttl=64):
133         """
134         Create packet stream for inside network
135
136         :param in_if: Inside interface
137         :param out_if: Outside interface
138         :param ttl: TTL of generated packets
139         """
140         pkts = []
141         # TCP
142         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
143              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
144              TCP(sport=self.tcp_port_in, dport=20))
145         pkts.append(p)
146
147         # UDP
148         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
149              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
150              UDP(sport=self.udp_port_in, dport=20))
151         pkts.append(p)
152
153         # ICMP
154         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
155              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
156              ICMP(id=self.icmp_id_in, type='echo-request'))
157         pkts.append(p)
158
159         return pkts
160
161     def compose_ip6(self, ip4, pref, plen):
162         """
163         Compose IPv4-embedded IPv6 addresses
164
165         :param ip4: IPv4 address
166         :param pref: IPv6 prefix
167         :param plen: IPv6 prefix length
168         :returns: IPv4-embedded IPv6 addresses
169         """
170         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
171         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
172         if plen == 32:
173             pref_n[4] = ip4_n[0]
174             pref_n[5] = ip4_n[1]
175             pref_n[6] = ip4_n[2]
176             pref_n[7] = ip4_n[3]
177         elif plen == 40:
178             pref_n[5] = ip4_n[0]
179             pref_n[6] = ip4_n[1]
180             pref_n[7] = ip4_n[2]
181             pref_n[9] = ip4_n[3]
182         elif plen == 48:
183             pref_n[6] = ip4_n[0]
184             pref_n[7] = ip4_n[1]
185             pref_n[9] = ip4_n[2]
186             pref_n[10] = ip4_n[3]
187         elif plen == 56:
188             pref_n[7] = ip4_n[0]
189             pref_n[9] = ip4_n[1]
190             pref_n[10] = ip4_n[2]
191             pref_n[11] = ip4_n[3]
192         elif plen == 64:
193             pref_n[9] = ip4_n[0]
194             pref_n[10] = ip4_n[1]
195             pref_n[11] = ip4_n[2]
196             pref_n[12] = ip4_n[3]
197         elif plen == 96:
198             pref_n[12] = ip4_n[0]
199             pref_n[13] = ip4_n[1]
200             pref_n[14] = ip4_n[2]
201             pref_n[15] = ip4_n[3]
202         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
203
204     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
205         """
206         Create IPv6 packet stream for inside network
207
208         :param in_if: Inside interface
209         :param out_if: Outside interface
210         :param ttl: Hop Limit of generated packets
211         :param pref: NAT64 prefix
212         :param plen: NAT64 prefix length
213         """
214         pkts = []
215         if pref is None:
216             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
217         else:
218             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
219
220         # TCP
221         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
222              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
223              TCP(sport=self.tcp_port_in, dport=20))
224         pkts.append(p)
225
226         # UDP
227         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
228              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
229              UDP(sport=self.udp_port_in, dport=20))
230         pkts.append(p)
231
232         # ICMP
233         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
234              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
235              ICMPv6EchoRequest(id=self.icmp_id_in))
236         pkts.append(p)
237
238         return pkts
239
240     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
241         """
242         Create packet stream for outside network
243
244         :param out_if: Outside interface
245         :param dst_ip: Destination IP address (Default use global NAT address)
246         :param ttl: TTL of generated packets
247         """
248         if dst_ip is None:
249             dst_ip = self.nat_addr
250         pkts = []
251         # TCP
252         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
253              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
254              TCP(dport=self.tcp_port_out, sport=20))
255         pkts.append(p)
256
257         # UDP
258         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
259              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
260              UDP(dport=self.udp_port_out, sport=20))
261         pkts.append(p)
262
263         # ICMP
264         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
265              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
266              ICMP(id=self.icmp_id_out, type='echo-reply'))
267         pkts.append(p)
268
269         return pkts
270
271     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
272                            packet_num=3, dst_ip=None):
273         """
274         Verify captured packets on outside network
275
276         :param capture: Captured packets
277         :param nat_ip: Translated IP address (Default use global NAT address)
278         :param same_port: Sorce port number is not translated (Default False)
279         :param packet_num: Expected number of packets (Default 3)
280         :param dst_ip: Destination IP address (Default do not verify)
281         """
282         if nat_ip is None:
283             nat_ip = self.nat_addr
284         self.assertEqual(packet_num, len(capture))
285         for packet in capture:
286             try:
287                 self.check_ip_checksum(packet)
288                 self.assertEqual(packet[IP].src, nat_ip)
289                 if dst_ip is not None:
290                     self.assertEqual(packet[IP].dst, dst_ip)
291                 if packet.haslayer(TCP):
292                     if same_port:
293                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
294                     else:
295                         self.assertNotEqual(
296                             packet[TCP].sport, self.tcp_port_in)
297                     self.tcp_port_out = packet[TCP].sport
298                     self.check_tcp_checksum(packet)
299                 elif packet.haslayer(UDP):
300                     if same_port:
301                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
302                     else:
303                         self.assertNotEqual(
304                             packet[UDP].sport, self.udp_port_in)
305                     self.udp_port_out = packet[UDP].sport
306                 else:
307                     if same_port:
308                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
309                     else:
310                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
311                     self.icmp_id_out = packet[ICMP].id
312                     self.check_icmp_checksum(packet)
313             except:
314                 self.logger.error(ppp("Unexpected or invalid packet "
315                                       "(outside network):", packet))
316                 raise
317
318     def verify_capture_in(self, capture, in_if, packet_num=3):
319         """
320         Verify captured packets on inside network
321
322         :param capture: Captured packets
323         :param in_if: Inside interface
324         :param packet_num: Expected number of packets (Default 3)
325         """
326         self.assertEqual(packet_num, len(capture))
327         for packet in capture:
328             try:
329                 self.check_ip_checksum(packet)
330                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
331                 if packet.haslayer(TCP):
332                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
333                     self.check_tcp_checksum(packet)
334                 elif packet.haslayer(UDP):
335                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
336                 else:
337                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
338                     self.check_icmp_checksum(packet)
339             except:
340                 self.logger.error(ppp("Unexpected or invalid packet "
341                                       "(inside network):", packet))
342                 raise
343
344     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
345         """
346         Verify captured IPv6 packets on inside network
347
348         :param capture: Captured packets
349         :param src_ip: Source IP
350         :param dst_ip: Destination IP address
351         :param packet_num: Expected number of packets (Default 3)
352         """
353         self.assertEqual(packet_num, len(capture))
354         for packet in capture:
355             try:
356                 self.assertEqual(packet[IPv6].src, src_ip)
357                 self.assertEqual(packet[IPv6].dst, dst_ip)
358                 if packet.haslayer(TCP):
359                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
360                     self.check_tcp_checksum(packet)
361                 elif packet.haslayer(UDP):
362                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
363                     self.check_udp_checksum(packet)
364                 else:
365                     self.assertEqual(packet[ICMPv6EchoReply].id,
366                                      self.icmp_id_in)
367                     self.check_icmpv6_checksum(packet)
368             except:
369                 self.logger.error(ppp("Unexpected or invalid packet "
370                                       "(inside network):", packet))
371                 raise
372
373     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
374         """
375         Verify captured packet that don't have to be translated
376
377         :param capture: Captured packets
378         :param ingress_if: Ingress interface
379         :param egress_if: Egress interface
380         """
381         for packet in capture:
382             try:
383                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
384                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
385                 if packet.haslayer(TCP):
386                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
387                 elif packet.haslayer(UDP):
388                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
389                 else:
390                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
391             except:
392                 self.logger.error(ppp("Unexpected or invalid packet "
393                                       "(inside network):", packet))
394                 raise
395
396     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
397                                             packet_num=3, icmp_type=11):
398         """
399         Verify captured packets with ICMP errors on outside network
400
401         :param capture: Captured packets
402         :param src_ip: Translated IP address or IP address of VPP
403                        (Default use global NAT address)
404         :param packet_num: Expected number of packets (Default 3)
405         :param icmp_type: Type of error ICMP packet
406                           we are expecting (Default 11)
407         """
408         if src_ip is None:
409             src_ip = self.nat_addr
410         self.assertEqual(packet_num, len(capture))
411         for packet in capture:
412             try:
413                 self.assertEqual(packet[IP].src, src_ip)
414                 self.assertTrue(packet.haslayer(ICMP))
415                 icmp = packet[ICMP]
416                 self.assertEqual(icmp.type, icmp_type)
417                 self.assertTrue(icmp.haslayer(IPerror))
418                 inner_ip = icmp[IPerror]
419                 if inner_ip.haslayer(TCPerror):
420                     self.assertEqual(inner_ip[TCPerror].dport,
421                                      self.tcp_port_out)
422                 elif inner_ip.haslayer(UDPerror):
423                     self.assertEqual(inner_ip[UDPerror].dport,
424                                      self.udp_port_out)
425                 else:
426                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
427             except:
428                 self.logger.error(ppp("Unexpected or invalid packet "
429                                       "(outside network):", packet))
430                 raise
431
432     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
433                                            icmp_type=11):
434         """
435         Verify captured packets with ICMP errors on inside network
436
437         :param capture: Captured packets
438         :param in_if: Inside interface
439         :param packet_num: Expected number of packets (Default 3)
440         :param icmp_type: Type of error ICMP packet
441                           we are expecting (Default 11)
442         """
443         self.assertEqual(packet_num, len(capture))
444         for packet in capture:
445             try:
446                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447                 self.assertTrue(packet.haslayer(ICMP))
448                 icmp = packet[ICMP]
449                 self.assertEqual(icmp.type, icmp_type)
450                 self.assertTrue(icmp.haslayer(IPerror))
451                 inner_ip = icmp[IPerror]
452                 if inner_ip.haslayer(TCPerror):
453                     self.assertEqual(inner_ip[TCPerror].sport,
454                                      self.tcp_port_in)
455                 elif inner_ip.haslayer(UDPerror):
456                     self.assertEqual(inner_ip[UDPerror].sport,
457                                      self.udp_port_in)
458                 else:
459                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
460             except:
461                 self.logger.error(ppp("Unexpected or invalid packet "
462                                       "(inside network):", packet))
463                 raise
464
465     def verify_ipfix_nat44_ses(self, data):
466         """
467         Verify IPFIX NAT44 session create/delete event
468
469         :param data: Decoded IPFIX data records
470         """
471         nat44_ses_create_num = 0
472         nat44_ses_delete_num = 0
473         self.assertEqual(6, len(data))
474         for record in data:
475             # natEvent
476             self.assertIn(ord(record[230]), [4, 5])
477             if ord(record[230]) == 4:
478                 nat44_ses_create_num += 1
479             else:
480                 nat44_ses_delete_num += 1
481             # sourceIPv4Address
482             self.assertEqual(self.pg0.remote_ip4n, record[8])
483             # postNATSourceIPv4Address
484             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
485                              record[225])
486             # ingressVRFID
487             self.assertEqual(struct.pack("!I", 0), record[234])
488             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
489             if IP_PROTOS.icmp == ord(record[4]):
490                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
491                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
492                                  record[227])
493             elif IP_PROTOS.tcp == ord(record[4]):
494                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
495                                  record[7])
496                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
497                                  record[227])
498             elif IP_PROTOS.udp == ord(record[4]):
499                 self.assertEqual(struct.pack("!H", self.udp_port_in),
500                                  record[7])
501                 self.assertEqual(struct.pack("!H", self.udp_port_out),
502                                  record[227])
503             else:
504                 self.fail("Invalid protocol")
505         self.assertEqual(3, nat44_ses_create_num)
506         self.assertEqual(3, nat44_ses_delete_num)
507
508     def verify_ipfix_addr_exhausted(self, data):
509         """
510         Verify IPFIX NAT addresses event
511
512         :param data: Decoded IPFIX data records
513         """
514         self.assertEqual(1, len(data))
515         record = data[0]
516         # natEvent
517         self.assertEqual(ord(record[230]), 3)
518         # natPoolID
519         self.assertEqual(struct.pack("!I", 0), record[283])
520
521
522 class TestNAT44(MethodHolder):
523     """ NAT44 Test Cases """
524
525     @classmethod
526     def setUpClass(cls):
527         super(TestNAT44, cls).setUpClass()
528
529         try:
530             cls.tcp_port_in = 6303
531             cls.tcp_port_out = 6303
532             cls.udp_port_in = 6304
533             cls.udp_port_out = 6304
534             cls.icmp_id_in = 6305
535             cls.icmp_id_out = 6305
536             cls.nat_addr = '10.0.0.3'
537             cls.ipfix_src_port = 4739
538             cls.ipfix_domain_id = 1
539
540             cls.create_pg_interfaces(range(9))
541             cls.interfaces = list(cls.pg_interfaces[0:4])
542
543             for i in cls.interfaces:
544                 i.admin_up()
545                 i.config_ip4()
546                 i.resolve_arp()
547
548             cls.pg0.generate_remote_hosts(3)
549             cls.pg0.configure_ipv4_neighbors()
550
551             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
552
553             cls.pg4._local_ip4 = "172.16.255.1"
554             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
555             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
556             cls.pg4.set_table_ip4(10)
557             cls.pg5._local_ip4 = "172.17.255.3"
558             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
559             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
560             cls.pg5.set_table_ip4(10)
561             cls.pg6._local_ip4 = "172.16.255.1"
562             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
563             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
564             cls.pg6.set_table_ip4(20)
565             for i in cls.overlapping_interfaces:
566                 i.config_ip4()
567                 i.admin_up()
568                 i.resolve_arp()
569
570             cls.pg7.admin_up()
571             cls.pg8.admin_up()
572
573         except Exception:
574             super(TestNAT44, cls).tearDownClass()
575             raise
576
577     def clear_nat44(self):
578         """
579         Clear NAT44 configuration.
580         """
581         # I found no elegant way to do this
582         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
583                                    dst_address_length=32,
584                                    next_hop_address=self.pg7.remote_ip4n,
585                                    next_hop_sw_if_index=self.pg7.sw_if_index,
586                                    is_add=0)
587         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
588                                    dst_address_length=32,
589                                    next_hop_address=self.pg8.remote_ip4n,
590                                    next_hop_sw_if_index=self.pg8.sw_if_index,
591                                    is_add=0)
592
593         for intf in [self.pg7, self.pg8]:
594             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
595             for n in neighbors:
596                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
597                                               n.mac_address,
598                                               n.ip_address,
599                                               is_add=0)
600
601         if self.pg7.has_ip4_config:
602             self.pg7.unconfig_ip4()
603
604         interfaces = self.vapi.nat44_interface_addr_dump()
605         for intf in interfaces:
606             self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
607
608         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
609                             domain_id=self.ipfix_domain_id)
610         self.ipfix_src_port = 4739
611         self.ipfix_domain_id = 1
612
613         interfaces = self.vapi.nat44_interface_dump()
614         for intf in interfaces:
615             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
616                                                       intf.is_inside,
617                                                       is_add=0)
618
619         interfaces = self.vapi.nat44_interface_output_feature_dump()
620         for intf in interfaces:
621             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
622                                                              intf.is_inside,
623                                                              is_add=0)
624
625         static_mappings = self.vapi.nat44_static_mapping_dump()
626         for sm in static_mappings:
627             self.vapi.nat44_add_del_static_mapping(
628                 sm.local_ip_address,
629                 sm.external_ip_address,
630                 local_port=sm.local_port,
631                 external_port=sm.external_port,
632                 addr_only=sm.addr_only,
633                 vrf_id=sm.vrf_id,
634                 protocol=sm.protocol,
635                 is_add=0)
636
637         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
638         for lb_sm in lb_static_mappings:
639             self.vapi.nat44_add_del_lb_static_mapping(
640                 lb_sm.external_addr,
641                 lb_sm.external_port,
642                 lb_sm.protocol,
643                 lb_sm.vrf_id,
644                 is_add=0)
645
646         adresses = self.vapi.nat44_address_dump()
647         for addr in adresses:
648             self.vapi.nat44_add_del_address_range(addr.ip_address,
649                                                   addr.ip_address,
650                                                   is_add=0)
651
652     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
653                                  local_port=0, external_port=0, vrf_id=0,
654                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
655                                  proto=0):
656         """
657         Add/delete NAT44 static mapping
658
659         :param local_ip: Local IP address
660         :param external_ip: External IP address
661         :param local_port: Local port number (Optional)
662         :param external_port: External port number (Optional)
663         :param vrf_id: VRF ID (Default 0)
664         :param is_add: 1 if add, 0 if delete (Default add)
665         :param external_sw_if_index: External interface instead of IP address
666         :param proto: IP protocol (Mandatory if port specified)
667         """
668         addr_only = 1
669         if local_port and external_port:
670             addr_only = 0
671         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
672         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
673         self.vapi.nat44_add_del_static_mapping(
674             l_ip,
675             e_ip,
676             external_sw_if_index,
677             local_port,
678             external_port,
679             addr_only,
680             vrf_id,
681             proto,
682             is_add)
683
684     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
685         """
686         Add/delete NAT44 address
687
688         :param ip: IP address
689         :param is_add: 1 if add, 0 if delete (Default add)
690         """
691         nat_addr = socket.inet_pton(socket.AF_INET, ip)
692         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
693                                               vrf_id=vrf_id)
694
695     def test_dynamic(self):
696         """ NAT44 dynamic translation test """
697
698         self.nat44_add_address(self.nat_addr)
699         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
700         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
701                                                   is_inside=0)
702
703         # in2out
704         pkts = self.create_stream_in(self.pg0, self.pg1)
705         self.pg0.add_stream(pkts)
706         self.pg_enable_capture(self.pg_interfaces)
707         self.pg_start()
708         capture = self.pg1.get_capture(len(pkts))
709         self.verify_capture_out(capture)
710
711         # out2in
712         pkts = self.create_stream_out(self.pg1)
713         self.pg1.add_stream(pkts)
714         self.pg_enable_capture(self.pg_interfaces)
715         self.pg_start()
716         capture = self.pg0.get_capture(len(pkts))
717         self.verify_capture_in(capture, self.pg0)
718
719     def test_dynamic_icmp_errors_in2out_ttl_1(self):
720         """ NAT44 handling of client packets with TTL=1 """
721
722         self.nat44_add_address(self.nat_addr)
723         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
724         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
725                                                   is_inside=0)
726
727         # Client side - generate traffic
728         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
729         self.pg0.add_stream(pkts)
730         self.pg_enable_capture(self.pg_interfaces)
731         self.pg_start()
732
733         # Client side - verify ICMP type 11 packets
734         capture = self.pg0.get_capture(len(pkts))
735         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
736
737     def test_dynamic_icmp_errors_out2in_ttl_1(self):
738         """ NAT44 handling of server packets with TTL=1 """
739
740         self.nat44_add_address(self.nat_addr)
741         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
742         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
743                                                   is_inside=0)
744
745         # Client side - create sessions
746         pkts = self.create_stream_in(self.pg0, self.pg1)
747         self.pg0.add_stream(pkts)
748         self.pg_enable_capture(self.pg_interfaces)
749         self.pg_start()
750
751         # Server side - generate traffic
752         capture = self.pg1.get_capture(len(pkts))
753         self.verify_capture_out(capture)
754         pkts = self.create_stream_out(self.pg1, ttl=1)
755         self.pg1.add_stream(pkts)
756         self.pg_enable_capture(self.pg_interfaces)
757         self.pg_start()
758
759         # Server side - verify ICMP type 11 packets
760         capture = self.pg1.get_capture(len(pkts))
761         self.verify_capture_out_with_icmp_errors(capture,
762                                                  src_ip=self.pg1.local_ip4)
763
764     def test_dynamic_icmp_errors_in2out_ttl_2(self):
765         """ NAT44 handling of error responses to client packets with TTL=2 """
766
767         self.nat44_add_address(self.nat_addr)
768         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
769         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
770                                                   is_inside=0)
771
772         # Client side - generate traffic
773         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
774         self.pg0.add_stream(pkts)
775         self.pg_enable_capture(self.pg_interfaces)
776         self.pg_start()
777
778         # Server side - simulate ICMP type 11 response
779         capture = self.pg1.get_capture(len(pkts))
780         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
781                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
782                 ICMP(type=11) / packet[IP] for packet in capture]
783         self.pg1.add_stream(pkts)
784         self.pg_enable_capture(self.pg_interfaces)
785         self.pg_start()
786
787         # Client side - verify ICMP type 11 packets
788         capture = self.pg0.get_capture(len(pkts))
789         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
790
791     def test_dynamic_icmp_errors_out2in_ttl_2(self):
792         """ NAT44 handling of error responses to server packets with TTL=2 """
793
794         self.nat44_add_address(self.nat_addr)
795         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
796         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
797                                                   is_inside=0)
798
799         # Client side - create sessions
800         pkts = self.create_stream_in(self.pg0, self.pg1)
801         self.pg0.add_stream(pkts)
802         self.pg_enable_capture(self.pg_interfaces)
803         self.pg_start()
804
805         # Server side - generate traffic
806         capture = self.pg1.get_capture(len(pkts))
807         self.verify_capture_out(capture)
808         pkts = self.create_stream_out(self.pg1, ttl=2)
809         self.pg1.add_stream(pkts)
810         self.pg_enable_capture(self.pg_interfaces)
811         self.pg_start()
812
813         # Client side - simulate ICMP type 11 response
814         capture = self.pg0.get_capture(len(pkts))
815         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
816                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
817                 ICMP(type=11) / packet[IP] for packet in capture]
818         self.pg0.add_stream(pkts)
819         self.pg_enable_capture(self.pg_interfaces)
820         self.pg_start()
821
822         # Server side - verify ICMP type 11 packets
823         capture = self.pg1.get_capture(len(pkts))
824         self.verify_capture_out_with_icmp_errors(capture)
825
826     def test_ping_out_interface_from_outside(self):
827         """ Ping NAT44 out interface from outside network """
828
829         self.nat44_add_address(self.nat_addr)
830         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
831         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
832                                                   is_inside=0)
833
834         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
835              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
836              ICMP(id=self.icmp_id_out, type='echo-request'))
837         pkts = [p]
838         self.pg1.add_stream(pkts)
839         self.pg_enable_capture(self.pg_interfaces)
840         self.pg_start()
841         capture = self.pg1.get_capture(len(pkts))
842         self.assertEqual(1, len(capture))
843         packet = capture[0]
844         try:
845             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
846             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
847             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
848             self.assertEqual(packet[ICMP].type, 0)  # echo reply
849         except:
850             self.logger.error(ppp("Unexpected or invalid packet "
851                                   "(outside network):", packet))
852             raise
853
854     def test_ping_internal_host_from_outside(self):
855         """ Ping internal host from outside network """
856
857         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
858         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
859         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
860                                                   is_inside=0)
861
862         # out2in
863         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
864                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
865                ICMP(id=self.icmp_id_out, type='echo-request'))
866         self.pg1.add_stream(pkt)
867         self.pg_enable_capture(self.pg_interfaces)
868         self.pg_start()
869         capture = self.pg0.get_capture(1)
870         self.verify_capture_in(capture, self.pg0, packet_num=1)
871         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
872
873         # in2out
874         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
875                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
876                ICMP(id=self.icmp_id_in, type='echo-reply'))
877         self.pg0.add_stream(pkt)
878         self.pg_enable_capture(self.pg_interfaces)
879         self.pg_start()
880         capture = self.pg1.get_capture(1)
881         self.verify_capture_out(capture, same_port=True, packet_num=1)
882         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
883
884     def test_static_in(self):
885         """ 1:1 NAT initialized from inside network """
886
887         nat_ip = "10.0.0.10"
888         self.tcp_port_out = 6303
889         self.udp_port_out = 6304
890         self.icmp_id_out = 6305
891
892         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
893         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
894         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
895                                                   is_inside=0)
896
897         # in2out
898         pkts = self.create_stream_in(self.pg0, self.pg1)
899         self.pg0.add_stream(pkts)
900         self.pg_enable_capture(self.pg_interfaces)
901         self.pg_start()
902         capture = self.pg1.get_capture(len(pkts))
903         self.verify_capture_out(capture, nat_ip, True)
904
905         # out2in
906         pkts = self.create_stream_out(self.pg1, nat_ip)
907         self.pg1.add_stream(pkts)
908         self.pg_enable_capture(self.pg_interfaces)
909         self.pg_start()
910         capture = self.pg0.get_capture(len(pkts))
911         self.verify_capture_in(capture, self.pg0)
912
913     def test_static_out(self):
914         """ 1:1 NAT initialized from outside network """
915
916         nat_ip = "10.0.0.20"
917         self.tcp_port_out = 6303
918         self.udp_port_out = 6304
919         self.icmp_id_out = 6305
920
921         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
922         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
923         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
924                                                   is_inside=0)
925
926         # out2in
927         pkts = self.create_stream_out(self.pg1, nat_ip)
928         self.pg1.add_stream(pkts)
929         self.pg_enable_capture(self.pg_interfaces)
930         self.pg_start()
931         capture = self.pg0.get_capture(len(pkts))
932         self.verify_capture_in(capture, self.pg0)
933
934         # in2out
935         pkts = self.create_stream_in(self.pg0, self.pg1)
936         self.pg0.add_stream(pkts)
937         self.pg_enable_capture(self.pg_interfaces)
938         self.pg_start()
939         capture = self.pg1.get_capture(len(pkts))
940         self.verify_capture_out(capture, nat_ip, True)
941
942     def test_static_with_port_in(self):
943         """ 1:1 NAPT initialized from inside network """
944
945         self.tcp_port_out = 3606
946         self.udp_port_out = 3607
947         self.icmp_id_out = 3608
948
949         self.nat44_add_address(self.nat_addr)
950         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
951                                       self.tcp_port_in, self.tcp_port_out,
952                                       proto=IP_PROTOS.tcp)
953         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
954                                       self.udp_port_in, self.udp_port_out,
955                                       proto=IP_PROTOS.udp)
956         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
957                                       self.icmp_id_in, self.icmp_id_out,
958                                       proto=IP_PROTOS.icmp)
959         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
960         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
961                                                   is_inside=0)
962
963         # in2out
964         pkts = self.create_stream_in(self.pg0, self.pg1)
965         self.pg0.add_stream(pkts)
966         self.pg_enable_capture(self.pg_interfaces)
967         self.pg_start()
968         capture = self.pg1.get_capture(len(pkts))
969         self.verify_capture_out(capture)
970
971         # out2in
972         pkts = self.create_stream_out(self.pg1)
973         self.pg1.add_stream(pkts)
974         self.pg_enable_capture(self.pg_interfaces)
975         self.pg_start()
976         capture = self.pg0.get_capture(len(pkts))
977         self.verify_capture_in(capture, self.pg0)
978
979     def test_static_with_port_out(self):
980         """ 1:1 NAPT initialized from outside network """
981
982         self.tcp_port_out = 30606
983         self.udp_port_out = 30607
984         self.icmp_id_out = 30608
985
986         self.nat44_add_address(self.nat_addr)
987         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
988                                       self.tcp_port_in, self.tcp_port_out,
989                                       proto=IP_PROTOS.tcp)
990         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
991                                       self.udp_port_in, self.udp_port_out,
992                                       proto=IP_PROTOS.udp)
993         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
994                                       self.icmp_id_in, self.icmp_id_out,
995                                       proto=IP_PROTOS.icmp)
996         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
997         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
998                                                   is_inside=0)
999
1000         # out2in
1001         pkts = self.create_stream_out(self.pg1)
1002         self.pg1.add_stream(pkts)
1003         self.pg_enable_capture(self.pg_interfaces)
1004         self.pg_start()
1005         capture = self.pg0.get_capture(len(pkts))
1006         self.verify_capture_in(capture, self.pg0)
1007
1008         # in2out
1009         pkts = self.create_stream_in(self.pg0, self.pg1)
1010         self.pg0.add_stream(pkts)
1011         self.pg_enable_capture(self.pg_interfaces)
1012         self.pg_start()
1013         capture = self.pg1.get_capture(len(pkts))
1014         self.verify_capture_out(capture)
1015
1016     def test_static_vrf_aware(self):
1017         """ 1:1 NAT VRF awareness """
1018
1019         nat_ip1 = "10.0.0.30"
1020         nat_ip2 = "10.0.0.40"
1021         self.tcp_port_out = 6303
1022         self.udp_port_out = 6304
1023         self.icmp_id_out = 6305
1024
1025         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1026                                       vrf_id=10)
1027         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1028                                       vrf_id=10)
1029         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1030                                                   is_inside=0)
1031         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1032         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1033
1034         # inside interface VRF match NAT44 static mapping VRF
1035         pkts = self.create_stream_in(self.pg4, self.pg3)
1036         self.pg4.add_stream(pkts)
1037         self.pg_enable_capture(self.pg_interfaces)
1038         self.pg_start()
1039         capture = self.pg3.get_capture(len(pkts))
1040         self.verify_capture_out(capture, nat_ip1, True)
1041
1042         # inside interface VRF don't match NAT44 static mapping VRF (packets
1043         # are dropped)
1044         pkts = self.create_stream_in(self.pg0, self.pg3)
1045         self.pg0.add_stream(pkts)
1046         self.pg_enable_capture(self.pg_interfaces)
1047         self.pg_start()
1048         self.pg3.assert_nothing_captured()
1049
1050     def test_static_lb(self):
1051         """ NAT44 local service load balancing """
1052         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1053         external_port = 80
1054         local_port = 8080
1055         server1 = self.pg0.remote_hosts[0]
1056         server2 = self.pg0.remote_hosts[1]
1057
1058         locals = [{'addr': server1.ip4n,
1059                    'port': local_port,
1060                    'probability': 70},
1061                   {'addr': server2.ip4n,
1062                    'port': local_port,
1063                    'probability': 30}]
1064
1065         self.nat44_add_address(self.nat_addr)
1066         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1067                                                   external_port,
1068                                                   IP_PROTOS.tcp,
1069                                                   local_num=len(locals),
1070                                                   locals=locals)
1071         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1072         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1073                                                   is_inside=0)
1074
1075         # from client to service
1076         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1077              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1078              TCP(sport=12345, dport=external_port))
1079         self.pg1.add_stream(p)
1080         self.pg_enable_capture(self.pg_interfaces)
1081         self.pg_start()
1082         capture = self.pg0.get_capture(1)
1083         p = capture[0]
1084         server = None
1085         try:
1086             ip = p[IP]
1087             tcp = p[TCP]
1088             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1089             if ip.dst == server1.ip4:
1090                 server = server1
1091             else:
1092                 server = server2
1093             self.assertEqual(tcp.dport, local_port)
1094             self.check_tcp_checksum(p)
1095             self.check_ip_checksum(p)
1096         except:
1097             self.logger.error(ppp("Unexpected or invalid packet:", p))
1098             raise
1099
1100         # from service back to client
1101         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1102              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1103              TCP(sport=local_port, dport=12345))
1104         self.pg0.add_stream(p)
1105         self.pg_enable_capture(self.pg_interfaces)
1106         self.pg_start()
1107         capture = self.pg1.get_capture(1)
1108         p = capture[0]
1109         try:
1110             ip = p[IP]
1111             tcp = p[TCP]
1112             self.assertEqual(ip.src, self.nat_addr)
1113             self.assertEqual(tcp.sport, external_port)
1114             self.check_tcp_checksum(p)
1115             self.check_ip_checksum(p)
1116         except:
1117             self.logger.error(ppp("Unexpected or invalid packet:", p))
1118             raise
1119
1120         # multiple clients
1121         server1_n = 0
1122         server2_n = 0
1123         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1124         pkts = []
1125         for client in clients:
1126             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1127                  IP(src=client, dst=self.nat_addr) /
1128                  TCP(sport=12345, dport=external_port))
1129             pkts.append(p)
1130         self.pg1.add_stream(pkts)
1131         self.pg_enable_capture(self.pg_interfaces)
1132         self.pg_start()
1133         capture = self.pg0.get_capture(len(pkts))
1134         for p in capture:
1135             if p[IP].dst == server1.ip4:
1136                 server1_n += 1
1137             else:
1138                 server2_n += 1
1139         self.assertTrue(server1_n > server2_n)
1140
1141     def test_multiple_inside_interfaces(self):
1142         """ NAT44 multiple non-overlapping address space inside interfaces """
1143
1144         self.nat44_add_address(self.nat_addr)
1145         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1146         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1147         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1148                                                   is_inside=0)
1149
1150         # between two NAT44 inside interfaces (no translation)
1151         pkts = self.create_stream_in(self.pg0, self.pg1)
1152         self.pg0.add_stream(pkts)
1153         self.pg_enable_capture(self.pg_interfaces)
1154         self.pg_start()
1155         capture = self.pg1.get_capture(len(pkts))
1156         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1157
1158         # from NAT44 inside to interface without NAT44 feature (no translation)
1159         pkts = self.create_stream_in(self.pg0, self.pg2)
1160         self.pg0.add_stream(pkts)
1161         self.pg_enable_capture(self.pg_interfaces)
1162         self.pg_start()
1163         capture = self.pg2.get_capture(len(pkts))
1164         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1165
1166         # in2out 1st interface
1167         pkts = self.create_stream_in(self.pg0, self.pg3)
1168         self.pg0.add_stream(pkts)
1169         self.pg_enable_capture(self.pg_interfaces)
1170         self.pg_start()
1171         capture = self.pg3.get_capture(len(pkts))
1172         self.verify_capture_out(capture)
1173
1174         # out2in 1st interface
1175         pkts = self.create_stream_out(self.pg3)
1176         self.pg3.add_stream(pkts)
1177         self.pg_enable_capture(self.pg_interfaces)
1178         self.pg_start()
1179         capture = self.pg0.get_capture(len(pkts))
1180         self.verify_capture_in(capture, self.pg0)
1181
1182         # in2out 2nd interface
1183         pkts = self.create_stream_in(self.pg1, self.pg3)
1184         self.pg1.add_stream(pkts)
1185         self.pg_enable_capture(self.pg_interfaces)
1186         self.pg_start()
1187         capture = self.pg3.get_capture(len(pkts))
1188         self.verify_capture_out(capture)
1189
1190         # out2in 2nd interface
1191         pkts = self.create_stream_out(self.pg3)
1192         self.pg3.add_stream(pkts)
1193         self.pg_enable_capture(self.pg_interfaces)
1194         self.pg_start()
1195         capture = self.pg1.get_capture(len(pkts))
1196         self.verify_capture_in(capture, self.pg1)
1197
1198     def test_inside_overlapping_interfaces(self):
1199         """ NAT44 multiple inside interfaces with overlapping address space """
1200
1201         static_nat_ip = "10.0.0.10"
1202         self.nat44_add_address(self.nat_addr)
1203         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1204                                                   is_inside=0)
1205         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1206         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1207         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1208         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1209                                       vrf_id=20)
1210
1211         # between NAT44 inside interfaces with same VRF (no translation)
1212         pkts = self.create_stream_in(self.pg4, self.pg5)
1213         self.pg4.add_stream(pkts)
1214         self.pg_enable_capture(self.pg_interfaces)
1215         self.pg_start()
1216         capture = self.pg5.get_capture(len(pkts))
1217         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1218
1219         # between NAT44 inside interfaces with different VRF (hairpinning)
1220         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1221              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1222              TCP(sport=1234, dport=5678))
1223         self.pg4.add_stream(p)
1224         self.pg_enable_capture(self.pg_interfaces)
1225         self.pg_start()
1226         capture = self.pg6.get_capture(1)
1227         p = capture[0]
1228         try:
1229             ip = p[IP]
1230             tcp = p[TCP]
1231             self.assertEqual(ip.src, self.nat_addr)
1232             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1233             self.assertNotEqual(tcp.sport, 1234)
1234             self.assertEqual(tcp.dport, 5678)
1235         except:
1236             self.logger.error(ppp("Unexpected or invalid packet:", p))
1237             raise
1238
1239         # in2out 1st interface
1240         pkts = self.create_stream_in(self.pg4, self.pg3)
1241         self.pg4.add_stream(pkts)
1242         self.pg_enable_capture(self.pg_interfaces)
1243         self.pg_start()
1244         capture = self.pg3.get_capture(len(pkts))
1245         self.verify_capture_out(capture)
1246
1247         # out2in 1st interface
1248         pkts = self.create_stream_out(self.pg3)
1249         self.pg3.add_stream(pkts)
1250         self.pg_enable_capture(self.pg_interfaces)
1251         self.pg_start()
1252         capture = self.pg4.get_capture(len(pkts))
1253         self.verify_capture_in(capture, self.pg4)
1254
1255         # in2out 2nd interface
1256         pkts = self.create_stream_in(self.pg5, self.pg3)
1257         self.pg5.add_stream(pkts)
1258         self.pg_enable_capture(self.pg_interfaces)
1259         self.pg_start()
1260         capture = self.pg3.get_capture(len(pkts))
1261         self.verify_capture_out(capture)
1262
1263         # out2in 2nd interface
1264         pkts = self.create_stream_out(self.pg3)
1265         self.pg3.add_stream(pkts)
1266         self.pg_enable_capture(self.pg_interfaces)
1267         self.pg_start()
1268         capture = self.pg5.get_capture(len(pkts))
1269         self.verify_capture_in(capture, self.pg5)
1270
1271         # pg5 session dump
1272         addresses = self.vapi.nat44_address_dump()
1273         self.assertEqual(len(addresses), 1)
1274         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1275         self.assertEqual(len(sessions), 3)
1276         for session in sessions:
1277             self.assertFalse(session.is_static)
1278             self.assertEqual(session.inside_ip_address[0:4],
1279                              self.pg5.remote_ip4n)
1280             self.assertEqual(session.outside_ip_address,
1281                              addresses[0].ip_address)
1282         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1283         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1284         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1285         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1286         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1287         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1288         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1289         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1290         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1291
1292         # in2out 3rd interface
1293         pkts = self.create_stream_in(self.pg6, self.pg3)
1294         self.pg6.add_stream(pkts)
1295         self.pg_enable_capture(self.pg_interfaces)
1296         self.pg_start()
1297         capture = self.pg3.get_capture(len(pkts))
1298         self.verify_capture_out(capture, static_nat_ip, True)
1299
1300         # out2in 3rd interface
1301         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1302         self.pg3.add_stream(pkts)
1303         self.pg_enable_capture(self.pg_interfaces)
1304         self.pg_start()
1305         capture = self.pg6.get_capture(len(pkts))
1306         self.verify_capture_in(capture, self.pg6)
1307
1308         # general user and session dump verifications
1309         users = self.vapi.nat44_user_dump()
1310         self.assertTrue(len(users) >= 3)
1311         addresses = self.vapi.nat44_address_dump()
1312         self.assertEqual(len(addresses), 1)
1313         for user in users:
1314             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1315                                                          user.vrf_id)
1316             for session in sessions:
1317                 self.assertEqual(user.ip_address, session.inside_ip_address)
1318                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1319                 self.assertTrue(session.protocol in
1320                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1321                                  IP_PROTOS.icmp])
1322
1323         # pg4 session dump
1324         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1325         self.assertTrue(len(sessions) >= 4)
1326         for session in sessions:
1327             self.assertFalse(session.is_static)
1328             self.assertEqual(session.inside_ip_address[0:4],
1329                              self.pg4.remote_ip4n)
1330             self.assertEqual(session.outside_ip_address,
1331                              addresses[0].ip_address)
1332
1333         # pg6 session dump
1334         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1335         self.assertTrue(len(sessions) >= 3)
1336         for session in sessions:
1337             self.assertTrue(session.is_static)
1338             self.assertEqual(session.inside_ip_address[0:4],
1339                              self.pg6.remote_ip4n)
1340             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1341                              map(int, static_nat_ip.split('.')))
1342             self.assertTrue(session.inside_port in
1343                             [self.tcp_port_in, self.udp_port_in,
1344                              self.icmp_id_in])
1345
1346     def test_hairpinning(self):
1347         """ NAT44 hairpinning - 1:1 NAPT """
1348
1349         host = self.pg0.remote_hosts[0]
1350         server = self.pg0.remote_hosts[1]
1351         host_in_port = 1234
1352         host_out_port = 0
1353         server_in_port = 5678
1354         server_out_port = 8765
1355
1356         self.nat44_add_address(self.nat_addr)
1357         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1358         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1359                                                   is_inside=0)
1360         # add static mapping for server
1361         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1362                                       server_in_port, server_out_port,
1363                                       proto=IP_PROTOS.tcp)
1364
1365         # send packet from host to server
1366         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1367              IP(src=host.ip4, dst=self.nat_addr) /
1368              TCP(sport=host_in_port, dport=server_out_port))
1369         self.pg0.add_stream(p)
1370         self.pg_enable_capture(self.pg_interfaces)
1371         self.pg_start()
1372         capture = self.pg0.get_capture(1)
1373         p = capture[0]
1374         try:
1375             ip = p[IP]
1376             tcp = p[TCP]
1377             self.assertEqual(ip.src, self.nat_addr)
1378             self.assertEqual(ip.dst, server.ip4)
1379             self.assertNotEqual(tcp.sport, host_in_port)
1380             self.assertEqual(tcp.dport, server_in_port)
1381             self.check_tcp_checksum(p)
1382             host_out_port = tcp.sport
1383         except:
1384             self.logger.error(ppp("Unexpected or invalid packet:", p))
1385             raise
1386
1387         # send reply from server to host
1388         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1389              IP(src=server.ip4, dst=self.nat_addr) /
1390              TCP(sport=server_in_port, dport=host_out_port))
1391         self.pg0.add_stream(p)
1392         self.pg_enable_capture(self.pg_interfaces)
1393         self.pg_start()
1394         capture = self.pg0.get_capture(1)
1395         p = capture[0]
1396         try:
1397             ip = p[IP]
1398             tcp = p[TCP]
1399             self.assertEqual(ip.src, self.nat_addr)
1400             self.assertEqual(ip.dst, host.ip4)
1401             self.assertEqual(tcp.sport, server_out_port)
1402             self.assertEqual(tcp.dport, host_in_port)
1403             self.check_tcp_checksum(p)
1404         except:
1405             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1406             raise
1407
1408     def test_hairpinning2(self):
1409         """ NAT44 hairpinning - 1:1 NAT"""
1410
1411         server1_nat_ip = "10.0.0.10"
1412         server2_nat_ip = "10.0.0.11"
1413         host = self.pg0.remote_hosts[0]
1414         server1 = self.pg0.remote_hosts[1]
1415         server2 = self.pg0.remote_hosts[2]
1416         server_tcp_port = 22
1417         server_udp_port = 20
1418
1419         self.nat44_add_address(self.nat_addr)
1420         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1421         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1422                                                   is_inside=0)
1423
1424         # add static mapping for servers
1425         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1426         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1427
1428         # host to server1
1429         pkts = []
1430         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1431              IP(src=host.ip4, dst=server1_nat_ip) /
1432              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1433         pkts.append(p)
1434         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1435              IP(src=host.ip4, dst=server1_nat_ip) /
1436              UDP(sport=self.udp_port_in, dport=server_udp_port))
1437         pkts.append(p)
1438         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1439              IP(src=host.ip4, dst=server1_nat_ip) /
1440              ICMP(id=self.icmp_id_in, type='echo-request'))
1441         pkts.append(p)
1442         self.pg0.add_stream(pkts)
1443         self.pg_enable_capture(self.pg_interfaces)
1444         self.pg_start()
1445         capture = self.pg0.get_capture(len(pkts))
1446         for packet in capture:
1447             try:
1448                 self.assertEqual(packet[IP].src, self.nat_addr)
1449                 self.assertEqual(packet[IP].dst, server1.ip4)
1450                 if packet.haslayer(TCP):
1451                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1452                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1453                     self.tcp_port_out = packet[TCP].sport
1454                     self.check_tcp_checksum(packet)
1455                 elif packet.haslayer(UDP):
1456                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1457                     self.assertEqual(packet[UDP].dport, server_udp_port)
1458                     self.udp_port_out = packet[UDP].sport
1459                 else:
1460                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1461                     self.icmp_id_out = packet[ICMP].id
1462             except:
1463                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1464                 raise
1465
1466         # server1 to host
1467         pkts = []
1468         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1469              IP(src=server1.ip4, dst=self.nat_addr) /
1470              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1471         pkts.append(p)
1472         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1473              IP(src=server1.ip4, dst=self.nat_addr) /
1474              UDP(sport=server_udp_port, dport=self.udp_port_out))
1475         pkts.append(p)
1476         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1477              IP(src=server1.ip4, dst=self.nat_addr) /
1478              ICMP(id=self.icmp_id_out, type='echo-reply'))
1479         pkts.append(p)
1480         self.pg0.add_stream(pkts)
1481         self.pg_enable_capture(self.pg_interfaces)
1482         self.pg_start()
1483         capture = self.pg0.get_capture(len(pkts))
1484         for packet in capture:
1485             try:
1486                 self.assertEqual(packet[IP].src, server1_nat_ip)
1487                 self.assertEqual(packet[IP].dst, host.ip4)
1488                 if packet.haslayer(TCP):
1489                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1490                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1491                     self.check_tcp_checksum(packet)
1492                 elif packet.haslayer(UDP):
1493                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1494                     self.assertEqual(packet[UDP].sport, server_udp_port)
1495                 else:
1496                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1497             except:
1498                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1499                 raise
1500
1501         # server2 to server1
1502         pkts = []
1503         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1504              IP(src=server2.ip4, dst=server1_nat_ip) /
1505              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1506         pkts.append(p)
1507         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1508              IP(src=server2.ip4, dst=server1_nat_ip) /
1509              UDP(sport=self.udp_port_in, dport=server_udp_port))
1510         pkts.append(p)
1511         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1512              IP(src=server2.ip4, dst=server1_nat_ip) /
1513              ICMP(id=self.icmp_id_in, type='echo-request'))
1514         pkts.append(p)
1515         self.pg0.add_stream(pkts)
1516         self.pg_enable_capture(self.pg_interfaces)
1517         self.pg_start()
1518         capture = self.pg0.get_capture(len(pkts))
1519         for packet in capture:
1520             try:
1521                 self.assertEqual(packet[IP].src, server2_nat_ip)
1522                 self.assertEqual(packet[IP].dst, server1.ip4)
1523                 if packet.haslayer(TCP):
1524                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1525                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1526                     self.tcp_port_out = packet[TCP].sport
1527                     self.check_tcp_checksum(packet)
1528                 elif packet.haslayer(UDP):
1529                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1530                     self.assertEqual(packet[UDP].dport, server_udp_port)
1531                     self.udp_port_out = packet[UDP].sport
1532                 else:
1533                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1534                     self.icmp_id_out = packet[ICMP].id
1535             except:
1536                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1537                 raise
1538
1539         # server1 to server2
1540         pkts = []
1541         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1542              IP(src=server1.ip4, dst=server2_nat_ip) /
1543              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1544         pkts.append(p)
1545         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1546              IP(src=server1.ip4, dst=server2_nat_ip) /
1547              UDP(sport=server_udp_port, dport=self.udp_port_out))
1548         pkts.append(p)
1549         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1550              IP(src=server1.ip4, dst=server2_nat_ip) /
1551              ICMP(id=self.icmp_id_out, type='echo-reply'))
1552         pkts.append(p)
1553         self.pg0.add_stream(pkts)
1554         self.pg_enable_capture(self.pg_interfaces)
1555         self.pg_start()
1556         capture = self.pg0.get_capture(len(pkts))
1557         for packet in capture:
1558             try:
1559                 self.assertEqual(packet[IP].src, server1_nat_ip)
1560                 self.assertEqual(packet[IP].dst, server2.ip4)
1561                 if packet.haslayer(TCP):
1562                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1563                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1564                     self.check_tcp_checksum(packet)
1565                 elif packet.haslayer(UDP):
1566                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1567                     self.assertEqual(packet[UDP].sport, server_udp_port)
1568                 else:
1569                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1570             except:
1571                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1572                 raise
1573
1574     def test_max_translations_per_user(self):
1575         """ MAX translations per user - recycle the least recently used """
1576
1577         self.nat44_add_address(self.nat_addr)
1578         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1579         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1580                                                   is_inside=0)
1581
1582         # get maximum number of translations per user
1583         nat44_config = self.vapi.nat_show_config()
1584
1585         # send more than maximum number of translations per user packets
1586         pkts_num = nat44_config.max_translations_per_user + 5
1587         pkts = []
1588         for port in range(0, pkts_num):
1589             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1590                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1591                  TCP(sport=1025 + port))
1592             pkts.append(p)
1593         self.pg0.add_stream(pkts)
1594         self.pg_enable_capture(self.pg_interfaces)
1595         self.pg_start()
1596
1597         # verify number of translated packet
1598         self.pg1.get_capture(pkts_num)
1599
1600     def test_interface_addr(self):
1601         """ Acquire NAT44 addresses from interface """
1602         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1603
1604         # no address in NAT pool
1605         adresses = self.vapi.nat44_address_dump()
1606         self.assertEqual(0, len(adresses))
1607
1608         # configure interface address and check NAT address pool
1609         self.pg7.config_ip4()
1610         adresses = self.vapi.nat44_address_dump()
1611         self.assertEqual(1, len(adresses))
1612         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1613
1614         # remove interface address and check NAT address pool
1615         self.pg7.unconfig_ip4()
1616         adresses = self.vapi.nat44_address_dump()
1617         self.assertEqual(0, len(adresses))
1618
1619     def test_interface_addr_static_mapping(self):
1620         """ Static mapping with addresses from interface """
1621         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1622         self.nat44_add_static_mapping(
1623             '1.2.3.4',
1624             external_sw_if_index=self.pg7.sw_if_index)
1625
1626         # static mappings with external interface
1627         static_mappings = self.vapi.nat44_static_mapping_dump()
1628         self.assertEqual(1, len(static_mappings))
1629         self.assertEqual(self.pg7.sw_if_index,
1630                          static_mappings[0].external_sw_if_index)
1631
1632         # configure interface address and check static mappings
1633         self.pg7.config_ip4()
1634         static_mappings = self.vapi.nat44_static_mapping_dump()
1635         self.assertEqual(1, len(static_mappings))
1636         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1637                          self.pg7.local_ip4n)
1638         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1639
1640         # remove interface address and check static mappings
1641         self.pg7.unconfig_ip4()
1642         static_mappings = self.vapi.nat44_static_mapping_dump()
1643         self.assertEqual(0, len(static_mappings))
1644
1645     def test_ipfix_nat44_sess(self):
1646         """ IPFIX logging NAT44 session created/delted """
1647         self.ipfix_domain_id = 10
1648         self.ipfix_src_port = 20202
1649         colector_port = 30303
1650         bind_layers(UDP, IPFIX, dport=30303)
1651         self.nat44_add_address(self.nat_addr)
1652         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1653         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1654                                                   is_inside=0)
1655         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1656                                      src_address=self.pg3.local_ip4n,
1657                                      path_mtu=512,
1658                                      template_interval=10,
1659                                      collector_port=colector_port)
1660         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1661                             src_port=self.ipfix_src_port)
1662
1663         pkts = self.create_stream_in(self.pg0, self.pg1)
1664         self.pg0.add_stream(pkts)
1665         self.pg_enable_capture(self.pg_interfaces)
1666         self.pg_start()
1667         capture = self.pg1.get_capture(len(pkts))
1668         self.verify_capture_out(capture)
1669         self.nat44_add_address(self.nat_addr, is_add=0)
1670         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1671         capture = self.pg3.get_capture(3)
1672         ipfix = IPFIXDecoder()
1673         # first load template
1674         for p in capture:
1675             self.assertTrue(p.haslayer(IPFIX))
1676             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1677             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1678             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1679             self.assertEqual(p[UDP].dport, colector_port)
1680             self.assertEqual(p[IPFIX].observationDomainID,
1681                              self.ipfix_domain_id)
1682             if p.haslayer(Template):
1683                 ipfix.add_template(p.getlayer(Template))
1684         # verify events in data set
1685         for p in capture:
1686             if p.haslayer(Data):
1687                 data = ipfix.decode_data_set(p.getlayer(Set))
1688                 self.verify_ipfix_nat44_ses(data)
1689
1690     def test_ipfix_addr_exhausted(self):
1691         """ IPFIX logging NAT addresses exhausted """
1692         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1693         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1694                                                   is_inside=0)
1695         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1696                                      src_address=self.pg3.local_ip4n,
1697                                      path_mtu=512,
1698                                      template_interval=10)
1699         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1700                             src_port=self.ipfix_src_port)
1701
1702         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1703              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1704              TCP(sport=3025))
1705         self.pg0.add_stream(p)
1706         self.pg_enable_capture(self.pg_interfaces)
1707         self.pg_start()
1708         capture = self.pg1.get_capture(0)
1709         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1710         capture = self.pg3.get_capture(3)
1711         ipfix = IPFIXDecoder()
1712         # first load template
1713         for p in capture:
1714             self.assertTrue(p.haslayer(IPFIX))
1715             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1716             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1717             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1718             self.assertEqual(p[UDP].dport, 4739)
1719             self.assertEqual(p[IPFIX].observationDomainID,
1720                              self.ipfix_domain_id)
1721             if p.haslayer(Template):
1722                 ipfix.add_template(p.getlayer(Template))
1723         # verify events in data set
1724         for p in capture:
1725             if p.haslayer(Data):
1726                 data = ipfix.decode_data_set(p.getlayer(Set))
1727                 self.verify_ipfix_addr_exhausted(data)
1728
1729     def test_pool_addr_fib(self):
1730         """ NAT44 add pool addresses to FIB """
1731         static_addr = '10.0.0.10'
1732         self.nat44_add_address(self.nat_addr)
1733         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1734         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1735                                                   is_inside=0)
1736         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1737
1738         # NAT44 address
1739         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1740              ARP(op=ARP.who_has, pdst=self.nat_addr,
1741                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1742         self.pg1.add_stream(p)
1743         self.pg_enable_capture(self.pg_interfaces)
1744         self.pg_start()
1745         capture = self.pg1.get_capture(1)
1746         self.assertTrue(capture[0].haslayer(ARP))
1747         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1748
1749         # 1:1 NAT address
1750         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1751              ARP(op=ARP.who_has, pdst=static_addr,
1752                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1753         self.pg1.add_stream(p)
1754         self.pg_enable_capture(self.pg_interfaces)
1755         self.pg_start()
1756         capture = self.pg1.get_capture(1)
1757         self.assertTrue(capture[0].haslayer(ARP))
1758         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1759
1760         # send ARP to non-NAT44 interface
1761         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1762              ARP(op=ARP.who_has, pdst=self.nat_addr,
1763                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1764         self.pg2.add_stream(p)
1765         self.pg_enable_capture(self.pg_interfaces)
1766         self.pg_start()
1767         capture = self.pg1.get_capture(0)
1768
1769         # remove addresses and verify
1770         self.nat44_add_address(self.nat_addr, is_add=0)
1771         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1772                                       is_add=0)
1773
1774         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1775              ARP(op=ARP.who_has, pdst=self.nat_addr,
1776                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1777         self.pg1.add_stream(p)
1778         self.pg_enable_capture(self.pg_interfaces)
1779         self.pg_start()
1780         capture = self.pg1.get_capture(0)
1781
1782         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1783              ARP(op=ARP.who_has, pdst=static_addr,
1784                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1785         self.pg1.add_stream(p)
1786         self.pg_enable_capture(self.pg_interfaces)
1787         self.pg_start()
1788         capture = self.pg1.get_capture(0)
1789
1790     def test_vrf_mode(self):
1791         """ NAT44 tenant VRF aware address pool mode """
1792
1793         vrf_id1 = 1
1794         vrf_id2 = 2
1795         nat_ip1 = "10.0.0.10"
1796         nat_ip2 = "10.0.0.11"
1797
1798         self.pg0.unconfig_ip4()
1799         self.pg1.unconfig_ip4()
1800         self.pg0.set_table_ip4(vrf_id1)
1801         self.pg1.set_table_ip4(vrf_id2)
1802         self.pg0.config_ip4()
1803         self.pg1.config_ip4()
1804
1805         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
1806         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
1807         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1808         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1809         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1810                                                   is_inside=0)
1811
1812         # first VRF
1813         pkts = self.create_stream_in(self.pg0, self.pg2)
1814         self.pg0.add_stream(pkts)
1815         self.pg_enable_capture(self.pg_interfaces)
1816         self.pg_start()
1817         capture = self.pg2.get_capture(len(pkts))
1818         self.verify_capture_out(capture, nat_ip1)
1819
1820         # second VRF
1821         pkts = self.create_stream_in(self.pg1, self.pg2)
1822         self.pg1.add_stream(pkts)
1823         self.pg_enable_capture(self.pg_interfaces)
1824         self.pg_start()
1825         capture = self.pg2.get_capture(len(pkts))
1826         self.verify_capture_out(capture, nat_ip2)
1827
1828     def test_vrf_feature_independent(self):
1829         """ NAT44 tenant VRF independent address pool mode """
1830
1831         nat_ip1 = "10.0.0.10"
1832         nat_ip2 = "10.0.0.11"
1833
1834         self.nat44_add_address(nat_ip1)
1835         self.nat44_add_address(nat_ip2)
1836         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1837         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1838         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1839                                                   is_inside=0)
1840
1841         # first VRF
1842         pkts = self.create_stream_in(self.pg0, self.pg2)
1843         self.pg0.add_stream(pkts)
1844         self.pg_enable_capture(self.pg_interfaces)
1845         self.pg_start()
1846         capture = self.pg2.get_capture(len(pkts))
1847         self.verify_capture_out(capture, nat_ip1)
1848
1849         # second VRF
1850         pkts = self.create_stream_in(self.pg1, self.pg2)
1851         self.pg1.add_stream(pkts)
1852         self.pg_enable_capture(self.pg_interfaces)
1853         self.pg_start()
1854         capture = self.pg2.get_capture(len(pkts))
1855         self.verify_capture_out(capture, nat_ip1)
1856
1857     def test_dynamic_ipless_interfaces(self):
1858         """ NAT44 interfaces without configured IP address """
1859
1860         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1861                                       self.pg7.remote_mac,
1862                                       self.pg7.remote_ip4n,
1863                                       is_static=1)
1864         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1865                                       self.pg8.remote_mac,
1866                                       self.pg8.remote_ip4n,
1867                                       is_static=1)
1868
1869         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1870                                    dst_address_length=32,
1871                                    next_hop_address=self.pg7.remote_ip4n,
1872                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1873         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1874                                    dst_address_length=32,
1875                                    next_hop_address=self.pg8.remote_ip4n,
1876                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1877
1878         self.nat44_add_address(self.nat_addr)
1879         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1880         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1881                                                   is_inside=0)
1882
1883         # in2out
1884         pkts = self.create_stream_in(self.pg7, self.pg8)
1885         self.pg7.add_stream(pkts)
1886         self.pg_enable_capture(self.pg_interfaces)
1887         self.pg_start()
1888         capture = self.pg8.get_capture(len(pkts))
1889         self.verify_capture_out(capture)
1890
1891         # out2in
1892         pkts = self.create_stream_out(self.pg8, self.nat_addr)
1893         self.pg8.add_stream(pkts)
1894         self.pg_enable_capture(self.pg_interfaces)
1895         self.pg_start()
1896         capture = self.pg7.get_capture(len(pkts))
1897         self.verify_capture_in(capture, self.pg7)
1898
1899     def test_static_ipless_interfaces(self):
1900         """ NAT44 interfaces without configured IP address - 1:1 NAT """
1901
1902         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1903                                       self.pg7.remote_mac,
1904                                       self.pg7.remote_ip4n,
1905                                       is_static=1)
1906         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1907                                       self.pg8.remote_mac,
1908                                       self.pg8.remote_ip4n,
1909                                       is_static=1)
1910
1911         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1912                                    dst_address_length=32,
1913                                    next_hop_address=self.pg7.remote_ip4n,
1914                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1915         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1916                                    dst_address_length=32,
1917                                    next_hop_address=self.pg8.remote_ip4n,
1918                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1919
1920         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
1921         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1922         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1923                                                   is_inside=0)
1924
1925         # out2in
1926         pkts = self.create_stream_out(self.pg8)
1927         self.pg8.add_stream(pkts)
1928         self.pg_enable_capture(self.pg_interfaces)
1929         self.pg_start()
1930         capture = self.pg7.get_capture(len(pkts))
1931         self.verify_capture_in(capture, self.pg7)
1932
1933         # in2out
1934         pkts = self.create_stream_in(self.pg7, self.pg8)
1935         self.pg7.add_stream(pkts)
1936         self.pg_enable_capture(self.pg_interfaces)
1937         self.pg_start()
1938         capture = self.pg8.get_capture(len(pkts))
1939         self.verify_capture_out(capture, self.nat_addr, True)
1940
1941     def test_static_with_port_ipless_interfaces(self):
1942         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
1943
1944         self.tcp_port_out = 30606
1945         self.udp_port_out = 30607
1946         self.icmp_id_out = 30608
1947
1948         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1949                                       self.pg7.remote_mac,
1950                                       self.pg7.remote_ip4n,
1951                                       is_static=1)
1952         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1953                                       self.pg8.remote_mac,
1954                                       self.pg8.remote_ip4n,
1955                                       is_static=1)
1956
1957         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1958                                    dst_address_length=32,
1959                                    next_hop_address=self.pg7.remote_ip4n,
1960                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1961         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1962                                    dst_address_length=32,
1963                                    next_hop_address=self.pg8.remote_ip4n,
1964                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1965
1966         self.nat44_add_address(self.nat_addr)
1967         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1968                                       self.tcp_port_in, self.tcp_port_out,
1969                                       proto=IP_PROTOS.tcp)
1970         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1971                                       self.udp_port_in, self.udp_port_out,
1972                                       proto=IP_PROTOS.udp)
1973         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1974                                       self.icmp_id_in, self.icmp_id_out,
1975                                       proto=IP_PROTOS.icmp)
1976         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1977         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1978                                                   is_inside=0)
1979
1980         # out2in
1981         pkts = self.create_stream_out(self.pg8)
1982         self.pg8.add_stream(pkts)
1983         self.pg_enable_capture(self.pg_interfaces)
1984         self.pg_start()
1985         capture = self.pg7.get_capture(len(pkts))
1986         self.verify_capture_in(capture, self.pg7)
1987
1988         # in2out
1989         pkts = self.create_stream_in(self.pg7, self.pg8)
1990         self.pg7.add_stream(pkts)
1991         self.pg_enable_capture(self.pg_interfaces)
1992         self.pg_start()
1993         capture = self.pg8.get_capture(len(pkts))
1994         self.verify_capture_out(capture)
1995
1996     def test_static_unknown_proto(self):
1997         """ 1:1 NAT translate packet with unknown protocol """
1998         nat_ip = "10.0.0.10"
1999         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2000         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2001         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2002                                                   is_inside=0)
2003
2004         # in2out
2005         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2006              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2007              GRE() /
2008              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2009              TCP(sport=1234, dport=1234))
2010         self.pg0.add_stream(p)
2011         self.pg_enable_capture(self.pg_interfaces)
2012         self.pg_start()
2013         p = self.pg1.get_capture(1)
2014         packet = p[0]
2015         try:
2016             self.assertEqual(packet[IP].src, nat_ip)
2017             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2018             self.assertTrue(packet.haslayer(GRE))
2019             self.check_ip_checksum(packet)
2020         except:
2021             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2022             raise
2023
2024         # out2in
2025         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2026              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2027              GRE() /
2028              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2029              TCP(sport=1234, dport=1234))
2030         self.pg1.add_stream(p)
2031         self.pg_enable_capture(self.pg_interfaces)
2032         self.pg_start()
2033         p = self.pg0.get_capture(1)
2034         packet = p[0]
2035         try:
2036             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2037             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2038             self.assertTrue(packet.haslayer(GRE))
2039             self.check_ip_checksum(packet)
2040         except:
2041             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2042             raise
2043
2044     def test_hairpinning_static_unknown_proto(self):
2045         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2046
2047         host = self.pg0.remote_hosts[0]
2048         server = self.pg0.remote_hosts[1]
2049
2050         host_nat_ip = "10.0.0.10"
2051         server_nat_ip = "10.0.0.11"
2052
2053         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2054         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2055         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2056         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2057                                                   is_inside=0)
2058
2059         # host to server
2060         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2061              IP(src=host.ip4, dst=server_nat_ip) /
2062              GRE() /
2063              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2064              TCP(sport=1234, dport=1234))
2065         self.pg0.add_stream(p)
2066         self.pg_enable_capture(self.pg_interfaces)
2067         self.pg_start()
2068         p = self.pg0.get_capture(1)
2069         packet = p[0]
2070         try:
2071             self.assertEqual(packet[IP].src, host_nat_ip)
2072             self.assertEqual(packet[IP].dst, server.ip4)
2073             self.assertTrue(packet.haslayer(GRE))
2074             self.check_ip_checksum(packet)
2075         except:
2076             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2077             raise
2078
2079         # server to host
2080         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2081              IP(src=server.ip4, dst=host_nat_ip) /
2082              GRE() /
2083              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2084              TCP(sport=1234, dport=1234))
2085         self.pg0.add_stream(p)
2086         self.pg_enable_capture(self.pg_interfaces)
2087         self.pg_start()
2088         p = self.pg0.get_capture(1)
2089         packet = p[0]
2090         try:
2091             self.assertEqual(packet[IP].src, server_nat_ip)
2092             self.assertEqual(packet[IP].dst, host.ip4)
2093             self.assertTrue(packet.haslayer(GRE))
2094             self.check_ip_checksum(packet)
2095         except:
2096             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2097             raise
2098
2099     def test_unknown_proto(self):
2100         """ NAT44 translate packet with unknown protocol """
2101         self.nat44_add_address(self.nat_addr)
2102         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2103         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2104                                                   is_inside=0)
2105
2106         # in2out
2107         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2108              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2109              TCP(sport=self.tcp_port_in, dport=20))
2110         self.pg0.add_stream(p)
2111         self.pg_enable_capture(self.pg_interfaces)
2112         self.pg_start()
2113         p = self.pg1.get_capture(1)
2114
2115         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2116              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2117              GRE() /
2118              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2119              TCP(sport=1234, dport=1234))
2120         self.pg0.add_stream(p)
2121         self.pg_enable_capture(self.pg_interfaces)
2122         self.pg_start()
2123         p = self.pg1.get_capture(1)
2124         packet = p[0]
2125         try:
2126             self.assertEqual(packet[IP].src, self.nat_addr)
2127             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2128             self.assertTrue(packet.haslayer(GRE))
2129             self.check_ip_checksum(packet)
2130         except:
2131             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2132             raise
2133
2134         # out2in
2135         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2136              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2137              GRE() /
2138              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2139              TCP(sport=1234, dport=1234))
2140         self.pg1.add_stream(p)
2141         self.pg_enable_capture(self.pg_interfaces)
2142         self.pg_start()
2143         p = self.pg0.get_capture(1)
2144         packet = p[0]
2145         try:
2146             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2147             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2148             self.assertTrue(packet.haslayer(GRE))
2149             self.check_ip_checksum(packet)
2150         except:
2151             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2152             raise
2153
2154     def test_hairpinning_unknown_proto(self):
2155         """ NAT44 translate packet with unknown protocol - hairpinning """
2156         host = self.pg0.remote_hosts[0]
2157         server = self.pg0.remote_hosts[1]
2158         host_in_port = 1234
2159         host_out_port = 0
2160         server_in_port = 5678
2161         server_out_port = 8765
2162         server_nat_ip = "10.0.0.11"
2163
2164         self.nat44_add_address(self.nat_addr)
2165         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2166         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2167                                                   is_inside=0)
2168
2169         # add static mapping for server
2170         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2171
2172         # host to server
2173         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2174              IP(src=host.ip4, dst=server_nat_ip) /
2175              TCP(sport=host_in_port, dport=server_out_port))
2176         self.pg0.add_stream(p)
2177         self.pg_enable_capture(self.pg_interfaces)
2178         self.pg_start()
2179         capture = self.pg0.get_capture(1)
2180
2181         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2182              IP(src=host.ip4, dst=server_nat_ip) /
2183              GRE() /
2184              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2185              TCP(sport=1234, dport=1234))
2186         self.pg0.add_stream(p)
2187         self.pg_enable_capture(self.pg_interfaces)
2188         self.pg_start()
2189         p = self.pg0.get_capture(1)
2190         packet = p[0]
2191         try:
2192             self.assertEqual(packet[IP].src, self.nat_addr)
2193             self.assertEqual(packet[IP].dst, server.ip4)
2194             self.assertTrue(packet.haslayer(GRE))
2195             self.check_ip_checksum(packet)
2196         except:
2197             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2198             raise
2199
2200         # server to host
2201         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2202              IP(src=server.ip4, dst=self.nat_addr) /
2203              GRE() /
2204              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2205              TCP(sport=1234, dport=1234))
2206         self.pg0.add_stream(p)
2207         self.pg_enable_capture(self.pg_interfaces)
2208         self.pg_start()
2209         p = self.pg0.get_capture(1)
2210         packet = p[0]
2211         try:
2212             self.assertEqual(packet[IP].src, server_nat_ip)
2213             self.assertEqual(packet[IP].dst, host.ip4)
2214             self.assertTrue(packet.haslayer(GRE))
2215             self.check_ip_checksum(packet)
2216         except:
2217             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2218             raise
2219
2220     def test_output_feature(self):
2221         """ NAT44 interface output feature (in2out postrouting) """
2222         self.nat44_add_address(self.nat_addr)
2223         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2224         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2225                                                          is_inside=0)
2226
2227         # in2out
2228         pkts = self.create_stream_in(self.pg0, self.pg1)
2229         self.pg0.add_stream(pkts)
2230         self.pg_enable_capture(self.pg_interfaces)
2231         self.pg_start()
2232         capture = self.pg1.get_capture(len(pkts))
2233         self.verify_capture_out(capture)
2234
2235         # out2in
2236         pkts = self.create_stream_out(self.pg1)
2237         self.pg1.add_stream(pkts)
2238         self.pg_enable_capture(self.pg_interfaces)
2239         self.pg_start()
2240         capture = self.pg0.get_capture(len(pkts))
2241         self.verify_capture_in(capture, self.pg0)
2242
2243     def test_output_feature_vrf_aware(self):
2244         """ NAT44 interface output feature VRF aware (in2out postrouting) """
2245         nat_ip_vrf10 = "10.0.0.10"
2246         nat_ip_vrf20 = "10.0.0.20"
2247
2248         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2249                                    dst_address_length=32,
2250                                    next_hop_address=self.pg3.remote_ip4n,
2251                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2252                                    table_id=10)
2253         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2254                                    dst_address_length=32,
2255                                    next_hop_address=self.pg3.remote_ip4n,
2256                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2257                                    table_id=20)
2258
2259         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2260         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2261         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2262         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2263         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2264                                                          is_inside=0)
2265
2266         # in2out VRF 10
2267         pkts = self.create_stream_in(self.pg4, self.pg3)
2268         self.pg4.add_stream(pkts)
2269         self.pg_enable_capture(self.pg_interfaces)
2270         self.pg_start()
2271         capture = self.pg3.get_capture(len(pkts))
2272         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2273
2274         # out2in VRF 10
2275         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2276         self.pg3.add_stream(pkts)
2277         self.pg_enable_capture(self.pg_interfaces)
2278         self.pg_start()
2279         capture = self.pg4.get_capture(len(pkts))
2280         self.verify_capture_in(capture, self.pg4)
2281
2282         # in2out VRF 20
2283         pkts = self.create_stream_in(self.pg6, self.pg3)
2284         self.pg6.add_stream(pkts)
2285         self.pg_enable_capture(self.pg_interfaces)
2286         self.pg_start()
2287         capture = self.pg3.get_capture(len(pkts))
2288         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2289
2290         # out2in VRF 20
2291         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2292         self.pg3.add_stream(pkts)
2293         self.pg_enable_capture(self.pg_interfaces)
2294         self.pg_start()
2295         capture = self.pg6.get_capture(len(pkts))
2296         self.verify_capture_in(capture, self.pg6)
2297
2298     def test_output_feature_hairpinning(self):
2299         """ NAT44 interface output feature hairpinning (in2out postrouting) """
2300         host = self.pg0.remote_hosts[0]
2301         server = self.pg0.remote_hosts[1]
2302         host_in_port = 1234
2303         host_out_port = 0
2304         server_in_port = 5678
2305         server_out_port = 8765
2306
2307         self.nat44_add_address(self.nat_addr)
2308         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2309         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2310                                                          is_inside=0)
2311
2312         # add static mapping for server
2313         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2314                                       server_in_port, server_out_port,
2315                                       proto=IP_PROTOS.tcp)
2316
2317         # send packet from host to server
2318         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2319              IP(src=host.ip4, dst=self.nat_addr) /
2320              TCP(sport=host_in_port, dport=server_out_port))
2321         self.pg0.add_stream(p)
2322         self.pg_enable_capture(self.pg_interfaces)
2323         self.pg_start()
2324         capture = self.pg0.get_capture(1)
2325         p = capture[0]
2326         try:
2327             ip = p[IP]
2328             tcp = p[TCP]
2329             self.assertEqual(ip.src, self.nat_addr)
2330             self.assertEqual(ip.dst, server.ip4)
2331             self.assertNotEqual(tcp.sport, host_in_port)
2332             self.assertEqual(tcp.dport, server_in_port)
2333             self.check_tcp_checksum(p)
2334             host_out_port = tcp.sport
2335         except:
2336             self.logger.error(ppp("Unexpected or invalid packet:", p))
2337             raise
2338
2339         # send reply from server to host
2340         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2341              IP(src=server.ip4, dst=self.nat_addr) /
2342              TCP(sport=server_in_port, dport=host_out_port))
2343         self.pg0.add_stream(p)
2344         self.pg_enable_capture(self.pg_interfaces)
2345         self.pg_start()
2346         capture = self.pg0.get_capture(1)
2347         p = capture[0]
2348         try:
2349             ip = p[IP]
2350             tcp = p[TCP]
2351             self.assertEqual(ip.src, self.nat_addr)
2352             self.assertEqual(ip.dst, host.ip4)
2353             self.assertEqual(tcp.sport, server_out_port)
2354             self.assertEqual(tcp.dport, host_in_port)
2355             self.check_tcp_checksum(p)
2356         except:
2357             self.logger.error(ppp("Unexpected or invalid packet:"), p)
2358             raise
2359
2360     def tearDown(self):
2361         super(TestNAT44, self).tearDown()
2362         if not self.vpp_dead:
2363             self.logger.info(self.vapi.cli("show nat44 verbose"))
2364             self.clear_nat44()
2365
2366
2367 class TestDeterministicNAT(MethodHolder):
2368     """ Deterministic NAT Test Cases """
2369
2370     @classmethod
2371     def setUpConstants(cls):
2372         super(TestDeterministicNAT, cls).setUpConstants()
2373         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2374
2375     @classmethod
2376     def setUpClass(cls):
2377         super(TestDeterministicNAT, cls).setUpClass()
2378
2379         try:
2380             cls.tcp_port_in = 6303
2381             cls.tcp_external_port = 6303
2382             cls.udp_port_in = 6304
2383             cls.udp_external_port = 6304
2384             cls.icmp_id_in = 6305
2385             cls.nat_addr = '10.0.0.3'
2386
2387             cls.create_pg_interfaces(range(3))
2388             cls.interfaces = list(cls.pg_interfaces)
2389
2390             for i in cls.interfaces:
2391                 i.admin_up()
2392                 i.config_ip4()
2393                 i.resolve_arp()
2394
2395             cls.pg0.generate_remote_hosts(2)
2396             cls.pg0.configure_ipv4_neighbors()
2397
2398         except Exception:
2399             super(TestDeterministicNAT, cls).tearDownClass()
2400             raise
2401
2402     def create_stream_in(self, in_if, out_if, ttl=64):
2403         """
2404         Create packet stream for inside network
2405
2406         :param in_if: Inside interface
2407         :param out_if: Outside interface
2408         :param ttl: TTL of generated packets
2409         """
2410         pkts = []
2411         # TCP
2412         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2413              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2414              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2415         pkts.append(p)
2416
2417         # UDP
2418         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2419              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2420              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2421         pkts.append(p)
2422
2423         # ICMP
2424         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2425              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2426              ICMP(id=self.icmp_id_in, type='echo-request'))
2427         pkts.append(p)
2428
2429         return pkts
2430
2431     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2432         """
2433         Create packet stream for outside network
2434
2435         :param out_if: Outside interface
2436         :param dst_ip: Destination IP address (Default use global NAT address)
2437         :param ttl: TTL of generated packets
2438         """
2439         if dst_ip is None:
2440             dst_ip = self.nat_addr
2441         pkts = []
2442         # TCP
2443         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2444              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2445              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2446         pkts.append(p)
2447
2448         # UDP
2449         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2450              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2451              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2452         pkts.append(p)
2453
2454         # ICMP
2455         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2456              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2457              ICMP(id=self.icmp_external_id, type='echo-reply'))
2458         pkts.append(p)
2459
2460         return pkts
2461
2462     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2463         """
2464         Verify captured packets on outside network
2465
2466         :param capture: Captured packets
2467         :param nat_ip: Translated IP address (Default use global NAT address)
2468         :param same_port: Sorce port number is not translated (Default False)
2469         :param packet_num: Expected number of packets (Default 3)
2470         """
2471         if nat_ip is None:
2472             nat_ip = self.nat_addr
2473         self.assertEqual(packet_num, len(capture))
2474         for packet in capture:
2475             try:
2476                 self.assertEqual(packet[IP].src, nat_ip)
2477                 if packet.haslayer(TCP):
2478                     self.tcp_port_out = packet[TCP].sport
2479                 elif packet.haslayer(UDP):
2480                     self.udp_port_out = packet[UDP].sport
2481                 else:
2482                     self.icmp_external_id = packet[ICMP].id
2483             except:
2484                 self.logger.error(ppp("Unexpected or invalid packet "
2485                                       "(outside network):", packet))
2486                 raise
2487
2488     def initiate_tcp_session(self, in_if, out_if):
2489         """
2490         Initiates TCP session
2491
2492         :param in_if: Inside interface
2493         :param out_if: Outside interface
2494         """
2495         try:
2496             # SYN packet in->out
2497             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2498                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2499                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2500                      flags="S"))
2501             in_if.add_stream(p)
2502             self.pg_enable_capture(self.pg_interfaces)
2503             self.pg_start()
2504             capture = out_if.get_capture(1)
2505             p = capture[0]
2506             self.tcp_port_out = p[TCP].sport
2507
2508             # SYN + ACK packet out->in
2509             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2510                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
2511                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2512                      flags="SA"))
2513             out_if.add_stream(p)
2514             self.pg_enable_capture(self.pg_interfaces)
2515             self.pg_start()
2516             in_if.get_capture(1)
2517
2518             # ACK packet in->out
2519             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2520                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2521                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2522                      flags="A"))
2523             in_if.add_stream(p)
2524             self.pg_enable_capture(self.pg_interfaces)
2525             self.pg_start()
2526             out_if.get_capture(1)
2527
2528         except:
2529             self.logger.error("TCP 3 way handshake failed")
2530             raise
2531
2532     def verify_ipfix_max_entries_per_user(self, data):
2533         """
2534         Verify IPFIX maximum entries per user exceeded event
2535
2536         :param data: Decoded IPFIX data records
2537         """
2538         self.assertEqual(1, len(data))
2539         record = data[0]
2540         # natEvent
2541         self.assertEqual(ord(record[230]), 13)
2542         # natQuotaExceededEvent
2543         self.assertEqual('\x03\x00\x00\x00', record[466])
2544         # sourceIPv4Address
2545         self.assertEqual(self.pg0.remote_ip4n, record[8])
2546
2547     def test_deterministic_mode(self):
2548         """ NAT plugin run deterministic mode """
2549         in_addr = '172.16.255.0'
2550         out_addr = '172.17.255.50'
2551         in_addr_t = '172.16.255.20'
2552         in_addr_n = socket.inet_aton(in_addr)
2553         out_addr_n = socket.inet_aton(out_addr)
2554         in_addr_t_n = socket.inet_aton(in_addr_t)
2555         in_plen = 24
2556         out_plen = 32
2557
2558         nat_config = self.vapi.nat_show_config()
2559         self.assertEqual(1, nat_config.deterministic)
2560
2561         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
2562
2563         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
2564         self.assertEqual(rep1.out_addr[:4], out_addr_n)
2565         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
2566         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2567
2568         deterministic_mappings = self.vapi.nat_det_map_dump()
2569         self.assertEqual(len(deterministic_mappings), 1)
2570         dsm = deterministic_mappings[0]
2571         self.assertEqual(in_addr_n, dsm.in_addr[:4])
2572         self.assertEqual(in_plen, dsm.in_plen)
2573         self.assertEqual(out_addr_n, dsm.out_addr[:4])
2574         self.assertEqual(out_plen, dsm.out_plen)
2575
2576         self.clear_nat_det()
2577         deterministic_mappings = self.vapi.nat_det_map_dump()
2578         self.assertEqual(len(deterministic_mappings), 0)
2579
2580     def test_set_timeouts(self):
2581         """ Set deterministic NAT timeouts """
2582         timeouts_before = self.vapi.nat_det_get_timeouts()
2583
2584         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
2585                                        timeouts_before.tcp_established + 10,
2586                                        timeouts_before.tcp_transitory + 10,
2587                                        timeouts_before.icmp + 10)
2588
2589         timeouts_after = self.vapi.nat_det_get_timeouts()
2590
2591         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2592         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2593         self.assertNotEqual(timeouts_before.tcp_established,
2594                             timeouts_after.tcp_established)
2595         self.assertNotEqual(timeouts_before.tcp_transitory,
2596                             timeouts_after.tcp_transitory)
2597
2598     def test_det_in(self):
2599         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
2600
2601         nat_ip = "10.0.0.10"
2602
2603         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2604                                       32,
2605                                       socket.inet_aton(nat_ip),
2606                                       32)
2607         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2608         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2609                                                   is_inside=0)
2610
2611         # in2out
2612         pkts = self.create_stream_in(self.pg0, self.pg1)
2613         self.pg0.add_stream(pkts)
2614         self.pg_enable_capture(self.pg_interfaces)
2615         self.pg_start()
2616         capture = self.pg1.get_capture(len(pkts))
2617         self.verify_capture_out(capture, nat_ip)
2618
2619         # out2in
2620         pkts = self.create_stream_out(self.pg1, nat_ip)
2621         self.pg1.add_stream(pkts)
2622         self.pg_enable_capture(self.pg_interfaces)
2623         self.pg_start()
2624         capture = self.pg0.get_capture(len(pkts))
2625         self.verify_capture_in(capture, self.pg0)
2626
2627         # session dump test
2628         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
2629         self.assertEqual(len(sessions), 3)
2630
2631         # TCP session
2632         s = sessions[0]
2633         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2634         self.assertEqual(s.in_port, self.tcp_port_in)
2635         self.assertEqual(s.out_port, self.tcp_port_out)
2636         self.assertEqual(s.ext_port, self.tcp_external_port)
2637
2638         # UDP session
2639         s = sessions[1]
2640         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2641         self.assertEqual(s.in_port, self.udp_port_in)
2642         self.assertEqual(s.out_port, self.udp_port_out)
2643         self.assertEqual(s.ext_port, self.udp_external_port)
2644
2645         # ICMP session
2646         s = sessions[2]
2647         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2648         self.assertEqual(s.in_port, self.icmp_id_in)
2649         self.assertEqual(s.out_port, self.icmp_external_id)
2650
2651     def test_multiple_users(self):
2652         """ Deterministic NAT multiple users """
2653
2654         nat_ip = "10.0.0.10"
2655         port_in = 80
2656         external_port = 6303
2657
2658         host0 = self.pg0.remote_hosts[0]
2659         host1 = self.pg0.remote_hosts[1]
2660
2661         self.vapi.nat_det_add_del_map(host0.ip4n,
2662                                       24,
2663                                       socket.inet_aton(nat_ip),
2664                                       32)
2665         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2666         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2667                                                   is_inside=0)
2668
2669         # host0 to out
2670         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2671              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2672              TCP(sport=port_in, dport=external_port))
2673         self.pg0.add_stream(p)
2674         self.pg_enable_capture(self.pg_interfaces)
2675         self.pg_start()
2676         capture = self.pg1.get_capture(1)
2677         p = capture[0]
2678         try:
2679             ip = p[IP]
2680             tcp = p[TCP]
2681             self.assertEqual(ip.src, nat_ip)
2682             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2683             self.assertEqual(tcp.dport, external_port)
2684             port_out0 = tcp.sport
2685         except:
2686             self.logger.error(ppp("Unexpected or invalid packet:", p))
2687             raise
2688
2689         # host1 to out
2690         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2691              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2692              TCP(sport=port_in, dport=external_port))
2693         self.pg0.add_stream(p)
2694         self.pg_enable_capture(self.pg_interfaces)
2695         self.pg_start()
2696         capture = self.pg1.get_capture(1)
2697         p = capture[0]
2698         try:
2699             ip = p[IP]
2700             tcp = p[TCP]
2701             self.assertEqual(ip.src, nat_ip)
2702             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2703             self.assertEqual(tcp.dport, external_port)
2704             port_out1 = tcp.sport
2705         except:
2706             self.logger.error(ppp("Unexpected or invalid packet:", p))
2707             raise
2708
2709         dms = self.vapi.nat_det_map_dump()
2710         self.assertEqual(1, len(dms))
2711         self.assertEqual(2, dms[0].ses_num)
2712
2713         # out to host0
2714         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2715              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2716              TCP(sport=external_port, dport=port_out0))
2717         self.pg1.add_stream(p)
2718         self.pg_enable_capture(self.pg_interfaces)
2719         self.pg_start()
2720         capture = self.pg0.get_capture(1)
2721         p = capture[0]
2722         try:
2723             ip = p[IP]
2724             tcp = p[TCP]
2725             self.assertEqual(ip.src, self.pg1.remote_ip4)
2726             self.assertEqual(ip.dst, host0.ip4)
2727             self.assertEqual(tcp.dport, port_in)
2728             self.assertEqual(tcp.sport, external_port)
2729         except:
2730             self.logger.error(ppp("Unexpected or invalid packet:", p))
2731             raise
2732
2733         # out to host1
2734         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2735              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2736              TCP(sport=external_port, dport=port_out1))
2737         self.pg1.add_stream(p)
2738         self.pg_enable_capture(self.pg_interfaces)
2739         self.pg_start()
2740         capture = self.pg0.get_capture(1)
2741         p = capture[0]
2742         try:
2743             ip = p[IP]
2744             tcp = p[TCP]
2745             self.assertEqual(ip.src, self.pg1.remote_ip4)
2746             self.assertEqual(ip.dst, host1.ip4)
2747             self.assertEqual(tcp.dport, port_in)
2748             self.assertEqual(tcp.sport, external_port)
2749         except:
2750             self.logger.error(ppp("Unexpected or invalid packet", p))
2751             raise
2752
2753         # session close api test
2754         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
2755                                             port_out1,
2756                                             self.pg1.remote_ip4n,
2757                                             external_port)
2758         dms = self.vapi.nat_det_map_dump()
2759         self.assertEqual(dms[0].ses_num, 1)
2760
2761         self.vapi.nat_det_close_session_in(host0.ip4n,
2762                                            port_in,
2763                                            self.pg1.remote_ip4n,
2764                                            external_port)
2765         dms = self.vapi.nat_det_map_dump()
2766         self.assertEqual(dms[0].ses_num, 0)
2767
2768     def test_tcp_session_close_detection_in(self):
2769         """ Deterministic NAT TCP session close from inside network """
2770         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2771                                       32,
2772                                       socket.inet_aton(self.nat_addr),
2773                                       32)
2774         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2775         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2776                                                   is_inside=0)
2777
2778         self.initiate_tcp_session(self.pg0, self.pg1)
2779
2780         # close the session from inside
2781         try:
2782             # FIN packet in -> out
2783             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2784                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2785                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2786                      flags="F"))
2787             self.pg0.add_stream(p)
2788             self.pg_enable_capture(self.pg_interfaces)
2789             self.pg_start()
2790             self.pg1.get_capture(1)
2791
2792             pkts = []
2793
2794             # ACK packet out -> in
2795             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2796                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2797                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2798                      flags="A"))
2799             pkts.append(p)
2800
2801             # FIN packet out -> in
2802             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2803                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2804                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2805                      flags="F"))
2806             pkts.append(p)
2807
2808             self.pg1.add_stream(pkts)
2809             self.pg_enable_capture(self.pg_interfaces)
2810             self.pg_start()
2811             self.pg0.get_capture(2)
2812
2813             # ACK packet in -> out
2814             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2815                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2816                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2817                      flags="A"))
2818             self.pg0.add_stream(p)
2819             self.pg_enable_capture(self.pg_interfaces)
2820             self.pg_start()
2821             self.pg1.get_capture(1)
2822
2823             # Check if deterministic NAT44 closed the session
2824             dms = self.vapi.nat_det_map_dump()
2825             self.assertEqual(0, dms[0].ses_num)
2826         except:
2827             self.logger.error("TCP session termination failed")
2828             raise
2829
2830     def test_tcp_session_close_detection_out(self):
2831         """ Deterministic NAT TCP session close from outside network """
2832         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2833                                       32,
2834                                       socket.inet_aton(self.nat_addr),
2835                                       32)
2836         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2837         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2838                                                   is_inside=0)
2839
2840         self.initiate_tcp_session(self.pg0, self.pg1)
2841
2842         # close the session from outside
2843         try:
2844             # FIN packet out -> in
2845             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2846                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2847                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2848                      flags="F"))
2849             self.pg1.add_stream(p)
2850             self.pg_enable_capture(self.pg_interfaces)
2851             self.pg_start()
2852             self.pg0.get_capture(1)
2853
2854             pkts = []
2855
2856             # ACK packet in -> out
2857             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2858                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2859                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2860                      flags="A"))
2861             pkts.append(p)
2862
2863             # ACK packet in -> out
2864             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2865                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2866                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2867                      flags="F"))
2868             pkts.append(p)
2869
2870             self.pg0.add_stream(pkts)
2871             self.pg_enable_capture(self.pg_interfaces)
2872             self.pg_start()
2873             self.pg1.get_capture(2)
2874
2875             # ACK packet out -> in
2876             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2877                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2878                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2879                      flags="A"))
2880             self.pg1.add_stream(p)
2881             self.pg_enable_capture(self.pg_interfaces)
2882             self.pg_start()
2883             self.pg0.get_capture(1)
2884
2885             # Check if deterministic NAT44 closed the session
2886             dms = self.vapi.nat_det_map_dump()
2887             self.assertEqual(0, dms[0].ses_num)
2888         except:
2889             self.logger.error("TCP session termination failed")
2890             raise
2891
2892     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2893     def test_session_timeout(self):
2894         """ Deterministic NAT session timeouts """
2895         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2896                                       32,
2897                                       socket.inet_aton(self.nat_addr),
2898                                       32)
2899         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2900         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2901                                                   is_inside=0)
2902
2903         self.initiate_tcp_session(self.pg0, self.pg1)
2904         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
2905         pkts = self.create_stream_in(self.pg0, self.pg1)
2906         self.pg0.add_stream(pkts)
2907         self.pg_enable_capture(self.pg_interfaces)
2908         self.pg_start()
2909         capture = self.pg1.get_capture(len(pkts))
2910         sleep(15)
2911
2912         dms = self.vapi.nat_det_map_dump()
2913         self.assertEqual(0, dms[0].ses_num)
2914
2915     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2916     def test_session_limit_per_user(self):
2917         """ Deterministic NAT maximum sessions per user limit """
2918         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2919                                       32,
2920                                       socket.inet_aton(self.nat_addr),
2921                                       32)
2922         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2923         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2924                                                   is_inside=0)
2925         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2926                                      src_address=self.pg2.local_ip4n,
2927                                      path_mtu=512,
2928                                      template_interval=10)
2929         self.vapi.nat_ipfix()
2930
2931         pkts = []
2932         for port in range(1025, 2025):
2933             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2934                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2935                  UDP(sport=port, dport=port))
2936             pkts.append(p)
2937
2938         self.pg0.add_stream(pkts)
2939         self.pg_enable_capture(self.pg_interfaces)
2940         self.pg_start()
2941         capture = self.pg1.get_capture(len(pkts))
2942
2943         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2944              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2945              UDP(sport=3001, dport=3002))
2946         self.pg0.add_stream(p)
2947         self.pg_enable_capture(self.pg_interfaces)
2948         self.pg_start()
2949         capture = self.pg1.assert_nothing_captured()
2950
2951         # verify ICMP error packet
2952         capture = self.pg0.get_capture(1)
2953         p = capture[0]
2954         self.assertTrue(p.haslayer(ICMP))
2955         icmp = p[ICMP]
2956         self.assertEqual(icmp.type, 3)
2957         self.assertEqual(icmp.code, 1)
2958         self.assertTrue(icmp.haslayer(IPerror))
2959         inner_ip = icmp[IPerror]
2960         self.assertEqual(inner_ip[UDPerror].sport, 3001)
2961         self.assertEqual(inner_ip[UDPerror].dport, 3002)
2962
2963         dms = self.vapi.nat_det_map_dump()
2964
2965         self.assertEqual(1000, dms[0].ses_num)
2966
2967         # verify IPFIX logging
2968         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2969         sleep(1)
2970         capture = self.pg2.get_capture(2)
2971         ipfix = IPFIXDecoder()
2972         # first load template
2973         for p in capture:
2974             self.assertTrue(p.haslayer(IPFIX))
2975             if p.haslayer(Template):
2976                 ipfix.add_template(p.getlayer(Template))
2977         # verify events in data set
2978         for p in capture:
2979             if p.haslayer(Data):
2980                 data = ipfix.decode_data_set(p.getlayer(Set))
2981                 self.verify_ipfix_max_entries_per_user(data)
2982
2983     def clear_nat_det(self):
2984         """
2985         Clear deterministic NAT configuration.
2986         """
2987         self.vapi.nat_ipfix(enable=0)
2988         self.vapi.nat_det_set_timeouts()
2989         deterministic_mappings = self.vapi.nat_det_map_dump()
2990         for dsm in deterministic_mappings:
2991             self.vapi.nat_det_add_del_map(dsm.in_addr,
2992                                           dsm.in_plen,
2993                                           dsm.out_addr,
2994                                           dsm.out_plen,
2995                                           is_add=0)
2996
2997         interfaces = self.vapi.nat44_interface_dump()
2998         for intf in interfaces:
2999             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3000                                                       intf.is_inside,
3001                                                       is_add=0)
3002
3003     def tearDown(self):
3004         super(TestDeterministicNAT, self).tearDown()
3005         if not self.vpp_dead:
3006             self.logger.info(self.vapi.cli("show nat44 detail"))
3007             self.clear_nat_det()
3008
3009
3010 class TestNAT64(MethodHolder):
3011     """ NAT64 Test Cases """
3012
3013     @classmethod
3014     def setUpClass(cls):
3015         super(TestNAT64, cls).setUpClass()
3016
3017         try:
3018             cls.tcp_port_in = 6303
3019             cls.tcp_port_out = 6303
3020             cls.udp_port_in = 6304
3021             cls.udp_port_out = 6304
3022             cls.icmp_id_in = 6305
3023             cls.icmp_id_out = 6305
3024             cls.nat_addr = '10.0.0.3'
3025             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3026             cls.vrf1_id = 10
3027             cls.vrf1_nat_addr = '10.0.10.3'
3028             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3029                                                    cls.vrf1_nat_addr)
3030
3031             cls.create_pg_interfaces(range(3))
3032             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3033             cls.ip6_interfaces.append(cls.pg_interfaces[2])
3034             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3035
3036             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3037
3038             cls.pg0.generate_remote_hosts(2)
3039
3040             for i in cls.ip6_interfaces:
3041                 i.admin_up()
3042                 i.config_ip6()
3043                 i.configure_ipv6_neighbors()
3044
3045             for i in cls.ip4_interfaces:
3046                 i.admin_up()
3047                 i.config_ip4()
3048                 i.resolve_arp()
3049
3050         except Exception:
3051             super(TestNAT64, cls).tearDownClass()
3052             raise
3053
3054     def test_pool(self):
3055         """ Add/delete address to NAT64 pool """
3056         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3057
3058         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3059
3060         addresses = self.vapi.nat64_pool_addr_dump()
3061         self.assertEqual(len(addresses), 1)
3062         self.assertEqual(addresses[0].address, nat_addr)
3063
3064         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3065
3066         addresses = self.vapi.nat64_pool_addr_dump()
3067         self.assertEqual(len(addresses), 0)
3068
3069     def test_interface(self):
3070         """ Enable/disable NAT64 feature on the interface """
3071         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3072         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3073
3074         interfaces = self.vapi.nat64_interface_dump()
3075         self.assertEqual(len(interfaces), 2)
3076         pg0_found = False
3077         pg1_found = False
3078         for intf in interfaces:
3079             if intf.sw_if_index == self.pg0.sw_if_index:
3080                 self.assertEqual(intf.is_inside, 1)
3081                 pg0_found = True
3082             elif intf.sw_if_index == self.pg1.sw_if_index:
3083                 self.assertEqual(intf.is_inside, 0)
3084                 pg1_found = True
3085         self.assertTrue(pg0_found)
3086         self.assertTrue(pg1_found)
3087
3088         features = self.vapi.cli("show interface features pg0")
3089         self.assertNotEqual(features.find('nat64-in2out'), -1)
3090         features = self.vapi.cli("show interface features pg1")
3091         self.assertNotEqual(features.find('nat64-out2in'), -1)
3092
3093         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3094         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3095
3096         interfaces = self.vapi.nat64_interface_dump()
3097         self.assertEqual(len(interfaces), 0)
3098
3099     def test_static_bib(self):
3100         """ Add/delete static BIB entry """
3101         in_addr = socket.inet_pton(socket.AF_INET6,
3102                                    '2001:db8:85a3::8a2e:370:7334')
3103         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3104         in_port = 1234
3105         out_port = 5678
3106         proto = IP_PROTOS.tcp
3107
3108         self.vapi.nat64_add_del_static_bib(in_addr,
3109                                            out_addr,
3110                                            in_port,
3111                                            out_port,
3112                                            proto)
3113         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3114         static_bib_num = 0
3115         for bibe in bib:
3116             if bibe.is_static:
3117                 static_bib_num += 1
3118                 self.assertEqual(bibe.i_addr, in_addr)
3119                 self.assertEqual(bibe.o_addr, out_addr)
3120                 self.assertEqual(bibe.i_port, in_port)
3121                 self.assertEqual(bibe.o_port, out_port)
3122         self.assertEqual(static_bib_num, 1)
3123
3124         self.vapi.nat64_add_del_static_bib(in_addr,
3125                                            out_addr,
3126                                            in_port,
3127                                            out_port,
3128                                            proto,
3129                                            is_add=0)
3130         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3131         static_bib_num = 0
3132         for bibe in bib:
3133             if bibe.is_static:
3134                 static_bib_num += 1
3135         self.assertEqual(static_bib_num, 0)
3136
3137     def test_set_timeouts(self):
3138         """ Set NAT64 timeouts """
3139         # verify default values
3140         timeouts = self.vapi.nat64_get_timeouts()
3141         self.assertEqual(timeouts.udp, 300)
3142         self.assertEqual(timeouts.icmp, 60)
3143         self.assertEqual(timeouts.tcp_trans, 240)
3144         self.assertEqual(timeouts.tcp_est, 7440)
3145         self.assertEqual(timeouts.tcp_incoming_syn, 6)
3146
3147         # set and verify custom values
3148         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3149                                      tcp_est=7450, tcp_incoming_syn=10)
3150         timeouts = self.vapi.nat64_get_timeouts()
3151         self.assertEqual(timeouts.udp, 200)
3152         self.assertEqual(timeouts.icmp, 30)
3153         self.assertEqual(timeouts.tcp_trans, 250)
3154         self.assertEqual(timeouts.tcp_est, 7450)
3155         self.assertEqual(timeouts.tcp_incoming_syn, 10)
3156
3157     def test_dynamic(self):
3158         """ NAT64 dynamic translation test """
3159         self.tcp_port_in = 6303
3160         self.udp_port_in = 6304
3161         self.icmp_id_in = 6305
3162
3163         ses_num_start = self.nat64_get_ses_num()
3164
3165         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3166                                                 self.nat_addr_n)
3167         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3168         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3169
3170         # in2out
3171         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3172         self.pg0.add_stream(pkts)
3173         self.pg_enable_capture(self.pg_interfaces)
3174         self.pg_start()
3175         capture = self.pg1.get_capture(len(pkts))
3176         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3177                                 dst_ip=self.pg1.remote_ip4)
3178
3179         # out2in
3180         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3181         self.pg1.add_stream(pkts)
3182         self.pg_enable_capture(self.pg_interfaces)
3183         self.pg_start()
3184         capture = self.pg0.get_capture(len(pkts))
3185         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3186         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3187
3188         # in2out
3189         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3190         self.pg0.add_stream(pkts)
3191         self.pg_enable_capture(self.pg_interfaces)
3192         self.pg_start()
3193         capture = self.pg1.get_capture(len(pkts))
3194         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3195                                 dst_ip=self.pg1.remote_ip4)
3196
3197         # out2in
3198         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3199         self.pg1.add_stream(pkts)
3200         self.pg_enable_capture(self.pg_interfaces)
3201         self.pg_start()
3202         capture = self.pg0.get_capture(len(pkts))
3203         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3204
3205         ses_num_end = self.nat64_get_ses_num()
3206
3207         self.assertEqual(ses_num_end - ses_num_start, 3)
3208
3209         # tenant with specific VRF
3210         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3211                                                 self.vrf1_nat_addr_n,
3212                                                 vrf_id=self.vrf1_id)
3213         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3214
3215         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3216         self.pg2.add_stream(pkts)
3217         self.pg_enable_capture(self.pg_interfaces)
3218         self.pg_start()
3219         capture = self.pg1.get_capture(len(pkts))
3220         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3221                                 dst_ip=self.pg1.remote_ip4)
3222
3223         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3224         self.pg1.add_stream(pkts)
3225         self.pg_enable_capture(self.pg_interfaces)
3226         self.pg_start()
3227         capture = self.pg2.get_capture(len(pkts))
3228         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3229
3230     def test_static(self):
3231         """ NAT64 static translation test """
3232         self.tcp_port_in = 60303
3233         self.udp_port_in = 60304
3234         self.icmp_id_in = 60305
3235         self.tcp_port_out = 60303
3236         self.udp_port_out = 60304
3237         self.icmp_id_out = 60305
3238
3239         ses_num_start = self.nat64_get_ses_num()
3240
3241         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3242                                                 self.nat_addr_n)
3243         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3244         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3245
3246         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3247                                            self.nat_addr_n,
3248                                            self.tcp_port_in,
3249                                            self.tcp_port_out,
3250                                            IP_PROTOS.tcp)
3251         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3252                                            self.nat_addr_n,
3253                                            self.udp_port_in,
3254                                            self.udp_port_out,
3255                                            IP_PROTOS.udp)
3256         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3257                                            self.nat_addr_n,
3258                                            self.icmp_id_in,
3259                                            self.icmp_id_out,
3260                                            IP_PROTOS.icmp)
3261
3262         # in2out
3263         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3264         self.pg0.add_stream(pkts)
3265         self.pg_enable_capture(self.pg_interfaces)
3266         self.pg_start()
3267         capture = self.pg1.get_capture(len(pkts))
3268         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3269                                 dst_ip=self.pg1.remote_ip4, same_port=True)
3270
3271         # out2in
3272         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3273         self.pg1.add_stream(pkts)
3274         self.pg_enable_capture(self.pg_interfaces)
3275         self.pg_start()
3276         capture = self.pg0.get_capture(len(pkts))
3277         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3278         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3279
3280         ses_num_end = self.nat64_get_ses_num()
3281
3282         self.assertEqual(ses_num_end - ses_num_start, 3)
3283
3284     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3285     def test_session_timeout(self):
3286         """ NAT64 session timeout """
3287         self.icmp_id_in = 1234
3288         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3289                                                 self.nat_addr_n)
3290         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3291         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3292         self.vapi.nat64_set_timeouts(icmp=5)
3293
3294         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3295         self.pg0.add_stream(pkts)
3296         self.pg_enable_capture(self.pg_interfaces)
3297         self.pg_start()
3298         capture = self.pg1.get_capture(len(pkts))
3299
3300         ses_num_before_timeout = self.nat64_get_ses_num()
3301
3302         sleep(15)
3303
3304         # ICMP session after timeout
3305         ses_num_after_timeout = self.nat64_get_ses_num()
3306         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3307
3308     def test_icmp_error(self):
3309         """ NAT64 ICMP Error message translation """
3310         self.tcp_port_in = 6303
3311         self.udp_port_in = 6304
3312         self.icmp_id_in = 6305
3313
3314         ses_num_start = self.nat64_get_ses_num()
3315
3316         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3317                                                 self.nat_addr_n)
3318         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3319         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3320
3321         # send some packets to create sessions
3322         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3323         self.pg0.add_stream(pkts)
3324         self.pg_enable_capture(self.pg_interfaces)
3325         self.pg_start()
3326         capture_ip4 = self.pg1.get_capture(len(pkts))
3327         self.verify_capture_out(capture_ip4,
3328                                 nat_ip=self.nat_addr,
3329                                 dst_ip=self.pg1.remote_ip4)
3330
3331         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3332         self.pg1.add_stream(pkts)
3333         self.pg_enable_capture(self.pg_interfaces)
3334         self.pg_start()
3335         capture_ip6 = self.pg0.get_capture(len(pkts))
3336         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3337         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3338                                    self.pg0.remote_ip6)
3339
3340         # in2out
3341         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3342                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3343                 ICMPv6DestUnreach(code=1) /
3344                 packet[IPv6] for packet in capture_ip6]
3345         self.pg0.add_stream(pkts)
3346         self.pg_enable_capture(self.pg_interfaces)
3347         self.pg_start()
3348         capture = self.pg1.get_capture(len(pkts))
3349         for packet in capture:
3350             try:
3351                 self.assertEqual(packet[IP].src, self.nat_addr)
3352                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3353                 self.assertEqual(packet[ICMP].type, 3)
3354                 self.assertEqual(packet[ICMP].code, 13)
3355                 inner = packet[IPerror]
3356                 self.assertEqual(inner.src, self.pg1.remote_ip4)
3357                 self.assertEqual(inner.dst, self.nat_addr)
3358                 self.check_icmp_checksum(packet)
3359                 if inner.haslayer(TCPerror):
3360                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3361                 elif inner.haslayer(UDPerror):
3362                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3363                 else:
3364                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3365             except:
3366                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3367                 raise
3368
3369         # out2in
3370         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3371                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3372                 ICMP(type=3, code=13) /
3373                 packet[IP] for packet in capture_ip4]
3374         self.pg1.add_stream(pkts)
3375         self.pg_enable_capture(self.pg_interfaces)
3376         self.pg_start()
3377         capture = self.pg0.get_capture(len(pkts))
3378         for packet in capture:
3379             try:
3380                 self.assertEqual(packet[IPv6].src, ip.src)
3381                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3382                 icmp = packet[ICMPv6DestUnreach]
3383                 self.assertEqual(icmp.code, 1)
3384                 inner = icmp[IPerror6]
3385                 self.assertEqual(inner.src, self.pg0.remote_ip6)
3386                 self.assertEqual(inner.dst, ip.src)
3387                 self.check_icmpv6_checksum(packet)
3388                 if inner.haslayer(TCPerror):
3389                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3390                 elif inner.haslayer(UDPerror):
3391                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3392                 else:
3393                     self.assertEqual(inner[ICMPv6EchoRequest].id,
3394                                      self.icmp_id_in)
3395             except:
3396                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3397                 raise
3398
3399     def test_hairpinning(self):
3400         """ NAT64 hairpinning """
3401
3402         client = self.pg0.remote_hosts[0]
3403         server = self.pg0.remote_hosts[1]
3404         server_tcp_in_port = 22
3405         server_tcp_out_port = 4022
3406         server_udp_in_port = 23
3407         server_udp_out_port = 4023
3408         client_tcp_in_port = 1234
3409         client_udp_in_port = 1235
3410         client_tcp_out_port = 0
3411         client_udp_out_port = 0
3412         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3413         nat_addr_ip6 = ip.src
3414
3415         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3416                                                 self.nat_addr_n)
3417         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3418         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3419
3420         self.vapi.nat64_add_del_static_bib(server.ip6n,
3421                                            self.nat_addr_n,
3422                                            server_tcp_in_port,
3423                                            server_tcp_out_port,
3424                                            IP_PROTOS.tcp)
3425         self.vapi.nat64_add_del_static_bib(server.ip6n,
3426                                            self.nat_addr_n,
3427                                            server_udp_in_port,
3428                                            server_udp_out_port,
3429                                            IP_PROTOS.udp)
3430
3431         # client to server
3432         pkts = []
3433         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3434              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3435              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3436         pkts.append(p)
3437         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3438              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3439              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3440         pkts.append(p)
3441         self.pg0.add_stream(pkts)
3442         self.pg_enable_capture(self.pg_interfaces)
3443         self.pg_start()
3444         capture = self.pg0.get_capture(len(pkts))
3445         for packet in capture:
3446             try:
3447                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3448                 self.assertEqual(packet[IPv6].dst, server.ip6)
3449                 if packet.haslayer(TCP):
3450                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3451                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3452                     self.check_tcp_checksum(packet)
3453                     client_tcp_out_port = packet[TCP].sport
3454                 else:
3455                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3456                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
3457                     self.check_udp_checksum(packet)
3458                     client_udp_out_port = packet[UDP].sport
3459             except:
3460                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3461                 raise
3462
3463         # server to client
3464         pkts = []
3465         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3466              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3467              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3468         pkts.append(p)
3469         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3470              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3471              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3472         pkts.append(p)
3473         self.pg0.add_stream(pkts)
3474         self.pg_enable_capture(self.pg_interfaces)
3475         self.pg_start()
3476         capture = self.pg0.get_capture(len(pkts))
3477         for packet in capture:
3478             try:
3479                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3480                 self.assertEqual(packet[IPv6].dst, client.ip6)
3481                 if packet.haslayer(TCP):
3482                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3483                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3484                     self.check_tcp_checksum(packet)
3485                 else:
3486                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
3487                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
3488                     self.check_udp_checksum(packet)
3489             except:
3490                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3491                 raise
3492
3493         # ICMP error
3494         pkts = []
3495         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3496                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3497                 ICMPv6DestUnreach(code=1) /
3498                 packet[IPv6] for packet in capture]
3499         self.pg0.add_stream(pkts)
3500         self.pg_enable_capture(self.pg_interfaces)
3501         self.pg_start()
3502         capture = self.pg0.get_capture(len(pkts))
3503         for packet in capture:
3504             try:
3505                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3506                 self.assertEqual(packet[IPv6].dst, server.ip6)
3507                 icmp = packet[ICMPv6DestUnreach]
3508                 self.assertEqual(icmp.code, 1)
3509                 inner = icmp[IPerror6]
3510                 self.assertEqual(inner.src, server.ip6)
3511                 self.assertEqual(inner.dst, nat_addr_ip6)
3512                 self.check_icmpv6_checksum(packet)
3513                 if inner.haslayer(TCPerror):
3514                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3515                     self.assertEqual(inner[TCPerror].dport,
3516                                      client_tcp_out_port)
3517                 else:
3518                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3519                     self.assertEqual(inner[UDPerror].dport,
3520                                      client_udp_out_port)
3521             except:
3522                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3523                 raise
3524
3525     def test_prefix(self):
3526         """ NAT64 Network-Specific Prefix """
3527
3528         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3529                                                 self.nat_addr_n)
3530         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3531         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3532         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3533                                                 self.vrf1_nat_addr_n,
3534                                                 vrf_id=self.vrf1_id)
3535         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3536
3537         # Add global prefix
3538         global_pref64 = "2001:db8::"
3539         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3540         global_pref64_len = 32
3541         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3542
3543         prefix = self.vapi.nat64_prefix_dump()
3544         self.assertEqual(len(prefix), 1)
3545         self.assertEqual(prefix[0].prefix, global_pref64_n)
3546         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3547         self.assertEqual(prefix[0].vrf_id, 0)
3548
3549         # Add tenant specific prefix
3550         vrf1_pref64 = "2001:db8:122:300::"
3551         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3552         vrf1_pref64_len = 56
3553         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3554                                        vrf1_pref64_len,
3555                                        vrf_id=self.vrf1_id)
3556         prefix = self.vapi.nat64_prefix_dump()
3557         self.assertEqual(len(prefix), 2)
3558
3559         # Global prefix
3560         pkts = self.create_stream_in_ip6(self.pg0,
3561                                          self.pg1,
3562                                          pref=global_pref64,
3563                                          plen=global_pref64_len)
3564         self.pg0.add_stream(pkts)
3565         self.pg_enable_capture(self.pg_interfaces)
3566         self.pg_start()
3567         capture = self.pg1.get_capture(len(pkts))
3568         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3569                                 dst_ip=self.pg1.remote_ip4)
3570
3571         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3572         self.pg1.add_stream(pkts)
3573         self.pg_enable_capture(self.pg_interfaces)
3574         self.pg_start()
3575         capture = self.pg0.get_capture(len(pkts))
3576         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3577                                   global_pref64,
3578                                   global_pref64_len)
3579         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3580
3581         # Tenant specific prefix
3582         pkts = self.create_stream_in_ip6(self.pg2,
3583                                          self.pg1,
3584                                          pref=vrf1_pref64,
3585                                          plen=vrf1_pref64_len)
3586         self.pg2.add_stream(pkts)
3587         self.pg_enable_capture(self.pg_interfaces)
3588         self.pg_start()
3589         capture = self.pg1.get_capture(len(pkts))
3590         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3591                                 dst_ip=self.pg1.remote_ip4)
3592
3593         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3594         self.pg1.add_stream(pkts)
3595         self.pg_enable_capture(self.pg_interfaces)
3596         self.pg_start()
3597         capture = self.pg2.get_capture(len(pkts))
3598         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3599                                   vrf1_pref64,
3600                                   vrf1_pref64_len)
3601         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3602
3603     def test_unknown_proto(self):
3604         """ NAT64 translate packet with unknown protocol """
3605
3606         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3607                                                 self.nat_addr_n)
3608         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3609         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3610         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
3611
3612         # in2out
3613         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3614              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3615              TCP(sport=self.tcp_port_in, dport=20))
3616         self.pg0.add_stream(p)
3617         self.pg_enable_capture(self.pg_interfaces)
3618         self.pg_start()
3619         p = self.pg1.get_capture(1)
3620
3621         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3622              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
3623              GRE() /
3624              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3625              TCP(sport=1234, dport=1234))
3626         self.pg0.add_stream(p)
3627         self.pg_enable_capture(self.pg_interfaces)
3628         self.pg_start()
3629         p = self.pg1.get_capture(1)
3630         packet = p[0]
3631         try:
3632             self.assertEqual(packet[IP].src, self.nat_addr)
3633             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3634             self.assertTrue(packet.haslayer(GRE))
3635             self.check_ip_checksum(packet)
3636         except:
3637             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3638             raise
3639
3640         # out2in
3641         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3642              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3643              GRE() /
3644              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3645              TCP(sport=1234, dport=1234))
3646         self.pg1.add_stream(p)
3647         self.pg_enable_capture(self.pg_interfaces)
3648         self.pg_start()
3649         p = self.pg0.get_capture(1)
3650         packet = p[0]
3651         try:
3652             self.assertEqual(packet[IPv6].src, remote_ip6)
3653             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3654             self.assertEqual(packet[IPv6].nh, 47)
3655         except:
3656             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3657             raise
3658
3659     def test_hairpinning_unknown_proto(self):
3660         """ NAT64 translate packet with unknown protocol - hairpinning """
3661
3662         client = self.pg0.remote_hosts[0]
3663         server = self.pg0.remote_hosts[1]
3664         server_tcp_in_port = 22
3665         server_tcp_out_port = 4022
3666         client_tcp_in_port = 1234
3667         client_tcp_out_port = 1235
3668         server_nat_ip = "10.0.0.100"
3669         client_nat_ip = "10.0.0.110"
3670         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
3671         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
3672         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
3673         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
3674
3675         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
3676                                                 client_nat_ip_n)
3677         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3678         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3679
3680         self.vapi.nat64_add_del_static_bib(server.ip6n,
3681                                            server_nat_ip_n,
3682                                            server_tcp_in_port,
3683                                            server_tcp_out_port,
3684                                            IP_PROTOS.tcp)
3685
3686         self.vapi.nat64_add_del_static_bib(server.ip6n,
3687                                            server_nat_ip_n,
3688                                            0,
3689                                            0,
3690                                            IP_PROTOS.gre)
3691
3692         self.vapi.nat64_add_del_static_bib(client.ip6n,
3693                                            client_nat_ip_n,
3694                                            client_tcp_in_port,
3695                                            client_tcp_out_port,
3696                                            IP_PROTOS.tcp)
3697
3698         # client to server
3699         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3700              IPv6(src=client.ip6, dst=server_nat_ip6) /
3701              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3702         self.pg0.add_stream(p)
3703         self.pg_enable_capture(self.pg_interfaces)
3704         self.pg_start()
3705         p = self.pg0.get_capture(1)
3706
3707         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3708              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
3709              GRE() /
3710              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3711              TCP(sport=1234, dport=1234))
3712         self.pg0.add_stream(p)
3713         self.pg_enable_capture(self.pg_interfaces)
3714         self.pg_start()
3715         p = self.pg0.get_capture(1)
3716         packet = p[0]
3717         try:
3718             self.assertEqual(packet[IPv6].src, client_nat_ip6)
3719             self.assertEqual(packet[IPv6].dst, server.ip6)
3720             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3721         except:
3722             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3723             raise
3724
3725         # server to client
3726         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3727              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
3728              GRE() /
3729              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3730              TCP(sport=1234, dport=1234))
3731         self.pg0.add_stream(p)
3732         self.pg_enable_capture(self.pg_interfaces)
3733         self.pg_start()
3734         p = self.pg0.get_capture(1)
3735         packet = p[0]
3736         try:
3737             self.assertEqual(packet[IPv6].src, server_nat_ip6)
3738             self.assertEqual(packet[IPv6].dst, client.ip6)
3739             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3740         except:
3741             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3742             raise
3743
3744     def nat64_get_ses_num(self):
3745         """
3746         Return number of active NAT64 sessions.
3747         """
3748         st = self.vapi.nat64_st_dump()
3749         return len(st)
3750
3751     def clear_nat64(self):
3752         """
3753         Clear NAT64 configuration.
3754         """
3755         self.vapi.nat64_set_timeouts()
3756
3757         interfaces = self.vapi.nat64_interface_dump()
3758         for intf in interfaces:
3759             self.vapi.nat64_add_del_interface(intf.sw_if_index,
3760                                               intf.is_inside,
3761                                               is_add=0)
3762
3763         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3764         for bibe in bib:
3765             if bibe.is_static:
3766                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3767                                                    bibe.o_addr,
3768                                                    bibe.i_port,
3769                                                    bibe.o_port,
3770                                                    bibe.proto,
3771                                                    bibe.vrf_id,
3772                                                    is_add=0)
3773
3774         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3775         for bibe in bib:
3776             if bibe.is_static:
3777                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3778                                                    bibe.o_addr,
3779                                                    bibe.i_port,
3780                                                    bibe.o_port,
3781                                                    bibe.proto,
3782                                                    bibe.vrf_id,
3783                                                    is_add=0)
3784
3785         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3786         for bibe in bib:
3787             if bibe.is_static:
3788                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3789                                                    bibe.o_addr,
3790                                                    bibe.i_port,
3791                                                    bibe.o_port,
3792                                                    bibe.proto,
3793                                                    bibe.vrf_id,
3794                                                    is_add=0)
3795
3796         adresses = self.vapi.nat64_pool_addr_dump()
3797         for addr in adresses:
3798             self.vapi.nat64_add_del_pool_addr_range(addr.address,
3799                                                     addr.address,
3800                                                     vrf_id=addr.vrf_id,
3801                                                     is_add=0)
3802
3803         prefixes = self.vapi.nat64_prefix_dump()
3804         for prefix in prefixes:
3805             self.vapi.nat64_add_del_prefix(prefix.prefix,
3806                                            prefix.prefix_len,
3807                                            vrf_id=prefix.vrf_id,
3808                                            is_add=0)
3809
3810     def tearDown(self):
3811         super(TestNAT64, self).tearDown()
3812         if not self.vpp_dead:
3813             self.logger.info(self.vapi.cli("show nat64 pool"))
3814             self.logger.info(self.vapi.cli("show nat64 interfaces"))
3815             self.logger.info(self.vapi.cli("show nat64 prefix"))
3816             self.logger.info(self.vapi.cli("show nat64 bib all"))
3817             self.logger.info(self.vapi.cli("show nat64 session table all"))
3818             self.clear_nat64()
3819
3820 if __name__ == '__main__':
3821     unittest.main(testRunner=VppTestRunner)