0708d440714e0fe14d41b57f6f1571e7601b9c95
[vpp.git] / test / test_snat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.l2 import Ether, ARP
11 from scapy.data import IP_PROTOS
12 from util import ppp
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14
15
16 class MethodHolder(VppTestCase):
17     """ SNAT create capture and verify method holder """
18
19     @classmethod
20     def setUpClass(cls):
21         super(MethodHolder, cls).setUpClass()
22
23     def tearDown(self):
24         super(MethodHolder, self).tearDown()
25
26     def create_stream_in(self, in_if, out_if, ttl=64):
27         """
28         Create packet stream for inside network
29
30         :param in_if: Inside interface
31         :param out_if: Outside interface
32         :param ttl: TTL of generated packets
33         """
34         pkts = []
35         # TCP
36         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
37              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
38              TCP(sport=self.tcp_port_in))
39         pkts.append(p)
40
41         # UDP
42         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
43              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
44              UDP(sport=self.udp_port_in))
45         pkts.append(p)
46
47         # ICMP
48         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
49              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
50              ICMP(id=self.icmp_id_in, type='echo-request'))
51         pkts.append(p)
52
53         return pkts
54
55     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
56         """
57         Create packet stream for outside network
58
59         :param out_if: Outside interface
60         :param dst_ip: Destination IP address (Default use global SNAT address)
61         :param ttl: TTL of generated packets
62         """
63         if dst_ip is None:
64             dst_ip = self.snat_addr
65         pkts = []
66         # TCP
67         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
68              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
69              TCP(dport=self.tcp_port_out))
70         pkts.append(p)
71
72         # UDP
73         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
74              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
75              UDP(dport=self.udp_port_out))
76         pkts.append(p)
77
78         # ICMP
79         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
80              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
81              ICMP(id=self.icmp_id_out, type='echo-reply'))
82         pkts.append(p)
83
84         return pkts
85
86     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
87                            packet_num=3):
88         """
89         Verify captured packets on outside network
90
91         :param capture: Captured packets
92         :param nat_ip: Translated IP address (Default use global SNAT address)
93         :param same_port: Sorce port number is not translated (Default False)
94         :param packet_num: Expected number of packets (Default 3)
95         """
96         if nat_ip is None:
97             nat_ip = self.snat_addr
98         self.assertEqual(packet_num, len(capture))
99         for packet in capture:
100             try:
101                 self.assertEqual(packet[IP].src, nat_ip)
102                 if packet.haslayer(TCP):
103                     if same_port:
104                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
105                     else:
106                         self.assertNotEqual(
107                             packet[TCP].sport, self.tcp_port_in)
108                     self.tcp_port_out = packet[TCP].sport
109                 elif packet.haslayer(UDP):
110                     if same_port:
111                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
112                     else:
113                         self.assertNotEqual(
114                             packet[UDP].sport, self.udp_port_in)
115                     self.udp_port_out = packet[UDP].sport
116                 else:
117                     if same_port:
118                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
119                     else:
120                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
121                     self.icmp_id_out = packet[ICMP].id
122             except:
123                 self.logger.error(ppp("Unexpected or invalid packet "
124                                       "(outside network):", packet))
125                 raise
126
127     def verify_capture_in(self, capture, in_if, packet_num=3):
128         """
129         Verify captured packets on inside network
130
131         :param capture: Captured packets
132         :param in_if: Inside interface
133         :param packet_num: Expected number of packets (Default 3)
134         """
135         self.assertEqual(packet_num, len(capture))
136         for packet in capture:
137             try:
138                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
139                 if packet.haslayer(TCP):
140                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
141                 elif packet.haslayer(UDP):
142                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
143                 else:
144                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
145             except:
146                 self.logger.error(ppp("Unexpected or invalid packet "
147                                       "(inside network):", packet))
148                 raise
149
150     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
151         """
152         Verify captured packet that don't have to be translated
153
154         :param capture: Captured packets
155         :param ingress_if: Ingress interface
156         :param egress_if: Egress interface
157         """
158         for packet in capture:
159             try:
160                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
161                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
162                 if packet.haslayer(TCP):
163                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
164                 elif packet.haslayer(UDP):
165                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
166                 else:
167                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
168             except:
169                 self.logger.error(ppp("Unexpected or invalid packet "
170                                       "(inside network):", packet))
171                 raise
172
173     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
174                                             packet_num=3, icmp_type=11):
175         """
176         Verify captured packets with ICMP errors on outside network
177
178         :param capture: Captured packets
179         :param src_ip: Translated IP address or IP address of VPP
180                        (Default use global SNAT address)
181         :param packet_num: Expected number of packets (Default 3)
182         :param icmp_type: Type of error ICMP packet
183                           we are expecting (Default 11)
184         """
185         if src_ip is None:
186             src_ip = self.snat_addr
187         self.assertEqual(packet_num, len(capture))
188         for packet in capture:
189             try:
190                 self.assertEqual(packet[IP].src, src_ip)
191                 self.assertTrue(packet.haslayer(ICMP))
192                 icmp = packet[ICMP]
193                 self.assertEqual(icmp.type, icmp_type)
194                 self.assertTrue(icmp.haslayer(IPerror))
195                 inner_ip = icmp[IPerror]
196                 if inner_ip.haslayer(TCPerror):
197                     self.assertEqual(inner_ip[TCPerror].dport,
198                                      self.tcp_port_out)
199                 elif inner_ip.haslayer(UDPerror):
200                     self.assertEqual(inner_ip[UDPerror].dport,
201                                      self.udp_port_out)
202                 else:
203                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
204             except:
205                 self.logger.error(ppp("Unexpected or invalid packet "
206                                       "(outside network):", packet))
207                 raise
208
209     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
210                                            icmp_type=11):
211         """
212         Verify captured packets with ICMP errors on inside network
213
214         :param capture: Captured packets
215         :param in_if: Inside interface
216         :param packet_num: Expected number of packets (Default 3)
217         :param icmp_type: Type of error ICMP packet
218                           we are expecting (Default 11)
219         """
220         self.assertEqual(packet_num, len(capture))
221         for packet in capture:
222             try:
223                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
224                 self.assertTrue(packet.haslayer(ICMP))
225                 icmp = packet[ICMP]
226                 self.assertEqual(icmp.type, icmp_type)
227                 self.assertTrue(icmp.haslayer(IPerror))
228                 inner_ip = icmp[IPerror]
229                 if inner_ip.haslayer(TCPerror):
230                     self.assertEqual(inner_ip[TCPerror].sport,
231                                      self.tcp_port_in)
232                 elif inner_ip.haslayer(UDPerror):
233                     self.assertEqual(inner_ip[UDPerror].sport,
234                                      self.udp_port_in)
235                 else:
236                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
237             except:
238                 self.logger.error(ppp("Unexpected or invalid packet "
239                                       "(inside network):", packet))
240                 raise
241
242     def verify_ipfix_nat44_ses(self, data):
243         """
244         Verify IPFIX NAT44 session create/delete event
245
246         :param data: Decoded IPFIX data records
247         """
248         nat44_ses_create_num = 0
249         nat44_ses_delete_num = 0
250         self.assertEqual(6, len(data))
251         for record in data:
252             # natEvent
253             self.assertIn(ord(record[230]), [4, 5])
254             if ord(record[230]) == 4:
255                 nat44_ses_create_num += 1
256             else:
257                 nat44_ses_delete_num += 1
258             # sourceIPv4Address
259             self.assertEqual(self.pg0.remote_ip4n, record[8])
260             # postNATSourceIPv4Address
261             self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
262                              record[225])
263             # ingressVRFID
264             self.assertEqual(struct.pack("!I", 0), record[234])
265             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
266             if IP_PROTOS.icmp == ord(record[4]):
267                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
268                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
269                                  record[227])
270             elif IP_PROTOS.tcp == ord(record[4]):
271                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
272                                  record[7])
273                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
274                                  record[227])
275             elif IP_PROTOS.udp == ord(record[4]):
276                 self.assertEqual(struct.pack("!H", self.udp_port_in),
277                                  record[7])
278                 self.assertEqual(struct.pack("!H", self.udp_port_out),
279                                  record[227])
280             else:
281                 self.fail("Invalid protocol")
282         self.assertEqual(3, nat44_ses_create_num)
283         self.assertEqual(3, nat44_ses_delete_num)
284
285     def verify_ipfix_addr_exhausted(self, data):
286         """
287         Verify IPFIX NAT addresses event
288
289         :param data: Decoded IPFIX data records
290         """
291         self.assertEqual(1, len(data))
292         record = data[0]
293         # natEvent
294         self.assertEqual(ord(record[230]), 3)
295         # natPoolID
296         self.assertEqual(struct.pack("!I", 0), record[283])
297
298
299 class TestSNAT(MethodHolder):
300     """ SNAT Test Cases """
301
302     @classmethod
303     def setUpClass(cls):
304         super(TestSNAT, cls).setUpClass()
305
306         try:
307             cls.tcp_port_in = 6303
308             cls.tcp_port_out = 6303
309             cls.udp_port_in = 6304
310             cls.udp_port_out = 6304
311             cls.icmp_id_in = 6305
312             cls.icmp_id_out = 6305
313             cls.snat_addr = '10.0.0.3'
314
315             cls.create_pg_interfaces(range(8))
316             cls.interfaces = list(cls.pg_interfaces[0:4])
317
318             for i in cls.interfaces:
319                 i.admin_up()
320                 i.config_ip4()
321                 i.resolve_arp()
322
323             cls.pg0.generate_remote_hosts(2)
324             cls.pg0.configure_ipv4_neighbors()
325
326             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
327
328             cls.pg4._local_ip4 = "172.16.255.1"
329             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
330             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
331             cls.pg4.set_table_ip4(10)
332             cls.pg5._local_ip4 = "172.16.255.3"
333             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
334             cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
335             cls.pg5.set_table_ip4(10)
336             cls.pg6._local_ip4 = "172.16.255.1"
337             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
338             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
339             cls.pg6.set_table_ip4(20)
340             for i in cls.overlapping_interfaces:
341                 i.config_ip4()
342                 i.admin_up()
343                 i.resolve_arp()
344
345             cls.pg7.admin_up()
346
347         except Exception:
348             super(TestSNAT, cls).tearDownClass()
349             raise
350
351     def clear_snat(self):
352         """
353         Clear SNAT configuration.
354         """
355         if self.pg7.has_ip4_config:
356             self.pg7.unconfig_ip4()
357
358         interfaces = self.vapi.snat_interface_addr_dump()
359         for intf in interfaces:
360             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
361
362         self.vapi.snat_ipfix(enable=0)
363
364         interfaces = self.vapi.snat_interface_dump()
365         for intf in interfaces:
366             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
367                                                      intf.is_inside,
368                                                      is_add=0)
369
370         static_mappings = self.vapi.snat_static_mapping_dump()
371         for sm in static_mappings:
372             self.vapi.snat_add_static_mapping(sm.local_ip_address,
373                                               sm.external_ip_address,
374                                               local_port=sm.local_port,
375                                               external_port=sm.external_port,
376                                               addr_only=sm.addr_only,
377                                               vrf_id=sm.vrf_id,
378                                               protocol=sm.protocol,
379                                               is_add=0)
380
381         adresses = self.vapi.snat_address_dump()
382         for addr in adresses:
383             self.vapi.snat_add_address_range(addr.ip_address,
384                                              addr.ip_address,
385                                              is_add=0)
386
387     def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
388                                 local_port=0, external_port=0, vrf_id=0,
389                                 is_add=1, external_sw_if_index=0xFFFFFFFF,
390                                 proto=0):
391         """
392         Add/delete S-NAT static mapping
393
394         :param local_ip: Local IP address
395         :param external_ip: External IP address
396         :param local_port: Local port number (Optional)
397         :param external_port: External port number (Optional)
398         :param vrf_id: VRF ID (Default 0)
399         :param is_add: 1 if add, 0 if delete (Default add)
400         :param external_sw_if_index: External interface instead of IP address
401         :param proto: IP protocol (Mandatory if port specified)
402         """
403         addr_only = 1
404         if local_port and external_port:
405             addr_only = 0
406         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
407         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
408         self.vapi.snat_add_static_mapping(
409             l_ip,
410             e_ip,
411             external_sw_if_index,
412             local_port,
413             external_port,
414             addr_only,
415             vrf_id,
416             proto,
417             is_add)
418
419     def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
420         """
421         Add/delete S-NAT address
422
423         :param ip: IP address
424         :param is_add: 1 if add, 0 if delete (Default add)
425         """
426         snat_addr = socket.inet_pton(socket.AF_INET, ip)
427         self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
428                                          vrf_id=vrf_id)
429
430     def test_dynamic(self):
431         """ SNAT dynamic translation test """
432
433         self.snat_add_address(self.snat_addr)
434         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
435         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
436                                                  is_inside=0)
437
438         # in2out
439         pkts = self.create_stream_in(self.pg0, self.pg1)
440         self.pg0.add_stream(pkts)
441         self.pg_enable_capture(self.pg_interfaces)
442         self.pg_start()
443         capture = self.pg1.get_capture(len(pkts))
444         self.verify_capture_out(capture)
445
446         # out2in
447         pkts = self.create_stream_out(self.pg1)
448         self.pg1.add_stream(pkts)
449         self.pg_enable_capture(self.pg_interfaces)
450         self.pg_start()
451         capture = self.pg0.get_capture(len(pkts))
452         self.verify_capture_in(capture, self.pg0)
453
454     def test_dynamic_icmp_errors_in2out_ttl_1(self):
455         """ SNAT handling of client packets with TTL=1 """
456
457         self.snat_add_address(self.snat_addr)
458         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
459         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
460                                                  is_inside=0)
461
462         # Client side - generate traffic
463         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
464         self.pg0.add_stream(pkts)
465         self.pg_enable_capture(self.pg_interfaces)
466         self.pg_start()
467
468         # Client side - verify ICMP type 11 packets
469         capture = self.pg0.get_capture(len(pkts))
470         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
471
472     def test_dynamic_icmp_errors_out2in_ttl_1(self):
473         """ SNAT handling of server packets with TTL=1 """
474
475         self.snat_add_address(self.snat_addr)
476         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
477         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
478                                                  is_inside=0)
479
480         # Client side - create sessions
481         pkts = self.create_stream_in(self.pg0, self.pg1)
482         self.pg0.add_stream(pkts)
483         self.pg_enable_capture(self.pg_interfaces)
484         self.pg_start()
485
486         # Server side - generate traffic
487         capture = self.pg1.get_capture(len(pkts))
488         self.verify_capture_out(capture)
489         pkts = self.create_stream_out(self.pg1, ttl=1)
490         self.pg1.add_stream(pkts)
491         self.pg_enable_capture(self.pg_interfaces)
492         self.pg_start()
493
494         # Server side - verify ICMP type 11 packets
495         capture = self.pg1.get_capture(len(pkts))
496         self.verify_capture_out_with_icmp_errors(capture,
497                                                  src_ip=self.pg1.local_ip4)
498
499     def test_dynamic_icmp_errors_in2out_ttl_2(self):
500         """ SNAT handling of error responses to client packets with TTL=2 """
501
502         self.snat_add_address(self.snat_addr)
503         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
504         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
505                                                  is_inside=0)
506
507         # Client side - generate traffic
508         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
509         self.pg0.add_stream(pkts)
510         self.pg_enable_capture(self.pg_interfaces)
511         self.pg_start()
512
513         # Server side - simulate ICMP type 11 response
514         capture = self.pg1.get_capture(len(pkts))
515         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
516                 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
517                 ICMP(type=11) / packet[IP] for packet in capture]
518         self.pg1.add_stream(pkts)
519         self.pg_enable_capture(self.pg_interfaces)
520         self.pg_start()
521
522         # Client side - verify ICMP type 11 packets
523         capture = self.pg0.get_capture(len(pkts))
524         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
525
526     def test_dynamic_icmp_errors_out2in_ttl_2(self):
527         """ SNAT handling of error responses to server packets with TTL=2 """
528
529         self.snat_add_address(self.snat_addr)
530         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
531         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
532                                                  is_inside=0)
533
534         # Client side - create sessions
535         pkts = self.create_stream_in(self.pg0, self.pg1)
536         self.pg0.add_stream(pkts)
537         self.pg_enable_capture(self.pg_interfaces)
538         self.pg_start()
539
540         # Server side - generate traffic
541         capture = self.pg1.get_capture(len(pkts))
542         self.verify_capture_out(capture)
543         pkts = self.create_stream_out(self.pg1, ttl=2)
544         self.pg1.add_stream(pkts)
545         self.pg_enable_capture(self.pg_interfaces)
546         self.pg_start()
547
548         # Client side - simulate ICMP type 11 response
549         capture = self.pg0.get_capture(len(pkts))
550         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
551                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
552                 ICMP(type=11) / packet[IP] for packet in capture]
553         self.pg0.add_stream(pkts)
554         self.pg_enable_capture(self.pg_interfaces)
555         self.pg_start()
556
557         # Server side - verify ICMP type 11 packets
558         capture = self.pg1.get_capture(len(pkts))
559         self.verify_capture_out_with_icmp_errors(capture)
560
561     def test_ping_out_interface_from_outside(self):
562         """ Ping SNAT out interface from outside """
563
564         self.snat_add_address(self.snat_addr)
565         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
566         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
567                                                  is_inside=0)
568
569         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
570              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
571              ICMP(id=self.icmp_id_out, type='echo-request'))
572         pkts = [p]
573         self.pg1.add_stream(pkts)
574         self.pg_enable_capture(self.pg_interfaces)
575         self.pg_start()
576         capture = self.pg1.get_capture(len(pkts))
577         self.assertEqual(1, len(capture))
578         packet = capture[0]
579         try:
580             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
581             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
582             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
583             self.assertEqual(packet[ICMP].type, 0)  # echo reply
584         except:
585             self.logger.error(ppp("Unexpected or invalid packet "
586                                   "(outside network):", packet))
587             raise
588
589     def test_static_in(self):
590         """ SNAT 1:1 NAT initialized from inside network """
591
592         nat_ip = "10.0.0.10"
593         self.tcp_port_out = 6303
594         self.udp_port_out = 6304
595         self.icmp_id_out = 6305
596
597         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
598         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
599         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
600                                                  is_inside=0)
601
602         # in2out
603         pkts = self.create_stream_in(self.pg0, self.pg1)
604         self.pg0.add_stream(pkts)
605         self.pg_enable_capture(self.pg_interfaces)
606         self.pg_start()
607         capture = self.pg1.get_capture(len(pkts))
608         self.verify_capture_out(capture, nat_ip, True)
609
610         # out2in
611         pkts = self.create_stream_out(self.pg1, nat_ip)
612         self.pg1.add_stream(pkts)
613         self.pg_enable_capture(self.pg_interfaces)
614         self.pg_start()
615         capture = self.pg0.get_capture(len(pkts))
616         self.verify_capture_in(capture, self.pg0)
617
618     def test_static_out(self):
619         """ SNAT 1:1 NAT initialized from outside network """
620
621         nat_ip = "10.0.0.20"
622         self.tcp_port_out = 6303
623         self.udp_port_out = 6304
624         self.icmp_id_out = 6305
625
626         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
627         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
628         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
629                                                  is_inside=0)
630
631         # out2in
632         pkts = self.create_stream_out(self.pg1, nat_ip)
633         self.pg1.add_stream(pkts)
634         self.pg_enable_capture(self.pg_interfaces)
635         self.pg_start()
636         capture = self.pg0.get_capture(len(pkts))
637         self.verify_capture_in(capture, self.pg0)
638
639         # in2out
640         pkts = self.create_stream_in(self.pg0, self.pg1)
641         self.pg0.add_stream(pkts)
642         self.pg_enable_capture(self.pg_interfaces)
643         self.pg_start()
644         capture = self.pg1.get_capture(len(pkts))
645         self.verify_capture_out(capture, nat_ip, True)
646
647     def test_static_with_port_in(self):
648         """ SNAT 1:1 NAT with port initialized from inside network """
649
650         self.tcp_port_out = 3606
651         self.udp_port_out = 3607
652         self.icmp_id_out = 3608
653
654         self.snat_add_address(self.snat_addr)
655         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
656                                      self.tcp_port_in, self.tcp_port_out,
657                                      proto=IP_PROTOS.tcp)
658         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
659                                      self.udp_port_in, self.udp_port_out,
660                                      proto=IP_PROTOS.udp)
661         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
662                                      self.icmp_id_in, self.icmp_id_out,
663                                      proto=IP_PROTOS.icmp)
664         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
665         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
666                                                  is_inside=0)
667
668         # in2out
669         pkts = self.create_stream_in(self.pg0, self.pg1)
670         self.pg0.add_stream(pkts)
671         self.pg_enable_capture(self.pg_interfaces)
672         self.pg_start()
673         capture = self.pg1.get_capture(len(pkts))
674         self.verify_capture_out(capture)
675
676         # out2in
677         pkts = self.create_stream_out(self.pg1)
678         self.pg1.add_stream(pkts)
679         self.pg_enable_capture(self.pg_interfaces)
680         self.pg_start()
681         capture = self.pg0.get_capture(len(pkts))
682         self.verify_capture_in(capture, self.pg0)
683
684     def test_static_with_port_out(self):
685         """ SNAT 1:1 NAT with port initialized from outside network """
686
687         self.tcp_port_out = 30606
688         self.udp_port_out = 30607
689         self.icmp_id_out = 30608
690
691         self.snat_add_address(self.snat_addr)
692         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
693                                      self.tcp_port_in, self.tcp_port_out,
694                                      proto=IP_PROTOS.tcp)
695         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
696                                      self.udp_port_in, self.udp_port_out,
697                                      proto=IP_PROTOS.udp)
698         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
699                                      self.icmp_id_in, self.icmp_id_out,
700                                      proto=IP_PROTOS.icmp)
701         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
702         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
703                                                  is_inside=0)
704
705         # out2in
706         pkts = self.create_stream_out(self.pg1)
707         self.pg1.add_stream(pkts)
708         self.pg_enable_capture(self.pg_interfaces)
709         self.pg_start()
710         capture = self.pg0.get_capture(len(pkts))
711         self.verify_capture_in(capture, self.pg0)
712
713         # in2out
714         pkts = self.create_stream_in(self.pg0, self.pg1)
715         self.pg0.add_stream(pkts)
716         self.pg_enable_capture(self.pg_interfaces)
717         self.pg_start()
718         capture = self.pg1.get_capture(len(pkts))
719         self.verify_capture_out(capture)
720
721     def test_static_vrf_aware(self):
722         """ SNAT 1:1 NAT VRF awareness """
723
724         nat_ip1 = "10.0.0.30"
725         nat_ip2 = "10.0.0.40"
726         self.tcp_port_out = 6303
727         self.udp_port_out = 6304
728         self.icmp_id_out = 6305
729
730         self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
731                                      vrf_id=10)
732         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
733                                      vrf_id=10)
734         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
735                                                  is_inside=0)
736         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
737         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
738
739         # inside interface VRF match SNAT static mapping VRF
740         pkts = self.create_stream_in(self.pg4, self.pg3)
741         self.pg4.add_stream(pkts)
742         self.pg_enable_capture(self.pg_interfaces)
743         self.pg_start()
744         capture = self.pg3.get_capture(len(pkts))
745         self.verify_capture_out(capture, nat_ip1, True)
746
747         # inside interface VRF don't match SNAT static mapping VRF (packets
748         # are dropped)
749         pkts = self.create_stream_in(self.pg0, self.pg3)
750         self.pg0.add_stream(pkts)
751         self.pg_enable_capture(self.pg_interfaces)
752         self.pg_start()
753         self.pg3.assert_nothing_captured()
754
755     def test_multiple_inside_interfaces(self):
756         """ SNAT multiple inside interfaces (non-overlapping address space) """
757
758         self.snat_add_address(self.snat_addr)
759         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
760         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
761         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
762                                                  is_inside=0)
763
764         # between two S-NAT inside interfaces (no translation)
765         pkts = self.create_stream_in(self.pg0, self.pg1)
766         self.pg0.add_stream(pkts)
767         self.pg_enable_capture(self.pg_interfaces)
768         self.pg_start()
769         capture = self.pg1.get_capture(len(pkts))
770         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
771
772         # from S-NAT inside to interface without S-NAT feature (no translation)
773         pkts = self.create_stream_in(self.pg0, self.pg2)
774         self.pg0.add_stream(pkts)
775         self.pg_enable_capture(self.pg_interfaces)
776         self.pg_start()
777         capture = self.pg2.get_capture(len(pkts))
778         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
779
780         # in2out 1st interface
781         pkts = self.create_stream_in(self.pg0, self.pg3)
782         self.pg0.add_stream(pkts)
783         self.pg_enable_capture(self.pg_interfaces)
784         self.pg_start()
785         capture = self.pg3.get_capture(len(pkts))
786         self.verify_capture_out(capture)
787
788         # out2in 1st interface
789         pkts = self.create_stream_out(self.pg3)
790         self.pg3.add_stream(pkts)
791         self.pg_enable_capture(self.pg_interfaces)
792         self.pg_start()
793         capture = self.pg0.get_capture(len(pkts))
794         self.verify_capture_in(capture, self.pg0)
795
796         # in2out 2nd interface
797         pkts = self.create_stream_in(self.pg1, self.pg3)
798         self.pg1.add_stream(pkts)
799         self.pg_enable_capture(self.pg_interfaces)
800         self.pg_start()
801         capture = self.pg3.get_capture(len(pkts))
802         self.verify_capture_out(capture)
803
804         # out2in 2nd interface
805         pkts = self.create_stream_out(self.pg3)
806         self.pg3.add_stream(pkts)
807         self.pg_enable_capture(self.pg_interfaces)
808         self.pg_start()
809         capture = self.pg1.get_capture(len(pkts))
810         self.verify_capture_in(capture, self.pg1)
811
812     def test_inside_overlapping_interfaces(self):
813         """ SNAT multiple inside interfaces with overlapping address space """
814
815         static_nat_ip = "10.0.0.10"
816         self.snat_add_address(self.snat_addr)
817         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
818                                                  is_inside=0)
819         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
820         self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
821         self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
822         self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
823                                      vrf_id=20)
824
825         # between S-NAT inside interfaces with same VRF (no translation)
826         pkts = self.create_stream_in(self.pg4, self.pg5)
827         self.pg4.add_stream(pkts)
828         self.pg_enable_capture(self.pg_interfaces)
829         self.pg_start()
830         capture = self.pg5.get_capture(len(pkts))
831         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
832
833         # between S-NAT inside interfaces with different VRF (hairpinning)
834         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
835              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
836              TCP(sport=1234, dport=5678))
837         self.pg4.add_stream(p)
838         self.pg_enable_capture(self.pg_interfaces)
839         self.pg_start()
840         capture = self.pg6.get_capture(1)
841         p = capture[0]
842         try:
843             ip = p[IP]
844             tcp = p[TCP]
845             self.assertEqual(ip.src, self.snat_addr)
846             self.assertEqual(ip.dst, self.pg6.remote_ip4)
847             self.assertNotEqual(tcp.sport, 1234)
848             self.assertEqual(tcp.dport, 5678)
849         except:
850             self.logger.error(ppp("Unexpected or invalid packet:", p))
851             raise
852
853         # in2out 1st interface
854         pkts = self.create_stream_in(self.pg4, self.pg3)
855         self.pg4.add_stream(pkts)
856         self.pg_enable_capture(self.pg_interfaces)
857         self.pg_start()
858         capture = self.pg3.get_capture(len(pkts))
859         self.verify_capture_out(capture)
860
861         # out2in 1st interface
862         pkts = self.create_stream_out(self.pg3)
863         self.pg3.add_stream(pkts)
864         self.pg_enable_capture(self.pg_interfaces)
865         self.pg_start()
866         capture = self.pg4.get_capture(len(pkts))
867         self.verify_capture_in(capture, self.pg4)
868
869         # in2out 2nd interface
870         pkts = self.create_stream_in(self.pg5, self.pg3)
871         self.pg5.add_stream(pkts)
872         self.pg_enable_capture(self.pg_interfaces)
873         self.pg_start()
874         capture = self.pg3.get_capture(len(pkts))
875         self.verify_capture_out(capture)
876
877         # out2in 2nd interface
878         pkts = self.create_stream_out(self.pg3)
879         self.pg3.add_stream(pkts)
880         self.pg_enable_capture(self.pg_interfaces)
881         self.pg_start()
882         capture = self.pg5.get_capture(len(pkts))
883         self.verify_capture_in(capture, self.pg5)
884
885         # pg5 session dump
886         addresses = self.vapi.snat_address_dump()
887         self.assertEqual(len(addresses), 1)
888         sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
889         self.assertEqual(len(sessions), 3)
890         for session in sessions:
891             self.assertFalse(session.is_static)
892             self.assertEqual(session.inside_ip_address[0:4],
893                              self.pg5.remote_ip4n)
894             self.assertEqual(session.outside_ip_address,
895                              addresses[0].ip_address)
896         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
897         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
898         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
899         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
900         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
901         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
902         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
903         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
904         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
905
906         # in2out 3rd interface
907         pkts = self.create_stream_in(self.pg6, self.pg3)
908         self.pg6.add_stream(pkts)
909         self.pg_enable_capture(self.pg_interfaces)
910         self.pg_start()
911         capture = self.pg3.get_capture(len(pkts))
912         self.verify_capture_out(capture, static_nat_ip, True)
913
914         # out2in 3rd interface
915         pkts = self.create_stream_out(self.pg3, static_nat_ip)
916         self.pg3.add_stream(pkts)
917         self.pg_enable_capture(self.pg_interfaces)
918         self.pg_start()
919         capture = self.pg6.get_capture(len(pkts))
920         self.verify_capture_in(capture, self.pg6)
921
922         # general user and session dump verifications
923         users = self.vapi.snat_user_dump()
924         self.assertTrue(len(users) >= 3)
925         addresses = self.vapi.snat_address_dump()
926         self.assertEqual(len(addresses), 1)
927         for user in users:
928             sessions = self.vapi.snat_user_session_dump(user.ip_address,
929                                                         user.vrf_id)
930             for session in sessions:
931                 self.assertEqual(user.ip_address, session.inside_ip_address)
932                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
933                 self.assertTrue(session.protocol in
934                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
935                                  IP_PROTOS.icmp])
936
937         # pg4 session dump
938         sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
939         self.assertTrue(len(sessions) >= 4)
940         for session in sessions:
941             self.assertFalse(session.is_static)
942             self.assertEqual(session.inside_ip_address[0:4],
943                              self.pg4.remote_ip4n)
944             self.assertEqual(session.outside_ip_address,
945                              addresses[0].ip_address)
946
947         # pg6 session dump
948         sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
949         self.assertTrue(len(sessions) >= 3)
950         for session in sessions:
951             self.assertTrue(session.is_static)
952             self.assertEqual(session.inside_ip_address[0:4],
953                              self.pg6.remote_ip4n)
954             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
955                              map(int, static_nat_ip.split('.')))
956             self.assertTrue(session.inside_port in
957                             [self.tcp_port_in, self.udp_port_in,
958                              self.icmp_id_in])
959
960     def test_hairpinning(self):
961         """ SNAT hairpinning """
962
963         host = self.pg0.remote_hosts[0]
964         server = self.pg0.remote_hosts[1]
965         host_in_port = 1234
966         host_out_port = 0
967         server_in_port = 5678
968         server_out_port = 8765
969
970         self.snat_add_address(self.snat_addr)
971         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
972         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
973                                                  is_inside=0)
974         # add static mapping for server
975         self.snat_add_static_mapping(server.ip4, self.snat_addr,
976                                      server_in_port, server_out_port,
977                                      proto=IP_PROTOS.tcp)
978
979         # send packet from host to server
980         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
981              IP(src=host.ip4, dst=self.snat_addr) /
982              TCP(sport=host_in_port, dport=server_out_port))
983         self.pg0.add_stream(p)
984         self.pg_enable_capture(self.pg_interfaces)
985         self.pg_start()
986         capture = self.pg0.get_capture(1)
987         p = capture[0]
988         try:
989             ip = p[IP]
990             tcp = p[TCP]
991             self.assertEqual(ip.src, self.snat_addr)
992             self.assertEqual(ip.dst, server.ip4)
993             self.assertNotEqual(tcp.sport, host_in_port)
994             self.assertEqual(tcp.dport, server_in_port)
995             host_out_port = tcp.sport
996         except:
997             self.logger.error(ppp("Unexpected or invalid packet:", p))
998             raise
999
1000         # send reply from server to host
1001         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1002              IP(src=server.ip4, dst=self.snat_addr) /
1003              TCP(sport=server_in_port, dport=host_out_port))
1004         self.pg0.add_stream(p)
1005         self.pg_enable_capture(self.pg_interfaces)
1006         self.pg_start()
1007         capture = self.pg0.get_capture(1)
1008         p = capture[0]
1009         try:
1010             ip = p[IP]
1011             tcp = p[TCP]
1012             self.assertEqual(ip.src, self.snat_addr)
1013             self.assertEqual(ip.dst, host.ip4)
1014             self.assertEqual(tcp.sport, server_out_port)
1015             self.assertEqual(tcp.dport, host_in_port)
1016         except:
1017             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1018             raise
1019
1020     def test_max_translations_per_user(self):
1021         """ MAX translations per user - recycle the least recently used """
1022
1023         self.snat_add_address(self.snat_addr)
1024         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1025         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1026                                                  is_inside=0)
1027
1028         # get maximum number of translations per user
1029         snat_config = self.vapi.snat_show_config()
1030
1031         # send more than maximum number of translations per user packets
1032         pkts_num = snat_config.max_translations_per_user + 5
1033         pkts = []
1034         for port in range(0, pkts_num):
1035             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1036                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1037                  TCP(sport=1025 + port))
1038             pkts.append(p)
1039         self.pg0.add_stream(pkts)
1040         self.pg_enable_capture(self.pg_interfaces)
1041         self.pg_start()
1042
1043         # verify number of translated packet
1044         self.pg1.get_capture(pkts_num)
1045
1046     def test_interface_addr(self):
1047         """ Acquire SNAT addresses from interface """
1048         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1049
1050         # no address in NAT pool
1051         adresses = self.vapi.snat_address_dump()
1052         self.assertEqual(0, len(adresses))
1053
1054         # configure interface address and check NAT address pool
1055         self.pg7.config_ip4()
1056         adresses = self.vapi.snat_address_dump()
1057         self.assertEqual(1, len(adresses))
1058         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1059
1060         # remove interface address and check NAT address pool
1061         self.pg7.unconfig_ip4()
1062         adresses = self.vapi.snat_address_dump()
1063         self.assertEqual(0, len(adresses))
1064
1065     def test_interface_addr_static_mapping(self):
1066         """ Static mapping with addresses from interface """
1067         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1068         self.snat_add_static_mapping('1.2.3.4',
1069                                      external_sw_if_index=self.pg7.sw_if_index)
1070
1071         # static mappings with external interface
1072         static_mappings = self.vapi.snat_static_mapping_dump()
1073         self.assertEqual(1, len(static_mappings))
1074         self.assertEqual(self.pg7.sw_if_index,
1075                          static_mappings[0].external_sw_if_index)
1076
1077         # configure interface address and check static mappings
1078         self.pg7.config_ip4()
1079         static_mappings = self.vapi.snat_static_mapping_dump()
1080         self.assertEqual(1, len(static_mappings))
1081         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1082                          self.pg7.local_ip4n)
1083         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1084
1085         # remove interface address and check static mappings
1086         self.pg7.unconfig_ip4()
1087         static_mappings = self.vapi.snat_static_mapping_dump()
1088         self.assertEqual(0, len(static_mappings))
1089
1090     def test_ipfix_nat44_sess(self):
1091         """ S-NAT IPFIX logging NAT44 session created/delted """
1092         self.snat_add_address(self.snat_addr)
1093         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1094         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1095                                                  is_inside=0)
1096         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1097                                      src_address=self.pg3.local_ip4n,
1098                                      path_mtu=512,
1099                                      template_interval=10)
1100         self.vapi.snat_ipfix()
1101
1102         pkts = self.create_stream_in(self.pg0, self.pg1)
1103         self.pg0.add_stream(pkts)
1104         self.pg_enable_capture(self.pg_interfaces)
1105         self.pg_start()
1106         capture = self.pg1.get_capture(len(pkts))
1107         self.verify_capture_out(capture)
1108         self.snat_add_address(self.snat_addr, is_add=0)
1109         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1110         capture = self.pg3.get_capture(3)
1111         ipfix = IPFIXDecoder()
1112         # first load template
1113         for p in capture:
1114             self.assertTrue(p.haslayer(IPFIX))
1115             if p.haslayer(Template):
1116                 ipfix.add_template(p.getlayer(Template))
1117         # verify events in data set
1118         for p in capture:
1119             if p.haslayer(Data):
1120                 data = ipfix.decode_data_set(p.getlayer(Set))
1121                 self.verify_ipfix_nat44_ses(data)
1122
1123     def test_ipfix_addr_exhausted(self):
1124         """ S-NAT IPFIX logging NAT addresses exhausted """
1125         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1126         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1127                                                  is_inside=0)
1128         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1129                                      src_address=self.pg3.local_ip4n,
1130                                      path_mtu=512,
1131                                      template_interval=10)
1132         self.vapi.snat_ipfix()
1133
1134         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1135              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1136              TCP(sport=3025))
1137         self.pg0.add_stream(p)
1138         self.pg_enable_capture(self.pg_interfaces)
1139         self.pg_start()
1140         capture = self.pg1.get_capture(0)
1141         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1142         capture = self.pg3.get_capture(3)
1143         ipfix = IPFIXDecoder()
1144         # first load template
1145         for p in capture:
1146             self.assertTrue(p.haslayer(IPFIX))
1147             if p.haslayer(Template):
1148                 ipfix.add_template(p.getlayer(Template))
1149         # verify events in data set
1150         for p in capture:
1151             if p.haslayer(Data):
1152                 data = ipfix.decode_data_set(p.getlayer(Set))
1153                 self.verify_ipfix_addr_exhausted(data)
1154
1155     def test_pool_addr_fib(self):
1156         """ S-NAT add pool addresses to FIB """
1157         static_addr = '10.0.0.10'
1158         self.snat_add_address(self.snat_addr)
1159         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1160         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1161                                                  is_inside=0)
1162         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1163
1164         # SNAT address
1165         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1166              ARP(op=ARP.who_has, pdst=self.snat_addr,
1167                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1168         self.pg1.add_stream(p)
1169         self.pg_enable_capture(self.pg_interfaces)
1170         self.pg_start()
1171         capture = self.pg1.get_capture(1)
1172         self.assertTrue(capture[0].haslayer(ARP))
1173         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1174
1175         # 1:1 NAT address
1176         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1177              ARP(op=ARP.who_has, pdst=static_addr,
1178                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1179         self.pg1.add_stream(p)
1180         self.pg_enable_capture(self.pg_interfaces)
1181         self.pg_start()
1182         capture = self.pg1.get_capture(1)
1183         self.assertTrue(capture[0].haslayer(ARP))
1184         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1185
1186         # send ARP to non-SNAT interface
1187         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1188              ARP(op=ARP.who_has, pdst=self.snat_addr,
1189                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1190         self.pg2.add_stream(p)
1191         self.pg_enable_capture(self.pg_interfaces)
1192         self.pg_start()
1193         capture = self.pg1.get_capture(0)
1194
1195         # remove addresses and verify
1196         self.snat_add_address(self.snat_addr, is_add=0)
1197         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1198                                      is_add=0)
1199
1200         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1201              ARP(op=ARP.who_has, pdst=self.snat_addr,
1202                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1203         self.pg1.add_stream(p)
1204         self.pg_enable_capture(self.pg_interfaces)
1205         self.pg_start()
1206         capture = self.pg1.get_capture(0)
1207
1208         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1209              ARP(op=ARP.who_has, pdst=static_addr,
1210                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1211         self.pg1.add_stream(p)
1212         self.pg_enable_capture(self.pg_interfaces)
1213         self.pg_start()
1214         capture = self.pg1.get_capture(0)
1215
1216     def test_vrf_mode(self):
1217         """ S-NAT tenant VRF aware address pool mode """
1218
1219         vrf_id1 = 1
1220         vrf_id2 = 2
1221         nat_ip1 = "10.0.0.10"
1222         nat_ip2 = "10.0.0.11"
1223
1224         self.pg0.unconfig_ip4()
1225         self.pg1.unconfig_ip4()
1226         self.pg0.set_table_ip4(vrf_id1)
1227         self.pg1.set_table_ip4(vrf_id2)
1228         self.pg0.config_ip4()
1229         self.pg1.config_ip4()
1230
1231         self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1232         self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1233         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1234         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1235         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1236                                                  is_inside=0)
1237
1238         # first VRF
1239         pkts = self.create_stream_in(self.pg0, self.pg2)
1240         self.pg0.add_stream(pkts)
1241         self.pg_enable_capture(self.pg_interfaces)
1242         self.pg_start()
1243         capture = self.pg2.get_capture(len(pkts))
1244         self.verify_capture_out(capture, nat_ip1)
1245
1246         # second VRF
1247         pkts = self.create_stream_in(self.pg1, self.pg2)
1248         self.pg1.add_stream(pkts)
1249         self.pg_enable_capture(self.pg_interfaces)
1250         self.pg_start()
1251         capture = self.pg2.get_capture(len(pkts))
1252         self.verify_capture_out(capture, nat_ip2)
1253
1254     def test_vrf_feature_independent(self):
1255         """ S-NAT tenant VRF independent address pool mode """
1256
1257         nat_ip1 = "10.0.0.10"
1258         nat_ip2 = "10.0.0.11"
1259
1260         self.snat_add_address(nat_ip1)
1261         self.snat_add_address(nat_ip2)
1262         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1263         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1264         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1265                                                  is_inside=0)
1266
1267         # first VRF
1268         pkts = self.create_stream_in(self.pg0, self.pg2)
1269         self.pg0.add_stream(pkts)
1270         self.pg_enable_capture(self.pg_interfaces)
1271         self.pg_start()
1272         capture = self.pg2.get_capture(len(pkts))
1273         self.verify_capture_out(capture, nat_ip1)
1274
1275         # second VRF
1276         pkts = self.create_stream_in(self.pg1, self.pg2)
1277         self.pg1.add_stream(pkts)
1278         self.pg_enable_capture(self.pg_interfaces)
1279         self.pg_start()
1280         capture = self.pg2.get_capture(len(pkts))
1281         self.verify_capture_out(capture, nat_ip1)
1282
1283     def tearDown(self):
1284         super(TestSNAT, self).tearDown()
1285         if not self.vpp_dead:
1286             self.logger.info(self.vapi.cli("show snat verbose"))
1287             self.clear_snat()
1288
1289
1290 class TestDeterministicNAT(MethodHolder):
1291     """ Deterministic NAT Test Cases """
1292
1293     @classmethod
1294     def setUpConstants(cls):
1295         super(TestDeterministicNAT, cls).setUpConstants()
1296         cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1297
1298     @classmethod
1299     def setUpClass(cls):
1300         super(TestDeterministicNAT, cls).setUpClass()
1301
1302         try:
1303             cls.create_pg_interfaces(range(2))
1304             cls.interfaces = list(cls.pg_interfaces)
1305
1306             for i in cls.interfaces:
1307                 i.admin_up()
1308                 i.config_ip4()
1309                 i.resolve_arp()
1310
1311         except Exception:
1312             super(TestDeterministicNAT, cls).tearDownClass()
1313             raise
1314
1315     def test_deterministic_mode(self):
1316         """ S-NAT run deterministic mode """
1317         in_addr = '172.16.255.0'
1318         out_addr = '172.17.255.50'
1319         in_addr_t = '172.16.255.20'
1320         in_addr_n = socket.inet_aton(in_addr)
1321         out_addr_n = socket.inet_aton(out_addr)
1322         in_addr_t_n = socket.inet_aton(in_addr_t)
1323         in_plen = 24
1324         out_plen = 32
1325
1326         snat_config = self.vapi.snat_show_config()
1327         self.assertEqual(1, snat_config.deterministic)
1328
1329         self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
1330
1331         rep1 = self.vapi.snat_det_forward(in_addr_t_n)
1332         self.assertEqual(rep1.out_addr[:4], out_addr_n)
1333         rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
1334         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
1335
1336         deterministic_mappings = self.vapi.snat_det_map_dump()
1337         self.assertEqual(len(deterministic_mappings), 1)
1338         dsm = deterministic_mappings[0]
1339         self.assertEqual(in_addr_n, dsm.in_addr[:4])
1340         self.assertEqual(in_plen, dsm.in_plen)
1341         self.assertEqual(out_addr_n, dsm.out_addr[:4])
1342         self.assertEqual(out_plen, dsm.out_plen)
1343
1344         self.clear_snat()
1345         deterministic_mappings = self.vapi.snat_det_map_dump()
1346         self.assertEqual(len(deterministic_mappings), 0)
1347
1348     def clear_snat(self):
1349         """
1350         Clear SNAT configuration.
1351         """
1352         deterministic_mappings = self.vapi.snat_det_map_dump()
1353         for dsm in deterministic_mappings:
1354             self.vapi.snat_add_det_map(dsm.in_addr,
1355                                        dsm.in_plen,
1356                                        dsm.out_addr,
1357                                        dsm.out_plen,
1358                                        is_add=0)
1359
1360     def tearDown(self):
1361         super(TestDeterministicNAT, self).tearDown()
1362         if not self.vpp_dead:
1363             self.logger.info(self.vapi.cli("show snat detail"))
1364             self.clear_snat()
1365
1366 if __name__ == '__main__':
1367     unittest.main(testRunner=VppTestRunner)