SNAT: Additional tests for SNAT interfaces without a configured ip address
[vpp.git] / test / test_snat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.l2 import Ether, ARP
11 from scapy.data import IP_PROTOS
12 from util import ppp
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from time import sleep
15
16
17 class MethodHolder(VppTestCase):
18     """ SNAT create capture and verify method holder """
19
20     @classmethod
21     def setUpClass(cls):
22         super(MethodHolder, cls).setUpClass()
23
24     def tearDown(self):
25         super(MethodHolder, self).tearDown()
26
27     def create_stream_in(self, in_if, out_if, ttl=64):
28         """
29         Create packet stream for inside network
30
31         :param in_if: Inside interface
32         :param out_if: Outside interface
33         :param ttl: TTL of generated packets
34         """
35         pkts = []
36         # TCP
37         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
38              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
39              TCP(sport=self.tcp_port_in))
40         pkts.append(p)
41
42         # UDP
43         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
44              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
45              UDP(sport=self.udp_port_in))
46         pkts.append(p)
47
48         # ICMP
49         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
50              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
51              ICMP(id=self.icmp_id_in, type='echo-request'))
52         pkts.append(p)
53
54         return pkts
55
56     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
57         """
58         Create packet stream for outside network
59
60         :param out_if: Outside interface
61         :param dst_ip: Destination IP address (Default use global SNAT address)
62         :param ttl: TTL of generated packets
63         """
64         if dst_ip is None:
65             dst_ip = self.snat_addr
66         pkts = []
67         # TCP
68         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
69              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
70              TCP(dport=self.tcp_port_out))
71         pkts.append(p)
72
73         # UDP
74         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
75              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
76              UDP(dport=self.udp_port_out))
77         pkts.append(p)
78
79         # ICMP
80         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
81              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
82              ICMP(id=self.icmp_id_out, type='echo-reply'))
83         pkts.append(p)
84
85         return pkts
86
87     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
88                            packet_num=3):
89         """
90         Verify captured packets on outside network
91
92         :param capture: Captured packets
93         :param nat_ip: Translated IP address (Default use global SNAT address)
94         :param same_port: Sorce port number is not translated (Default False)
95         :param packet_num: Expected number of packets (Default 3)
96         """
97         if nat_ip is None:
98             nat_ip = self.snat_addr
99         self.assertEqual(packet_num, len(capture))
100         for packet in capture:
101             try:
102                 self.assertEqual(packet[IP].src, nat_ip)
103                 if packet.haslayer(TCP):
104                     if same_port:
105                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
106                     else:
107                         self.assertNotEqual(
108                             packet[TCP].sport, self.tcp_port_in)
109                     self.tcp_port_out = packet[TCP].sport
110                 elif packet.haslayer(UDP):
111                     if same_port:
112                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
113                     else:
114                         self.assertNotEqual(
115                             packet[UDP].sport, self.udp_port_in)
116                     self.udp_port_out = packet[UDP].sport
117                 else:
118                     if same_port:
119                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
120                     else:
121                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
122                     self.icmp_id_out = packet[ICMP].id
123             except:
124                 self.logger.error(ppp("Unexpected or invalid packet "
125                                       "(outside network):", packet))
126                 raise
127
128     def verify_capture_in(self, capture, in_if, packet_num=3):
129         """
130         Verify captured packets on inside network
131
132         :param capture: Captured packets
133         :param in_if: Inside interface
134         :param packet_num: Expected number of packets (Default 3)
135         """
136         self.assertEqual(packet_num, len(capture))
137         for packet in capture:
138             try:
139                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
140                 if packet.haslayer(TCP):
141                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
142                 elif packet.haslayer(UDP):
143                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
144                 else:
145                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
146             except:
147                 self.logger.error(ppp("Unexpected or invalid packet "
148                                       "(inside network):", packet))
149                 raise
150
151     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
152         """
153         Verify captured packet that don't have to be translated
154
155         :param capture: Captured packets
156         :param ingress_if: Ingress interface
157         :param egress_if: Egress interface
158         """
159         for packet in capture:
160             try:
161                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
162                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
163                 if packet.haslayer(TCP):
164                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
165                 elif packet.haslayer(UDP):
166                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
167                 else:
168                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
169             except:
170                 self.logger.error(ppp("Unexpected or invalid packet "
171                                       "(inside network):", packet))
172                 raise
173
174     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
175                                             packet_num=3, icmp_type=11):
176         """
177         Verify captured packets with ICMP errors on outside network
178
179         :param capture: Captured packets
180         :param src_ip: Translated IP address or IP address of VPP
181                        (Default use global SNAT address)
182         :param packet_num: Expected number of packets (Default 3)
183         :param icmp_type: Type of error ICMP packet
184                           we are expecting (Default 11)
185         """
186         if src_ip is None:
187             src_ip = self.snat_addr
188         self.assertEqual(packet_num, len(capture))
189         for packet in capture:
190             try:
191                 self.assertEqual(packet[IP].src, src_ip)
192                 self.assertTrue(packet.haslayer(ICMP))
193                 icmp = packet[ICMP]
194                 self.assertEqual(icmp.type, icmp_type)
195                 self.assertTrue(icmp.haslayer(IPerror))
196                 inner_ip = icmp[IPerror]
197                 if inner_ip.haslayer(TCPerror):
198                     self.assertEqual(inner_ip[TCPerror].dport,
199                                      self.tcp_port_out)
200                 elif inner_ip.haslayer(UDPerror):
201                     self.assertEqual(inner_ip[UDPerror].dport,
202                                      self.udp_port_out)
203                 else:
204                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
205             except:
206                 self.logger.error(ppp("Unexpected or invalid packet "
207                                       "(outside network):", packet))
208                 raise
209
210     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
211                                            icmp_type=11):
212         """
213         Verify captured packets with ICMP errors on inside network
214
215         :param capture: Captured packets
216         :param in_if: Inside interface
217         :param packet_num: Expected number of packets (Default 3)
218         :param icmp_type: Type of error ICMP packet
219                           we are expecting (Default 11)
220         """
221         self.assertEqual(packet_num, len(capture))
222         for packet in capture:
223             try:
224                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
225                 self.assertTrue(packet.haslayer(ICMP))
226                 icmp = packet[ICMP]
227                 self.assertEqual(icmp.type, icmp_type)
228                 self.assertTrue(icmp.haslayer(IPerror))
229                 inner_ip = icmp[IPerror]
230                 if inner_ip.haslayer(TCPerror):
231                     self.assertEqual(inner_ip[TCPerror].sport,
232                                      self.tcp_port_in)
233                 elif inner_ip.haslayer(UDPerror):
234                     self.assertEqual(inner_ip[UDPerror].sport,
235                                      self.udp_port_in)
236                 else:
237                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
238             except:
239                 self.logger.error(ppp("Unexpected or invalid packet "
240                                       "(inside network):", packet))
241                 raise
242
243     def verify_ipfix_nat44_ses(self, data):
244         """
245         Verify IPFIX NAT44 session create/delete event
246
247         :param data: Decoded IPFIX data records
248         """
249         nat44_ses_create_num = 0
250         nat44_ses_delete_num = 0
251         self.assertEqual(6, len(data))
252         for record in data:
253             # natEvent
254             self.assertIn(ord(record[230]), [4, 5])
255             if ord(record[230]) == 4:
256                 nat44_ses_create_num += 1
257             else:
258                 nat44_ses_delete_num += 1
259             # sourceIPv4Address
260             self.assertEqual(self.pg0.remote_ip4n, record[8])
261             # postNATSourceIPv4Address
262             self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
263                              record[225])
264             # ingressVRFID
265             self.assertEqual(struct.pack("!I", 0), record[234])
266             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
267             if IP_PROTOS.icmp == ord(record[4]):
268                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
269                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
270                                  record[227])
271             elif IP_PROTOS.tcp == ord(record[4]):
272                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
273                                  record[7])
274                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
275                                  record[227])
276             elif IP_PROTOS.udp == ord(record[4]):
277                 self.assertEqual(struct.pack("!H", self.udp_port_in),
278                                  record[7])
279                 self.assertEqual(struct.pack("!H", self.udp_port_out),
280                                  record[227])
281             else:
282                 self.fail("Invalid protocol")
283         self.assertEqual(3, nat44_ses_create_num)
284         self.assertEqual(3, nat44_ses_delete_num)
285
286     def verify_ipfix_addr_exhausted(self, data):
287         """
288         Verify IPFIX NAT addresses event
289
290         :param data: Decoded IPFIX data records
291         """
292         self.assertEqual(1, len(data))
293         record = data[0]
294         # natEvent
295         self.assertEqual(ord(record[230]), 3)
296         # natPoolID
297         self.assertEqual(struct.pack("!I", 0), record[283])
298
299
300 class TestSNAT(MethodHolder):
301     """ SNAT Test Cases """
302
303     @classmethod
304     def setUpClass(cls):
305         super(TestSNAT, cls).setUpClass()
306
307         try:
308             cls.tcp_port_in = 6303
309             cls.tcp_port_out = 6303
310             cls.udp_port_in = 6304
311             cls.udp_port_out = 6304
312             cls.icmp_id_in = 6305
313             cls.icmp_id_out = 6305
314             cls.snat_addr = '10.0.0.3'
315
316             cls.create_pg_interfaces(range(9))
317             cls.interfaces = list(cls.pg_interfaces[0:4])
318
319             for i in cls.interfaces:
320                 i.admin_up()
321                 i.config_ip4()
322                 i.resolve_arp()
323
324             cls.pg0.generate_remote_hosts(2)
325             cls.pg0.configure_ipv4_neighbors()
326
327             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
328
329             cls.pg4._local_ip4 = "172.16.255.1"
330             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
331             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
332             cls.pg4.set_table_ip4(10)
333             cls.pg5._local_ip4 = "172.16.255.3"
334             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
335             cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
336             cls.pg5.set_table_ip4(10)
337             cls.pg6._local_ip4 = "172.16.255.1"
338             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
339             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
340             cls.pg6.set_table_ip4(20)
341             for i in cls.overlapping_interfaces:
342                 i.config_ip4()
343                 i.admin_up()
344                 i.resolve_arp()
345
346             cls.pg7.admin_up()
347             cls.pg8.admin_up()
348
349         except Exception:
350             super(TestSNAT, cls).tearDownClass()
351             raise
352
353     def clear_snat(self):
354         """
355         Clear SNAT configuration.
356         """
357         # I found no elegant way to do this
358         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
359                                    dst_address_length=32,
360                                    next_hop_address=self.pg7.remote_ip4n,
361                                    next_hop_sw_if_index=self.pg7.sw_if_index,
362                                    is_add=0)
363         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
364                                    dst_address_length=32,
365                                    next_hop_address=self.pg8.remote_ip4n,
366                                    next_hop_sw_if_index=self.pg8.sw_if_index,
367                                    is_add=0)
368
369         for intf in [self.pg7, self.pg8]:
370             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
371             for n in neighbors:
372                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
373                                               n.mac_address,
374                                               n.ip_address,
375                                               is_add=0)
376
377         if self.pg7.has_ip4_config:
378             self.pg7.unconfig_ip4()
379
380         interfaces = self.vapi.snat_interface_addr_dump()
381         for intf in interfaces:
382             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
383
384         self.vapi.snat_ipfix(enable=0)
385
386         interfaces = self.vapi.snat_interface_dump()
387         for intf in interfaces:
388             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
389                                                      intf.is_inside,
390                                                      is_add=0)
391
392         static_mappings = self.vapi.snat_static_mapping_dump()
393         for sm in static_mappings:
394             self.vapi.snat_add_static_mapping(sm.local_ip_address,
395                                               sm.external_ip_address,
396                                               local_port=sm.local_port,
397                                               external_port=sm.external_port,
398                                               addr_only=sm.addr_only,
399                                               vrf_id=sm.vrf_id,
400                                               protocol=sm.protocol,
401                                               is_add=0)
402
403         adresses = self.vapi.snat_address_dump()
404         for addr in adresses:
405             self.vapi.snat_add_address_range(addr.ip_address,
406                                              addr.ip_address,
407                                              is_add=0)
408
409     def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
410                                 local_port=0, external_port=0, vrf_id=0,
411                                 is_add=1, external_sw_if_index=0xFFFFFFFF,
412                                 proto=0):
413         """
414         Add/delete S-NAT static mapping
415
416         :param local_ip: Local IP address
417         :param external_ip: External IP address
418         :param local_port: Local port number (Optional)
419         :param external_port: External port number (Optional)
420         :param vrf_id: VRF ID (Default 0)
421         :param is_add: 1 if add, 0 if delete (Default add)
422         :param external_sw_if_index: External interface instead of IP address
423         :param proto: IP protocol (Mandatory if port specified)
424         """
425         addr_only = 1
426         if local_port and external_port:
427             addr_only = 0
428         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
429         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
430         self.vapi.snat_add_static_mapping(
431             l_ip,
432             e_ip,
433             external_sw_if_index,
434             local_port,
435             external_port,
436             addr_only,
437             vrf_id,
438             proto,
439             is_add)
440
441     def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
442         """
443         Add/delete S-NAT address
444
445         :param ip: IP address
446         :param is_add: 1 if add, 0 if delete (Default add)
447         """
448         snat_addr = socket.inet_pton(socket.AF_INET, ip)
449         self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
450                                          vrf_id=vrf_id)
451
452     def test_dynamic(self):
453         """ SNAT dynamic translation test """
454
455         self.snat_add_address(self.snat_addr)
456         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
457         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
458                                                  is_inside=0)
459
460         # in2out
461         pkts = self.create_stream_in(self.pg0, self.pg1)
462         self.pg0.add_stream(pkts)
463         self.pg_enable_capture(self.pg_interfaces)
464         self.pg_start()
465         capture = self.pg1.get_capture(len(pkts))
466         self.verify_capture_out(capture)
467
468         # out2in
469         pkts = self.create_stream_out(self.pg1)
470         self.pg1.add_stream(pkts)
471         self.pg_enable_capture(self.pg_interfaces)
472         self.pg_start()
473         capture = self.pg0.get_capture(len(pkts))
474         self.verify_capture_in(capture, self.pg0)
475
476     def test_dynamic_icmp_errors_in2out_ttl_1(self):
477         """ SNAT handling of client packets with TTL=1 """
478
479         self.snat_add_address(self.snat_addr)
480         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
481         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
482                                                  is_inside=0)
483
484         # Client side - generate traffic
485         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
486         self.pg0.add_stream(pkts)
487         self.pg_enable_capture(self.pg_interfaces)
488         self.pg_start()
489
490         # Client side - verify ICMP type 11 packets
491         capture = self.pg0.get_capture(len(pkts))
492         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
493
494     def test_dynamic_icmp_errors_out2in_ttl_1(self):
495         """ SNAT handling of server packets with TTL=1 """
496
497         self.snat_add_address(self.snat_addr)
498         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
499         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
500                                                  is_inside=0)
501
502         # Client side - create sessions
503         pkts = self.create_stream_in(self.pg0, self.pg1)
504         self.pg0.add_stream(pkts)
505         self.pg_enable_capture(self.pg_interfaces)
506         self.pg_start()
507
508         # Server side - generate traffic
509         capture = self.pg1.get_capture(len(pkts))
510         self.verify_capture_out(capture)
511         pkts = self.create_stream_out(self.pg1, ttl=1)
512         self.pg1.add_stream(pkts)
513         self.pg_enable_capture(self.pg_interfaces)
514         self.pg_start()
515
516         # Server side - verify ICMP type 11 packets
517         capture = self.pg1.get_capture(len(pkts))
518         self.verify_capture_out_with_icmp_errors(capture,
519                                                  src_ip=self.pg1.local_ip4)
520
521     def test_dynamic_icmp_errors_in2out_ttl_2(self):
522         """ SNAT handling of error responses to client packets with TTL=2 """
523
524         self.snat_add_address(self.snat_addr)
525         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
526         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
527                                                  is_inside=0)
528
529         # Client side - generate traffic
530         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
531         self.pg0.add_stream(pkts)
532         self.pg_enable_capture(self.pg_interfaces)
533         self.pg_start()
534
535         # Server side - simulate ICMP type 11 response
536         capture = self.pg1.get_capture(len(pkts))
537         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
538                 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
539                 ICMP(type=11) / packet[IP] for packet in capture]
540         self.pg1.add_stream(pkts)
541         self.pg_enable_capture(self.pg_interfaces)
542         self.pg_start()
543
544         # Client side - verify ICMP type 11 packets
545         capture = self.pg0.get_capture(len(pkts))
546         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
547
548     def test_dynamic_icmp_errors_out2in_ttl_2(self):
549         """ SNAT handling of error responses to server packets with TTL=2 """
550
551         self.snat_add_address(self.snat_addr)
552         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
553         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
554                                                  is_inside=0)
555
556         # Client side - create sessions
557         pkts = self.create_stream_in(self.pg0, self.pg1)
558         self.pg0.add_stream(pkts)
559         self.pg_enable_capture(self.pg_interfaces)
560         self.pg_start()
561
562         # Server side - generate traffic
563         capture = self.pg1.get_capture(len(pkts))
564         self.verify_capture_out(capture)
565         pkts = self.create_stream_out(self.pg1, ttl=2)
566         self.pg1.add_stream(pkts)
567         self.pg_enable_capture(self.pg_interfaces)
568         self.pg_start()
569
570         # Client side - simulate ICMP type 11 response
571         capture = self.pg0.get_capture(len(pkts))
572         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
573                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
574                 ICMP(type=11) / packet[IP] for packet in capture]
575         self.pg0.add_stream(pkts)
576         self.pg_enable_capture(self.pg_interfaces)
577         self.pg_start()
578
579         # Server side - verify ICMP type 11 packets
580         capture = self.pg1.get_capture(len(pkts))
581         self.verify_capture_out_with_icmp_errors(capture)
582
583     def test_ping_out_interface_from_outside(self):
584         """ Ping SNAT out interface from outside network """
585
586         self.snat_add_address(self.snat_addr)
587         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
588         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
589                                                  is_inside=0)
590
591         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
592              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
593              ICMP(id=self.icmp_id_out, type='echo-request'))
594         pkts = [p]
595         self.pg1.add_stream(pkts)
596         self.pg_enable_capture(self.pg_interfaces)
597         self.pg_start()
598         capture = self.pg1.get_capture(len(pkts))
599         self.assertEqual(1, len(capture))
600         packet = capture[0]
601         try:
602             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
603             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
604             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
605             self.assertEqual(packet[ICMP].type, 0)  # echo reply
606         except:
607             self.logger.error(ppp("Unexpected or invalid packet "
608                                   "(outside network):", packet))
609             raise
610
611     def test_ping_internal_host_from_outside(self):
612         """ Ping internal host from outside network """
613
614         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
615         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
616         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
617                                                  is_inside=0)
618
619         # out2in
620         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
621                IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
622                ICMP(id=self.icmp_id_out, type='echo-request'))
623         self.pg1.add_stream(pkt)
624         self.pg_enable_capture(self.pg_interfaces)
625         self.pg_start()
626         capture = self.pg0.get_capture(1)
627         self.verify_capture_in(capture, self.pg0, packet_num=1)
628         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
629
630         # in2out
631         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
632                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
633                ICMP(id=self.icmp_id_in, type='echo-reply'))
634         self.pg0.add_stream(pkt)
635         self.pg_enable_capture(self.pg_interfaces)
636         self.pg_start()
637         capture = self.pg1.get_capture(1)
638         self.verify_capture_out(capture, same_port=True, packet_num=1)
639         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
640
641     def test_static_in(self):
642         """ SNAT 1:1 NAT initialized from inside network """
643
644         nat_ip = "10.0.0.10"
645         self.tcp_port_out = 6303
646         self.udp_port_out = 6304
647         self.icmp_id_out = 6305
648
649         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
650         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
651         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
652                                                  is_inside=0)
653
654         # in2out
655         pkts = self.create_stream_in(self.pg0, self.pg1)
656         self.pg0.add_stream(pkts)
657         self.pg_enable_capture(self.pg_interfaces)
658         self.pg_start()
659         capture = self.pg1.get_capture(len(pkts))
660         self.verify_capture_out(capture, nat_ip, True)
661
662         # out2in
663         pkts = self.create_stream_out(self.pg1, nat_ip)
664         self.pg1.add_stream(pkts)
665         self.pg_enable_capture(self.pg_interfaces)
666         self.pg_start()
667         capture = self.pg0.get_capture(len(pkts))
668         self.verify_capture_in(capture, self.pg0)
669
670     def test_static_out(self):
671         """ SNAT 1:1 NAT initialized from outside network """
672
673         nat_ip = "10.0.0.20"
674         self.tcp_port_out = 6303
675         self.udp_port_out = 6304
676         self.icmp_id_out = 6305
677
678         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
679         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
680         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
681                                                  is_inside=0)
682
683         # out2in
684         pkts = self.create_stream_out(self.pg1, nat_ip)
685         self.pg1.add_stream(pkts)
686         self.pg_enable_capture(self.pg_interfaces)
687         self.pg_start()
688         capture = self.pg0.get_capture(len(pkts))
689         self.verify_capture_in(capture, self.pg0)
690
691         # in2out
692         pkts = self.create_stream_in(self.pg0, self.pg1)
693         self.pg0.add_stream(pkts)
694         self.pg_enable_capture(self.pg_interfaces)
695         self.pg_start()
696         capture = self.pg1.get_capture(len(pkts))
697         self.verify_capture_out(capture, nat_ip, True)
698
699     def test_static_with_port_in(self):
700         """ SNAT 1:1 NAT with port initialized from inside network """
701
702         self.tcp_port_out = 3606
703         self.udp_port_out = 3607
704         self.icmp_id_out = 3608
705
706         self.snat_add_address(self.snat_addr)
707         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
708                                      self.tcp_port_in, self.tcp_port_out,
709                                      proto=IP_PROTOS.tcp)
710         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
711                                      self.udp_port_in, self.udp_port_out,
712                                      proto=IP_PROTOS.udp)
713         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
714                                      self.icmp_id_in, self.icmp_id_out,
715                                      proto=IP_PROTOS.icmp)
716         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
717         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
718                                                  is_inside=0)
719
720         # in2out
721         pkts = self.create_stream_in(self.pg0, self.pg1)
722         self.pg0.add_stream(pkts)
723         self.pg_enable_capture(self.pg_interfaces)
724         self.pg_start()
725         capture = self.pg1.get_capture(len(pkts))
726         self.verify_capture_out(capture)
727
728         # out2in
729         pkts = self.create_stream_out(self.pg1)
730         self.pg1.add_stream(pkts)
731         self.pg_enable_capture(self.pg_interfaces)
732         self.pg_start()
733         capture = self.pg0.get_capture(len(pkts))
734         self.verify_capture_in(capture, self.pg0)
735
736     def test_static_with_port_out(self):
737         """ SNAT 1:1 NAT with port initialized from outside network """
738
739         self.tcp_port_out = 30606
740         self.udp_port_out = 30607
741         self.icmp_id_out = 30608
742
743         self.snat_add_address(self.snat_addr)
744         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
745                                      self.tcp_port_in, self.tcp_port_out,
746                                      proto=IP_PROTOS.tcp)
747         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
748                                      self.udp_port_in, self.udp_port_out,
749                                      proto=IP_PROTOS.udp)
750         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
751                                      self.icmp_id_in, self.icmp_id_out,
752                                      proto=IP_PROTOS.icmp)
753         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
754         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
755                                                  is_inside=0)
756
757         # out2in
758         pkts = self.create_stream_out(self.pg1)
759         self.pg1.add_stream(pkts)
760         self.pg_enable_capture(self.pg_interfaces)
761         self.pg_start()
762         capture = self.pg0.get_capture(len(pkts))
763         self.verify_capture_in(capture, self.pg0)
764
765         # in2out
766         pkts = self.create_stream_in(self.pg0, self.pg1)
767         self.pg0.add_stream(pkts)
768         self.pg_enable_capture(self.pg_interfaces)
769         self.pg_start()
770         capture = self.pg1.get_capture(len(pkts))
771         self.verify_capture_out(capture)
772
773     def test_static_vrf_aware(self):
774         """ SNAT 1:1 NAT VRF awareness """
775
776         nat_ip1 = "10.0.0.30"
777         nat_ip2 = "10.0.0.40"
778         self.tcp_port_out = 6303
779         self.udp_port_out = 6304
780         self.icmp_id_out = 6305
781
782         self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
783                                      vrf_id=10)
784         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
785                                      vrf_id=10)
786         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
787                                                  is_inside=0)
788         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
789         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
790
791         # inside interface VRF match SNAT static mapping VRF
792         pkts = self.create_stream_in(self.pg4, self.pg3)
793         self.pg4.add_stream(pkts)
794         self.pg_enable_capture(self.pg_interfaces)
795         self.pg_start()
796         capture = self.pg3.get_capture(len(pkts))
797         self.verify_capture_out(capture, nat_ip1, True)
798
799         # inside interface VRF don't match SNAT static mapping VRF (packets
800         # are dropped)
801         pkts = self.create_stream_in(self.pg0, self.pg3)
802         self.pg0.add_stream(pkts)
803         self.pg_enable_capture(self.pg_interfaces)
804         self.pg_start()
805         self.pg3.assert_nothing_captured()
806
807     def test_multiple_inside_interfaces(self):
808         """ SNAT multiple inside interfaces (non-overlapping address space) """
809
810         self.snat_add_address(self.snat_addr)
811         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
812         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
813         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
814                                                  is_inside=0)
815
816         # between two S-NAT inside interfaces (no translation)
817         pkts = self.create_stream_in(self.pg0, self.pg1)
818         self.pg0.add_stream(pkts)
819         self.pg_enable_capture(self.pg_interfaces)
820         self.pg_start()
821         capture = self.pg1.get_capture(len(pkts))
822         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
823
824         # from S-NAT inside to interface without S-NAT feature (no translation)
825         pkts = self.create_stream_in(self.pg0, self.pg2)
826         self.pg0.add_stream(pkts)
827         self.pg_enable_capture(self.pg_interfaces)
828         self.pg_start()
829         capture = self.pg2.get_capture(len(pkts))
830         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
831
832         # in2out 1st interface
833         pkts = self.create_stream_in(self.pg0, self.pg3)
834         self.pg0.add_stream(pkts)
835         self.pg_enable_capture(self.pg_interfaces)
836         self.pg_start()
837         capture = self.pg3.get_capture(len(pkts))
838         self.verify_capture_out(capture)
839
840         # out2in 1st interface
841         pkts = self.create_stream_out(self.pg3)
842         self.pg3.add_stream(pkts)
843         self.pg_enable_capture(self.pg_interfaces)
844         self.pg_start()
845         capture = self.pg0.get_capture(len(pkts))
846         self.verify_capture_in(capture, self.pg0)
847
848         # in2out 2nd interface
849         pkts = self.create_stream_in(self.pg1, self.pg3)
850         self.pg1.add_stream(pkts)
851         self.pg_enable_capture(self.pg_interfaces)
852         self.pg_start()
853         capture = self.pg3.get_capture(len(pkts))
854         self.verify_capture_out(capture)
855
856         # out2in 2nd interface
857         pkts = self.create_stream_out(self.pg3)
858         self.pg3.add_stream(pkts)
859         self.pg_enable_capture(self.pg_interfaces)
860         self.pg_start()
861         capture = self.pg1.get_capture(len(pkts))
862         self.verify_capture_in(capture, self.pg1)
863
864     def test_inside_overlapping_interfaces(self):
865         """ SNAT multiple inside interfaces with overlapping address space """
866
867         static_nat_ip = "10.0.0.10"
868         self.snat_add_address(self.snat_addr)
869         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
870                                                  is_inside=0)
871         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
872         self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
873         self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
874         self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
875                                      vrf_id=20)
876
877         # between S-NAT inside interfaces with same VRF (no translation)
878         pkts = self.create_stream_in(self.pg4, self.pg5)
879         self.pg4.add_stream(pkts)
880         self.pg_enable_capture(self.pg_interfaces)
881         self.pg_start()
882         capture = self.pg5.get_capture(len(pkts))
883         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
884
885         # between S-NAT inside interfaces with different VRF (hairpinning)
886         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
887              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
888              TCP(sport=1234, dport=5678))
889         self.pg4.add_stream(p)
890         self.pg_enable_capture(self.pg_interfaces)
891         self.pg_start()
892         capture = self.pg6.get_capture(1)
893         p = capture[0]
894         try:
895             ip = p[IP]
896             tcp = p[TCP]
897             self.assertEqual(ip.src, self.snat_addr)
898             self.assertEqual(ip.dst, self.pg6.remote_ip4)
899             self.assertNotEqual(tcp.sport, 1234)
900             self.assertEqual(tcp.dport, 5678)
901         except:
902             self.logger.error(ppp("Unexpected or invalid packet:", p))
903             raise
904
905         # in2out 1st interface
906         pkts = self.create_stream_in(self.pg4, self.pg3)
907         self.pg4.add_stream(pkts)
908         self.pg_enable_capture(self.pg_interfaces)
909         self.pg_start()
910         capture = self.pg3.get_capture(len(pkts))
911         self.verify_capture_out(capture)
912
913         # out2in 1st interface
914         pkts = self.create_stream_out(self.pg3)
915         self.pg3.add_stream(pkts)
916         self.pg_enable_capture(self.pg_interfaces)
917         self.pg_start()
918         capture = self.pg4.get_capture(len(pkts))
919         self.verify_capture_in(capture, self.pg4)
920
921         # in2out 2nd interface
922         pkts = self.create_stream_in(self.pg5, self.pg3)
923         self.pg5.add_stream(pkts)
924         self.pg_enable_capture(self.pg_interfaces)
925         self.pg_start()
926         capture = self.pg3.get_capture(len(pkts))
927         self.verify_capture_out(capture)
928
929         # out2in 2nd interface
930         pkts = self.create_stream_out(self.pg3)
931         self.pg3.add_stream(pkts)
932         self.pg_enable_capture(self.pg_interfaces)
933         self.pg_start()
934         capture = self.pg5.get_capture(len(pkts))
935         self.verify_capture_in(capture, self.pg5)
936
937         # pg5 session dump
938         addresses = self.vapi.snat_address_dump()
939         self.assertEqual(len(addresses), 1)
940         sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
941         self.assertEqual(len(sessions), 3)
942         for session in sessions:
943             self.assertFalse(session.is_static)
944             self.assertEqual(session.inside_ip_address[0:4],
945                              self.pg5.remote_ip4n)
946             self.assertEqual(session.outside_ip_address,
947                              addresses[0].ip_address)
948         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
949         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
950         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
951         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
952         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
953         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
954         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
955         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
956         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
957
958         # in2out 3rd interface
959         pkts = self.create_stream_in(self.pg6, self.pg3)
960         self.pg6.add_stream(pkts)
961         self.pg_enable_capture(self.pg_interfaces)
962         self.pg_start()
963         capture = self.pg3.get_capture(len(pkts))
964         self.verify_capture_out(capture, static_nat_ip, True)
965
966         # out2in 3rd interface
967         pkts = self.create_stream_out(self.pg3, static_nat_ip)
968         self.pg3.add_stream(pkts)
969         self.pg_enable_capture(self.pg_interfaces)
970         self.pg_start()
971         capture = self.pg6.get_capture(len(pkts))
972         self.verify_capture_in(capture, self.pg6)
973
974         # general user and session dump verifications
975         users = self.vapi.snat_user_dump()
976         self.assertTrue(len(users) >= 3)
977         addresses = self.vapi.snat_address_dump()
978         self.assertEqual(len(addresses), 1)
979         for user in users:
980             sessions = self.vapi.snat_user_session_dump(user.ip_address,
981                                                         user.vrf_id)
982             for session in sessions:
983                 self.assertEqual(user.ip_address, session.inside_ip_address)
984                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
985                 self.assertTrue(session.protocol in
986                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
987                                  IP_PROTOS.icmp])
988
989         # pg4 session dump
990         sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
991         self.assertTrue(len(sessions) >= 4)
992         for session in sessions:
993             self.assertFalse(session.is_static)
994             self.assertEqual(session.inside_ip_address[0:4],
995                              self.pg4.remote_ip4n)
996             self.assertEqual(session.outside_ip_address,
997                              addresses[0].ip_address)
998
999         # pg6 session dump
1000         sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1001         self.assertTrue(len(sessions) >= 3)
1002         for session in sessions:
1003             self.assertTrue(session.is_static)
1004             self.assertEqual(session.inside_ip_address[0:4],
1005                              self.pg6.remote_ip4n)
1006             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1007                              map(int, static_nat_ip.split('.')))
1008             self.assertTrue(session.inside_port in
1009                             [self.tcp_port_in, self.udp_port_in,
1010                              self.icmp_id_in])
1011
1012     def test_hairpinning(self):
1013         """ SNAT hairpinning """
1014
1015         host = self.pg0.remote_hosts[0]
1016         server = self.pg0.remote_hosts[1]
1017         host_in_port = 1234
1018         host_out_port = 0
1019         server_in_port = 5678
1020         server_out_port = 8765
1021
1022         self.snat_add_address(self.snat_addr)
1023         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1024         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1025                                                  is_inside=0)
1026         # add static mapping for server
1027         self.snat_add_static_mapping(server.ip4, self.snat_addr,
1028                                      server_in_port, server_out_port,
1029                                      proto=IP_PROTOS.tcp)
1030
1031         # send packet from host to server
1032         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1033              IP(src=host.ip4, dst=self.snat_addr) /
1034              TCP(sport=host_in_port, dport=server_out_port))
1035         self.pg0.add_stream(p)
1036         self.pg_enable_capture(self.pg_interfaces)
1037         self.pg_start()
1038         capture = self.pg0.get_capture(1)
1039         p = capture[0]
1040         try:
1041             ip = p[IP]
1042             tcp = p[TCP]
1043             self.assertEqual(ip.src, self.snat_addr)
1044             self.assertEqual(ip.dst, server.ip4)
1045             self.assertNotEqual(tcp.sport, host_in_port)
1046             self.assertEqual(tcp.dport, server_in_port)
1047             host_out_port = tcp.sport
1048         except:
1049             self.logger.error(ppp("Unexpected or invalid packet:", p))
1050             raise
1051
1052         # send reply from server to host
1053         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1054              IP(src=server.ip4, dst=self.snat_addr) /
1055              TCP(sport=server_in_port, dport=host_out_port))
1056         self.pg0.add_stream(p)
1057         self.pg_enable_capture(self.pg_interfaces)
1058         self.pg_start()
1059         capture = self.pg0.get_capture(1)
1060         p = capture[0]
1061         try:
1062             ip = p[IP]
1063             tcp = p[TCP]
1064             self.assertEqual(ip.src, self.snat_addr)
1065             self.assertEqual(ip.dst, host.ip4)
1066             self.assertEqual(tcp.sport, server_out_port)
1067             self.assertEqual(tcp.dport, host_in_port)
1068         except:
1069             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1070             raise
1071
1072     def test_max_translations_per_user(self):
1073         """ MAX translations per user - recycle the least recently used """
1074
1075         self.snat_add_address(self.snat_addr)
1076         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1077         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1078                                                  is_inside=0)
1079
1080         # get maximum number of translations per user
1081         snat_config = self.vapi.snat_show_config()
1082
1083         # send more than maximum number of translations per user packets
1084         pkts_num = snat_config.max_translations_per_user + 5
1085         pkts = []
1086         for port in range(0, pkts_num):
1087             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1088                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1089                  TCP(sport=1025 + port))
1090             pkts.append(p)
1091         self.pg0.add_stream(pkts)
1092         self.pg_enable_capture(self.pg_interfaces)
1093         self.pg_start()
1094
1095         # verify number of translated packet
1096         self.pg1.get_capture(pkts_num)
1097
1098     def test_interface_addr(self):
1099         """ Acquire SNAT addresses from interface """
1100         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1101
1102         # no address in NAT pool
1103         adresses = self.vapi.snat_address_dump()
1104         self.assertEqual(0, len(adresses))
1105
1106         # configure interface address and check NAT address pool
1107         self.pg7.config_ip4()
1108         adresses = self.vapi.snat_address_dump()
1109         self.assertEqual(1, len(adresses))
1110         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1111
1112         # remove interface address and check NAT address pool
1113         self.pg7.unconfig_ip4()
1114         adresses = self.vapi.snat_address_dump()
1115         self.assertEqual(0, len(adresses))
1116
1117     def test_interface_addr_static_mapping(self):
1118         """ Static mapping with addresses from interface """
1119         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1120         self.snat_add_static_mapping('1.2.3.4',
1121                                      external_sw_if_index=self.pg7.sw_if_index)
1122
1123         # static mappings with external interface
1124         static_mappings = self.vapi.snat_static_mapping_dump()
1125         self.assertEqual(1, len(static_mappings))
1126         self.assertEqual(self.pg7.sw_if_index,
1127                          static_mappings[0].external_sw_if_index)
1128
1129         # configure interface address and check static mappings
1130         self.pg7.config_ip4()
1131         static_mappings = self.vapi.snat_static_mapping_dump()
1132         self.assertEqual(1, len(static_mappings))
1133         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1134                          self.pg7.local_ip4n)
1135         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1136
1137         # remove interface address and check static mappings
1138         self.pg7.unconfig_ip4()
1139         static_mappings = self.vapi.snat_static_mapping_dump()
1140         self.assertEqual(0, len(static_mappings))
1141
1142     def test_ipfix_nat44_sess(self):
1143         """ S-NAT IPFIX logging NAT44 session created/delted """
1144         self.snat_add_address(self.snat_addr)
1145         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1146         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1147                                                  is_inside=0)
1148         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1149                                      src_address=self.pg3.local_ip4n,
1150                                      path_mtu=512,
1151                                      template_interval=10)
1152         self.vapi.snat_ipfix()
1153
1154         pkts = self.create_stream_in(self.pg0, self.pg1)
1155         self.pg0.add_stream(pkts)
1156         self.pg_enable_capture(self.pg_interfaces)
1157         self.pg_start()
1158         capture = self.pg1.get_capture(len(pkts))
1159         self.verify_capture_out(capture)
1160         self.snat_add_address(self.snat_addr, is_add=0)
1161         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1162         capture = self.pg3.get_capture(3)
1163         ipfix = IPFIXDecoder()
1164         # first load template
1165         for p in capture:
1166             self.assertTrue(p.haslayer(IPFIX))
1167             if p.haslayer(Template):
1168                 ipfix.add_template(p.getlayer(Template))
1169         # verify events in data set
1170         for p in capture:
1171             if p.haslayer(Data):
1172                 data = ipfix.decode_data_set(p.getlayer(Set))
1173                 self.verify_ipfix_nat44_ses(data)
1174
1175     def test_ipfix_addr_exhausted(self):
1176         """ S-NAT IPFIX logging NAT addresses exhausted """
1177         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1178         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1179                                                  is_inside=0)
1180         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1181                                      src_address=self.pg3.local_ip4n,
1182                                      path_mtu=512,
1183                                      template_interval=10)
1184         self.vapi.snat_ipfix()
1185
1186         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1187              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1188              TCP(sport=3025))
1189         self.pg0.add_stream(p)
1190         self.pg_enable_capture(self.pg_interfaces)
1191         self.pg_start()
1192         capture = self.pg1.get_capture(0)
1193         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1194         capture = self.pg3.get_capture(3)
1195         ipfix = IPFIXDecoder()
1196         # first load template
1197         for p in capture:
1198             self.assertTrue(p.haslayer(IPFIX))
1199             if p.haslayer(Template):
1200                 ipfix.add_template(p.getlayer(Template))
1201         # verify events in data set
1202         for p in capture:
1203             if p.haslayer(Data):
1204                 data = ipfix.decode_data_set(p.getlayer(Set))
1205                 self.verify_ipfix_addr_exhausted(data)
1206
1207     def test_pool_addr_fib(self):
1208         """ S-NAT add pool addresses to FIB """
1209         static_addr = '10.0.0.10'
1210         self.snat_add_address(self.snat_addr)
1211         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1212         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1213                                                  is_inside=0)
1214         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1215
1216         # SNAT address
1217         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1218              ARP(op=ARP.who_has, pdst=self.snat_addr,
1219                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1220         self.pg1.add_stream(p)
1221         self.pg_enable_capture(self.pg_interfaces)
1222         self.pg_start()
1223         capture = self.pg1.get_capture(1)
1224         self.assertTrue(capture[0].haslayer(ARP))
1225         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1226
1227         # 1:1 NAT address
1228         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1229              ARP(op=ARP.who_has, pdst=static_addr,
1230                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1231         self.pg1.add_stream(p)
1232         self.pg_enable_capture(self.pg_interfaces)
1233         self.pg_start()
1234         capture = self.pg1.get_capture(1)
1235         self.assertTrue(capture[0].haslayer(ARP))
1236         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1237
1238         # send ARP to non-SNAT interface
1239         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1240              ARP(op=ARP.who_has, pdst=self.snat_addr,
1241                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1242         self.pg2.add_stream(p)
1243         self.pg_enable_capture(self.pg_interfaces)
1244         self.pg_start()
1245         capture = self.pg1.get_capture(0)
1246
1247         # remove addresses and verify
1248         self.snat_add_address(self.snat_addr, is_add=0)
1249         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1250                                      is_add=0)
1251
1252         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1253              ARP(op=ARP.who_has, pdst=self.snat_addr,
1254                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1255         self.pg1.add_stream(p)
1256         self.pg_enable_capture(self.pg_interfaces)
1257         self.pg_start()
1258         capture = self.pg1.get_capture(0)
1259
1260         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1261              ARP(op=ARP.who_has, pdst=static_addr,
1262                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1263         self.pg1.add_stream(p)
1264         self.pg_enable_capture(self.pg_interfaces)
1265         self.pg_start()
1266         capture = self.pg1.get_capture(0)
1267
1268     def test_vrf_mode(self):
1269         """ S-NAT tenant VRF aware address pool mode """
1270
1271         vrf_id1 = 1
1272         vrf_id2 = 2
1273         nat_ip1 = "10.0.0.10"
1274         nat_ip2 = "10.0.0.11"
1275
1276         self.pg0.unconfig_ip4()
1277         self.pg1.unconfig_ip4()
1278         self.pg0.set_table_ip4(vrf_id1)
1279         self.pg1.set_table_ip4(vrf_id2)
1280         self.pg0.config_ip4()
1281         self.pg1.config_ip4()
1282
1283         self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1284         self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1285         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1286         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1287         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1288                                                  is_inside=0)
1289
1290         # first VRF
1291         pkts = self.create_stream_in(self.pg0, self.pg2)
1292         self.pg0.add_stream(pkts)
1293         self.pg_enable_capture(self.pg_interfaces)
1294         self.pg_start()
1295         capture = self.pg2.get_capture(len(pkts))
1296         self.verify_capture_out(capture, nat_ip1)
1297
1298         # second VRF
1299         pkts = self.create_stream_in(self.pg1, self.pg2)
1300         self.pg1.add_stream(pkts)
1301         self.pg_enable_capture(self.pg_interfaces)
1302         self.pg_start()
1303         capture = self.pg2.get_capture(len(pkts))
1304         self.verify_capture_out(capture, nat_ip2)
1305
1306     def test_vrf_feature_independent(self):
1307         """ S-NAT tenant VRF independent address pool mode """
1308
1309         nat_ip1 = "10.0.0.10"
1310         nat_ip2 = "10.0.0.11"
1311
1312         self.snat_add_address(nat_ip1)
1313         self.snat_add_address(nat_ip2)
1314         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1315         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1316         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1317                                                  is_inside=0)
1318
1319         # first VRF
1320         pkts = self.create_stream_in(self.pg0, self.pg2)
1321         self.pg0.add_stream(pkts)
1322         self.pg_enable_capture(self.pg_interfaces)
1323         self.pg_start()
1324         capture = self.pg2.get_capture(len(pkts))
1325         self.verify_capture_out(capture, nat_ip1)
1326
1327         # second VRF
1328         pkts = self.create_stream_in(self.pg1, self.pg2)
1329         self.pg1.add_stream(pkts)
1330         self.pg_enable_capture(self.pg_interfaces)
1331         self.pg_start()
1332         capture = self.pg2.get_capture(len(pkts))
1333         self.verify_capture_out(capture, nat_ip1)
1334
1335     def test_dynamic_ipless_interfaces(self):
1336         """ SNAT interfaces without configured ip dynamic map """
1337
1338         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1339                                       self.pg7.remote_mac,
1340                                       self.pg7.remote_ip4n,
1341                                       is_static=1)
1342         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1343                                       self.pg8.remote_mac,
1344                                       self.pg8.remote_ip4n,
1345                                       is_static=1)
1346
1347         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1348                                    dst_address_length=32,
1349                                    next_hop_address=self.pg7.remote_ip4n,
1350                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1351         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1352                                    dst_address_length=32,
1353                                    next_hop_address=self.pg8.remote_ip4n,
1354                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1355
1356         self.snat_add_address(self.snat_addr)
1357         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1358         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1359                                                  is_inside=0)
1360
1361         # in2out
1362         pkts = self.create_stream_in(self.pg7, self.pg8)
1363         self.pg7.add_stream(pkts)
1364         self.pg_enable_capture(self.pg_interfaces)
1365         self.pg_start()
1366         capture = self.pg8.get_capture(len(pkts))
1367         self.verify_capture_out(capture)
1368
1369         # out2in
1370         pkts = self.create_stream_out(self.pg8, self.snat_addr)
1371         self.pg8.add_stream(pkts)
1372         self.pg_enable_capture(self.pg_interfaces)
1373         self.pg_start()
1374         capture = self.pg7.get_capture(len(pkts))
1375         self.verify_capture_in(capture, self.pg7)
1376
1377     def test_static_ipless_interfaces(self):
1378         """ SNAT 1:1 NAT interfaces without configured ip """
1379
1380         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1381                                       self.pg7.remote_mac,
1382                                       self.pg7.remote_ip4n,
1383                                       is_static=1)
1384         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1385                                       self.pg8.remote_mac,
1386                                       self.pg8.remote_ip4n,
1387                                       is_static=1)
1388
1389         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1390                                    dst_address_length=32,
1391                                    next_hop_address=self.pg7.remote_ip4n,
1392                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1393         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1394                                    dst_address_length=32,
1395                                    next_hop_address=self.pg8.remote_ip4n,
1396                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1397
1398         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1399         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1400         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1401                                                  is_inside=0)
1402
1403         # out2in
1404         pkts = self.create_stream_out(self.pg8)
1405         self.pg8.add_stream(pkts)
1406         self.pg_enable_capture(self.pg_interfaces)
1407         self.pg_start()
1408         capture = self.pg7.get_capture(len(pkts))
1409         self.verify_capture_in(capture, self.pg7)
1410
1411         # in2out
1412         pkts = self.create_stream_in(self.pg7, self.pg8)
1413         self.pg7.add_stream(pkts)
1414         self.pg_enable_capture(self.pg_interfaces)
1415         self.pg_start()
1416         capture = self.pg8.get_capture(len(pkts))
1417         self.verify_capture_out(capture, self.snat_addr, True)
1418
1419     def test_static_with_port_ipless_interfaces(self):
1420         """ SNAT 1:1 NAT with port interfaces without configured ip """
1421
1422         self.tcp_port_out = 30606
1423         self.udp_port_out = 30607
1424         self.icmp_id_out = 30608
1425
1426         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1427                                       self.pg7.remote_mac,
1428                                       self.pg7.remote_ip4n,
1429                                       is_static=1)
1430         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1431                                       self.pg8.remote_mac,
1432                                       self.pg8.remote_ip4n,
1433                                       is_static=1)
1434
1435         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1436                                    dst_address_length=32,
1437                                    next_hop_address=self.pg7.remote_ip4n,
1438                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1439         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1440                                    dst_address_length=32,
1441                                    next_hop_address=self.pg8.remote_ip4n,
1442                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1443
1444         self.snat_add_address(self.snat_addr)
1445         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1446                                      self.tcp_port_in, self.tcp_port_out,
1447                                      proto=IP_PROTOS.tcp)
1448         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1449                                      self.udp_port_in, self.udp_port_out,
1450                                      proto=IP_PROTOS.udp)
1451         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1452                                      self.icmp_id_in, self.icmp_id_out,
1453                                      proto=IP_PROTOS.icmp)
1454         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1455         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1456                                                  is_inside=0)
1457
1458         # out2in
1459         pkts = self.create_stream_out(self.pg8)
1460         self.pg8.add_stream(pkts)
1461         self.pg_enable_capture(self.pg_interfaces)
1462         self.pg_start()
1463         capture = self.pg7.get_capture(len(pkts))
1464         self.verify_capture_in(capture, self.pg7)
1465
1466         # in2out
1467         pkts = self.create_stream_in(self.pg7, self.pg8)
1468         self.pg7.add_stream(pkts)
1469         self.pg_enable_capture(self.pg_interfaces)
1470         self.pg_start()
1471         capture = self.pg8.get_capture(len(pkts))
1472         self.verify_capture_out(capture)
1473
1474     def tearDown(self):
1475         super(TestSNAT, self).tearDown()
1476         if not self.vpp_dead:
1477             self.logger.info(self.vapi.cli("show snat verbose"))
1478             self.clear_snat()
1479
1480
1481 class TestDeterministicNAT(MethodHolder):
1482     """ Deterministic NAT Test Cases """
1483
1484     @classmethod
1485     def setUpConstants(cls):
1486         super(TestDeterministicNAT, cls).setUpConstants()
1487         cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1488
1489     @classmethod
1490     def setUpClass(cls):
1491         super(TestDeterministicNAT, cls).setUpClass()
1492
1493         try:
1494             cls.tcp_port_in = 6303
1495             cls.tcp_external_port = 6303
1496             cls.udp_port_in = 6304
1497             cls.udp_external_port = 6304
1498             cls.icmp_id_in = 6305
1499             cls.snat_addr = '10.0.0.3'
1500
1501             cls.create_pg_interfaces(range(3))
1502             cls.interfaces = list(cls.pg_interfaces)
1503
1504             for i in cls.interfaces:
1505                 i.admin_up()
1506                 i.config_ip4()
1507                 i.resolve_arp()
1508
1509             cls.pg0.generate_remote_hosts(2)
1510             cls.pg0.configure_ipv4_neighbors()
1511
1512         except Exception:
1513             super(TestDeterministicNAT, cls).tearDownClass()
1514             raise
1515
1516     def create_stream_in(self, in_if, out_if, ttl=64):
1517         """
1518         Create packet stream for inside network
1519
1520         :param in_if: Inside interface
1521         :param out_if: Outside interface
1522         :param ttl: TTL of generated packets
1523         """
1524         pkts = []
1525         # TCP
1526         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1527              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1528              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1529         pkts.append(p)
1530
1531         # UDP
1532         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1533              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1534              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
1535         pkts.append(p)
1536
1537         # ICMP
1538         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1539              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1540              ICMP(id=self.icmp_id_in, type='echo-request'))
1541         pkts.append(p)
1542
1543         return pkts
1544
1545     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
1546         """
1547         Create packet stream for outside network
1548
1549         :param out_if: Outside interface
1550         :param dst_ip: Destination IP address (Default use global SNAT address)
1551         :param ttl: TTL of generated packets
1552         """
1553         if dst_ip is None:
1554             dst_ip = self.snat_addr
1555         pkts = []
1556         # TCP
1557         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1558              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1559              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
1560         pkts.append(p)
1561
1562         # UDP
1563         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1564              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1565              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
1566         pkts.append(p)
1567
1568         # ICMP
1569         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1570              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1571              ICMP(id=self.icmp_external_id, type='echo-reply'))
1572         pkts.append(p)
1573
1574         return pkts
1575
1576     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
1577         """
1578         Verify captured packets on outside network
1579
1580         :param capture: Captured packets
1581         :param nat_ip: Translated IP address (Default use global SNAT address)
1582         :param same_port: Sorce port number is not translated (Default False)
1583         :param packet_num: Expected number of packets (Default 3)
1584         """
1585         if nat_ip is None:
1586             nat_ip = self.snat_addr
1587         self.assertEqual(packet_num, len(capture))
1588         for packet in capture:
1589             try:
1590                 self.assertEqual(packet[IP].src, nat_ip)
1591                 if packet.haslayer(TCP):
1592                     self.tcp_port_out = packet[TCP].sport
1593                 elif packet.haslayer(UDP):
1594                     self.udp_port_out = packet[UDP].sport
1595                 else:
1596                     self.icmp_external_id = packet[ICMP].id
1597             except:
1598                 self.logger.error(ppp("Unexpected or invalid packet "
1599                                       "(outside network):", packet))
1600                 raise
1601
1602     def initiate_tcp_session(self, in_if, out_if):
1603         """
1604         Initiates TCP session
1605
1606         :param in_if: Inside interface
1607         :param out_if: Outside interface
1608         """
1609         try:
1610             # SYN packet in->out
1611             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1612                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1613                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1614                      flags="S"))
1615             in_if.add_stream(p)
1616             self.pg_enable_capture(self.pg_interfaces)
1617             self.pg_start()
1618             capture = out_if.get_capture(1)
1619             p = capture[0]
1620             self.tcp_port_out = p[TCP].sport
1621
1622             # SYN + ACK packet out->in
1623             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
1624                  IP(src=out_if.remote_ip4, dst=self.snat_addr) /
1625                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1626                      flags="SA"))
1627             out_if.add_stream(p)
1628             self.pg_enable_capture(self.pg_interfaces)
1629             self.pg_start()
1630             in_if.get_capture(1)
1631
1632             # ACK packet in->out
1633             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1634                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1635                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1636                      flags="A"))
1637             in_if.add_stream(p)
1638             self.pg_enable_capture(self.pg_interfaces)
1639             self.pg_start()
1640             out_if.get_capture(1)
1641
1642         except:
1643             self.logger.error("TCP 3 way handshake failed")
1644             raise
1645
1646     def verify_ipfix_max_entries_per_user(self, data):
1647         """
1648         Verify IPFIX maximum entries per user exceeded event
1649
1650         :param data: Decoded IPFIX data records
1651         """
1652         self.assertEqual(1, len(data))
1653         record = data[0]
1654         # natEvent
1655         self.assertEqual(ord(record[230]), 13)
1656         # natQuotaExceededEvent
1657         self.assertEqual('\x03\x00\x00\x00', record[466])
1658         # sourceIPv4Address
1659         self.assertEqual(self.pg0.remote_ip4n, record[8])
1660
1661     def test_deterministic_mode(self):
1662         """ S-NAT run deterministic mode """
1663         in_addr = '172.16.255.0'
1664         out_addr = '172.17.255.50'
1665         in_addr_t = '172.16.255.20'
1666         in_addr_n = socket.inet_aton(in_addr)
1667         out_addr_n = socket.inet_aton(out_addr)
1668         in_addr_t_n = socket.inet_aton(in_addr_t)
1669         in_plen = 24
1670         out_plen = 32
1671
1672         snat_config = self.vapi.snat_show_config()
1673         self.assertEqual(1, snat_config.deterministic)
1674
1675         self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
1676
1677         rep1 = self.vapi.snat_det_forward(in_addr_t_n)
1678         self.assertEqual(rep1.out_addr[:4], out_addr_n)
1679         rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
1680         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
1681
1682         deterministic_mappings = self.vapi.snat_det_map_dump()
1683         self.assertEqual(len(deterministic_mappings), 1)
1684         dsm = deterministic_mappings[0]
1685         self.assertEqual(in_addr_n, dsm.in_addr[:4])
1686         self.assertEqual(in_plen, dsm.in_plen)
1687         self.assertEqual(out_addr_n, dsm.out_addr[:4])
1688         self.assertEqual(out_plen, dsm.out_plen)
1689
1690         self.clear_snat()
1691         deterministic_mappings = self.vapi.snat_det_map_dump()
1692         self.assertEqual(len(deterministic_mappings), 0)
1693
1694     def test_set_timeouts(self):
1695         """ Set deterministic NAT timeouts """
1696         timeouts_before = self.vapi.snat_det_get_timeouts()
1697
1698         self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
1699                                         timeouts_before.tcp_established + 10,
1700                                         timeouts_before.tcp_transitory + 10,
1701                                         timeouts_before.icmp + 10)
1702
1703         timeouts_after = self.vapi.snat_det_get_timeouts()
1704
1705         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
1706         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
1707         self.assertNotEqual(timeouts_before.tcp_established,
1708                             timeouts_after.tcp_established)
1709         self.assertNotEqual(timeouts_before.tcp_transitory,
1710                             timeouts_after.tcp_transitory)
1711
1712     def test_det_in(self):
1713         """ CGNAT translation test (TCP, UDP, ICMP) """
1714
1715         nat_ip = "10.0.0.10"
1716
1717         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1718                                    32,
1719                                    socket.inet_aton(nat_ip),
1720                                    32)
1721         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1722         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1723                                                  is_inside=0)
1724
1725         # in2out
1726         pkts = self.create_stream_in(self.pg0, self.pg1)
1727         self.pg0.add_stream(pkts)
1728         self.pg_enable_capture(self.pg_interfaces)
1729         self.pg_start()
1730         capture = self.pg1.get_capture(len(pkts))
1731         self.verify_capture_out(capture, nat_ip)
1732
1733         # out2in
1734         pkts = self.create_stream_out(self.pg1, nat_ip)
1735         self.pg1.add_stream(pkts)
1736         self.pg_enable_capture(self.pg_interfaces)
1737         self.pg_start()
1738         capture = self.pg0.get_capture(len(pkts))
1739         self.verify_capture_in(capture, self.pg0)
1740
1741         # session dump test
1742         sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
1743         self.assertEqual(len(sessions), 3)
1744
1745         # TCP session
1746         s = sessions[0]
1747         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
1748         self.assertEqual(s.in_port, self.tcp_port_in)
1749         self.assertEqual(s.out_port, self.tcp_port_out)
1750         self.assertEqual(s.ext_port, self.tcp_external_port)
1751
1752         # UDP session
1753         s = sessions[1]
1754         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
1755         self.assertEqual(s.in_port, self.udp_port_in)
1756         self.assertEqual(s.out_port, self.udp_port_out)
1757         self.assertEqual(s.ext_port, self.udp_external_port)
1758
1759         # ICMP session
1760         s = sessions[2]
1761         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
1762         self.assertEqual(s.in_port, self.icmp_id_in)
1763         self.assertEqual(s.out_port, self.icmp_external_id)
1764
1765     def test_multiple_users(self):
1766         """ CGNAT multiple users """
1767
1768         nat_ip = "10.0.0.10"
1769         port_in = 80
1770         external_port = 6303
1771
1772         host0 = self.pg0.remote_hosts[0]
1773         host1 = self.pg0.remote_hosts[1]
1774
1775         self.vapi.snat_add_det_map(host0.ip4n,
1776                                    24,
1777                                    socket.inet_aton(nat_ip),
1778                                    32)
1779         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1780         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1781                                                  is_inside=0)
1782
1783         # host0 to out
1784         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
1785              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
1786              TCP(sport=port_in, dport=external_port))
1787         self.pg0.add_stream(p)
1788         self.pg_enable_capture(self.pg_interfaces)
1789         self.pg_start()
1790         capture = self.pg1.get_capture(1)
1791         p = capture[0]
1792         try:
1793             ip = p[IP]
1794             tcp = p[TCP]
1795             self.assertEqual(ip.src, nat_ip)
1796             self.assertEqual(ip.dst, self.pg1.remote_ip4)
1797             self.assertEqual(tcp.dport, external_port)
1798             port_out0 = tcp.sport
1799         except:
1800             self.logger.error(ppp("Unexpected or invalid packet:", p))
1801             raise
1802
1803         # host1 to out
1804         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
1805              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
1806              TCP(sport=port_in, dport=external_port))
1807         self.pg0.add_stream(p)
1808         self.pg_enable_capture(self.pg_interfaces)
1809         self.pg_start()
1810         capture = self.pg1.get_capture(1)
1811         p = capture[0]
1812         try:
1813             ip = p[IP]
1814             tcp = p[TCP]
1815             self.assertEqual(ip.src, nat_ip)
1816             self.assertEqual(ip.dst, self.pg1.remote_ip4)
1817             self.assertEqual(tcp.dport, external_port)
1818             port_out1 = tcp.sport
1819         except:
1820             self.logger.error(ppp("Unexpected or invalid packet:", p))
1821             raise
1822
1823         dms = self.vapi.snat_det_map_dump()
1824         self.assertEqual(1, len(dms))
1825         self.assertEqual(2, dms[0].ses_num)
1826
1827         # out to host0
1828         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1829              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1830              TCP(sport=external_port, dport=port_out0))
1831         self.pg1.add_stream(p)
1832         self.pg_enable_capture(self.pg_interfaces)
1833         self.pg_start()
1834         capture = self.pg0.get_capture(1)
1835         p = capture[0]
1836         try:
1837             ip = p[IP]
1838             tcp = p[TCP]
1839             self.assertEqual(ip.src, self.pg1.remote_ip4)
1840             self.assertEqual(ip.dst, host0.ip4)
1841             self.assertEqual(tcp.dport, port_in)
1842             self.assertEqual(tcp.sport, external_port)
1843         except:
1844             self.logger.error(ppp("Unexpected or invalid packet:", p))
1845             raise
1846
1847         # out to host1
1848         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1849              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1850              TCP(sport=external_port, dport=port_out1))
1851         self.pg1.add_stream(p)
1852         self.pg_enable_capture(self.pg_interfaces)
1853         self.pg_start()
1854         capture = self.pg0.get_capture(1)
1855         p = capture[0]
1856         try:
1857             ip = p[IP]
1858             tcp = p[TCP]
1859             self.assertEqual(ip.src, self.pg1.remote_ip4)
1860             self.assertEqual(ip.dst, host1.ip4)
1861             self.assertEqual(tcp.dport, port_in)
1862             self.assertEqual(tcp.sport, external_port)
1863         except:
1864             self.logger.error(ppp("Unexpected or invalid packet", p))
1865             raise
1866
1867         # session close api test
1868         self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
1869                                              port_out1,
1870                                              self.pg1.remote_ip4n,
1871                                              external_port)
1872         dms = self.vapi.snat_det_map_dump()
1873         self.assertEqual(dms[0].ses_num, 1)
1874
1875         self.vapi.snat_det_close_session_in(host0.ip4n,
1876                                             port_in,
1877                                             self.pg1.remote_ip4n,
1878                                             external_port)
1879         dms = self.vapi.snat_det_map_dump()
1880         self.assertEqual(dms[0].ses_num, 0)
1881
1882     def test_tcp_session_close_detection_in(self):
1883         """ CGNAT TCP session close initiated from inside network """
1884         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1885                                    32,
1886                                    socket.inet_aton(self.snat_addr),
1887                                    32)
1888         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1889         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1890                                                  is_inside=0)
1891
1892         self.initiate_tcp_session(self.pg0, self.pg1)
1893
1894         # close the session from inside
1895         try:
1896             # FIN packet in -> out
1897             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1898                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1899                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1900                      flags="F"))
1901             self.pg0.add_stream(p)
1902             self.pg_enable_capture(self.pg_interfaces)
1903             self.pg_start()
1904             self.pg1.get_capture(1)
1905
1906             pkts = []
1907
1908             # ACK packet out -> in
1909             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1910                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1911                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1912                      flags="A"))
1913             pkts.append(p)
1914
1915             # FIN packet out -> in
1916             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1917                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1918                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1919                      flags="F"))
1920             pkts.append(p)
1921
1922             self.pg1.add_stream(pkts)
1923             self.pg_enable_capture(self.pg_interfaces)
1924             self.pg_start()
1925             self.pg0.get_capture(2)
1926
1927             # ACK packet in -> out
1928             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1929                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1930                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1931                      flags="A"))
1932             self.pg0.add_stream(p)
1933             self.pg_enable_capture(self.pg_interfaces)
1934             self.pg_start()
1935             self.pg1.get_capture(1)
1936
1937             # Check if snat closed the session
1938             dms = self.vapi.snat_det_map_dump()
1939             self.assertEqual(0, dms[0].ses_num)
1940         except:
1941             self.logger.error("TCP session termination failed")
1942             raise
1943
1944     def test_tcp_session_close_detection_out(self):
1945         """ CGNAT TCP session close initiated from outside network """
1946         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1947                                    32,
1948                                    socket.inet_aton(self.snat_addr),
1949                                    32)
1950         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1951         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1952                                                  is_inside=0)
1953
1954         self.initiate_tcp_session(self.pg0, self.pg1)
1955
1956         # close the session from outside
1957         try:
1958             # FIN packet out -> in
1959             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1960                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1961                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1962                      flags="F"))
1963             self.pg1.add_stream(p)
1964             self.pg_enable_capture(self.pg_interfaces)
1965             self.pg_start()
1966             self.pg0.get_capture(1)
1967
1968             pkts = []
1969
1970             # ACK packet in -> out
1971             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1972                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1973                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1974                      flags="A"))
1975             pkts.append(p)
1976
1977             # ACK packet in -> out
1978             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1979                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1980                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1981                      flags="F"))
1982             pkts.append(p)
1983
1984             self.pg0.add_stream(pkts)
1985             self.pg_enable_capture(self.pg_interfaces)
1986             self.pg_start()
1987             self.pg1.get_capture(2)
1988
1989             # ACK packet out -> in
1990             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1991                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1992                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1993                      flags="A"))
1994             self.pg1.add_stream(p)
1995             self.pg_enable_capture(self.pg_interfaces)
1996             self.pg_start()
1997             self.pg0.get_capture(1)
1998
1999             # Check if snat closed the session
2000             dms = self.vapi.snat_det_map_dump()
2001             self.assertEqual(0, dms[0].ses_num)
2002         except:
2003             self.logger.error("TCP session termination failed")
2004             raise
2005
2006     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2007     def test_session_timeout(self):
2008         """ CGNAT session timeouts """
2009         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2010                                    32,
2011                                    socket.inet_aton(self.snat_addr),
2012                                    32)
2013         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2014         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2015                                                  is_inside=0)
2016
2017         self.initiate_tcp_session(self.pg0, self.pg1)
2018         self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2019         pkts = self.create_stream_in(self.pg0, self.pg1)
2020         self.pg0.add_stream(pkts)
2021         self.pg_enable_capture(self.pg_interfaces)
2022         self.pg_start()
2023         capture = self.pg1.get_capture(len(pkts))
2024         sleep(15)
2025
2026         dms = self.vapi.snat_det_map_dump()
2027         self.assertEqual(0, dms[0].ses_num)
2028
2029     def test_session_limit_per_user(self):
2030         """ CGNAT maximum 1000 sessions per user should be created """
2031         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2032                                    32,
2033                                    socket.inet_aton(self.snat_addr),
2034                                    32)
2035         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2036         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2037                                                  is_inside=0)
2038         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2039                                      src_address=self.pg2.local_ip4n,
2040                                      path_mtu=512,
2041                                      template_interval=10)
2042         self.vapi.snat_ipfix()
2043
2044         pkts = []
2045         for port in range(1025, 2025):
2046             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2047                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2048                  UDP(sport=port, dport=port))
2049             pkts.append(p)
2050
2051         self.pg0.add_stream(pkts)
2052         self.pg_enable_capture(self.pg_interfaces)
2053         self.pg_start()
2054         capture = self.pg1.get_capture(len(pkts))
2055
2056         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2057              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2058              UDP(sport=3001, dport=3002))
2059         self.pg0.add_stream(p)
2060         self.pg_enable_capture(self.pg_interfaces)
2061         self.pg_start()
2062         capture = self.pg1.assert_nothing_captured()
2063
2064         # verify ICMP error packet
2065         capture = self.pg0.get_capture(1)
2066         p = capture[0]
2067         self.assertTrue(p.haslayer(ICMP))
2068         icmp = p[ICMP]
2069         self.assertEqual(icmp.type, 3)
2070         self.assertEqual(icmp.code, 1)
2071         self.assertTrue(icmp.haslayer(IPerror))
2072         inner_ip = icmp[IPerror]
2073         self.assertEqual(inner_ip[UDPerror].sport, 3001)
2074         self.assertEqual(inner_ip[UDPerror].dport, 3002)
2075
2076         dms = self.vapi.snat_det_map_dump()
2077
2078         self.assertEqual(1000, dms[0].ses_num)
2079
2080         # verify IPFIX logging
2081         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2082         capture = self.pg2.get_capture(2)
2083         ipfix = IPFIXDecoder()
2084         # first load template
2085         for p in capture:
2086             self.assertTrue(p.haslayer(IPFIX))
2087             if p.haslayer(Template):
2088                 ipfix.add_template(p.getlayer(Template))
2089         # verify events in data set
2090         for p in capture:
2091             if p.haslayer(Data):
2092                 data = ipfix.decode_data_set(p.getlayer(Set))
2093                 self.verify_ipfix_max_entries_per_user(data)
2094
2095     def clear_snat(self):
2096         """
2097         Clear SNAT configuration.
2098         """
2099         self.vapi.snat_ipfix(enable=0)
2100         self.vapi.snat_det_set_timeouts()
2101         deterministic_mappings = self.vapi.snat_det_map_dump()
2102         for dsm in deterministic_mappings:
2103             self.vapi.snat_add_det_map(dsm.in_addr,
2104                                        dsm.in_plen,
2105                                        dsm.out_addr,
2106                                        dsm.out_plen,
2107                                        is_add=0)
2108
2109         interfaces = self.vapi.snat_interface_dump()
2110         for intf in interfaces:
2111             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2112                                                      intf.is_inside,
2113                                                      is_add=0)
2114
2115     def tearDown(self):
2116         super(TestDeterministicNAT, self).tearDown()
2117         if not self.vpp_dead:
2118             self.logger.info(self.vapi.cli("show snat detail"))
2119             self.clear_snat()
2120
2121 if __name__ == '__main__':
2122     unittest.main(testRunner=VppTestRunner)