nat: configurable handoff frame queue size
[vpp.git] / src / plugins / nat / test / test_nat44_ei.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import random
5 import socket
6 import struct
7 import unittest
8 from io import BytesIO
9 from time import sleep
10
11 import scapy.compat
12 from framework import VppTestCase, VppTestRunner
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
15     IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
16     PacketListField
17 from scapy.data import IP_PROTOS
18 from scapy.layers.inet import IP, TCP, UDP, ICMP
19 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
20 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
21 from scapy.layers.l2 import Ether, ARP, GRE
22 from scapy.packet import Raw
23 from syslog_rfc5424_parser import SyslogMessage, ParseError
24 from syslog_rfc5424_parser.constants import SyslogSeverity
25 from util import ppp
26 from vpp_ip_route import VppIpRoute, VppRoutePath
27 from vpp_neighbor import VppNeighbor
28 from vpp_papi import VppEnum
29
30
31 # NAT HA protocol event data
32 class Event(Packet):
33     name = "Event"
34     fields_desc = [ByteEnumField("event_type", None,
35                                  {1: "add", 2: "del", 3: "refresh"}),
36                    ByteEnumField("protocol", None,
37                                  {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}),
38                    ShortField("flags", 0),
39                    IPField("in_addr", None),
40                    IPField("out_addr", None),
41                    ShortField("in_port", None),
42                    ShortField("out_port", None),
43                    IPField("eh_addr", None),
44                    IPField("ehn_addr", None),
45                    ShortField("eh_port", None),
46                    ShortField("ehn_port", None),
47                    IntField("fib_index", None),
48                    IntField("total_pkts", 0),
49                    LongField("total_bytes", 0)]
50
51     def extract_padding(self, s):
52         return "", s
53
54
55 # NAT HA protocol header
56 class HANATStateSync(Packet):
57     name = "HA NAT state sync"
58     fields_desc = [XByteField("version", 1),
59                    FlagsField("flags", 0, 8, ['ACK']),
60                    FieldLenField("count", None, count_of="events"),
61                    IntField("sequence_number", 1),
62                    IntField("thread_index", 0),
63                    PacketListField("events", [], Event,
64                                    count_from=lambda pkt: pkt.count)]
65
66
67 class MethodHolder(VppTestCase):
68     """ NAT create capture and verify method holder """
69
70     @property
71     def config_flags(self):
72         return VppEnum.vl_api_nat_config_flags_t
73
74     @property
75     def nat44_config_flags(self):
76         return VppEnum.vl_api_nat44_config_flags_t
77
78     @property
79     def SYSLOG_SEVERITY(self):
80         return VppEnum.vl_api_syslog_severity_t
81
82     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
83                                  local_port=0, external_port=0, vrf_id=0,
84                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
85                                  proto=0, tag="", flags=0):
86         """
87         Add/delete NAT44EI static mapping
88
89         :param local_ip: Local IP address
90         :param external_ip: External IP address
91         :param local_port: Local port number (Optional)
92         :param external_port: External port number (Optional)
93         :param vrf_id: VRF ID (Default 0)
94         :param is_add: 1 if add, 0 if delete (Default add)
95         :param external_sw_if_index: External interface instead of IP address
96         :param proto: IP protocol (Mandatory if port specified)
97         :param tag: Opaque string tag
98         :param flags: NAT configuration flags
99         """
100
101         if not (local_port and external_port):
102             flags |= self.config_flags.NAT_IS_ADDR_ONLY
103
104         self.vapi.nat44_add_del_static_mapping(
105             is_add=is_add,
106             local_ip_address=local_ip,
107             external_ip_address=external_ip,
108             external_sw_if_index=external_sw_if_index,
109             local_port=local_port,
110             external_port=external_port,
111             vrf_id=vrf_id, protocol=proto,
112             flags=flags,
113             tag=tag)
114
115     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
116         """
117         Add/delete NAT44EI address
118
119         :param ip: IP address
120         :param is_add: 1 if add, 0 if delete (Default add)
121         :param twice_nat: twice NAT address for external hosts
122         """
123         flags = self.config_flags.NAT_IS_TWICE_NAT if twice_nat else 0
124         self.vapi.nat44_add_del_address_range(first_ip_address=ip,
125                                               last_ip_address=ip,
126                                               vrf_id=vrf_id,
127                                               is_add=is_add,
128                                               flags=flags)
129
130     def create_routes_and_neigbors(self):
131         r1 = VppIpRoute(self, self.pg7.remote_ip4, 32,
132                         [VppRoutePath(self.pg7.remote_ip4,
133                                       self.pg7.sw_if_index)])
134         r2 = VppIpRoute(self, self.pg8.remote_ip4, 32,
135                         [VppRoutePath(self.pg8.remote_ip4,
136                                       self.pg8.sw_if_index)])
137         r1.add_vpp_config()
138         r2.add_vpp_config()
139
140         n1 = VppNeighbor(self,
141                          self.pg7.sw_if_index,
142                          self.pg7.remote_mac,
143                          self.pg7.remote_ip4,
144                          is_static=1)
145         n2 = VppNeighbor(self,
146                          self.pg8.sw_if_index,
147                          self.pg8.remote_mac,
148                          self.pg8.remote_ip4,
149                          is_static=1)
150         n1.add_vpp_config()
151         n2.add_vpp_config()
152
153     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
154         """
155         Create packet stream for inside network
156
157         :param in_if: Inside interface
158         :param out_if: Outside interface
159         :param dst_ip: Destination address
160         :param ttl: TTL of generated packets
161         """
162         if dst_ip is None:
163             dst_ip = out_if.remote_ip4
164
165         pkts = []
166         # TCP
167         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
168              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
169              TCP(sport=self.tcp_port_in, dport=20))
170         pkts.extend([p, p])
171
172         # UDP
173         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
174              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
175              UDP(sport=self.udp_port_in, dport=20))
176         pkts.append(p)
177
178         # ICMP
179         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
180              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
181              ICMP(id=self.icmp_id_in, type='echo-request'))
182         pkts.append(p)
183
184         return pkts
185
186     def compose_ip6(self, ip4, pref, plen):
187         """
188         Compose IPv4-embedded IPv6 addresses
189
190         :param ip4: IPv4 address
191         :param pref: IPv6 prefix
192         :param plen: IPv6 prefix length
193         :returns: IPv4-embedded IPv6 addresses
194         """
195         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
196         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
197         if plen == 32:
198             pref_n[4] = ip4_n[0]
199             pref_n[5] = ip4_n[1]
200             pref_n[6] = ip4_n[2]
201             pref_n[7] = ip4_n[3]
202         elif plen == 40:
203             pref_n[5] = ip4_n[0]
204             pref_n[6] = ip4_n[1]
205             pref_n[7] = ip4_n[2]
206             pref_n[9] = ip4_n[3]
207         elif plen == 48:
208             pref_n[6] = ip4_n[0]
209             pref_n[7] = ip4_n[1]
210             pref_n[9] = ip4_n[2]
211             pref_n[10] = ip4_n[3]
212         elif plen == 56:
213             pref_n[7] = ip4_n[0]
214             pref_n[9] = ip4_n[1]
215             pref_n[10] = ip4_n[2]
216             pref_n[11] = ip4_n[3]
217         elif plen == 64:
218             pref_n[9] = ip4_n[0]
219             pref_n[10] = ip4_n[1]
220             pref_n[11] = ip4_n[2]
221             pref_n[12] = ip4_n[3]
222         elif plen == 96:
223             pref_n[12] = ip4_n[0]
224             pref_n[13] = ip4_n[1]
225             pref_n[14] = ip4_n[2]
226             pref_n[15] = ip4_n[3]
227         packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
228         return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
229
230     def create_stream_out(self, out_if, dst_ip=None, ttl=64,
231                           use_inside_ports=False):
232         """
233         Create packet stream for outside network
234
235         :param out_if: Outside interface
236         :param dst_ip: Destination IP address (Default use global NAT address)
237         :param ttl: TTL of generated packets
238         :param use_inside_ports: Use inside NAT ports as destination ports
239                instead of outside ports
240         """
241         if dst_ip is None:
242             dst_ip = self.nat_addr
243         if not use_inside_ports:
244             tcp_port = self.tcp_port_out
245             udp_port = self.udp_port_out
246             icmp_id = self.icmp_id_out
247         else:
248             tcp_port = self.tcp_port_in
249             udp_port = self.udp_port_in
250             icmp_id = self.icmp_id_in
251         pkts = []
252         # TCP
253         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
254              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
255              TCP(dport=tcp_port, sport=20))
256         pkts.extend([p, p])
257
258         # UDP
259         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
260              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
261              UDP(dport=udp_port, sport=20))
262         pkts.append(p)
263
264         # ICMP
265         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
266              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
267              ICMP(id=icmp_id, type='echo-reply'))
268         pkts.append(p)
269
270         return pkts
271
272     def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
273         """
274         Create packet stream for outside network
275
276         :param out_if: Outside interface
277         :param dst_ip: Destination IP address (Default use global NAT address)
278         :param hl: HL of generated packets
279         """
280         pkts = []
281         # TCP
282         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
283              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
284              TCP(dport=self.tcp_port_out, sport=20))
285         pkts.append(p)
286
287         # UDP
288         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
289              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
290              UDP(dport=self.udp_port_out, sport=20))
291         pkts.append(p)
292
293         # ICMP
294         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
295              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
296              ICMPv6EchoReply(id=self.icmp_id_out))
297         pkts.append(p)
298
299         return pkts
300
301     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
302                            dst_ip=None, is_ip6=False, ignore_port=False):
303         """
304         Verify captured packets on outside network
305
306         :param capture: Captured packets
307         :param nat_ip: Translated IP address (Default use global NAT address)
308         :param same_port: Source port number is not translated (Default False)
309         :param dst_ip: Destination IP address (Default do not verify)
310         :param is_ip6: If L3 protocol is IPv6 (Default False)
311         """
312         if is_ip6:
313             IP46 = IPv6
314             ICMP46 = ICMPv6EchoRequest
315         else:
316             IP46 = IP
317             ICMP46 = ICMP
318         if nat_ip is None:
319             nat_ip = self.nat_addr
320         for packet in capture:
321             try:
322                 if not is_ip6:
323                     self.assert_packet_checksums_valid(packet)
324                 self.assertEqual(packet[IP46].src, nat_ip)
325                 if dst_ip is not None:
326                     self.assertEqual(packet[IP46].dst, dst_ip)
327                 if packet.haslayer(TCP):
328                     if not ignore_port:
329                         if same_port:
330                             self.assertEqual(
331                                 packet[TCP].sport, self.tcp_port_in)
332                         else:
333                             self.assertNotEqual(
334                                 packet[TCP].sport, self.tcp_port_in)
335                     self.tcp_port_out = packet[TCP].sport
336                     self.assert_packet_checksums_valid(packet)
337                 elif packet.haslayer(UDP):
338                     if not ignore_port:
339                         if same_port:
340                             self.assertEqual(
341                                 packet[UDP].sport, self.udp_port_in)
342                         else:
343                             self.assertNotEqual(
344                                 packet[UDP].sport, self.udp_port_in)
345                     self.udp_port_out = packet[UDP].sport
346                 else:
347                     if not ignore_port:
348                         if same_port:
349                             self.assertEqual(
350                                 packet[ICMP46].id, self.icmp_id_in)
351                         else:
352                             self.assertNotEqual(
353                                 packet[ICMP46].id, self.icmp_id_in)
354                     self.icmp_id_out = packet[ICMP46].id
355                     self.assert_packet_checksums_valid(packet)
356             except:
357                 self.logger.error(ppp("Unexpected or invalid packet "
358                                       "(outside network):", packet))
359                 raise
360
361     def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
362                                dst_ip=None):
363         """
364         Verify captured packets on outside network
365
366         :param capture: Captured packets
367         :param nat_ip: Translated IP address
368         :param same_port: Source port number is not translated (Default False)
369         :param dst_ip: Destination IP address (Default do not verify)
370         """
371         return self.verify_capture_out(capture, nat_ip, same_port, dst_ip,
372                                        True)
373
374     def verify_capture_in(self, capture, in_if):
375         """
376         Verify captured packets on inside network
377
378         :param capture: Captured packets
379         :param in_if: Inside interface
380         """
381         for packet in capture:
382             try:
383                 self.assert_packet_checksums_valid(packet)
384                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
385                 if packet.haslayer(TCP):
386                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
387                 elif packet.haslayer(UDP):
388                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
389                 else:
390                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
391             except:
392                 self.logger.error(ppp("Unexpected or invalid packet "
393                                       "(inside network):", packet))
394                 raise
395
396     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
397         """
398         Verify captured packet that don't have to be translated
399
400         :param capture: Captured packets
401         :param ingress_if: Ingress interface
402         :param egress_if: Egress interface
403         """
404         for packet in capture:
405             try:
406                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
407                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
408                 if packet.haslayer(TCP):
409                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
410                 elif packet.haslayer(UDP):
411                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
412                 else:
413                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
414             except:
415                 self.logger.error(ppp("Unexpected or invalid packet "
416                                       "(inside network):", packet))
417                 raise
418
419     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
420                                             icmp_type=11):
421         """
422         Verify captured packets with ICMP errors on outside network
423
424         :param capture: Captured packets
425         :param src_ip: Translated IP address or IP address of VPP
426                        (Default use global NAT address)
427         :param icmp_type: Type of error ICMP packet
428                           we are expecting (Default 11)
429         """
430         if src_ip is None:
431             src_ip = self.nat_addr
432         for packet in capture:
433             try:
434                 self.assertEqual(packet[IP].src, src_ip)
435                 self.assertEqual(packet.haslayer(ICMP), 1)
436                 icmp = packet[ICMP]
437                 self.assertEqual(icmp.type, icmp_type)
438                 self.assertTrue(icmp.haslayer(IPerror))
439                 inner_ip = icmp[IPerror]
440                 if inner_ip.haslayer(TCPerror):
441                     self.assertEqual(inner_ip[TCPerror].dport,
442                                      self.tcp_port_out)
443                 elif inner_ip.haslayer(UDPerror):
444                     self.assertEqual(inner_ip[UDPerror].dport,
445                                      self.udp_port_out)
446                 else:
447                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
448             except:
449                 self.logger.error(ppp("Unexpected or invalid packet "
450                                       "(outside network):", packet))
451                 raise
452
453     def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
454         """
455         Verify captured packets with ICMP errors on inside network
456
457         :param capture: Captured packets
458         :param in_if: Inside interface
459         :param icmp_type: Type of error ICMP packet
460                           we are expecting (Default 11)
461         """
462         for packet in capture:
463             try:
464                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
465                 self.assertEqual(packet.haslayer(ICMP), 1)
466                 icmp = packet[ICMP]
467                 self.assertEqual(icmp.type, icmp_type)
468                 self.assertTrue(icmp.haslayer(IPerror))
469                 inner_ip = icmp[IPerror]
470                 if inner_ip.haslayer(TCPerror):
471                     self.assertEqual(inner_ip[TCPerror].sport,
472                                      self.tcp_port_in)
473                 elif inner_ip.haslayer(UDPerror):
474                     self.assertEqual(inner_ip[UDPerror].sport,
475                                      self.udp_port_in)
476                 else:
477                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
478             except:
479                 self.logger.error(ppp("Unexpected or invalid packet "
480                                       "(inside network):", packet))
481                 raise
482
483     def create_stream_frag(self, src_if, dst, sport, dport, data,
484                            proto=IP_PROTOS.tcp, echo_reply=False):
485         """
486         Create fragmented packet stream
487
488         :param src_if: Source interface
489         :param dst: Destination IPv4 address
490         :param sport: Source port
491         :param dport: Destination port
492         :param data: Payload data
493         :param proto: protocol (TCP, UDP, ICMP)
494         :param echo_reply: use echo_reply if protocol is ICMP
495         :returns: Fragments
496         """
497         if proto == IP_PROTOS.tcp:
498             p = (IP(src=src_if.remote_ip4, dst=dst) /
499                  TCP(sport=sport, dport=dport) /
500                  Raw(data))
501             p = p.__class__(scapy.compat.raw(p))
502             chksum = p[TCP].chksum
503             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
504         elif proto == IP_PROTOS.udp:
505             proto_header = UDP(sport=sport, dport=dport)
506         elif proto == IP_PROTOS.icmp:
507             if not echo_reply:
508                 proto_header = ICMP(id=sport, type='echo-request')
509             else:
510                 proto_header = ICMP(id=sport, type='echo-reply')
511         else:
512             raise Exception("Unsupported protocol")
513         id = random.randint(0, 65535)
514         pkts = []
515         if proto == IP_PROTOS.tcp:
516             raw = Raw(data[0:4])
517         else:
518             raw = Raw(data[0:16])
519         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
520              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
521              proto_header /
522              raw)
523         pkts.append(p)
524         if proto == IP_PROTOS.tcp:
525             raw = Raw(data[4:20])
526         else:
527             raw = Raw(data[16:32])
528         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
529              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
530                 proto=proto) /
531              raw)
532         pkts.append(p)
533         if proto == IP_PROTOS.tcp:
534             raw = Raw(data[20:])
535         else:
536             raw = Raw(data[32:])
537         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
538              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto,
539                 id=id) /
540              raw)
541         pkts.append(p)
542         return pkts
543
544     def reass_frags_and_verify(self, frags, src, dst):
545         """
546         Reassemble and verify fragmented packet
547
548         :param frags: Captured fragments
549         :param src: Source IPv4 address to verify
550         :param dst: Destination IPv4 address to verify
551
552         :returns: Reassembled IPv4 packet
553         """
554         buffer = BytesIO()
555         for p in frags:
556             self.assertEqual(p[IP].src, src)
557             self.assertEqual(p[IP].dst, dst)
558             self.assert_ip_checksum_valid(p)
559             buffer.seek(p[IP].frag * 8)
560             buffer.write(bytes(p[IP].payload))
561         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
562                 proto=frags[0][IP].proto)
563         if ip.proto == IP_PROTOS.tcp:
564             p = (ip / TCP(buffer.getvalue()))
565             self.logger.debug(ppp("Reassembled:", p))
566             self.assert_tcp_checksum_valid(p)
567         elif ip.proto == IP_PROTOS.udp:
568             p = (ip / UDP(buffer.getvalue()[:8]) /
569                  Raw(buffer.getvalue()[8:]))
570         elif ip.proto == IP_PROTOS.icmp:
571             p = (ip / ICMP(buffer.getvalue()))
572         return p
573
574     def verify_ipfix_nat44_ses(self, data):
575         """
576         Verify IPFIX NAT44EI session create/delete event
577
578         :param data: Decoded IPFIX data records
579         """
580         nat44_ses_create_num = 0
581         nat44_ses_delete_num = 0
582         self.assertEqual(6, len(data))
583         for record in data:
584             # natEvent
585             self.assertIn(scapy.compat.orb(record[230]), [4, 5])
586             if scapy.compat.orb(record[230]) == 4:
587                 nat44_ses_create_num += 1
588             else:
589                 nat44_ses_delete_num += 1
590             # sourceIPv4Address
591             self.assertEqual(self.pg0.remote_ip4,
592                              str(ipaddress.IPv4Address(record[8])))
593             # postNATSourceIPv4Address
594             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
595                              record[225])
596             # ingressVRFID
597             self.assertEqual(struct.pack("!I", 0), record[234])
598             # protocolIdentifier/sourceTransportPort
599             # /postNAPTSourceTransportPort
600             if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
601                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
602                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
603                                  record[227])
604             elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
605                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
606                                  record[7])
607                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
608                                  record[227])
609             elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
610                 self.assertEqual(struct.pack("!H", self.udp_port_in),
611                                  record[7])
612                 self.assertEqual(struct.pack("!H", self.udp_port_out),
613                                  record[227])
614             else:
615                 self.fail("Invalid protocol")
616         self.assertEqual(3, nat44_ses_create_num)
617         self.assertEqual(3, nat44_ses_delete_num)
618
619     def verify_ipfix_addr_exhausted(self, data):
620         self.assertEqual(1, len(data))
621         record = data[0]
622         # natEvent
623         self.assertEqual(scapy.compat.orb(record[230]), 3)
624         # natPoolID
625         self.assertEqual(struct.pack("!I", 0), record[283])
626
627     def verify_ipfix_max_sessions(self, data, limit):
628         self.assertEqual(1, len(data))
629         record = data[0]
630         # natEvent
631         self.assertEqual(scapy.compat.orb(record[230]), 13)
632         # natQuotaExceededEvent
633         self.assertEqual(struct.pack("I", 1), record[466])
634         # maxSessionEntries
635         self.assertEqual(struct.pack("I", limit), record[471])
636
637     def verify_no_nat44_user(self):
638         """ Verify that there is no NAT44EI user """
639         users = self.vapi.nat44_user_dump()
640         self.assertEqual(len(users), 0)
641         users = self.statistics.get_counter('/nat44/total-users')
642         self.assertEqual(users[0][0], 0)
643         sessions = self.statistics.get_counter('/nat44/total-sessions')
644         self.assertEqual(sessions[0][0], 0)
645
646     def verify_syslog_apmap(self, data, is_add=True):
647         message = data.decode('utf-8')
648         try:
649             message = SyslogMessage.parse(message)
650         except ParseError as e:
651             self.logger.error(e)
652             raise
653         else:
654             self.assertEqual(message.severity, SyslogSeverity.info)
655             self.assertEqual(message.appname, 'NAT')
656             self.assertEqual(message.msgid, 'APMADD' if is_add else 'APMDEL')
657             sd_params = message.sd.get('napmap')
658             self.assertTrue(sd_params is not None)
659             self.assertEqual(sd_params.get('IATYP'), 'IPv4')
660             self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
661             self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
662             self.assertEqual(sd_params.get('XATYP'), 'IPv4')
663             self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
664             self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
665             self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
666             self.assertTrue(sd_params.get('SSUBIX') is not None)
667             self.assertEqual(sd_params.get('SVLAN'), '0')
668
669     def verify_mss_value(self, pkt, mss):
670         if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
671             raise TypeError("Not a TCP/IP packet")
672
673         for option in pkt[TCP].options:
674             if option[0] == 'MSS':
675                 self.assertEqual(option[1], mss)
676                 self.assert_tcp_checksum_valid(pkt)
677
678     @staticmethod
679     def proto2layer(proto):
680         if proto == IP_PROTOS.tcp:
681             return TCP
682         elif proto == IP_PROTOS.udp:
683             return UDP
684         elif proto == IP_PROTOS.icmp:
685             return ICMP
686         else:
687             raise Exception("Unsupported protocol")
688
689     def frag_in_order(self, proto=IP_PROTOS.tcp, dont_translate=False,
690                       ignore_port=False):
691         layer = self.proto2layer(proto)
692
693         if proto == IP_PROTOS.tcp:
694             data = b"A" * 4 + b"B" * 16 + b"C" * 3
695         else:
696             data = b"A" * 16 + b"B" * 16 + b"C" * 3
697         self.port_in = random.randint(1025, 65535)
698
699         # in2out
700         pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4,
701                                        self.port_in, 20, data, proto)
702         self.pg0.add_stream(pkts)
703         self.pg_enable_capture(self.pg_interfaces)
704         self.pg_start()
705         frags = self.pg1.get_capture(len(pkts))
706         if not dont_translate:
707             p = self.reass_frags_and_verify(frags,
708                                             self.nat_addr,
709                                             self.pg1.remote_ip4)
710         else:
711             p = self.reass_frags_and_verify(frags,
712                                             self.pg0.remote_ip4,
713                                             self.pg1.remote_ip4)
714         if proto != IP_PROTOS.icmp:
715             if not dont_translate:
716                 self.assertEqual(p[layer].dport, 20)
717                 if not ignore_port:
718                     self.assertNotEqual(p[layer].sport, self.port_in)
719             else:
720                 self.assertEqual(p[layer].sport, self.port_in)
721         else:
722             if not ignore_port:
723                 if not dont_translate:
724                     self.assertNotEqual(p[layer].id, self.port_in)
725                 else:
726                     self.assertEqual(p[layer].id, self.port_in)
727         self.assertEqual(data, p[Raw].load)
728
729         # out2in
730         if not dont_translate:
731             dst_addr = self.nat_addr
732         else:
733             dst_addr = self.pg0.remote_ip4
734         if proto != IP_PROTOS.icmp:
735             sport = 20
736             dport = p[layer].sport
737         else:
738             sport = p[layer].id
739             dport = 0
740         pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport, data,
741                                        proto, echo_reply=True)
742         self.pg1.add_stream(pkts)
743         self.pg_enable_capture(self.pg_interfaces)
744         self.pg_start()
745         frags = self.pg0.get_capture(len(pkts))
746         p = self.reass_frags_and_verify(frags,
747                                         self.pg1.remote_ip4,
748                                         self.pg0.remote_ip4)
749         if proto != IP_PROTOS.icmp:
750             self.assertEqual(p[layer].sport, 20)
751             self.assertEqual(p[layer].dport, self.port_in)
752         else:
753             self.assertEqual(p[layer].id, self.port_in)
754         self.assertEqual(data, p[Raw].load)
755
756     def reass_hairpinning(self, server_addr, server_in_port, server_out_port,
757                           host_in_port, proto=IP_PROTOS.tcp,
758                           ignore_port=False):
759
760         layer = self.proto2layer(proto)
761
762         if proto == IP_PROTOS.tcp:
763             data = b"A" * 4 + b"B" * 16 + b"C" * 3
764         else:
765             data = b"A" * 16 + b"B" * 16 + b"C" * 3
766
767         # send packet from host to server
768         pkts = self.create_stream_frag(self.pg0,
769                                        self.nat_addr,
770                                        host_in_port,
771                                        server_out_port,
772                                        data,
773                                        proto)
774         self.pg0.add_stream(pkts)
775         self.pg_enable_capture(self.pg_interfaces)
776         self.pg_start()
777         frags = self.pg0.get_capture(len(pkts))
778         p = self.reass_frags_and_verify(frags,
779                                         self.nat_addr,
780                                         server_addr)
781         if proto != IP_PROTOS.icmp:
782             if not ignore_port:
783                 self.assertNotEqual(p[layer].sport, host_in_port)
784             self.assertEqual(p[layer].dport, server_in_port)
785         else:
786             if not ignore_port:
787                 self.assertNotEqual(p[layer].id, host_in_port)
788         self.assertEqual(data, p[Raw].load)
789
790     def frag_out_of_order(self, proto=IP_PROTOS.tcp, dont_translate=False,
791                           ignore_port=False):
792         layer = self.proto2layer(proto)
793
794         if proto == IP_PROTOS.tcp:
795             data = b"A" * 4 + b"B" * 16 + b"C" * 3
796         else:
797             data = b"A" * 16 + b"B" * 16 + b"C" * 3
798         self.port_in = random.randint(1025, 65535)
799
800         for i in range(2):
801             # in2out
802             pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4,
803                                            self.port_in, 20, data, proto)
804             pkts.reverse()
805             self.pg0.add_stream(pkts)
806             self.pg_enable_capture(self.pg_interfaces)
807             self.pg_start()
808             frags = self.pg1.get_capture(len(pkts))
809             if not dont_translate:
810                 p = self.reass_frags_and_verify(frags,
811                                                 self.nat_addr,
812                                                 self.pg1.remote_ip4)
813             else:
814                 p = self.reass_frags_and_verify(frags,
815                                                 self.pg0.remote_ip4,
816                                                 self.pg1.remote_ip4)
817             if proto != IP_PROTOS.icmp:
818                 if not dont_translate:
819                     self.assertEqual(p[layer].dport, 20)
820                     if not ignore_port:
821                         self.assertNotEqual(p[layer].sport, self.port_in)
822                 else:
823                     self.assertEqual(p[layer].sport, self.port_in)
824             else:
825                 if not ignore_port:
826                     if not dont_translate:
827                         self.assertNotEqual(p[layer].id, self.port_in)
828                     else:
829                         self.assertEqual(p[layer].id, self.port_in)
830             self.assertEqual(data, p[Raw].load)
831
832             # out2in
833             if not dont_translate:
834                 dst_addr = self.nat_addr
835             else:
836                 dst_addr = self.pg0.remote_ip4
837             if proto != IP_PROTOS.icmp:
838                 sport = 20
839                 dport = p[layer].sport
840             else:
841                 sport = p[layer].id
842                 dport = 0
843             pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport,
844                                            data, proto, echo_reply=True)
845             pkts.reverse()
846             self.pg1.add_stream(pkts)
847             self.pg_enable_capture(self.pg_interfaces)
848             self.pg_start()
849             frags = self.pg0.get_capture(len(pkts))
850             p = self.reass_frags_and_verify(frags,
851                                             self.pg1.remote_ip4,
852                                             self.pg0.remote_ip4)
853             if proto != IP_PROTOS.icmp:
854                 self.assertEqual(p[layer].sport, 20)
855                 self.assertEqual(p[layer].dport, self.port_in)
856             else:
857                 self.assertEqual(p[layer].id, self.port_in)
858             self.assertEqual(data, p[Raw].load)
859
860
861 class TestNAT44EIAPI(MethodHolder):
862     """ NAT44EI API Test Cases """
863
864     fq_nelts = 512
865
866     def setUp(self):
867         super(TestNAT44EIAPI, self).setUp()
868         self.vapi.nat_set_fq_options(frame_queue_nelts=self.fq_nelts)
869         self.vapi.nat44_plugin_enable_disable(enable=1)
870
871     def tearDown(self):
872         super(TestNAT44EIAPI, self).tearDown()
873         if not self.vpp_dead:
874             self.vapi.nat44_plugin_enable_disable(enable=0)
875             self.vapi.cli("clear logging")
876
877     def test_show_frame_queue_nelts(self):
878         """ API test - worker handoff frame queue elements """
879         nat_config = self.vapi.nat_show_fq_options()
880         self.assertEqual(self.fq_nelts, nat_config.frame_queue_nelts)
881         self.vapi.nat44_plugin_enable_disable(enable=0)
882         self.vapi.cli("set nat frame-queue-nelts 256")
883         self.vapi.nat44_plugin_enable_disable(enable=1)
884         nat_config = self.vapi.nat_show_fq_options()
885         self.assertEqual(256, nat_config.frame_queue_nelts)
886
887
888 class TestNAT44EI(MethodHolder):
889     """ NAT44EI Test Cases """
890
891     max_translations = 10240
892     max_users = 10240
893
894     @classmethod
895     def setUpClass(cls):
896         super(TestNAT44EI, cls).setUpClass()
897         cls.vapi.cli("set log class nat level debug")
898
899         cls.tcp_port_in = 6303
900         cls.tcp_port_out = 6303
901         cls.udp_port_in = 6304
902         cls.udp_port_out = 6304
903         cls.icmp_id_in = 6305
904         cls.icmp_id_out = 6305
905         cls.nat_addr = '10.0.0.3'
906         cls.ipfix_src_port = 4739
907         cls.ipfix_domain_id = 1
908         cls.tcp_external_port = 80
909         cls.udp_external_port = 69
910
911         cls.create_pg_interfaces(range(10))
912         cls.interfaces = list(cls.pg_interfaces[0:4])
913
914         for i in cls.interfaces:
915             i.admin_up()
916             i.config_ip4()
917             i.resolve_arp()
918
919         cls.pg0.generate_remote_hosts(3)
920         cls.pg0.configure_ipv4_neighbors()
921
922         cls.pg1.generate_remote_hosts(1)
923         cls.pg1.configure_ipv4_neighbors()
924
925         cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926         cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 10})
927         cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 20})
928
929         cls.pg4._local_ip4 = "172.16.255.1"
930         cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
931         cls.pg4.set_table_ip4(10)
932         cls.pg5._local_ip4 = "172.17.255.3"
933         cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
934         cls.pg5.set_table_ip4(10)
935         cls.pg6._local_ip4 = "172.16.255.1"
936         cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
937         cls.pg6.set_table_ip4(20)
938         for i in cls.overlapping_interfaces:
939             i.config_ip4()
940             i.admin_up()
941             i.resolve_arp()
942
943         cls.pg7.admin_up()
944         cls.pg8.admin_up()
945
946         cls.pg9.generate_remote_hosts(2)
947         cls.pg9.config_ip4()
948         cls.vapi.sw_interface_add_del_address(
949             sw_if_index=cls.pg9.sw_if_index,
950             prefix="10.0.0.1/24")
951
952         cls.pg9.admin_up()
953         cls.pg9.resolve_arp()
954         cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
955         cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
956         cls.pg9.resolve_arp()
957
958     def setUp(self):
959         super(TestNAT44EI, self).setUp()
960         self.vapi.nat44_plugin_enable_disable(
961             sessions=self.max_translations,
962             users=self.max_users, enable=1)
963
964     def tearDown(self):
965         super(TestNAT44EI, self).tearDown()
966         if not self.vpp_dead:
967             self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
968                                                src_port=self.ipfix_src_port,
969                                                enable=0)
970             self.ipfix_src_port = 4739
971             self.ipfix_domain_id = 1
972
973             self.vapi.nat44_plugin_enable_disable(enable=0)
974             self.vapi.cli("clear logging")
975
976     def test_clear_sessions(self):
977         """ NAT44EI session clearing test """
978
979         self.nat44_add_address(self.nat_addr)
980         flags = self.config_flags.NAT_IS_INSIDE
981         self.vapi.nat44_interface_add_del_feature(
982             sw_if_index=self.pg0.sw_if_index,
983             flags=flags, is_add=1)
984         self.vapi.nat44_interface_add_del_feature(
985             sw_if_index=self.pg1.sw_if_index,
986             is_add=1)
987
988         nat_config = self.vapi.nat_show_config()
989         self.assertEqual(0, nat_config.endpoint_dependent)
990
991         pkts = self.create_stream_in(self.pg0, self.pg1)
992         self.pg0.add_stream(pkts)
993         self.pg_enable_capture(self.pg_interfaces)
994         self.pg_start()
995         capture = self.pg1.get_capture(len(pkts))
996         self.verify_capture_out(capture)
997
998         sessions = self.statistics.get_counter('/nat44/total-sessions')
999         self.assertTrue(sessions[0][0] > 0)
1000         self.logger.info("sessions before clearing: %s" % sessions[0][0])
1001
1002         self.vapi.cli("clear nat44 sessions")
1003
1004         sessions = self.statistics.get_counter('/nat44/total-sessions')
1005         self.assertEqual(sessions[0][0], 0)
1006         self.logger.info("sessions after clearing: %s" % sessions[0][0])
1007
1008     def test_dynamic(self):
1009         """ NAT44EI dynamic translation test """
1010         self.nat44_add_address(self.nat_addr)
1011         flags = self.config_flags.NAT_IS_INSIDE
1012         self.vapi.nat44_interface_add_del_feature(
1013             sw_if_index=self.pg0.sw_if_index,
1014             flags=flags, is_add=1)
1015         self.vapi.nat44_interface_add_del_feature(
1016             sw_if_index=self.pg1.sw_if_index,
1017             is_add=1)
1018
1019         # in2out
1020         tcpn = self.statistics.get_counter('/nat44/in2out/slowpath/tcp')[0]
1021         udpn = self.statistics.get_counter('/nat44/in2out/slowpath/udp')[0]
1022         icmpn = self.statistics.get_counter('/nat44/in2out/slowpath/icmp')[0]
1023         drops = self.statistics.get_counter('/nat44/in2out/slowpath/drops')[0]
1024
1025         pkts = self.create_stream_in(self.pg0, self.pg1)
1026         self.pg0.add_stream(pkts)
1027         self.pg_enable_capture(self.pg_interfaces)
1028         self.pg_start()
1029         capture = self.pg1.get_capture(len(pkts))
1030         self.verify_capture_out(capture)
1031
1032         if_idx = self.pg0.sw_if_index
1033         cnt = self.statistics.get_counter('/nat44/in2out/slowpath/tcp')[0]
1034         self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
1035         cnt = self.statistics.get_counter('/nat44/in2out/slowpath/udp')[0]
1036         self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
1037         cnt = self.statistics.get_counter('/nat44/in2out/slowpath/icmp')[0]
1038         self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
1039         cnt = self.statistics.get_counter('/nat44/in2out/slowpath/drops')[0]
1040         self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
1041
1042         # out2in
1043         tcpn = self.statistics.get_counter('/nat44/out2in/slowpath/tcp')[0]
1044         udpn = self.statistics.get_counter('/nat44/out2in/slowpath/udp')[0]
1045         icmpn = self.statistics.get_counter('/nat44/out2in/slowpath/icmp')[0]
1046         drops = self.statistics.get_counter('/nat44/out2in/slowpath/drops')[0]
1047
1048         pkts = self.create_stream_out(self.pg1)
1049         self.pg1.add_stream(pkts)
1050         self.pg_enable_capture(self.pg_interfaces)
1051         self.pg_start()
1052         capture = self.pg0.get_capture(len(pkts))
1053         self.verify_capture_in(capture, self.pg0)
1054
1055         if_idx = self.pg1.sw_if_index
1056         cnt = self.statistics.get_counter('/nat44/out2in/slowpath/tcp')[0]
1057         self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
1058         cnt = self.statistics.get_counter('/nat44/out2in/slowpath/udp')[0]
1059         self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
1060         cnt = self.statistics.get_counter('/nat44/out2in/slowpath/icmp')[0]
1061         self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
1062         cnt = self.statistics.get_counter('/nat44/out2in/slowpath/drops')[0]
1063         self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
1064
1065         users = self.statistics.get_counter('/nat44/total-users')
1066         self.assertEqual(users[0][0], 1)
1067         sessions = self.statistics.get_counter('/nat44/total-sessions')
1068         self.assertEqual(sessions[0][0], 3)
1069
1070     def test_dynamic_icmp_errors_in2out_ttl_1(self):
1071         """ NAT44EI handling of client packets with TTL=1 """
1072
1073         self.nat44_add_address(self.nat_addr)
1074         flags = self.config_flags.NAT_IS_INSIDE
1075         self.vapi.nat44_interface_add_del_feature(
1076             sw_if_index=self.pg0.sw_if_index,
1077             flags=flags, is_add=1)
1078         self.vapi.nat44_interface_add_del_feature(
1079             sw_if_index=self.pg1.sw_if_index,
1080             is_add=1)
1081
1082         # Client side - generate traffic
1083         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1084         self.pg0.add_stream(pkts)
1085         self.pg_enable_capture(self.pg_interfaces)
1086         self.pg_start()
1087
1088         # Client side - verify ICMP type 11 packets
1089         capture = self.pg0.get_capture(len(pkts))
1090         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1091
1092     def test_dynamic_icmp_errors_out2in_ttl_1(self):
1093         """ NAT44EI handling of server packets with TTL=1 """
1094
1095         self.nat44_add_address(self.nat_addr)
1096         flags = self.config_flags.NAT_IS_INSIDE
1097         self.vapi.nat44_interface_add_del_feature(
1098             sw_if_index=self.pg0.sw_if_index,
1099             flags=flags, is_add=1)
1100         self.vapi.nat44_interface_add_del_feature(
1101             sw_if_index=self.pg1.sw_if_index,
1102             is_add=1)
1103
1104         # Client side - create sessions
1105         pkts = self.create_stream_in(self.pg0, self.pg1)
1106         self.pg0.add_stream(pkts)
1107         self.pg_enable_capture(self.pg_interfaces)
1108         self.pg_start()
1109
1110         # Server side - generate traffic
1111         capture = self.pg1.get_capture(len(pkts))
1112         self.verify_capture_out(capture)
1113         pkts = self.create_stream_out(self.pg1, ttl=1)
1114         self.pg1.add_stream(pkts)
1115         self.pg_enable_capture(self.pg_interfaces)
1116         self.pg_start()
1117
1118         # Server side - verify ICMP type 11 packets
1119         capture = self.pg1.get_capture(len(pkts))
1120         self.verify_capture_out_with_icmp_errors(capture,
1121                                                  src_ip=self.pg1.local_ip4)
1122
1123     def test_dynamic_icmp_errors_in2out_ttl_2(self):
1124         """ NAT44EI handling of error responses to client packets with TTL=2
1125         """
1126
1127         self.nat44_add_address(self.nat_addr)
1128         flags = self.config_flags.NAT_IS_INSIDE
1129         self.vapi.nat44_interface_add_del_feature(
1130             sw_if_index=self.pg0.sw_if_index,
1131             flags=flags, is_add=1)
1132         self.vapi.nat44_interface_add_del_feature(
1133             sw_if_index=self.pg1.sw_if_index,
1134             is_add=1)
1135
1136         # Client side - generate traffic
1137         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1138         self.pg0.add_stream(pkts)
1139         self.pg_enable_capture(self.pg_interfaces)
1140         self.pg_start()
1141
1142         # Server side - simulate ICMP type 11 response
1143         capture = self.pg1.get_capture(len(pkts))
1144         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1145                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1146                 ICMP(type=11) / packet[IP] for packet in capture]
1147         self.pg1.add_stream(pkts)
1148         self.pg_enable_capture(self.pg_interfaces)
1149         self.pg_start()
1150
1151         # Client side - verify ICMP type 11 packets
1152         capture = self.pg0.get_capture(len(pkts))
1153         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1154
1155     def test_dynamic_icmp_errors_out2in_ttl_2(self):
1156         """ NAT44EI handling of error responses to server packets with TTL=2
1157         """
1158
1159         self.nat44_add_address(self.nat_addr)
1160         flags = self.config_flags.NAT_IS_INSIDE
1161         self.vapi.nat44_interface_add_del_feature(
1162             sw_if_index=self.pg0.sw_if_index,
1163             flags=flags, is_add=1)
1164         self.vapi.nat44_interface_add_del_feature(
1165             sw_if_index=self.pg1.sw_if_index,
1166             is_add=1)
1167
1168         # Client side - create sessions
1169         pkts = self.create_stream_in(self.pg0, self.pg1)
1170         self.pg0.add_stream(pkts)
1171         self.pg_enable_capture(self.pg_interfaces)
1172         self.pg_start()
1173
1174         # Server side - generate traffic
1175         capture = self.pg1.get_capture(len(pkts))
1176         self.verify_capture_out(capture)
1177         pkts = self.create_stream_out(self.pg1, ttl=2)
1178         self.pg1.add_stream(pkts)
1179         self.pg_enable_capture(self.pg_interfaces)
1180         self.pg_start()
1181
1182         # Client side - simulate ICMP type 11 response
1183         capture = self.pg0.get_capture(len(pkts))
1184         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1185                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1186                 ICMP(type=11) / packet[IP] for packet in capture]
1187         self.pg0.add_stream(pkts)
1188         self.pg_enable_capture(self.pg_interfaces)
1189         self.pg_start()
1190
1191         # Server side - verify ICMP type 11 packets
1192         capture = self.pg1.get_capture(len(pkts))
1193         self.verify_capture_out_with_icmp_errors(capture)
1194
1195     def test_ping_out_interface_from_outside(self):
1196         """ NAT44EI ping out interface from outside network """
1197
1198         self.nat44_add_address(self.nat_addr)
1199         flags = self.config_flags.NAT_IS_INSIDE
1200         self.vapi.nat44_interface_add_del_feature(
1201             sw_if_index=self.pg0.sw_if_index,
1202             flags=flags, is_add=1)
1203         self.vapi.nat44_interface_add_del_feature(
1204             sw_if_index=self.pg1.sw_if_index,
1205             is_add=1)
1206
1207         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1208              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1209              ICMP(id=self.icmp_id_out, type='echo-request'))
1210         pkts = [p]
1211         self.pg1.add_stream(pkts)
1212         self.pg_enable_capture(self.pg_interfaces)
1213         self.pg_start()
1214         capture = self.pg1.get_capture(len(pkts))
1215         packet = capture[0]
1216         try:
1217             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1218             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1219             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1220             self.assertEqual(packet[ICMP].type, 0)  # echo reply
1221         except:
1222             self.logger.error(ppp("Unexpected or invalid packet "
1223                                   "(outside network):", packet))
1224             raise
1225
1226     def test_ping_internal_host_from_outside(self):
1227         """ NAT44EI ping internal host from outside network """
1228
1229         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1230         flags = self.config_flags.NAT_IS_INSIDE
1231         self.vapi.nat44_interface_add_del_feature(
1232             sw_if_index=self.pg0.sw_if_index,
1233             flags=flags, is_add=1)
1234         self.vapi.nat44_interface_add_del_feature(
1235             sw_if_index=self.pg1.sw_if_index,
1236             is_add=1)
1237
1238         # out2in
1239         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1240                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1241                ICMP(id=self.icmp_id_out, type='echo-request'))
1242         self.pg1.add_stream(pkt)
1243         self.pg_enable_capture(self.pg_interfaces)
1244         self.pg_start()
1245         capture = self.pg0.get_capture(1)
1246         self.verify_capture_in(capture, self.pg0)
1247         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1248
1249         # in2out
1250         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1251                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1252                ICMP(id=self.icmp_id_in, type='echo-reply'))
1253         self.pg0.add_stream(pkt)
1254         self.pg_enable_capture(self.pg_interfaces)
1255         self.pg_start()
1256         capture = self.pg1.get_capture(1)
1257         self.verify_capture_out(capture, same_port=True)
1258         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1259
1260     def test_forwarding(self):
1261         """ NAT44EI forwarding test """
1262
1263         flags = self.config_flags.NAT_IS_INSIDE
1264         self.vapi.nat44_interface_add_del_feature(
1265             sw_if_index=self.pg0.sw_if_index,
1266             flags=flags, is_add=1)
1267         self.vapi.nat44_interface_add_del_feature(
1268             sw_if_index=self.pg1.sw_if_index,
1269             is_add=1)
1270         self.vapi.nat44_forwarding_enable_disable(enable=1)
1271
1272         real_ip = self.pg0.remote_ip4
1273         alias_ip = self.nat_addr
1274         flags = self.config_flags.NAT_IS_ADDR_ONLY
1275         self.vapi.nat44_add_del_static_mapping(is_add=1,
1276                                                local_ip_address=real_ip,
1277                                                external_ip_address=alias_ip,
1278                                                external_sw_if_index=0xFFFFFFFF,
1279                                                flags=flags)
1280
1281         try:
1282             # static mapping match
1283
1284             pkts = self.create_stream_out(self.pg1)
1285             self.pg1.add_stream(pkts)
1286             self.pg_enable_capture(self.pg_interfaces)
1287             self.pg_start()
1288             capture = self.pg0.get_capture(len(pkts))
1289             self.verify_capture_in(capture, self.pg0)
1290
1291             pkts = self.create_stream_in(self.pg0, self.pg1)
1292             self.pg0.add_stream(pkts)
1293             self.pg_enable_capture(self.pg_interfaces)
1294             self.pg_start()
1295             capture = self.pg1.get_capture(len(pkts))
1296             self.verify_capture_out(capture, same_port=True)
1297
1298             # no static mapping match
1299
1300             host0 = self.pg0.remote_hosts[0]
1301             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1302             try:
1303                 pkts = self.create_stream_out(self.pg1,
1304                                               dst_ip=self.pg0.remote_ip4,
1305                                               use_inside_ports=True)
1306                 self.pg1.add_stream(pkts)
1307                 self.pg_enable_capture(self.pg_interfaces)
1308                 self.pg_start()
1309                 capture = self.pg0.get_capture(len(pkts))
1310                 self.verify_capture_in(capture, self.pg0)
1311
1312                 pkts = self.create_stream_in(self.pg0, self.pg1)
1313                 self.pg0.add_stream(pkts)
1314                 self.pg_enable_capture(self.pg_interfaces)
1315                 self.pg_start()
1316                 capture = self.pg1.get_capture(len(pkts))
1317                 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1318                                         same_port=True)
1319             finally:
1320                 self.pg0.remote_hosts[0] = host0
1321
1322         finally:
1323             self.vapi.nat44_forwarding_enable_disable(enable=0)
1324             flags = self.config_flags.NAT_IS_ADDR_ONLY
1325             self.vapi.nat44_add_del_static_mapping(
1326                 is_add=0,
1327                 local_ip_address=real_ip,
1328                 external_ip_address=alias_ip,
1329                 external_sw_if_index=0xFFFFFFFF,
1330                 flags=flags)
1331
1332     def test_static_in(self):
1333         """ NAT44EI 1:1 NAT initialized from inside network """
1334
1335         nat_ip = "10.0.0.10"
1336         self.tcp_port_out = 6303
1337         self.udp_port_out = 6304
1338         self.icmp_id_out = 6305
1339
1340         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1341         flags = self.config_flags.NAT_IS_INSIDE
1342         self.vapi.nat44_interface_add_del_feature(
1343             sw_if_index=self.pg0.sw_if_index,
1344             flags=flags, is_add=1)
1345         self.vapi.nat44_interface_add_del_feature(
1346             sw_if_index=self.pg1.sw_if_index,
1347             is_add=1)
1348         sm = self.vapi.nat44_static_mapping_dump()
1349         self.assertEqual(len(sm), 1)
1350         self.assertEqual(sm[0].tag, '')
1351         self.assertEqual(sm[0].protocol, 0)
1352         self.assertEqual(sm[0].local_port, 0)
1353         self.assertEqual(sm[0].external_port, 0)
1354
1355         # in2out
1356         pkts = self.create_stream_in(self.pg0, self.pg1)
1357         self.pg0.add_stream(pkts)
1358         self.pg_enable_capture(self.pg_interfaces)
1359         self.pg_start()
1360         capture = self.pg1.get_capture(len(pkts))
1361         self.verify_capture_out(capture, nat_ip, True)
1362
1363         # out2in
1364         pkts = self.create_stream_out(self.pg1, nat_ip)
1365         self.pg1.add_stream(pkts)
1366         self.pg_enable_capture(self.pg_interfaces)
1367         self.pg_start()
1368         capture = self.pg0.get_capture(len(pkts))
1369         self.verify_capture_in(capture, self.pg0)
1370
1371     def test_static_out(self):
1372         """ NAT44EI 1:1 NAT initialized from outside network """
1373
1374         nat_ip = "10.0.0.20"
1375         self.tcp_port_out = 6303
1376         self.udp_port_out = 6304
1377         self.icmp_id_out = 6305
1378         tag = "testTAG"
1379
1380         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1381         flags = self.config_flags.NAT_IS_INSIDE
1382         self.vapi.nat44_interface_add_del_feature(
1383             sw_if_index=self.pg0.sw_if_index,
1384             flags=flags, is_add=1)
1385         self.vapi.nat44_interface_add_del_feature(
1386             sw_if_index=self.pg1.sw_if_index,
1387             is_add=1)
1388         sm = self.vapi.nat44_static_mapping_dump()
1389         self.assertEqual(len(sm), 1)
1390         self.assertEqual(sm[0].tag, tag)
1391
1392         # out2in
1393         pkts = self.create_stream_out(self.pg1, nat_ip)
1394         self.pg1.add_stream(pkts)
1395         self.pg_enable_capture(self.pg_interfaces)
1396         self.pg_start()
1397         capture = self.pg0.get_capture(len(pkts))
1398         self.verify_capture_in(capture, self.pg0)
1399
1400         # in2out
1401         pkts = self.create_stream_in(self.pg0, self.pg1)
1402         self.pg0.add_stream(pkts)
1403         self.pg_enable_capture(self.pg_interfaces)
1404         self.pg_start()
1405         capture = self.pg1.get_capture(len(pkts))
1406         self.verify_capture_out(capture, nat_ip, True)
1407
1408     def test_static_with_port_in(self):
1409         """ NAT44EI 1:1 NAPT initialized from inside network """
1410
1411         self.tcp_port_out = 3606
1412         self.udp_port_out = 3607
1413         self.icmp_id_out = 3608
1414
1415         self.nat44_add_address(self.nat_addr)
1416         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1417                                       self.tcp_port_in, self.tcp_port_out,
1418                                       proto=IP_PROTOS.tcp)
1419         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1420                                       self.udp_port_in, self.udp_port_out,
1421                                       proto=IP_PROTOS.udp)
1422         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1423                                       self.icmp_id_in, self.icmp_id_out,
1424                                       proto=IP_PROTOS.icmp)
1425         flags = self.config_flags.NAT_IS_INSIDE
1426         self.vapi.nat44_interface_add_del_feature(
1427             sw_if_index=self.pg0.sw_if_index,
1428             flags=flags, is_add=1)
1429         self.vapi.nat44_interface_add_del_feature(
1430             sw_if_index=self.pg1.sw_if_index,
1431             is_add=1)
1432
1433         # in2out
1434         pkts = self.create_stream_in(self.pg0, self.pg1)
1435         self.pg0.add_stream(pkts)
1436         self.pg_enable_capture(self.pg_interfaces)
1437         self.pg_start()
1438         capture = self.pg1.get_capture(len(pkts))
1439         self.verify_capture_out(capture)
1440
1441         # out2in
1442         pkts = self.create_stream_out(self.pg1)
1443         self.pg1.add_stream(pkts)
1444         self.pg_enable_capture(self.pg_interfaces)
1445         self.pg_start()
1446         capture = self.pg0.get_capture(len(pkts))
1447         self.verify_capture_in(capture, self.pg0)
1448
1449     def test_static_with_port_out(self):
1450         """ NAT44EI 1:1 NAPT initialized from outside network """
1451
1452         self.tcp_port_out = 30606
1453         self.udp_port_out = 30607
1454         self.icmp_id_out = 30608
1455
1456         self.nat44_add_address(self.nat_addr)
1457         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1458                                       self.tcp_port_in, self.tcp_port_out,
1459                                       proto=IP_PROTOS.tcp)
1460         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1461                                       self.udp_port_in, self.udp_port_out,
1462                                       proto=IP_PROTOS.udp)
1463         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1464                                       self.icmp_id_in, self.icmp_id_out,
1465                                       proto=IP_PROTOS.icmp)
1466         flags = self.config_flags.NAT_IS_INSIDE
1467         self.vapi.nat44_interface_add_del_feature(
1468             sw_if_index=self.pg0.sw_if_index,
1469             flags=flags, is_add=1)
1470         self.vapi.nat44_interface_add_del_feature(
1471             sw_if_index=self.pg1.sw_if_index,
1472             is_add=1)
1473
1474         # out2in
1475         pkts = self.create_stream_out(self.pg1)
1476         self.pg1.add_stream(pkts)
1477         self.pg_enable_capture(self.pg_interfaces)
1478         self.pg_start()
1479         capture = self.pg0.get_capture(len(pkts))
1480         self.verify_capture_in(capture, self.pg0)
1481
1482         # in2out
1483         pkts = self.create_stream_in(self.pg0, self.pg1)
1484         self.pg0.add_stream(pkts)
1485         self.pg_enable_capture(self.pg_interfaces)
1486         self.pg_start()
1487         capture = self.pg1.get_capture(len(pkts))
1488         self.verify_capture_out(capture)
1489
1490     def test_static_vrf_aware(self):
1491         """ NAT44EI 1:1 NAT VRF awareness """
1492
1493         nat_ip1 = "10.0.0.30"
1494         nat_ip2 = "10.0.0.40"
1495         self.tcp_port_out = 6303
1496         self.udp_port_out = 6304
1497         self.icmp_id_out = 6305
1498
1499         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1500                                       vrf_id=10)
1501         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1502                                       vrf_id=10)
1503         flags = self.config_flags.NAT_IS_INSIDE
1504         self.vapi.nat44_interface_add_del_feature(
1505             sw_if_index=self.pg3.sw_if_index,
1506             is_add=1)
1507         self.vapi.nat44_interface_add_del_feature(
1508             sw_if_index=self.pg0.sw_if_index,
1509             flags=flags, is_add=1)
1510         self.vapi.nat44_interface_add_del_feature(
1511             sw_if_index=self.pg4.sw_if_index,
1512             flags=flags, is_add=1)
1513
1514         # inside interface VRF match NAT44EI static mapping VRF
1515         pkts = self.create_stream_in(self.pg4, self.pg3)
1516         self.pg4.add_stream(pkts)
1517         self.pg_enable_capture(self.pg_interfaces)
1518         self.pg_start()
1519         capture = self.pg3.get_capture(len(pkts))
1520         self.verify_capture_out(capture, nat_ip1, True)
1521
1522         # inside interface VRF don't match NAT44EI static mapping VRF (packets
1523         # are dropped)
1524         pkts = self.create_stream_in(self.pg0, self.pg3)
1525         self.pg0.add_stream(pkts)
1526         self.pg_enable_capture(self.pg_interfaces)
1527         self.pg_start()
1528         self.pg3.assert_nothing_captured()
1529
1530     def test_dynamic_to_static(self):
1531         """ NAT44EI Switch from dynamic translation to 1:1NAT """
1532         nat_ip = "10.0.0.10"
1533         self.tcp_port_out = 6303
1534         self.udp_port_out = 6304
1535         self.icmp_id_out = 6305
1536
1537         self.nat44_add_address(self.nat_addr)
1538         flags = self.config_flags.NAT_IS_INSIDE
1539         self.vapi.nat44_interface_add_del_feature(
1540             sw_if_index=self.pg0.sw_if_index,
1541             flags=flags, is_add=1)
1542         self.vapi.nat44_interface_add_del_feature(
1543             sw_if_index=self.pg1.sw_if_index,
1544             is_add=1)
1545
1546         # dynamic
1547         pkts = self.create_stream_in(self.pg0, self.pg1)
1548         self.pg0.add_stream(pkts)
1549         self.pg_enable_capture(self.pg_interfaces)
1550         self.pg_start()
1551         capture = self.pg1.get_capture(len(pkts))
1552         self.verify_capture_out(capture)
1553
1554         # 1:1NAT
1555         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1556         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
1557         self.assertEqual(len(sessions), 0)
1558         pkts = self.create_stream_in(self.pg0, self.pg1)
1559         self.pg0.add_stream(pkts)
1560         self.pg_enable_capture(self.pg_interfaces)
1561         self.pg_start()
1562         capture = self.pg1.get_capture(len(pkts))
1563         self.verify_capture_out(capture, nat_ip, True)
1564
1565     def test_identity_nat(self):
1566         """ NAT44EI Identity NAT """
1567         flags = self.config_flags.NAT_IS_ADDR_ONLY
1568         self.vapi.nat44_add_del_identity_mapping(
1569             ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF,
1570             flags=flags, is_add=1)
1571         flags = self.config_flags.NAT_IS_INSIDE
1572         self.vapi.nat44_interface_add_del_feature(
1573             sw_if_index=self.pg0.sw_if_index,
1574             flags=flags, is_add=1)
1575         self.vapi.nat44_interface_add_del_feature(
1576             sw_if_index=self.pg1.sw_if_index,
1577             is_add=1)
1578
1579         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1580              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1581              TCP(sport=12345, dport=56789))
1582         self.pg1.add_stream(p)
1583         self.pg_enable_capture(self.pg_interfaces)
1584         self.pg_start()
1585         capture = self.pg0.get_capture(1)
1586         p = capture[0]
1587         try:
1588             ip = p[IP]
1589             tcp = p[TCP]
1590             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1591             self.assertEqual(ip.src, self.pg1.remote_ip4)
1592             self.assertEqual(tcp.dport, 56789)
1593             self.assertEqual(tcp.sport, 12345)
1594             self.assert_packet_checksums_valid(p)
1595         except:
1596             self.logger.error(ppp("Unexpected or invalid packet:", p))
1597             raise
1598
1599         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
1600         self.assertEqual(len(sessions), 0)
1601         flags = self.config_flags.NAT_IS_ADDR_ONLY
1602         self.vapi.nat44_add_del_identity_mapping(
1603             ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF,
1604             flags=flags, vrf_id=1, is_add=1)
1605         identity_mappings = self.vapi.nat44_identity_mapping_dump()
1606         self.assertEqual(len(identity_mappings), 2)
1607
1608     def test_multiple_inside_interfaces(self):
1609         """ NAT44EI multiple non-overlapping address space inside interfaces
1610         """
1611
1612         self.nat44_add_address(self.nat_addr)
1613         flags = self.config_flags.NAT_IS_INSIDE
1614         self.vapi.nat44_interface_add_del_feature(
1615             sw_if_index=self.pg0.sw_if_index,
1616             flags=flags, is_add=1)
1617         self.vapi.nat44_interface_add_del_feature(
1618             sw_if_index=self.pg1.sw_if_index,
1619             flags=flags, is_add=1)
1620         self.vapi.nat44_interface_add_del_feature(
1621             sw_if_index=self.pg3.sw_if_index,
1622             is_add=1)
1623
1624         # between two NAT44EI inside interfaces (no translation)
1625         pkts = self.create_stream_in(self.pg0, self.pg1)
1626         self.pg0.add_stream(pkts)
1627         self.pg_enable_capture(self.pg_interfaces)
1628         self.pg_start()
1629         capture = self.pg1.get_capture(len(pkts))
1630         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1631
1632         # from inside to interface without translation
1633         pkts = self.create_stream_in(self.pg0, self.pg2)
1634         self.pg0.add_stream(pkts)
1635         self.pg_enable_capture(self.pg_interfaces)
1636         self.pg_start()
1637         capture = self.pg2.get_capture(len(pkts))
1638         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1639
1640         # in2out 1st interface
1641         pkts = self.create_stream_in(self.pg0, self.pg3)
1642         self.pg0.add_stream(pkts)
1643         self.pg_enable_capture(self.pg_interfaces)
1644         self.pg_start()
1645         capture = self.pg3.get_capture(len(pkts))
1646         self.verify_capture_out(capture)
1647
1648         # out2in 1st interface
1649         pkts = self.create_stream_out(self.pg3)
1650         self.pg3.add_stream(pkts)
1651         self.pg_enable_capture(self.pg_interfaces)
1652         self.pg_start()
1653         capture = self.pg0.get_capture(len(pkts))
1654         self.verify_capture_in(capture, self.pg0)
1655
1656         # in2out 2nd interface
1657         pkts = self.create_stream_in(self.pg1, self.pg3)
1658         self.pg1.add_stream(pkts)
1659         self.pg_enable_capture(self.pg_interfaces)
1660         self.pg_start()
1661         capture = self.pg3.get_capture(len(pkts))
1662         self.verify_capture_out(capture)
1663
1664         # out2in 2nd interface
1665         pkts = self.create_stream_out(self.pg3)
1666         self.pg3.add_stream(pkts)
1667         self.pg_enable_capture(self.pg_interfaces)
1668         self.pg_start()
1669         capture = self.pg1.get_capture(len(pkts))
1670         self.verify_capture_in(capture, self.pg1)
1671
1672     def test_inside_overlapping_interfaces(self):
1673         """ NAT44EI multiple inside interfaces with overlapping address space
1674         """
1675
1676         static_nat_ip = "10.0.0.10"
1677         self.nat44_add_address(self.nat_addr)
1678         flags = self.config_flags.NAT_IS_INSIDE
1679         self.vapi.nat44_interface_add_del_feature(
1680             sw_if_index=self.pg3.sw_if_index,
1681             is_add=1)
1682         self.vapi.nat44_interface_add_del_feature(
1683             sw_if_index=self.pg4.sw_if_index,
1684             flags=flags, is_add=1)
1685         self.vapi.nat44_interface_add_del_feature(
1686             sw_if_index=self.pg5.sw_if_index,
1687             flags=flags, is_add=1)
1688         self.vapi.nat44_interface_add_del_feature(
1689             sw_if_index=self.pg6.sw_if_index,
1690             flags=flags, is_add=1)
1691         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1692                                       vrf_id=20)
1693
1694         # between NAT44EI inside interfaces with same VRF (no translation)
1695         pkts = self.create_stream_in(self.pg4, self.pg5)
1696         self.pg4.add_stream(pkts)
1697         self.pg_enable_capture(self.pg_interfaces)
1698         self.pg_start()
1699         capture = self.pg5.get_capture(len(pkts))
1700         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1701
1702         # between NAT44EI inside interfaces with different VRF (hairpinning)
1703         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1704              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1705              TCP(sport=1234, dport=5678))
1706         self.pg4.add_stream(p)
1707         self.pg_enable_capture(self.pg_interfaces)
1708         self.pg_start()
1709         capture = self.pg6.get_capture(1)
1710         p = capture[0]
1711         try:
1712             ip = p[IP]
1713             tcp = p[TCP]
1714             self.assertEqual(ip.src, self.nat_addr)
1715             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1716             self.assertNotEqual(tcp.sport, 1234)
1717             self.assertEqual(tcp.dport, 5678)
1718         except:
1719             self.logger.error(ppp("Unexpected or invalid packet:", p))
1720             raise
1721
1722         # in2out 1st interface
1723         pkts = self.create_stream_in(self.pg4, self.pg3)
1724         self.pg4.add_stream(pkts)
1725         self.pg_enable_capture(self.pg_interfaces)
1726         self.pg_start()
1727         capture = self.pg3.get_capture(len(pkts))
1728         self.verify_capture_out(capture)
1729
1730         # out2in 1st interface
1731         pkts = self.create_stream_out(self.pg3)
1732         self.pg3.add_stream(pkts)
1733         self.pg_enable_capture(self.pg_interfaces)
1734         self.pg_start()
1735         capture = self.pg4.get_capture(len(pkts))
1736         self.verify_capture_in(capture, self.pg4)
1737
1738         # in2out 2nd interface
1739         pkts = self.create_stream_in(self.pg5, self.pg3)
1740         self.pg5.add_stream(pkts)
1741         self.pg_enable_capture(self.pg_interfaces)
1742         self.pg_start()
1743         capture = self.pg3.get_capture(len(pkts))
1744         self.verify_capture_out(capture)
1745
1746         # out2in 2nd interface
1747         pkts = self.create_stream_out(self.pg3)
1748         self.pg3.add_stream(pkts)
1749         self.pg_enable_capture(self.pg_interfaces)
1750         self.pg_start()
1751         capture = self.pg5.get_capture(len(pkts))
1752         self.verify_capture_in(capture, self.pg5)
1753
1754         # pg5 session dump
1755         addresses = self.vapi.nat44_address_dump()
1756         self.assertEqual(len(addresses), 1)
1757         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4, 10)
1758         self.assertEqual(len(sessions), 3)
1759         for session in sessions:
1760             self.assertFalse(session.flags & self.config_flags.NAT_IS_STATIC)
1761             self.assertEqual(str(session.inside_ip_address),
1762                              self.pg5.remote_ip4)
1763             self.assertEqual(session.outside_ip_address,
1764                              addresses[0].ip_address)
1765         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1766         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1767         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1768         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1769         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1770         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1771         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1772         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1773         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1774
1775         # in2out 3rd interface
1776         pkts = self.create_stream_in(self.pg6, self.pg3)
1777         self.pg6.add_stream(pkts)
1778         self.pg_enable_capture(self.pg_interfaces)
1779         self.pg_start()
1780         capture = self.pg3.get_capture(len(pkts))
1781         self.verify_capture_out(capture, static_nat_ip, True)
1782
1783         # out2in 3rd interface
1784         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1785         self.pg3.add_stream(pkts)
1786         self.pg_enable_capture(self.pg_interfaces)
1787         self.pg_start()
1788         capture = self.pg6.get_capture(len(pkts))
1789         self.verify_capture_in(capture, self.pg6)
1790
1791         # general user and session dump verifications
1792         users = self.vapi.nat44_user_dump()
1793         self.assertGreaterEqual(len(users), 3)
1794         addresses = self.vapi.nat44_address_dump()
1795         self.assertEqual(len(addresses), 1)
1796         for user in users:
1797             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1798                                                          user.vrf_id)
1799             for session in sessions:
1800                 self.assertEqual(user.ip_address, session.inside_ip_address)
1801                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1802                 self.assertTrue(session.protocol in
1803                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1804                                  IP_PROTOS.icmp])
1805                 self.assertFalse(session.flags &
1806                                  self.config_flags.NAT_IS_EXT_HOST_VALID)
1807
1808         # pg4 session dump
1809         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4, 10)
1810         self.assertGreaterEqual(len(sessions), 4)
1811         for session in sessions:
1812             self.assertFalse(session.flags & self.config_flags.NAT_IS_STATIC)
1813             self.assertEqual(str(session.inside_ip_address),
1814                              self.pg4.remote_ip4)
1815             self.assertEqual(session.outside_ip_address,
1816                              addresses[0].ip_address)
1817
1818         # pg6 session dump
1819         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4, 20)
1820         self.assertGreaterEqual(len(sessions), 3)
1821         for session in sessions:
1822             self.assertTrue(session.flags & self.config_flags.NAT_IS_STATIC)
1823             self.assertEqual(str(session.inside_ip_address),
1824                              self.pg6.remote_ip4)
1825             self.assertEqual(str(session.outside_ip_address),
1826                              static_nat_ip)
1827             self.assertTrue(session.inside_port in
1828                             [self.tcp_port_in, self.udp_port_in,
1829                              self.icmp_id_in])
1830
1831     def test_hairpinning(self):
1832         """ NAT44EI hairpinning - 1:1 NAPT """
1833
1834         host = self.pg0.remote_hosts[0]
1835         server = self.pg0.remote_hosts[1]
1836         host_in_port = 1234
1837         host_out_port = 0
1838         server_in_port = 5678
1839         server_out_port = 8765
1840
1841         self.nat44_add_address(self.nat_addr)
1842         flags = self.config_flags.NAT_IS_INSIDE
1843         self.vapi.nat44_interface_add_del_feature(
1844             sw_if_index=self.pg0.sw_if_index,
1845             flags=flags, is_add=1)
1846         self.vapi.nat44_interface_add_del_feature(
1847             sw_if_index=self.pg1.sw_if_index,
1848             is_add=1)
1849
1850         # add static mapping for server
1851         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1852                                       server_in_port, server_out_port,
1853                                       proto=IP_PROTOS.tcp)
1854
1855         cnt = self.statistics.get_counter('/nat44/hairpinning')[0]
1856         # send packet from host to server
1857         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1858              IP(src=host.ip4, dst=self.nat_addr) /
1859              TCP(sport=host_in_port, dport=server_out_port))
1860         self.pg0.add_stream(p)
1861         self.pg_enable_capture(self.pg_interfaces)
1862         self.pg_start()
1863         capture = self.pg0.get_capture(1)
1864         p = capture[0]
1865         try:
1866             ip = p[IP]
1867             tcp = p[TCP]
1868             self.assertEqual(ip.src, self.nat_addr)
1869             self.assertEqual(ip.dst, server.ip4)
1870             self.assertNotEqual(tcp.sport, host_in_port)
1871             self.assertEqual(tcp.dport, server_in_port)
1872             self.assert_packet_checksums_valid(p)
1873             host_out_port = tcp.sport
1874         except:
1875             self.logger.error(ppp("Unexpected or invalid packet:", p))
1876             raise
1877
1878         after = self.statistics.get_counter('/nat44/hairpinning')[0]
1879         if_idx = self.pg0.sw_if_index
1880         self.assertEqual(after[if_idx] - cnt[if_idx], 1)
1881
1882         # send reply from server to host
1883         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1884              IP(src=server.ip4, dst=self.nat_addr) /
1885              TCP(sport=server_in_port, dport=host_out_port))
1886         self.pg0.add_stream(p)
1887         self.pg_enable_capture(self.pg_interfaces)
1888         self.pg_start()
1889         capture = self.pg0.get_capture(1)
1890         p = capture[0]
1891         try:
1892             ip = p[IP]
1893             tcp = p[TCP]
1894             self.assertEqual(ip.src, self.nat_addr)
1895             self.assertEqual(ip.dst, host.ip4)
1896             self.assertEqual(tcp.sport, server_out_port)
1897             self.assertEqual(tcp.dport, host_in_port)
1898             self.assert_packet_checksums_valid(p)
1899         except:
1900             self.logger.error(ppp("Unexpected or invalid packet:", p))
1901             raise
1902
1903         after = self.statistics.get_counter('/nat44/hairpinning')[0]
1904         if_idx = self.pg0.sw_if_index
1905         self.assertEqual(after[if_idx] - cnt[if_idx], 2)
1906
1907     def test_hairpinning2(self):
1908         """ NAT44EI hairpinning - 1:1 NAT"""
1909
1910         server1_nat_ip = "10.0.0.10"
1911         server2_nat_ip = "10.0.0.11"
1912         host = self.pg0.remote_hosts[0]
1913         server1 = self.pg0.remote_hosts[1]
1914         server2 = self.pg0.remote_hosts[2]
1915         server_tcp_port = 22
1916         server_udp_port = 20
1917
1918         self.nat44_add_address(self.nat_addr)
1919         flags = self.config_flags.NAT_IS_INSIDE
1920         self.vapi.nat44_interface_add_del_feature(
1921             sw_if_index=self.pg0.sw_if_index,
1922             flags=flags, is_add=1)
1923         self.vapi.nat44_interface_add_del_feature(
1924             sw_if_index=self.pg1.sw_if_index,
1925             is_add=1)
1926
1927         # add static mapping for servers
1928         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1929         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1930
1931         # host to server1
1932         pkts = []
1933         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1934              IP(src=host.ip4, dst=server1_nat_ip) /
1935              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1936         pkts.append(p)
1937         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1938              IP(src=host.ip4, dst=server1_nat_ip) /
1939              UDP(sport=self.udp_port_in, dport=server_udp_port))
1940         pkts.append(p)
1941         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1942              IP(src=host.ip4, dst=server1_nat_ip) /
1943              ICMP(id=self.icmp_id_in, type='echo-request'))
1944         pkts.append(p)
1945         self.pg0.add_stream(pkts)
1946         self.pg_enable_capture(self.pg_interfaces)
1947         self.pg_start()
1948         capture = self.pg0.get_capture(len(pkts))
1949         for packet in capture:
1950             try:
1951                 self.assertEqual(packet[IP].src, self.nat_addr)
1952                 self.assertEqual(packet[IP].dst, server1.ip4)
1953                 if packet.haslayer(TCP):
1954                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1955                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1956                     self.tcp_port_out = packet[TCP].sport
1957                     self.assert_packet_checksums_valid(packet)
1958                 elif packet.haslayer(UDP):
1959                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1960                     self.assertEqual(packet[UDP].dport, server_udp_port)
1961                     self.udp_port_out = packet[UDP].sport
1962                 else:
1963                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1964                     self.icmp_id_out = packet[ICMP].id
1965             except:
1966                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1967                 raise
1968
1969         # server1 to host
1970         pkts = []
1971         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1972              IP(src=server1.ip4, dst=self.nat_addr) /
1973              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1974         pkts.append(p)
1975         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1976              IP(src=server1.ip4, dst=self.nat_addr) /
1977              UDP(sport=server_udp_port, dport=self.udp_port_out))
1978         pkts.append(p)
1979         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1980              IP(src=server1.ip4, dst=self.nat_addr) /
1981              ICMP(id=self.icmp_id_out, type='echo-reply'))
1982         pkts.append(p)
1983         self.pg0.add_stream(pkts)
1984         self.pg_enable_capture(self.pg_interfaces)
1985         self.pg_start()
1986         capture = self.pg0.get_capture(len(pkts))
1987         for packet in capture:
1988             try:
1989                 self.assertEqual(packet[IP].src, server1_nat_ip)
1990                 self.assertEqual(packet[IP].dst, host.ip4)
1991                 if packet.haslayer(TCP):
1992                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1993                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1994                     self.assert_packet_checksums_valid(packet)
1995                 elif packet.haslayer(UDP):
1996                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1997                     self.assertEqual(packet[UDP].sport, server_udp_port)
1998                 else:
1999                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2000             except:
2001                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2002                 raise
2003
2004         # server2 to server1
2005         pkts = []
2006         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2007              IP(src=server2.ip4, dst=server1_nat_ip) /
2008              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2009         pkts.append(p)
2010         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2011              IP(src=server2.ip4, dst=server1_nat_ip) /
2012              UDP(sport=self.udp_port_in, dport=server_udp_port))
2013         pkts.append(p)
2014         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2015              IP(src=server2.ip4, dst=server1_nat_ip) /
2016              ICMP(id=self.icmp_id_in, type='echo-request'))
2017         pkts.append(p)
2018         self.pg0.add_stream(pkts)
2019         self.pg_enable_capture(self.pg_interfaces)
2020         self.pg_start()
2021         capture = self.pg0.get_capture(len(pkts))
2022         for packet in capture:
2023             try:
2024                 self.assertEqual(packet[IP].src, server2_nat_ip)
2025                 self.assertEqual(packet[IP].dst, server1.ip4)
2026                 if packet.haslayer(TCP):
2027                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2028                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2029                     self.tcp_port_out = packet[TCP].sport
2030                     self.assert_packet_checksums_valid(packet)
2031                 elif packet.haslayer(UDP):
2032                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
2033                     self.assertEqual(packet[UDP].dport, server_udp_port)
2034                     self.udp_port_out = packet[UDP].sport
2035                 else:
2036                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2037                     self.icmp_id_out = packet[ICMP].id
2038             except:
2039                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2040                 raise
2041
2042         # server1 to server2
2043         pkts = []
2044         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2045              IP(src=server1.ip4, dst=server2_nat_ip) /
2046              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2047         pkts.append(p)
2048         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2049              IP(src=server1.ip4, dst=server2_nat_ip) /
2050              UDP(sport=server_udp_port, dport=self.udp_port_out))
2051         pkts.append(p)
2052         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2053              IP(src=server1.ip4, dst=server2_nat_ip) /
2054              ICMP(id=self.icmp_id_out, type='echo-reply'))
2055         pkts.append(p)
2056         self.pg0.add_stream(pkts)
2057         self.pg_enable_capture(self.pg_interfaces)
2058         self.pg_start()
2059         capture = self.pg0.get_capture(len(pkts))
2060         for packet in capture:
2061             try:
2062                 self.assertEqual(packet[IP].src, server1_nat_ip)
2063                 self.assertEqual(packet[IP].dst, server2.ip4)
2064                 if packet.haslayer(TCP):
2065                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2066                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2067                     self.assert_packet_checksums_valid(packet)
2068                 elif packet.haslayer(UDP):
2069                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2070                     self.assertEqual(packet[UDP].sport, server_udp_port)
2071                 else:
2072                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2073             except:
2074                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2075                 raise
2076
2077     def test_hairpinning_avoid_inf_loop(self):
2078         """ NAT44 hairpinning - 1:1 NAPT avoid infinite loop """
2079
2080         host = self.pg0.remote_hosts[0]
2081         server = self.pg0.remote_hosts[1]
2082         host_in_port = 1234
2083         host_out_port = 0
2084         server_in_port = 5678
2085         server_out_port = 8765
2086
2087         self.nat44_add_address(self.nat_addr)
2088         flags = self.config_flags.NAT_IS_INSIDE
2089         self.vapi.nat44_interface_add_del_feature(
2090             sw_if_index=self.pg0.sw_if_index,
2091             flags=flags, is_add=1)
2092         self.vapi.nat44_interface_add_del_feature(
2093             sw_if_index=self.pg1.sw_if_index,
2094             is_add=1)
2095
2096         # add static mapping for server
2097         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2098                                       server_in_port, server_out_port,
2099                                       proto=IP_PROTOS.tcp)
2100
2101         # add another static mapping that maps pg0.local_ip4 address to itself
2102         self.nat44_add_static_mapping(self.pg0.local_ip4, self.pg0.local_ip4)
2103
2104         # send packet from host to VPP (the packet should get dropped)
2105         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2106              IP(src=host.ip4, dst=self.pg0.local_ip4) /
2107              TCP(sport=host_in_port, dport=server_out_port))
2108         self.pg0.add_stream(p)
2109         self.pg_enable_capture(self.pg_interfaces)
2110         self.pg_start()
2111         # Here VPP used to crash due to an infinite loop
2112
2113         cnt = self.statistics.get_counter('/nat44/hairpinning')[0]
2114         # send packet from host to server
2115         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2116              IP(src=host.ip4, dst=self.nat_addr) /
2117              TCP(sport=host_in_port, dport=server_out_port))
2118         self.pg0.add_stream(p)
2119         self.pg_enable_capture(self.pg_interfaces)
2120         self.pg_start()
2121         capture = self.pg0.get_capture(1)
2122         p = capture[0]
2123         try:
2124             ip = p[IP]
2125             tcp = p[TCP]
2126             self.assertEqual(ip.src, self.nat_addr)
2127             self.assertEqual(ip.dst, server.ip4)
2128             self.assertNotEqual(tcp.sport, host_in_port)
2129             self.assertEqual(tcp.dport, server_in_port)
2130             self.assert_packet_checksums_valid(p)
2131             host_out_port = tcp.sport
2132         except:
2133             self.logger.error(ppp("Unexpected or invalid packet:", p))
2134             raise
2135
2136         after = self.statistics.get_counter('/nat44/hairpinning')[0]
2137         if_idx = self.pg0.sw_if_index
2138         self.assertEqual(after[if_idx] - cnt[if_idx], 1)
2139
2140         # send reply from server to host
2141         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2142              IP(src=server.ip4, dst=self.nat_addr) /
2143              TCP(sport=server_in_port, dport=host_out_port))
2144         self.pg0.add_stream(p)
2145         self.pg_enable_capture(self.pg_interfaces)
2146         self.pg_start()
2147         capture = self.pg0.get_capture(1)
2148         p = capture[0]
2149         try:
2150             ip = p[IP]
2151             tcp = p[TCP]
2152             self.assertEqual(ip.src, self.nat_addr)
2153             self.assertEqual(ip.dst, host.ip4)
2154             self.assertEqual(tcp.sport, server_out_port)
2155             self.assertEqual(tcp.dport, host_in_port)
2156             self.assert_packet_checksums_valid(p)
2157         except:
2158             self.logger.error(ppp("Unexpected or invalid packet:", p))
2159             raise
2160
2161         after = self.statistics.get_counter('/nat44/hairpinning')[0]
2162         if_idx = self.pg0.sw_if_index
2163         self.assertEqual(after[if_idx] - cnt[if_idx], 2)
2164
2165     def test_interface_addr(self):
2166         """ NAT44EI acquire addresses from interface """
2167         self.vapi.nat44_add_del_interface_addr(
2168             is_add=1,
2169             sw_if_index=self.pg7.sw_if_index)
2170
2171         # no address in NAT pool
2172         addresses = self.vapi.nat44_address_dump()
2173         self.assertEqual(0, len(addresses))
2174
2175         # configure interface address and check NAT address pool
2176         self.pg7.config_ip4()
2177         addresses = self.vapi.nat44_address_dump()
2178         self.assertEqual(1, len(addresses))
2179         self.assertEqual(str(addresses[0].ip_address), self.pg7.local_ip4)
2180
2181         # remove interface address and check NAT address pool
2182         self.pg7.unconfig_ip4()
2183         addresses = self.vapi.nat44_address_dump()
2184         self.assertEqual(0, len(addresses))
2185
2186     def test_interface_addr_static_mapping(self):
2187         """ NAT44EI Static mapping with addresses from interface """
2188         tag = "testTAG"
2189
2190         self.vapi.nat44_add_del_interface_addr(
2191             is_add=1,
2192             sw_if_index=self.pg7.sw_if_index)
2193         self.nat44_add_static_mapping(
2194             '1.2.3.4',
2195             external_sw_if_index=self.pg7.sw_if_index,
2196             tag=tag)
2197
2198         # static mappings with external interface
2199         static_mappings = self.vapi.nat44_static_mapping_dump()
2200         self.assertEqual(1, len(static_mappings))
2201         self.assertEqual(self.pg7.sw_if_index,
2202                          static_mappings[0].external_sw_if_index)
2203         self.assertEqual(static_mappings[0].tag, tag)
2204
2205         # configure interface address and check static mappings
2206         self.pg7.config_ip4()
2207         static_mappings = self.vapi.nat44_static_mapping_dump()
2208         self.assertEqual(2, len(static_mappings))
2209         resolved = False
2210         for sm in static_mappings:
2211             if sm.external_sw_if_index == 0xFFFFFFFF:
2212                 self.assertEqual(str(sm.external_ip_address),
2213                                  self.pg7.local_ip4)
2214                 self.assertEqual(sm.tag, tag)
2215                 resolved = True
2216         self.assertTrue(resolved)
2217
2218         # remove interface address and check static mappings
2219         self.pg7.unconfig_ip4()
2220         static_mappings = self.vapi.nat44_static_mapping_dump()
2221         self.assertEqual(1, len(static_mappings))
2222         self.assertEqual(self.pg7.sw_if_index,
2223                          static_mappings[0].external_sw_if_index)
2224         self.assertEqual(static_mappings[0].tag, tag)
2225
2226         # configure interface address again and check static mappings
2227         self.pg7.config_ip4()
2228         static_mappings = self.vapi.nat44_static_mapping_dump()
2229         self.assertEqual(2, len(static_mappings))
2230         resolved = False
2231         for sm in static_mappings:
2232             if sm.external_sw_if_index == 0xFFFFFFFF:
2233                 self.assertEqual(str(sm.external_ip_address),
2234                                  self.pg7.local_ip4)
2235                 self.assertEqual(sm.tag, tag)
2236                 resolved = True
2237         self.assertTrue(resolved)
2238
2239         # remove static mapping
2240         self.nat44_add_static_mapping(
2241             '1.2.3.4',
2242             external_sw_if_index=self.pg7.sw_if_index,
2243             tag=tag,
2244             is_add=0)
2245         static_mappings = self.vapi.nat44_static_mapping_dump()
2246         self.assertEqual(0, len(static_mappings))
2247
2248     def test_interface_addr_identity_nat(self):
2249         """ NAT44EI Identity NAT with addresses from interface """
2250
2251         port = 53053
2252         self.vapi.nat44_add_del_interface_addr(
2253             is_add=1,
2254             sw_if_index=self.pg7.sw_if_index)
2255         self.vapi.nat44_add_del_identity_mapping(
2256             ip_address=b'0',
2257             sw_if_index=self.pg7.sw_if_index,
2258             port=port,
2259             protocol=IP_PROTOS.tcp,
2260             is_add=1)
2261
2262         # identity mappings with external interface
2263         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2264         self.assertEqual(1, len(identity_mappings))
2265         self.assertEqual(self.pg7.sw_if_index,
2266                          identity_mappings[0].sw_if_index)
2267
2268         # configure interface address and check identity mappings
2269         self.pg7.config_ip4()
2270         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2271         resolved = False
2272         self.assertEqual(2, len(identity_mappings))
2273         for sm in identity_mappings:
2274             if sm.sw_if_index == 0xFFFFFFFF:
2275                 self.assertEqual(str(identity_mappings[0].ip_address),
2276                                  self.pg7.local_ip4)
2277                 self.assertEqual(port, identity_mappings[0].port)
2278                 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2279                 resolved = True
2280         self.assertTrue(resolved)
2281
2282         # remove interface address and check identity mappings
2283         self.pg7.unconfig_ip4()
2284         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2285         self.assertEqual(1, len(identity_mappings))
2286         self.assertEqual(self.pg7.sw_if_index,
2287                          identity_mappings[0].sw_if_index)
2288
2289     def test_ipfix_nat44_sess(self):
2290         """ NAT44EI IPFIX logging NAT44EI session created/deleted """
2291         self.ipfix_domain_id = 10
2292         self.ipfix_src_port = 20202
2293         collector_port = 30303
2294         bind_layers(UDP, IPFIX, dport=30303)
2295         self.nat44_add_address(self.nat_addr)
2296         flags = self.config_flags.NAT_IS_INSIDE
2297         self.vapi.nat44_interface_add_del_feature(
2298             sw_if_index=self.pg0.sw_if_index,
2299             flags=flags, is_add=1)
2300         self.vapi.nat44_interface_add_del_feature(
2301             sw_if_index=self.pg1.sw_if_index,
2302             is_add=1)
2303         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2304                                      src_address=self.pg3.local_ip4,
2305                                      path_mtu=512,
2306                                      template_interval=10,
2307                                      collector_port=collector_port)
2308         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2309                                            src_port=self.ipfix_src_port,
2310                                            enable=1)
2311
2312         pkts = self.create_stream_in(self.pg0, self.pg1)
2313         self.pg0.add_stream(pkts)
2314         self.pg_enable_capture(self.pg_interfaces)
2315         self.pg_start()
2316         capture = self.pg1.get_capture(len(pkts))
2317         self.verify_capture_out(capture)
2318         self.nat44_add_address(self.nat_addr, is_add=0)
2319         self.vapi.ipfix_flush()
2320         capture = self.pg3.get_capture(7)
2321         ipfix = IPFIXDecoder()
2322         # first load template
2323         for p in capture:
2324             self.assertTrue(p.haslayer(IPFIX))
2325             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2326             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2327             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2328             self.assertEqual(p[UDP].dport, collector_port)
2329             self.assertEqual(p[IPFIX].observationDomainID,
2330                              self.ipfix_domain_id)
2331             if p.haslayer(Template):
2332                 ipfix.add_template(p.getlayer(Template))
2333         # verify events in data set
2334         for p in capture:
2335             if p.haslayer(Data):
2336                 data = ipfix.decode_data_set(p.getlayer(Set))
2337                 self.verify_ipfix_nat44_ses(data)
2338
2339     def test_ipfix_addr_exhausted(self):
2340         """ NAT44EI IPFIX logging NAT addresses exhausted """
2341         flags = self.config_flags.NAT_IS_INSIDE
2342         self.vapi.nat44_interface_add_del_feature(
2343             sw_if_index=self.pg0.sw_if_index,
2344             flags=flags, is_add=1)
2345         self.vapi.nat44_interface_add_del_feature(
2346             sw_if_index=self.pg1.sw_if_index,
2347             is_add=1)
2348         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2349                                      src_address=self.pg3.local_ip4,
2350                                      path_mtu=512,
2351                                      template_interval=10)
2352         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2353                                            src_port=self.ipfix_src_port,
2354                                            enable=1)
2355
2356         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2357              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2358              TCP(sport=3025))
2359         self.pg0.add_stream(p)
2360         self.pg_enable_capture(self.pg_interfaces)
2361         self.pg_start()
2362         self.pg1.assert_nothing_captured()
2363         sleep(1)
2364         self.vapi.ipfix_flush()
2365         capture = self.pg3.get_capture(7)
2366         ipfix = IPFIXDecoder()
2367         # first load template
2368         for p in capture:
2369             self.assertTrue(p.haslayer(IPFIX))
2370             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2371             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2372             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2373             self.assertEqual(p[UDP].dport, 4739)
2374             self.assertEqual(p[IPFIX].observationDomainID,
2375                              self.ipfix_domain_id)
2376             if p.haslayer(Template):
2377                 ipfix.add_template(p.getlayer(Template))
2378         # verify events in data set
2379         for p in capture:
2380             if p.haslayer(Data):
2381                 data = ipfix.decode_data_set(p.getlayer(Set))
2382                 self.verify_ipfix_addr_exhausted(data)
2383
2384     def test_ipfix_max_sessions(self):
2385         """ NAT44EI IPFIX logging maximum session entries exceeded """
2386         self.nat44_add_address(self.nat_addr)
2387         flags = self.config_flags.NAT_IS_INSIDE
2388         self.vapi.nat44_interface_add_del_feature(
2389             sw_if_index=self.pg0.sw_if_index,
2390             flags=flags, is_add=1)
2391         self.vapi.nat44_interface_add_del_feature(
2392             sw_if_index=self.pg1.sw_if_index,
2393             is_add=1)
2394
2395         max_sessions = self.max_translations
2396
2397         pkts = []
2398         for i in range(0, max_sessions):
2399             src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2400             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2401                  IP(src=src, dst=self.pg1.remote_ip4) /
2402                  TCP(sport=1025))
2403             pkts.append(p)
2404         self.pg0.add_stream(pkts)
2405         self.pg_enable_capture(self.pg_interfaces)
2406         self.pg_start()
2407
2408         self.pg1.get_capture(max_sessions)
2409         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2410                                      src_address=self.pg3.local_ip4,
2411                                      path_mtu=512,
2412                                      template_interval=10)
2413         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2414                                            src_port=self.ipfix_src_port,
2415                                            enable=1)
2416
2417         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2418              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2419              TCP(sport=1025))
2420         self.pg0.add_stream(p)
2421         self.pg_enable_capture(self.pg_interfaces)
2422         self.pg_start()
2423         self.pg1.assert_nothing_captured()
2424         sleep(1)
2425         self.vapi.ipfix_flush()
2426         capture = self.pg3.get_capture(7)
2427         ipfix = IPFIXDecoder()
2428         # first load template
2429         for p in capture:
2430             self.assertTrue(p.haslayer(IPFIX))
2431             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2432             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2433             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2434             self.assertEqual(p[UDP].dport, 4739)
2435             self.assertEqual(p[IPFIX].observationDomainID,
2436                              self.ipfix_domain_id)
2437             if p.haslayer(Template):
2438                 ipfix.add_template(p.getlayer(Template))
2439         # verify events in data set
2440         for p in capture:
2441             if p.haslayer(Data):
2442                 data = ipfix.decode_data_set(p.getlayer(Set))
2443                 self.verify_ipfix_max_sessions(data, max_sessions)
2444
2445     def test_syslog_apmap(self):
2446         """ NAT44EI syslog address and port mapping creation and deletion """
2447         self.vapi.syslog_set_filter(
2448             self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
2449         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
2450         self.nat44_add_address(self.nat_addr)
2451         flags = self.config_flags.NAT_IS_INSIDE
2452         self.vapi.nat44_interface_add_del_feature(
2453             sw_if_index=self.pg0.sw_if_index,
2454             flags=flags, is_add=1)
2455         self.vapi.nat44_interface_add_del_feature(
2456             sw_if_index=self.pg1.sw_if_index,
2457             is_add=1)
2458
2459         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2460              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2461              TCP(sport=self.tcp_port_in, dport=20))
2462         self.pg0.add_stream(p)
2463         self.pg_enable_capture(self.pg_interfaces)
2464         self.pg_start()
2465         capture = self.pg1.get_capture(1)
2466         self.tcp_port_out = capture[0][TCP].sport
2467         capture = self.pg3.get_capture(1)
2468         self.verify_syslog_apmap(capture[0][Raw].load)
2469
2470         self.pg_enable_capture(self.pg_interfaces)
2471         self.pg_start()
2472         self.nat44_add_address(self.nat_addr, is_add=0)
2473         capture = self.pg3.get_capture(1)
2474         self.verify_syslog_apmap(capture[0][Raw].load, False)
2475
2476     def test_pool_addr_fib(self):
2477         """ NAT44EI add pool addresses to FIB """
2478         static_addr = '10.0.0.10'
2479         self.nat44_add_address(self.nat_addr)
2480         flags = self.config_flags.NAT_IS_INSIDE
2481         self.vapi.nat44_interface_add_del_feature(
2482             sw_if_index=self.pg0.sw_if_index,
2483             flags=flags, is_add=1)
2484         self.vapi.nat44_interface_add_del_feature(
2485             sw_if_index=self.pg1.sw_if_index,
2486             is_add=1)
2487         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2488
2489         # NAT44EI address
2490         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2491              ARP(op=ARP.who_has, pdst=self.nat_addr,
2492                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2493         self.pg1.add_stream(p)
2494         self.pg_enable_capture(self.pg_interfaces)
2495         self.pg_start()
2496         capture = self.pg1.get_capture(1)
2497         self.assertTrue(capture[0].haslayer(ARP))
2498         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2499
2500         # 1:1 NAT address
2501         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2502              ARP(op=ARP.who_has, pdst=static_addr,
2503                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2504         self.pg1.add_stream(p)
2505         self.pg_enable_capture(self.pg_interfaces)
2506         self.pg_start()
2507         capture = self.pg1.get_capture(1)
2508         self.assertTrue(capture[0].haslayer(ARP))
2509         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2510
2511         # send ARP to non-NAT44EI interface
2512         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2513              ARP(op=ARP.who_has, pdst=self.nat_addr,
2514                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2515         self.pg2.add_stream(p)
2516         self.pg_enable_capture(self.pg_interfaces)
2517         self.pg_start()
2518         self.pg1.assert_nothing_captured()
2519
2520         # remove addresses and verify
2521         self.nat44_add_address(self.nat_addr, is_add=0)
2522         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2523                                       is_add=0)
2524
2525         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2526              ARP(op=ARP.who_has, pdst=self.nat_addr,
2527                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2528         self.pg1.add_stream(p)
2529         self.pg_enable_capture(self.pg_interfaces)
2530         self.pg_start()
2531         self.pg1.assert_nothing_captured()
2532
2533         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2534              ARP(op=ARP.who_has, pdst=static_addr,
2535                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2536         self.pg1.add_stream(p)
2537         self.pg_enable_capture(self.pg_interfaces)
2538         self.pg_start()
2539         self.pg1.assert_nothing_captured()
2540
2541     def test_vrf_mode(self):
2542         """ NAT44EI tenant VRF aware address pool mode """
2543
2544         vrf_id1 = 1
2545         vrf_id2 = 2
2546         nat_ip1 = "10.0.0.10"
2547         nat_ip2 = "10.0.0.11"
2548
2549         self.pg0.unconfig_ip4()
2550         self.pg1.unconfig_ip4()
2551         self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
2552         self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
2553         self.pg0.set_table_ip4(vrf_id1)
2554         self.pg1.set_table_ip4(vrf_id2)
2555         self.pg0.config_ip4()
2556         self.pg1.config_ip4()
2557         self.pg0.resolve_arp()
2558         self.pg1.resolve_arp()
2559
2560         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2561         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2562         flags = self.config_flags.NAT_IS_INSIDE
2563         self.vapi.nat44_interface_add_del_feature(
2564             sw_if_index=self.pg0.sw_if_index,
2565             flags=flags, is_add=1)
2566         self.vapi.nat44_interface_add_del_feature(
2567             sw_if_index=self.pg1.sw_if_index,
2568             flags=flags, is_add=1)
2569         self.vapi.nat44_interface_add_del_feature(
2570             sw_if_index=self.pg2.sw_if_index,
2571             is_add=1)
2572
2573         try:
2574             # first VRF
2575             pkts = self.create_stream_in(self.pg0, self.pg2)
2576             self.pg0.add_stream(pkts)
2577             self.pg_enable_capture(self.pg_interfaces)
2578             self.pg_start()
2579             capture = self.pg2.get_capture(len(pkts))
2580             self.verify_capture_out(capture, nat_ip1)
2581
2582             # second VRF
2583             pkts = self.create_stream_in(self.pg1, self.pg2)
2584             self.pg1.add_stream(pkts)
2585             self.pg_enable_capture(self.pg_interfaces)
2586             self.pg_start()
2587             capture = self.pg2.get_capture(len(pkts))
2588             self.verify_capture_out(capture, nat_ip2)
2589
2590         finally:
2591             self.pg0.unconfig_ip4()
2592             self.pg1.unconfig_ip4()
2593             self.pg0.set_table_ip4(0)
2594             self.pg1.set_table_ip4(0)
2595             self.pg0.config_ip4()
2596             self.pg1.config_ip4()
2597             self.pg0.resolve_arp()
2598             self.pg1.resolve_arp()
2599             self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id1})
2600             self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id2})
2601
2602     def test_vrf_feature_independent(self):
2603         """ NAT44EI tenant VRF independent address pool mode """
2604
2605         nat_ip1 = "10.0.0.10"
2606         nat_ip2 = "10.0.0.11"
2607
2608         self.nat44_add_address(nat_ip1)
2609         self.nat44_add_address(nat_ip2, vrf_id=99)
2610         flags = self.config_flags.NAT_IS_INSIDE
2611         self.vapi.nat44_interface_add_del_feature(
2612             sw_if_index=self.pg0.sw_if_index,
2613             flags=flags, is_add=1)
2614         self.vapi.nat44_interface_add_del_feature(
2615             sw_if_index=self.pg1.sw_if_index,
2616             flags=flags, is_add=1)
2617         self.vapi.nat44_interface_add_del_feature(
2618             sw_if_index=self.pg2.sw_if_index,
2619             is_add=1)
2620
2621         # first VRF
2622         pkts = self.create_stream_in(self.pg0, self.pg2)
2623         self.pg0.add_stream(pkts)
2624         self.pg_enable_capture(self.pg_interfaces)
2625         self.pg_start()
2626         capture = self.pg2.get_capture(len(pkts))
2627         self.verify_capture_out(capture, nat_ip1)
2628
2629         # second VRF
2630         pkts = self.create_stream_in(self.pg1, self.pg2)
2631         self.pg1.add_stream(pkts)
2632         self.pg_enable_capture(self.pg_interfaces)
2633         self.pg_start()
2634         capture = self.pg2.get_capture(len(pkts))
2635         self.verify_capture_out(capture, nat_ip1)
2636
2637     def test_dynamic_ipless_interfaces(self):
2638         """ NAT44EI interfaces without configured IP address """
2639         self.create_routes_and_neigbors()
2640         self.nat44_add_address(self.nat_addr)
2641         flags = self.config_flags.NAT_IS_INSIDE
2642         self.vapi.nat44_interface_add_del_feature(
2643             sw_if_index=self.pg7.sw_if_index,
2644             flags=flags, is_add=1)
2645         self.vapi.nat44_interface_add_del_feature(
2646             sw_if_index=self.pg8.sw_if_index,
2647             is_add=1)
2648
2649         # in2out
2650         pkts = self.create_stream_in(self.pg7, self.pg8)
2651         self.pg7.add_stream(pkts)
2652         self.pg_enable_capture(self.pg_interfaces)
2653         self.pg_start()
2654         capture = self.pg8.get_capture(len(pkts))
2655         self.verify_capture_out(capture)
2656
2657         # out2in
2658         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2659         self.pg8.add_stream(pkts)
2660         self.pg_enable_capture(self.pg_interfaces)
2661         self.pg_start()
2662         capture = self.pg7.get_capture(len(pkts))
2663         self.verify_capture_in(capture, self.pg7)
2664
2665     def test_static_ipless_interfaces(self):
2666         """ NAT44EI interfaces without configured IP address - 1:1 NAT """
2667
2668         self.create_routes_and_neigbors()
2669         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2670         flags = self.config_flags.NAT_IS_INSIDE
2671         self.vapi.nat44_interface_add_del_feature(
2672             sw_if_index=self.pg7.sw_if_index,
2673             flags=flags, is_add=1)
2674         self.vapi.nat44_interface_add_del_feature(
2675             sw_if_index=self.pg8.sw_if_index,
2676             is_add=1)
2677
2678         # out2in
2679         pkts = self.create_stream_out(self.pg8)
2680         self.pg8.add_stream(pkts)
2681         self.pg_enable_capture(self.pg_interfaces)
2682         self.pg_start()
2683         capture = self.pg7.get_capture(len(pkts))
2684         self.verify_capture_in(capture, self.pg7)
2685
2686         # in2out
2687         pkts = self.create_stream_in(self.pg7, self.pg8)
2688         self.pg7.add_stream(pkts)
2689         self.pg_enable_capture(self.pg_interfaces)
2690         self.pg_start()
2691         capture = self.pg8.get_capture(len(pkts))
2692         self.verify_capture_out(capture, self.nat_addr, True)
2693
2694     def test_static_with_port_ipless_interfaces(self):
2695         """ NAT44EI interfaces without configured IP address - 1:1 NAPT """
2696
2697         self.tcp_port_out = 30606
2698         self.udp_port_out = 30607
2699         self.icmp_id_out = 30608
2700
2701         self.create_routes_and_neigbors()
2702         self.nat44_add_address(self.nat_addr)
2703         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2704                                       self.tcp_port_in, self.tcp_port_out,
2705                                       proto=IP_PROTOS.tcp)
2706         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2707                                       self.udp_port_in, self.udp_port_out,
2708                                       proto=IP_PROTOS.udp)
2709         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2710                                       self.icmp_id_in, self.icmp_id_out,
2711                                       proto=IP_PROTOS.icmp)
2712         flags = self.config_flags.NAT_IS_INSIDE
2713         self.vapi.nat44_interface_add_del_feature(
2714             sw_if_index=self.pg7.sw_if_index,
2715             flags=flags, is_add=1)
2716         self.vapi.nat44_interface_add_del_feature(
2717             sw_if_index=self.pg8.sw_if_index,
2718             is_add=1)
2719
2720         # out2in
2721         pkts = self.create_stream_out(self.pg8)
2722         self.pg8.add_stream(pkts)
2723         self.pg_enable_capture(self.pg_interfaces)
2724         self.pg_start()
2725         capture = self.pg7.get_capture(len(pkts))
2726         self.verify_capture_in(capture, self.pg7)
2727
2728         # in2out
2729         pkts = self.create_stream_in(self.pg7, self.pg8)
2730         self.pg7.add_stream(pkts)
2731         self.pg_enable_capture(self.pg_interfaces)
2732         self.pg_start()
2733         capture = self.pg8.get_capture(len(pkts))
2734         self.verify_capture_out(capture)
2735
2736     def test_static_unknown_proto(self):
2737         """ NAT44EI 1:1 translate packet with unknown protocol """
2738         nat_ip = "10.0.0.10"
2739         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2740         flags = self.config_flags.NAT_IS_INSIDE
2741         self.vapi.nat44_interface_add_del_feature(
2742             sw_if_index=self.pg0.sw_if_index,
2743             flags=flags, is_add=1)
2744         self.vapi.nat44_interface_add_del_feature(
2745             sw_if_index=self.pg1.sw_if_index,
2746             is_add=1)
2747
2748         # in2out
2749         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2750              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2751              GRE() /
2752              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2753              TCP(sport=1234, dport=1234))
2754         self.pg0.add_stream(p)
2755         self.pg_enable_capture(self.pg_interfaces)
2756         self.pg_start()
2757         p = self.pg1.get_capture(1)
2758         packet = p[0]
2759         try:
2760             self.assertEqual(packet[IP].src, nat_ip)
2761             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2762             self.assertEqual(packet.haslayer(GRE), 1)
2763             self.assert_packet_checksums_valid(packet)
2764         except:
2765             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2766             raise
2767
2768         # out2in
2769         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2770              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2771              GRE() /
2772              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2773              TCP(sport=1234, dport=1234))
2774         self.pg1.add_stream(p)
2775         self.pg_enable_capture(self.pg_interfaces)
2776         self.pg_start()
2777         p = self.pg0.get_capture(1)
2778         packet = p[0]
2779         try:
2780             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2781             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2782             self.assertEqual(packet.haslayer(GRE), 1)
2783             self.assert_packet_checksums_valid(packet)
2784         except:
2785             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2786             raise
2787
2788     def test_hairpinning_static_unknown_proto(self):
2789         """ NAT44EI 1:1 translate packet with unknown protocol - hairpinning
2790         """
2791
2792         host = self.pg0.remote_hosts[0]
2793         server = self.pg0.remote_hosts[1]
2794
2795         host_nat_ip = "10.0.0.10"
2796         server_nat_ip = "10.0.0.11"
2797
2798         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2799         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2800         flags = self.config_flags.NAT_IS_INSIDE
2801         self.vapi.nat44_interface_add_del_feature(
2802             sw_if_index=self.pg0.sw_if_index,
2803             flags=flags, is_add=1)
2804         self.vapi.nat44_interface_add_del_feature(
2805             sw_if_index=self.pg1.sw_if_index,
2806             is_add=1)
2807
2808         # host to server
2809         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2810              IP(src=host.ip4, dst=server_nat_ip) /
2811              GRE() /
2812              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2813              TCP(sport=1234, dport=1234))
2814         self.pg0.add_stream(p)
2815         self.pg_enable_capture(self.pg_interfaces)
2816         self.pg_start()
2817         p = self.pg0.get_capture(1)
2818         packet = p[0]
2819         try:
2820             self.assertEqual(packet[IP].src, host_nat_ip)
2821             self.assertEqual(packet[IP].dst, server.ip4)
2822             self.assertEqual(packet.haslayer(GRE), 1)
2823             self.assert_packet_checksums_valid(packet)
2824         except:
2825             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2826             raise
2827
2828         # server to host
2829         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2830              IP(src=server.ip4, dst=host_nat_ip) /
2831              GRE() /
2832              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2833              TCP(sport=1234, dport=1234))
2834         self.pg0.add_stream(p)
2835         self.pg_enable_capture(self.pg_interfaces)
2836         self.pg_start()
2837         p = self.pg0.get_capture(1)
2838         packet = p[0]
2839         try:
2840             self.assertEqual(packet[IP].src, server_nat_ip)
2841             self.assertEqual(packet[IP].dst, host.ip4)
2842             self.assertEqual(packet.haslayer(GRE), 1)
2843             self.assert_packet_checksums_valid(packet)
2844         except:
2845             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2846             raise
2847
2848     def test_output_feature(self):
2849         """ NAT44EI output feature (in2out postrouting) """
2850         self.nat44_add_address(self.nat_addr)
2851         flags = self.config_flags.NAT_IS_INSIDE
2852         self.vapi.nat44_interface_add_del_output_feature(
2853             is_add=1, flags=flags,
2854             sw_if_index=self.pg0.sw_if_index)
2855         self.vapi.nat44_interface_add_del_output_feature(
2856             is_add=1, flags=flags,
2857             sw_if_index=self.pg1.sw_if_index)
2858         self.vapi.nat44_interface_add_del_output_feature(
2859             is_add=1,
2860             sw_if_index=self.pg3.sw_if_index)
2861
2862         # in2out
2863         pkts = self.create_stream_in(self.pg0, self.pg3)
2864         self.pg0.add_stream(pkts)
2865         self.pg_enable_capture(self.pg_interfaces)
2866         self.pg_start()
2867         capture = self.pg3.get_capture(len(pkts))
2868         self.verify_capture_out(capture)
2869
2870         # out2in
2871         pkts = self.create_stream_out(self.pg3)
2872         self.pg3.add_stream(pkts)
2873         self.pg_enable_capture(self.pg_interfaces)
2874         self.pg_start()
2875         capture = self.pg0.get_capture(len(pkts))
2876         self.verify_capture_in(capture, self.pg0)
2877
2878         # from non-NAT interface to NAT inside interface
2879         pkts = self.create_stream_in(self.pg2, self.pg0)
2880         self.pg2.add_stream(pkts)
2881         self.pg_enable_capture(self.pg_interfaces)
2882         self.pg_start()
2883         capture = self.pg0.get_capture(len(pkts))
2884         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2885
2886     def test_output_feature_vrf_aware(self):
2887         """ NAT44EI output feature VRF aware (in2out postrouting) """
2888         nat_ip_vrf10 = "10.0.0.10"
2889         nat_ip_vrf20 = "10.0.0.20"
2890
2891         r1 = VppIpRoute(self, self.pg3.remote_ip4, 32,
2892                         [VppRoutePath(self.pg3.remote_ip4,
2893                                       self.pg3.sw_if_index)],
2894                         table_id=10)
2895         r2 = VppIpRoute(self, self.pg3.remote_ip4, 32,
2896                         [VppRoutePath(self.pg3.remote_ip4,
2897                                       self.pg3.sw_if_index)],
2898                         table_id=20)
2899         r1.add_vpp_config()
2900         r2.add_vpp_config()
2901
2902         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2903         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2904         flags = self.config_flags.NAT_IS_INSIDE
2905         self.vapi.nat44_interface_add_del_output_feature(
2906             is_add=1, flags=flags,
2907             sw_if_index=self.pg4.sw_if_index)
2908         self.vapi.nat44_interface_add_del_output_feature(
2909             is_add=1, flags=flags,
2910             sw_if_index=self.pg6.sw_if_index)
2911         self.vapi.nat44_interface_add_del_output_feature(
2912             is_add=1,
2913             sw_if_index=self.pg3.sw_if_index)
2914
2915         # in2out VRF 10
2916         pkts = self.create_stream_in(self.pg4, self.pg3)
2917         self.pg4.add_stream(pkts)
2918         self.pg_enable_capture(self.pg_interfaces)
2919         self.pg_start()
2920         capture = self.pg3.get_capture(len(pkts))
2921         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2922
2923         # out2in VRF 10
2924         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2925         self.pg3.add_stream(pkts)
2926         self.pg_enable_capture(self.pg_interfaces)
2927         self.pg_start()
2928         capture = self.pg4.get_capture(len(pkts))
2929         self.verify_capture_in(capture, self.pg4)
2930
2931         # in2out VRF 20
2932         pkts = self.create_stream_in(self.pg6, self.pg3)
2933         self.pg6.add_stream(pkts)
2934         self.pg_enable_capture(self.pg_interfaces)
2935         self.pg_start()
2936         capture = self.pg3.get_capture(len(pkts))
2937         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2938
2939         # out2in VRF 20
2940         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2941         self.pg3.add_stream(pkts)
2942         self.pg_enable_capture(self.pg_interfaces)
2943         self.pg_start()
2944         capture = self.pg6.get_capture(len(pkts))
2945         self.verify_capture_in(capture, self.pg6)
2946
2947     def test_output_feature_hairpinning(self):
2948         """ NAT44EI output feature hairpinning (in2out postrouting) """
2949         host = self.pg0.remote_hosts[0]
2950         server = self.pg0.remote_hosts[1]
2951         host_in_port = 1234
2952         host_out_port = 0
2953         server_in_port = 5678
2954         server_out_port = 8765
2955
2956         self.nat44_add_address(self.nat_addr)
2957         flags = self.config_flags.NAT_IS_INSIDE
2958         self.vapi.nat44_interface_add_del_output_feature(
2959             is_add=1, flags=flags,
2960             sw_if_index=self.pg0.sw_if_index)
2961         self.vapi.nat44_interface_add_del_output_feature(
2962             is_add=1,
2963             sw_if_index=self.pg1.sw_if_index)
2964
2965         # add static mapping for server
2966         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2967                                       server_in_port, server_out_port,
2968                                       proto=IP_PROTOS.tcp)
2969
2970         # send packet from host to server
2971         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2972              IP(src=host.ip4, dst=self.nat_addr) /
2973              TCP(sport=host_in_port, dport=server_out_port))
2974         self.pg0.add_stream(p)
2975         self.pg_enable_capture(self.pg_interfaces)
2976         self.pg_start()
2977         capture = self.pg0.get_capture(1)
2978         p = capture[0]
2979         try:
2980             ip = p[IP]
2981             tcp = p[TCP]
2982             self.assertEqual(ip.src, self.nat_addr)
2983             self.assertEqual(ip.dst, server.ip4)
2984             self.assertNotEqual(tcp.sport, host_in_port)
2985             self.assertEqual(tcp.dport, server_in_port)
2986             self.assert_packet_checksums_valid(p)
2987             host_out_port = tcp.sport
2988         except:
2989             self.logger.error(ppp("Unexpected or invalid packet:", p))
2990             raise
2991
2992         # send reply from server to host
2993         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2994              IP(src=server.ip4, dst=self.nat_addr) /
2995              TCP(sport=server_in_port, dport=host_out_port))
2996         self.pg0.add_stream(p)
2997         self.pg_enable_capture(self.pg_interfaces)
2998         self.pg_start()
2999         capture = self.pg0.get_capture(1)
3000         p = capture[0]
3001         try:
3002             ip = p[IP]
3003             tcp = p[TCP]
3004             self.assertEqual(ip.src, self.nat_addr)
3005             self.assertEqual(ip.dst, host.ip4)
3006             self.assertEqual(tcp.sport, server_out_port)
3007             self.assertEqual(tcp.dport, host_in_port)
3008             self.assert_packet_checksums_valid(p)
3009         except:
3010             self.logger.error(ppp("Unexpected or invalid packet:", p))
3011             raise
3012
3013     def test_one_armed_nat44(self):
3014         """ NAT44EI One armed NAT """
3015         remote_host = self.pg9.remote_hosts[0]
3016         local_host = self.pg9.remote_hosts[1]
3017         external_port = 0
3018
3019         self.nat44_add_address(self.nat_addr)
3020         flags = self.config_flags.NAT_IS_INSIDE
3021         self.vapi.nat44_interface_add_del_feature(
3022             sw_if_index=self.pg9.sw_if_index,
3023             is_add=1)
3024         self.vapi.nat44_interface_add_del_feature(
3025             sw_if_index=self.pg9.sw_if_index,
3026             flags=flags, is_add=1)
3027
3028         # in2out
3029         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3030              IP(src=local_host.ip4, dst=remote_host.ip4) /
3031              TCP(sport=12345, dport=80))
3032         self.pg9.add_stream(p)
3033         self.pg_enable_capture(self.pg_interfaces)
3034         self.pg_start()
3035         capture = self.pg9.get_capture(1)
3036         p = capture[0]
3037         try:
3038             ip = p[IP]
3039             tcp = p[TCP]
3040             self.assertEqual(ip.src, self.nat_addr)
3041             self.assertEqual(ip.dst, remote_host.ip4)
3042             self.assertNotEqual(tcp.sport, 12345)
3043             external_port = tcp.sport
3044             self.assertEqual(tcp.dport, 80)
3045             self.assert_packet_checksums_valid(p)
3046         except:
3047             self.logger.error(ppp("Unexpected or invalid packet:", p))
3048             raise
3049
3050         # out2in
3051         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3052              IP(src=remote_host.ip4, dst=self.nat_addr) /
3053              TCP(sport=80, dport=external_port))
3054         self.pg9.add_stream(p)
3055         self.pg_enable_capture(self.pg_interfaces)
3056         self.pg_start()
3057         capture = self.pg9.get_capture(1)
3058         p = capture[0]
3059         try:
3060             ip = p[IP]
3061             tcp = p[TCP]
3062             self.assertEqual(ip.src, remote_host.ip4)
3063             self.assertEqual(ip.dst, local_host.ip4)
3064             self.assertEqual(tcp.sport, 80)
3065             self.assertEqual(tcp.dport, 12345)
3066             self.assert_packet_checksums_valid(p)
3067         except:
3068             self.logger.error(ppp("Unexpected or invalid packet:", p))
3069             raise
3070
3071         err = self.statistics.get_err_counter(
3072             '/err/nat44-classify/next in2out')
3073         self.assertEqual(err, 1)
3074         err = self.statistics.get_err_counter(
3075             '/err/nat44-classify/next out2in')
3076         self.assertEqual(err, 1)
3077
3078     def test_del_session(self):
3079         """ NAT44EI delete session """
3080         self.nat44_add_address(self.nat_addr)
3081         flags = self.config_flags.NAT_IS_INSIDE
3082         self.vapi.nat44_interface_add_del_feature(
3083             sw_if_index=self.pg0.sw_if_index,
3084             flags=flags, is_add=1)
3085         self.vapi.nat44_interface_add_del_feature(
3086             sw_if_index=self.pg1.sw_if_index,
3087             is_add=1)
3088
3089         pkts = self.create_stream_in(self.pg0, self.pg1)
3090         self.pg0.add_stream(pkts)
3091         self.pg_enable_capture(self.pg_interfaces)
3092         self.pg_start()
3093         self.pg1.get_capture(len(pkts))
3094
3095         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3096         nsessions = len(sessions)
3097
3098         self.vapi.nat44_del_session(address=sessions[0].inside_ip_address,
3099                                     port=sessions[0].inside_port,
3100                                     protocol=sessions[0].protocol,
3101                                     flags=self.config_flags.NAT_IS_INSIDE)
3102         self.vapi.nat44_del_session(address=sessions[1].outside_ip_address,
3103                                     port=sessions[1].outside_port,
3104                                     protocol=sessions[1].protocol)
3105
3106         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3107         self.assertEqual(nsessions - len(sessions), 2)
3108
3109         self.vapi.nat44_del_session(address=sessions[0].inside_ip_address,
3110                                     port=sessions[0].inside_port,
3111                                     protocol=sessions[0].protocol,
3112                                     flags=self.config_flags.NAT_IS_INSIDE)
3113
3114         self.verify_no_nat44_user()
3115
3116     def test_frag_in_order(self):
3117         """ NAT44EI translate fragments arriving in order """
3118
3119         self.nat44_add_address(self.nat_addr)
3120         flags = self.config_flags.NAT_IS_INSIDE
3121         self.vapi.nat44_interface_add_del_feature(
3122             sw_if_index=self.pg0.sw_if_index,
3123             flags=flags, is_add=1)
3124         self.vapi.nat44_interface_add_del_feature(
3125             sw_if_index=self.pg1.sw_if_index,
3126             is_add=1)
3127
3128         self.frag_in_order(proto=IP_PROTOS.tcp)
3129         self.frag_in_order(proto=IP_PROTOS.udp)
3130         self.frag_in_order(proto=IP_PROTOS.icmp)
3131
3132     def test_frag_forwarding(self):
3133         """ NAT44EI forwarding fragment test """
3134         self.vapi.nat44_add_del_interface_addr(
3135             is_add=1,
3136             sw_if_index=self.pg1.sw_if_index)
3137         flags = self.config_flags.NAT_IS_INSIDE
3138         self.vapi.nat44_interface_add_del_feature(
3139             sw_if_index=self.pg0.sw_if_index,
3140             flags=flags, is_add=1)
3141         self.vapi.nat44_interface_add_del_feature(
3142             sw_if_index=self.pg1.sw_if_index,
3143             is_add=1)
3144         self.vapi.nat44_forwarding_enable_disable(enable=1)
3145
3146         data = b"A" * 16 + b"B" * 16 + b"C" * 3
3147         pkts = self.create_stream_frag(self.pg1,
3148                                        self.pg0.remote_ip4,
3149                                        4789,
3150                                        4789,
3151                                        data,
3152                                        proto=IP_PROTOS.udp)
3153         self.pg1.add_stream(pkts)
3154         self.pg_enable_capture(self.pg_interfaces)
3155         self.pg_start()
3156         frags = self.pg0.get_capture(len(pkts))
3157         p = self.reass_frags_and_verify(frags,
3158                                         self.pg1.remote_ip4,
3159                                         self.pg0.remote_ip4)
3160         self.assertEqual(p[UDP].sport, 4789)
3161         self.assertEqual(p[UDP].dport, 4789)
3162         self.assertEqual(data, p[Raw].load)
3163
3164     def test_reass_hairpinning(self):
3165         """ NAT44EI fragments hairpinning """
3166
3167         server_addr = self.pg0.remote_hosts[1].ip4
3168         host_in_port = random.randint(1025, 65535)
3169         server_in_port = random.randint(1025, 65535)
3170         server_out_port = random.randint(1025, 65535)
3171
3172         self.nat44_add_address(self.nat_addr)
3173         flags = self.config_flags.NAT_IS_INSIDE
3174         self.vapi.nat44_interface_add_del_feature(
3175             sw_if_index=self.pg0.sw_if_index,
3176             flags=flags, is_add=1)
3177         self.vapi.nat44_interface_add_del_feature(
3178             sw_if_index=self.pg1.sw_if_index,
3179             is_add=1)
3180         # add static mapping for server
3181         self.nat44_add_static_mapping(server_addr, self.nat_addr,
3182                                       server_in_port,
3183                                       server_out_port,
3184                                       proto=IP_PROTOS.tcp)
3185         self.nat44_add_static_mapping(server_addr, self.nat_addr,
3186                                       server_in_port,
3187                                       server_out_port,
3188                                       proto=IP_PROTOS.udp)
3189         self.nat44_add_static_mapping(server_addr, self.nat_addr)
3190
3191         self.reass_hairpinning(server_addr, server_in_port, server_out_port,
3192                                host_in_port, proto=IP_PROTOS.tcp)
3193         self.reass_hairpinning(server_addr, server_in_port, server_out_port,
3194                                host_in_port, proto=IP_PROTOS.udp)
3195         self.reass_hairpinning(server_addr, server_in_port, server_out_port,
3196                                host_in_port, proto=IP_PROTOS.icmp)
3197
3198     def test_frag_out_of_order(self):
3199         """ NAT44EI translate fragments arriving out of order """
3200
3201         self.nat44_add_address(self.nat_addr)
3202         flags = self.config_flags.NAT_IS_INSIDE
3203         self.vapi.nat44_interface_add_del_feature(
3204             sw_if_index=self.pg0.sw_if_index,
3205             flags=flags, is_add=1)
3206         self.vapi.nat44_interface_add_del_feature(
3207             sw_if_index=self.pg1.sw_if_index,
3208             is_add=1)
3209
3210         self.frag_out_of_order(proto=IP_PROTOS.tcp)
3211         self.frag_out_of_order(proto=IP_PROTOS.udp)
3212         self.frag_out_of_order(proto=IP_PROTOS.icmp)
3213
3214     def test_port_restricted(self):
3215         """ NAT44EI Port restricted NAT44EI (MAP-E CE) """
3216         self.nat44_add_address(self.nat_addr)
3217         flags = self.config_flags.NAT_IS_INSIDE
3218         self.vapi.nat44_interface_add_del_feature(
3219             sw_if_index=self.pg0.sw_if_index,
3220             flags=flags, is_add=1)
3221         self.vapi.nat44_interface_add_del_feature(
3222             sw_if_index=self.pg1.sw_if_index,
3223             is_add=1)
3224         self.vapi.nat_set_addr_and_port_alloc_alg(alg=1,
3225                                                   psid_offset=6,
3226                                                   psid_length=6,
3227                                                   psid=10)
3228
3229         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3230              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3231              TCP(sport=4567, dport=22))
3232         self.pg0.add_stream(p)
3233         self.pg_enable_capture(self.pg_interfaces)
3234         self.pg_start()
3235         capture = self.pg1.get_capture(1)
3236         p = capture[0]
3237         try:
3238             ip = p[IP]
3239             tcp = p[TCP]
3240             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3241             self.assertEqual(ip.src, self.nat_addr)
3242             self.assertEqual(tcp.dport, 22)
3243             self.assertNotEqual(tcp.sport, 4567)
3244             self.assertEqual((tcp.sport >> 6) & 63, 10)
3245             self.assert_packet_checksums_valid(p)
3246         except:
3247             self.logger.error(ppp("Unexpected or invalid packet:", p))
3248             raise
3249
3250     def test_port_range(self):
3251         """ NAT44EI External address port range """
3252         self.nat44_add_address(self.nat_addr)
3253         flags = self.config_flags.NAT_IS_INSIDE
3254         self.vapi.nat44_interface_add_del_feature(
3255             sw_if_index=self.pg0.sw_if_index,
3256             flags=flags, is_add=1)
3257         self.vapi.nat44_interface_add_del_feature(
3258             sw_if_index=self.pg1.sw_if_index,
3259             is_add=1)
3260         self.vapi.nat_set_addr_and_port_alloc_alg(alg=2,
3261                                                   start_port=1025,
3262                                                   end_port=1027)
3263
3264         pkts = []
3265         for port in range(0, 5):
3266             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3267                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3268                  TCP(sport=1125 + port))
3269             pkts.append(p)
3270         self.pg0.add_stream(pkts)
3271         self.pg_enable_capture(self.pg_interfaces)
3272         self.pg_start()
3273         capture = self.pg1.get_capture(3)
3274         for p in capture:
3275             tcp = p[TCP]
3276             self.assertGreaterEqual(tcp.sport, 1025)
3277             self.assertLessEqual(tcp.sport, 1027)
3278
3279     def test_multiple_outside_vrf(self):
3280         """ NAT44EI Multiple outside VRF """
3281         vrf_id1 = 1
3282         vrf_id2 = 2
3283
3284         self.pg1.unconfig_ip4()
3285         self.pg2.unconfig_ip4()
3286         self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
3287         self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
3288         self.pg1.set_table_ip4(vrf_id1)
3289         self.pg2.set_table_ip4(vrf_id2)
3290         self.pg1.config_ip4()
3291         self.pg2.config_ip4()
3292         self.pg1.resolve_arp()
3293         self.pg2.resolve_arp()
3294
3295         self.nat44_add_address(self.nat_addr)
3296         flags = self.config_flags.NAT_IS_INSIDE
3297         self.vapi.nat44_interface_add_del_feature(
3298             sw_if_index=self.pg0.sw_if_index,
3299             flags=flags, is_add=1)
3300         self.vapi.nat44_interface_add_del_feature(
3301             sw_if_index=self.pg1.sw_if_index,
3302             is_add=1)
3303         self.vapi.nat44_interface_add_del_feature(
3304             sw_if_index=self.pg2.sw_if_index,
3305             is_add=1)
3306
3307         try:
3308             # first VRF
3309             pkts = self.create_stream_in(self.pg0, self.pg1)
3310             self.pg0.add_stream(pkts)
3311             self.pg_enable_capture(self.pg_interfaces)
3312             self.pg_start()
3313             capture = self.pg1.get_capture(len(pkts))
3314             self.verify_capture_out(capture, self.nat_addr)
3315
3316             pkts = self.create_stream_out(self.pg1, self.nat_addr)
3317             self.pg1.add_stream(pkts)
3318             self.pg_enable_capture(self.pg_interfaces)
3319             self.pg_start()
3320             capture = self.pg0.get_capture(len(pkts))
3321             self.verify_capture_in(capture, self.pg0)
3322
3323             self.tcp_port_in = 60303
3324             self.udp_port_in = 60304
3325             self.icmp_id_in = 60305
3326
3327             # second VRF
3328             pkts = self.create_stream_in(self.pg0, self.pg2)
3329             self.pg0.add_stream(pkts)
3330             self.pg_enable_capture(self.pg_interfaces)
3331             self.pg_start()
3332             capture = self.pg2.get_capture(len(pkts))
3333             self.verify_capture_out(capture, self.nat_addr)
3334
3335             pkts = self.create_stream_out(self.pg2, self.nat_addr)
3336             self.pg2.add_stream(pkts)
3337             self.pg_enable_capture(self.pg_interfaces)
3338             self.pg_start()
3339             capture = self.pg0.get_capture(len(pkts))
3340             self.verify_capture_in(capture, self.pg0)
3341
3342         finally:
3343             self.nat44_add_address(self.nat_addr, is_add=0)
3344             self.pg1.unconfig_ip4()
3345             self.pg2.unconfig_ip4()
3346             self.pg1.set_table_ip4(0)
3347             self.pg2.set_table_ip4(0)
3348             self.pg1.config_ip4()
3349             self.pg2.config_ip4()
3350             self.pg1.resolve_arp()
3351             self.pg2.resolve_arp()
3352
3353     def test_mss_clamping(self):
3354         """ NAT44EI TCP MSS clamping """
3355         self.nat44_add_address(self.nat_addr)
3356         flags = self.config_flags.NAT_IS_INSIDE
3357         self.vapi.nat44_interface_add_del_feature(
3358             sw_if_index=self.pg0.sw_if_index,
3359             flags=flags, is_add=1)
3360         self.vapi.nat44_interface_add_del_feature(
3361             sw_if_index=self.pg1.sw_if_index,
3362             is_add=1)
3363
3364         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3365              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3366              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3367                  flags="S", options=[('MSS', 1400)]))
3368
3369         self.vapi.nat_set_mss_clamping(enable=1, mss_value=1000)
3370         self.pg0.add_stream(p)
3371         self.pg_enable_capture(self.pg_interfaces)
3372         self.pg_start()
3373         capture = self.pg1.get_capture(1)
3374         # Negotiated MSS value greater than configured - changed
3375         self.verify_mss_value(capture[0], 1000)
3376
3377         self.vapi.nat_set_mss_clamping(enable=0, mss_value=1500)
3378         self.pg0.add_stream(p)
3379         self.pg_enable_capture(self.pg_interfaces)
3380         self.pg_start()
3381         capture = self.pg1.get_capture(1)
3382         # MSS clamping disabled - negotiated MSS unchanged
3383         self.verify_mss_value(capture[0], 1400)
3384
3385         self.vapi.nat_set_mss_clamping(enable=1, mss_value=1500)
3386         self.pg0.add_stream(p)
3387         self.pg_enable_capture(self.pg_interfaces)
3388         self.pg_start()
3389         capture = self.pg1.get_capture(1)
3390         # Negotiated MSS value smaller than configured - unchanged
3391         self.verify_mss_value(capture[0], 1400)
3392
3393     def test_ha_send(self):
3394         """ NAT44EI Send HA session synchronization events (active) """
3395         flags = self.config_flags.NAT_IS_INSIDE
3396         self.vapi.nat44_interface_add_del_feature(
3397             sw_if_index=self.pg0.sw_if_index,
3398             flags=flags, is_add=1)
3399         self.vapi.nat44_interface_add_del_feature(
3400             sw_if_index=self.pg1.sw_if_index,
3401             is_add=1)
3402         self.nat44_add_address(self.nat_addr)
3403
3404         self.vapi.nat_ha_set_listener(ip_address=self.pg3.local_ip4,
3405                                       port=12345,
3406                                       path_mtu=512)
3407         self.vapi.nat_ha_set_failover(ip_address=self.pg3.remote_ip4,
3408                                       port=12346, session_refresh_interval=10)
3409         bind_layers(UDP, HANATStateSync, sport=12345)
3410
3411         # create sessions
3412         pkts = self.create_stream_in(self.pg0, self.pg1)
3413         self.pg0.add_stream(pkts)
3414         self.pg_enable_capture(self.pg_interfaces)
3415         self.pg_start()
3416         capture = self.pg1.get_capture(len(pkts))
3417         self.verify_capture_out(capture)
3418         # active send HA events
3419         self.vapi.nat_ha_flush()
3420         stats = self.statistics.get_counter('/nat44/ha/add-event-send')
3421         self.assertEqual(stats[0][0], 3)
3422         capture = self.pg3.get_capture(1)
3423         p = capture[0]
3424         self.assert_packet_checksums_valid(p)
3425         try:
3426             ip = p[IP]
3427             udp = p[UDP]
3428             hanat = p[HANATStateSync]
3429         except IndexError:
3430             self.logger.error(ppp("Invalid packet:", p))
3431             raise
3432         else:
3433             self.assertEqual(ip.src, self.pg3.local_ip4)
3434             self.assertEqual(ip.dst, self.pg3.remote_ip4)
3435             self.assertEqual(udp.sport, 12345)
3436             self.assertEqual(udp.dport, 12346)
3437             self.assertEqual(hanat.version, 1)
3438             self.assertEqual(hanat.thread_index, 0)
3439             self.assertEqual(hanat.count, 3)
3440             seq = hanat.sequence_number
3441             for event in hanat.events:
3442                 self.assertEqual(event.event_type, 1)
3443                 self.assertEqual(event.in_addr, self.pg0.remote_ip4)
3444                 self.assertEqual(event.out_addr, self.nat_addr)
3445                 self.assertEqual(event.fib_index, 0)
3446
3447         # ACK received events
3448         ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3449                IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3450                UDP(sport=12346, dport=12345) /
3451                HANATStateSync(sequence_number=seq, flags='ACK'))
3452         self.pg3.add_stream(ack)
3453         self.pg_start()
3454         stats = self.statistics.get_counter('/nat44/ha/ack-recv')
3455         self.assertEqual(stats[0][0], 1)
3456
3457         # delete one session
3458         self.pg_enable_capture(self.pg_interfaces)
3459         self.vapi.nat44_del_session(address=self.pg0.remote_ip4,
3460                                     port=self.tcp_port_in,
3461                                     protocol=IP_PROTOS.tcp,
3462                                     flags=self.config_flags.NAT_IS_INSIDE)
3463         self.vapi.nat_ha_flush()
3464         stats = self.statistics.get_counter('/nat44/ha/del-event-send')
3465         self.assertEqual(stats[0][0], 1)
3466         capture = self.pg3.get_capture(1)
3467         p = capture[0]
3468         try:
3469             hanat = p[HANATStateSync]
3470         except IndexError:
3471             self.logger.error(ppp("Invalid packet:", p))
3472             raise
3473         else:
3474             self.assertGreater(hanat.sequence_number, seq)
3475
3476         # do not send ACK, active retry send HA event again
3477         self.pg_enable_capture(self.pg_interfaces)
3478         sleep(12)
3479         stats = self.statistics.get_counter('/nat44/ha/retry-count')
3480         self.assertEqual(stats[0][0], 3)
3481         stats = self.statistics.get_counter('/nat44/ha/missed-count')
3482         self.assertEqual(stats[0][0], 1)
3483         capture = self.pg3.get_capture(3)
3484         for packet in capture:
3485             self.assertEqual(packet, p)
3486
3487         # session counters refresh
3488         pkts = self.create_stream_out(self.pg1)
3489         self.pg1.add_stream(pkts)
3490         self.pg_enable_capture(self.pg_interfaces)
3491         self.pg_start()
3492         self.pg0.get_capture(2)
3493         self.vapi.nat_ha_flush()
3494         stats = self.statistics.get_counter('/nat44/ha/refresh-event-send')
3495         self.assertEqual(stats[0][0], 2)
3496         capture = self.pg3.get_capture(1)
3497         p = capture[0]
3498         self.assert_packet_checksums_valid(p)
3499         try:
3500             ip = p[IP]
3501             udp = p[UDP]
3502             hanat = p[HANATStateSync]
3503         except IndexError:
3504             self.logger.error(ppp("Invalid packet:", p))
3505             raise
3506         else:
3507             self.assertEqual(ip.src, self.pg3.local_ip4)
3508             self.assertEqual(ip.dst, self.pg3.remote_ip4)
3509             self.assertEqual(udp.sport, 12345)
3510             self.assertEqual(udp.dport, 12346)
3511             self.assertEqual(hanat.version, 1)
3512             self.assertEqual(hanat.count, 2)
3513             seq = hanat.sequence_number
3514             for event in hanat.events:
3515                 self.assertEqual(event.event_type, 3)
3516                 self.assertEqual(event.out_addr, self.nat_addr)
3517                 self.assertEqual(event.fib_index, 0)
3518                 self.assertEqual(event.total_pkts, 2)
3519                 self.assertGreater(event.total_bytes, 0)
3520
3521         ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3522                IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3523                UDP(sport=12346, dport=12345) /
3524                HANATStateSync(sequence_number=seq, flags='ACK'))
3525         self.pg3.add_stream(ack)
3526         self.pg_start()
3527         stats = self.statistics.get_counter('/nat44/ha/ack-recv')
3528         self.assertEqual(stats[0][0], 2)
3529
3530     def test_ha_recv(self):
3531         """ NAT44EI Receive HA session synchronization events (passive) """
3532         self.nat44_add_address(self.nat_addr)
3533         flags = self.config_flags.NAT_IS_INSIDE
3534         self.vapi.nat44_interface_add_del_feature(
3535             sw_if_index=self.pg0.sw_if_index,
3536             flags=flags, is_add=1)
3537         self.vapi.nat44_interface_add_del_feature(
3538             sw_if_index=self.pg1.sw_if_index,
3539             is_add=1)
3540         self.vapi.nat_ha_set_listener(ip_address=self.pg3.local_ip4,
3541                                       port=12345,
3542                                       path_mtu=512)
3543         bind_layers(UDP, HANATStateSync, sport=12345)
3544
3545         self.tcp_port_out = random.randint(1025, 65535)
3546         self.udp_port_out = random.randint(1025, 65535)
3547
3548         # send HA session add events to failover/passive
3549         p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3550              IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3551              UDP(sport=12346, dport=12345) /
3552              HANATStateSync(sequence_number=1, events=[
3553                  Event(event_type='add', protocol='tcp',
3554                        in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3555                        in_port=self.tcp_port_in, out_port=self.tcp_port_out,
3556                        eh_addr=self.pg1.remote_ip4,
3557                        ehn_addr=self.pg1.remote_ip4,
3558                        eh_port=self.tcp_external_port,
3559                        ehn_port=self.tcp_external_port, fib_index=0),
3560                  Event(event_type='add', protocol='udp',
3561                        in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3562                        in_port=self.udp_port_in, out_port=self.udp_port_out,
3563                        eh_addr=self.pg1.remote_ip4,
3564                        ehn_addr=self.pg1.remote_ip4,
3565                        eh_port=self.udp_external_port,
3566                        ehn_port=self.udp_external_port, fib_index=0)]))
3567
3568         self.pg3.add_stream(p)
3569         self.pg_enable_capture(self.pg_interfaces)
3570         self.pg_start()
3571         # receive ACK
3572         capture = self.pg3.get_capture(1)
3573         p = capture[0]
3574         try:
3575             hanat = p[HANATStateSync]
3576         except IndexError:
3577             self.logger.error(ppp("Invalid packet:", p))
3578             raise
3579         else:
3580             self.assertEqual(hanat.sequence_number, 1)
3581             self.assertEqual(hanat.flags, 'ACK')
3582             self.assertEqual(hanat.version, 1)
3583             self.assertEqual(hanat.thread_index, 0)
3584         stats = self.statistics.get_counter('/nat44/ha/ack-send')
3585         self.assertEqual(stats[0][0], 1)
3586         stats = self.statistics.get_counter('/nat44/ha/add-event-recv')
3587         self.assertEqual(stats[0][0], 2)
3588         users = self.statistics.get_counter('/nat44/total-users')
3589         self.assertEqual(users[0][0], 1)
3590         sessions = self.statistics.get_counter('/nat44/total-sessions')
3591         self.assertEqual(sessions[0][0], 2)
3592         users = self.vapi.nat44_user_dump()
3593         self.assertEqual(len(users), 1)
3594         self.assertEqual(str(users[0].ip_address),
3595                          self.pg0.remote_ip4)
3596         # there should be 2 sessions created by HA
3597         sessions = self.vapi.nat44_user_session_dump(users[0].ip_address,
3598                                                      users[0].vrf_id)
3599         self.assertEqual(len(sessions), 2)
3600         for session in sessions:
3601             self.assertEqual(str(session.inside_ip_address),
3602                              self.pg0.remote_ip4)
3603             self.assertEqual(str(session.outside_ip_address),
3604                              self.nat_addr)
3605             self.assertIn(session.inside_port,
3606                           [self.tcp_port_in, self.udp_port_in])
3607             self.assertIn(session.outside_port,
3608                           [self.tcp_port_out, self.udp_port_out])
3609             self.assertIn(session.protocol, [IP_PROTOS.tcp, IP_PROTOS.udp])
3610
3611         # send HA session delete event to failover/passive
3612         p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3613              IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3614              UDP(sport=12346, dport=12345) /
3615              HANATStateSync(sequence_number=2, events=[
3616                  Event(event_type='del', protocol='udp',
3617                        in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3618                        in_port=self.udp_port_in, out_port=self.udp_port_out,
3619                        eh_addr=self.pg1.remote_ip4,
3620                        ehn_addr=self.pg1.remote_ip4,
3621                        eh_port=self.udp_external_port,
3622                        ehn_port=self.udp_external_port, fib_index=0)]))
3623
3624         self.pg3.add_stream(p)
3625         self.pg_enable_capture(self.pg_interfaces)
3626         self.pg_start()
3627         # receive ACK
3628         capture = self.pg3.get_capture(1)
3629         p = capture[0]
3630         try:
3631             hanat = p[HANATStateSync]
3632         except IndexError:
3633             self.logger.error(ppp("Invalid packet:", p))
3634             raise
3635         else:
3636             self.assertEqual(hanat.sequence_number, 2)
3637             self.assertEqual(hanat.flags, 'ACK')
3638             self.assertEqual(hanat.version, 1)
3639         users = self.vapi.nat44_user_dump()
3640         self.assertEqual(len(users), 1)
3641         self.assertEqual(str(users[0].ip_address),
3642                          self.pg0.remote_ip4)
3643         # now we should have only 1 session, 1 deleted by HA
3644         sessions = self.vapi.nat44_user_session_dump(users[0].ip_address,
3645                                                      users[0].vrf_id)
3646         self.assertEqual(len(sessions), 1)
3647         stats = self.statistics.get_counter('/nat44/ha/del-event-recv')
3648         self.assertEqual(stats[0][0], 1)
3649
3650         stats = self.statistics.get_err_counter('/err/nat-ha/pkts-processed')
3651         self.assertEqual(stats, 2)
3652
3653         # send HA session refresh event to failover/passive
3654         p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3655              IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3656              UDP(sport=12346, dport=12345) /
3657              HANATStateSync(sequence_number=3, events=[
3658                  Event(event_type='refresh', protocol='tcp',
3659                        in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3660                        in_port=self.tcp_port_in, out_port=self.tcp_port_out,
3661                        eh_addr=self.pg1.remote_ip4,
3662                        ehn_addr=self.pg1.remote_ip4,
3663                        eh_port=self.tcp_external_port,
3664                        ehn_port=self.tcp_external_port, fib_index=0,
3665                        total_bytes=1024, total_pkts=2)]))
3666         self.pg3.add_stream(p)
3667         self.pg_enable_capture(self.pg_interfaces)
3668         self.pg_start()
3669         # receive ACK
3670         capture = self.pg3.get_capture(1)
3671         p = capture[0]
3672         try:
3673             hanat = p[HANATStateSync]
3674         except IndexError:
3675             self.logger.error(ppp("Invalid packet:", p))
3676             raise
3677         else:
3678             self.assertEqual(hanat.sequence_number, 3)
3679             self.assertEqual(hanat.flags, 'ACK')
3680             self.assertEqual(hanat.version, 1)
3681         users = self.vapi.nat44_user_dump()
3682         self.assertEqual(len(users), 1)
3683         self.assertEqual(str(users[0].ip_address),
3684                          self.pg0.remote_ip4)
3685         sessions = self.vapi.nat44_user_session_dump(users[0].ip_address,
3686                                                      users[0].vrf_id)
3687         self.assertEqual(len(sessions), 1)
3688         session = sessions[0]
3689         self.assertEqual(session.total_bytes, 1024)
3690         self.assertEqual(session.total_pkts, 2)
3691         stats = self.statistics.get_counter('/nat44/ha/refresh-event-recv')
3692         self.assertEqual(stats[0][0], 1)
3693
3694         stats = self.statistics.get_err_counter('/err/nat-ha/pkts-processed')
3695         self.assertEqual(stats, 3)
3696
3697         # send packet to test session created by HA
3698         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3699              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3700              TCP(sport=self.tcp_external_port, dport=self.tcp_port_out))
3701         self.pg1.add_stream(p)
3702         self.pg_enable_capture(self.pg_interfaces)
3703         self.pg_start()
3704         capture = self.pg0.get_capture(1)
3705         p = capture[0]
3706         try:
3707             ip = p[IP]
3708             tcp = p[TCP]
3709         except IndexError:
3710             self.logger.error(ppp("Invalid packet:", p))
3711             raise
3712         else:
3713             self.assertEqual(ip.src, self.pg1.remote_ip4)
3714             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3715             self.assertEqual(tcp.sport, self.tcp_external_port)
3716             self.assertEqual(tcp.dport, self.tcp_port_in)
3717
3718     def show_commands_at_teardown(self):
3719         self.logger.info(self.vapi.cli("show nat44 addresses"))
3720         self.logger.info(self.vapi.cli("show nat44 interfaces"))
3721         self.logger.info(self.vapi.cli("show nat44 static mappings"))
3722         self.logger.info(self.vapi.cli("show nat44 interface address"))
3723         self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3724         self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
3725         self.logger.info(self.vapi.cli("show nat timeouts"))
3726         self.logger.info(
3727             self.vapi.cli("show nat addr-port-assignment-alg"))
3728         self.logger.info(self.vapi.cli("show nat ha"))
3729
3730
3731 class TestNAT44Out2InDPO(MethodHolder):
3732     """ NAT44EI Test Cases using out2in DPO """
3733
3734     @classmethod
3735     def setUpClass(cls):
3736         super(TestNAT44Out2InDPO, cls).setUpClass()
3737         cls.vapi.cli("set log class nat level debug")
3738
3739         cls.tcp_port_in = 6303
3740         cls.tcp_port_out = 6303
3741         cls.udp_port_in = 6304
3742         cls.udp_port_out = 6304
3743         cls.icmp_id_in = 6305
3744         cls.icmp_id_out = 6305
3745         cls.nat_addr = '10.0.0.3'
3746         cls.dst_ip4 = '192.168.70.1'
3747
3748         cls.create_pg_interfaces(range(2))
3749
3750         cls.pg0.admin_up()
3751         cls.pg0.config_ip4()
3752         cls.pg0.resolve_arp()
3753
3754         cls.pg1.admin_up()
3755         cls.pg1.config_ip6()
3756         cls.pg1.resolve_ndp()
3757
3758         r1 = VppIpRoute(cls, "::", 0,
3759                         [VppRoutePath(cls.pg1.remote_ip6,
3760                                       cls.pg1.sw_if_index)],
3761                         register=False)
3762         r1.add_vpp_config()
3763
3764     def setUp(self):
3765         super(TestNAT44Out2InDPO, self).setUp()
3766         flags = self.nat44_config_flags.NAT44_API_IS_OUT2IN_DPO
3767         self.vapi.nat44_plugin_enable_disable(enable=1, flags=flags)
3768
3769     def tearDown(self):
3770         super(TestNAT44Out2InDPO, self).tearDown()
3771         if not self.vpp_dead:
3772             self.vapi.nat44_plugin_enable_disable(enable=0)
3773             self.vapi.cli("clear logging")
3774
3775     def configure_xlat(self):
3776         self.dst_ip6_pfx = '1:2:3::'
3777         self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3778                                               self.dst_ip6_pfx)
3779         self.dst_ip6_pfx_len = 96
3780         self.src_ip6_pfx = '4:5:6::'
3781         self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3782                                               self.src_ip6_pfx)
3783         self.src_ip6_pfx_len = 96
3784         self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3785                                  self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3786                                  '\x00\x00\x00\x00', 0)
3787
3788     @unittest.skip('Temporary disabled')
3789     def test_464xlat_ce(self):
3790         """ Test 464XLAT CE with NAT44EI """
3791
3792         nat_config = self.vapi.nat_show_config()
3793         self.assertEqual(1, nat_config.out2in_dpo)
3794
3795         self.configure_xlat()
3796
3797         flags = self.config_flags.NAT_IS_INSIDE
3798         self.vapi.nat44_interface_add_del_feature(
3799             sw_if_index=self.pg0.sw_if_index,
3800             flags=flags, is_add=1)
3801         self.vapi.nat44_add_del_address_range(first_ip_address=self.nat_addr_n,
3802                                               last_ip_address=self.nat_addr_n,
3803                                               vrf_id=0xFFFFFFFF, is_add=1)
3804
3805         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3806                                        self.dst_ip6_pfx_len)
3807         out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3808                                        self.src_ip6_pfx_len)
3809
3810         try:
3811             pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3812             self.pg0.add_stream(pkts)
3813             self.pg_enable_capture(self.pg_interfaces)
3814             self.pg_start()
3815             capture = self.pg1.get_capture(len(pkts))
3816             self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3817                                         dst_ip=out_src_ip6)
3818
3819             pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3820                                               out_dst_ip6)
3821             self.pg1.add_stream(pkts)
3822             self.pg_enable_capture(self.pg_interfaces)
3823             self.pg_start()
3824             capture = self.pg0.get_capture(len(pkts))
3825             self.verify_capture_in(capture, self.pg0)
3826         finally:
3827             self.vapi.nat44_interface_add_del_feature(
3828                 sw_if_index=self.pg0.sw_if_index,
3829                 flags=flags)
3830             self.vapi.nat44_add_del_address_range(
3831                 first_ip_address=self.nat_addr_n,
3832                 last_ip_address=self.nat_addr_n,
3833                 vrf_id=0xFFFFFFFF)
3834
3835     @unittest.skip('Temporary disabled')
3836     def test_464xlat_ce_no_nat(self):
3837         """ Test 464XLAT CE without NAT44EI """
3838
3839         self.configure_xlat()
3840
3841         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3842                                        self.dst_ip6_pfx_len)
3843         out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3844                                        self.src_ip6_pfx_len)
3845
3846         pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3847         self.pg0.add_stream(pkts)
3848         self.pg_enable_capture(self.pg_interfaces)
3849         self.pg_start()
3850         capture = self.pg1.get_capture(len(pkts))
3851         self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3852                                     nat_ip=out_dst_ip6, same_port=True)
3853
3854         pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3855         self.pg1.add_stream(pkts)
3856         self.pg_enable_capture(self.pg_interfaces)
3857         self.pg_start()
3858         capture = self.pg0.get_capture(len(pkts))
3859         self.verify_capture_in(capture, self.pg0)
3860
3861
3862 if __name__ == '__main__':
3863     unittest.main(testRunner=VppTestRunner)