nat: nat44-ed add session timing out indicator in api
[vpp.git] / test / test_linux_cp.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from scapy.layers.inet import IP, UDP
6 from scapy.layers.inet6 import IPv6, Raw
7 from scapy.layers.l2 import Ether, ARP, Dot1Q
8
9 from util import reassemble4
10 from vpp_object import VppObject
11 from framework import VppTestCase, VppTestRunner
12 from vpp_ipip_tun_interface import VppIpIpTunInterface
13 from template_ipsec import TemplateIpsec, IpsecTun4Tests, \
14     IpsecTun4, mk_scapy_crypt_key, config_tun_params
15 from template_ipsec import TemplateIpsec, IpsecTun4Tests, \
16     IpsecTun4, mk_scapy_crypt_key, config_tun_params
17 from test_ipsec_tun_if_esp import TemplateIpsecItf4
18 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
19
20
21 class VppLcpPair(VppObject):
22     def __init__(self, test, phy, host):
23         self._test = test
24         self.phy = phy
25         self.host = host
26
27     def add_vpp_config(self):
28         self._test.vapi.cli("test lcp add phy %s host %s" %
29                             (self.phy, self.host))
30         self._test.registry.register(self, self._test.logger)
31         return self
32
33     def remove_vpp_config(self):
34         self._test.vapi.cli("test lcp del phy %s host %s" %
35                             (self.phy, self.host))
36
37     def object_id(self):
38         return "lcp:%d:%d" % (self.phy.sw_if_index,
39                               self.host.sw_if_index)
40
41     def query_vpp_config(self):
42         pairs = list(self._test.vapi.vpp.details_iter(
43             self._test.vapi.lcp_itf_pair_get))
44
45         for p in pairs:
46             if p.phy_sw_if_index == self.phy.sw_if_index and \
47                p.host_sw_if_index == self.host.sw_if_index:
48                 return True
49         return False
50
51
52 class TestLinuxCP(VppTestCase):
53     """ Linux Control Plane """
54
55     extra_vpp_plugin_config = ["plugin",
56                                "linux_cp_plugin.so",
57                                "{", "enable", "}",
58                                "plugin",
59                                "linux_cp_unittest_plugin.so",
60                                "{", "enable", "}"]
61
62     @classmethod
63     def setUpClass(cls):
64         super(TestLinuxCP, cls).setUpClass()
65
66     @classmethod
67     def tearDownClass(cls):
68         super(TestLinuxCP, cls).tearDownClass()
69
70     def setUp(self):
71         super(TestLinuxCP, self).setUp()
72
73         # create 4 pg interfaces so we can create two pairs
74         self.create_pg_interfaces(range(4))
75
76         # create on ip4 and one ip6 pg tun
77         self.pg_interfaces += self.create_pg_ip4_interfaces(range(4, 5))
78         self.pg_interfaces += self.create_pg_ip6_interfaces(range(5, 6))
79
80         for i in self.pg_interfaces:
81             i.admin_up()
82
83     def tearDown(self):
84         for i in self.pg_interfaces:
85             i.admin_down()
86         super(TestLinuxCP, self).tearDown()
87
88     def test_linux_cp_tap(self):
89         """ Linux CP TAP """
90
91         #
92         # Setup
93         #
94
95         arp_opts = {"who-has": 1, "is-at": 2}
96
97         # create two pairs, wihch a bunch of hots on the phys
98         hosts = [self.pg0, self.pg1]
99         phys = [self.pg2, self.pg3]
100         N_HOSTS = 4
101
102         for phy in phys:
103             phy.config_ip4()
104             phy.generate_remote_hosts(4)
105             phy.configure_ipv4_neighbors()
106
107         pair1 = VppLcpPair(self, phys[0], hosts[0]).add_vpp_config()
108         pair2 = VppLcpPair(self, phys[1], hosts[1]).add_vpp_config()
109
110         self.logger.info(self.vapi.cli("sh lcp adj verbose"))
111         self.logger.info(self.vapi.cli("sh lcp"))
112
113         #
114         # Traffic Tests
115         #
116
117         # hosts to phys
118         for phy, host in zip(phys, hosts):
119             for j in range(N_HOSTS):
120                 p = (Ether(src=phy.local_mac,
121                            dst=phy.remote_hosts[j].mac) /
122                      IP(src=phy.local_ip4,
123                         dst=phy.remote_hosts[j].ip4) /
124                      UDP(sport=1234, dport=1234) /
125                      Raw())
126
127                 rxs = self.send_and_expect(host, [p], phy)
128
129                 # verify packet is unchanged
130                 for rx in rxs:
131                     self.assertEqual(p.show2(True), rx.show2(True))
132
133                 # ARPs x-connect to phy
134                 p = (Ether(dst="ff:ff:ff:ff:ff:ff",
135                            src=phy.remote_hosts[j].mac) /
136                      ARP(op="who-has",
137                          hwdst=phy.remote_hosts[j].mac,
138                          hwsrc=phy.local_mac,
139                          psrc=phy.local_ip4,
140                          pdst=phy.remote_hosts[j].ip4))
141
142                 rxs = self.send_and_expect(host, [p], phy)
143
144                 # verify packet is unchanged
145                 for rx in rxs:
146                     self.assertEqual(p.show2(True), rx.show2(True))
147
148         # phy to host
149         for phy, host in zip(phys, hosts):
150             for j in range(N_HOSTS):
151                 p = (Ether(dst=phy.local_mac,
152                            src=phy.remote_hosts[j].mac) /
153                      IP(dst=phy.local_ip4,
154                         src=phy.remote_hosts[j].ip4) /
155                      UDP(sport=1234, dport=1234) /
156                      Raw())
157
158                 rxs = self.send_and_expect(phy, [p], host)
159
160                 # verify packet is unchanged
161                 for rx in rxs:
162                     self.assertEqual(p.show2(True), rx.show2(True))
163
164                 # ARPs rx'd on the phy are sent to the host
165                 p = (Ether(dst="ff:ff:ff:ff:ff:ff",
166                            src=phy.remote_hosts[j].mac) /
167                      ARP(op="is-at",
168                          hwsrc=phy.remote_hosts[j].mac,
169                          hwdst=phy.local_mac,
170                          pdst=phy.local_ip4,
171                          psrc=phy.remote_hosts[j].ip4))
172
173                 rxs = self.send_and_expect(phy, [p], host)
174
175                 # verify packet is unchanged
176                 for rx in rxs:
177                     self.assertEqual(p.show2(True), rx.show2(True))
178
179         # cleanup
180         for phy in phys:
181             phy.unconfig_ip4()
182
183     def test_linux_cp_tun(self):
184         """ Linux CP TUN """
185
186         #
187         # Setup
188         #
189         N_PKTS = 31
190
191         # create two pairs, wihch a bunch of hots on the phys
192         hosts = [self.pg4, self.pg5]
193         phy = self.pg2
194
195         phy.config_ip4()
196         phy.config_ip6()
197         phy.resolve_arp()
198         phy.resolve_ndp()
199
200         tun4 = VppIpIpTunInterface(
201             self,
202             phy,
203             phy.local_ip4,
204             phy.remote_ip4).add_vpp_config()
205         tun6 = VppIpIpTunInterface(
206             self,
207             phy,
208             phy.local_ip6,
209             phy.remote_ip6).add_vpp_config()
210         tuns = [tun4, tun6]
211
212         tun4.admin_up()
213         tun4.config_ip4()
214         tun6.admin_up()
215         tun6.config_ip6()
216
217         pair1 = VppLcpPair(self, tuns[0], hosts[0]).add_vpp_config()
218         pair2 = VppLcpPair(self, tuns[1], hosts[1]).add_vpp_config()
219
220         self.logger.info(self.vapi.cli("sh lcp adj verbose"))
221         self.logger.info(self.vapi.cli("sh lcp"))
222         self.logger.info(self.vapi.cli("sh ip punt redirect"))
223
224         #
225         # Traffic Tests
226         #
227
228         # host to phy for v4
229         p = (IP(src=tun4.local_ip4, dst="2.2.2.2") /
230              UDP(sport=1234, dport=1234) /
231              Raw())
232
233         rxs = self.send_and_expect(self.pg4, p * N_PKTS, phy)
234
235         # verify inner packet is unchanged and has the tunnel encap
236         for rx in rxs:
237             self.assertEqual(rx[Ether].dst, phy.remote_mac)
238             self.assertEqual(rx[IP].dst, phy.remote_ip4)
239             self.assertEqual(rx[IP].src, phy.local_ip4)
240             inner = IP(rx[IP].payload)
241             self.assertEqual(inner.src, tun4.local_ip4)
242             self.assertEqual(inner.dst, "2.2.2.2")
243
244         # host to phy for v6
245         p = (IPv6(src=tun6.local_ip6, dst="2::2") /
246              UDP(sport=1234, dport=1234) /
247              Raw())
248
249         rxs = self.send_and_expect(self.pg5, p * N_PKTS, phy)
250
251         # verify inner packet is unchanged and has the tunnel encap
252         for rx in rxs:
253             self.assertEqual(rx[IPv6].dst, phy.remote_ip6)
254             self.assertEqual(rx[IPv6].src, phy.local_ip6)
255             inner = IPv6(rx[IPv6].payload)
256             self.assertEqual(inner.src, tun6.local_ip6)
257             self.assertEqual(inner.dst, "2::2")
258
259         # phy to host v4
260         p = (Ether(dst=phy.local_mac, src=phy.remote_mac) /
261              IP(dst=phy.local_ip4, src=phy.remote_ip4) /
262              IP(dst=tun4.local_ip4, src=tun4.remote_ip4) /
263              UDP(sport=1234, dport=1234) /
264              Raw())
265
266         rxs = self.send_and_expect(phy, p * N_PKTS, self.pg4)
267         for rx in rxs:
268             rx = IP(rx)
269             self.assertEqual(rx[IP].dst, tun4.local_ip4)
270             self.assertEqual(rx[IP].src, tun4.remote_ip4)
271
272         # phy to host v6
273         p = (Ether(dst=phy.local_mac, src=phy.remote_mac) /
274              IPv6(dst=phy.local_ip6, src=phy.remote_ip6) /
275              IPv6(dst=tun6.local_ip6, src=tun6.remote_ip6) /
276              UDP(sport=1234, dport=1234) /
277              Raw())
278
279         rxs = self.send_and_expect(phy, p * N_PKTS, self.pg5)
280         for rx in rxs:
281             rx = IPv6(rx)
282             self.assertEqual(rx[IPv6].dst, tun6.local_ip6)
283             self.assertEqual(rx[IPv6].src, tun6.remote_ip6)
284
285         # cleanup
286         phy.unconfig_ip4()
287         phy.unconfig_ip6()
288
289         tun4.unconfig_ip4()
290         tun6.unconfig_ip6()
291
292
293 class TestLinuxCPIpsec(TemplateIpsec,
294                        TemplateIpsecItf4,
295                        IpsecTun4):
296     """ IPsec Interface IPv4 """
297
298     extra_vpp_plugin_config = ["plugin",
299                                "linux_cp_plugin.so",
300                                "{", "enable", "}",
301                                "plugin",
302                                "linux_cp_unittest_plugin.so",
303                                "{", "enable", "}"]
304
305     def setUp(self):
306         super(TestLinuxCPIpsec, self).setUp()
307
308         self.tun_if = self.pg0
309         self.pg_interfaces += self.create_pg_ip4_interfaces(range(3, 4))
310         self.pg_interfaces += self.create_pg_ip6_interfaces(range(4, 5))
311
312     def tearDown(self):
313         super(TestLinuxCPIpsec, self).tearDown()
314
315     def verify_encrypted(self, p, sa, rxs):
316         decrypt_pkts = []
317         for rx in rxs:
318             if p.nat_header:
319                 self.assertEqual(rx[UDP].dport, 4500)
320             self.assert_packet_checksums_valid(rx)
321             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
322             try:
323                 rx_ip = rx[IP]
324                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
325                 if not decrypt_pkt.haslayer(IP):
326                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
327                 if rx_ip.proto == socket.IPPROTO_ESP:
328                     self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
329                 decrypt_pkts.append(decrypt_pkt)
330                 self.assert_equal(decrypt_pkt.src, p.tun_if.local_ip4)
331                 self.assert_equal(decrypt_pkt.dst, p.tun_if.remote_ip4)
332             except:
333                 self.logger.debug(ppp("Unexpected packet:", rx))
334                 try:
335                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
336                 except:
337                     pass
338                 raise
339         pkts = reassemble4(decrypt_pkts)
340         for pkt in pkts:
341             self.assert_packet_checksums_valid(pkt)
342
343     def verify_decrypted(self, p, rxs):
344         for rx in rxs:
345             rx = IP(rx)
346             self.assert_equal(rx[IP].src, p.tun_if.remote_ip4)
347             self.assert_equal(rx[IP].dst, p.tun_if.local_ip4)
348             self.assert_packet_checksums_valid(rx)
349
350     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
351                          payload_size=54):
352         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
353                 sa.encrypt(IP(src=src, dst=dst) /
354                            UDP(sport=1111, dport=2222) /
355                            Raw(b'X' * payload_size))
356                 for i in range(count)]
357
358     def test_linux_cp_ipsec4_tun(self):
359         """ Linux CP Ipsec TUN """
360
361         #
362         # Setup
363         #
364         N_PKTS = 31
365
366         # the pg that paris with the tunnel
367         self.host = self.pg3
368
369         # tunnel and protection setup
370         p = self.ipv4_params
371
372         self.config_network(p)
373         self.config_sa_tun(p,
374                            self.pg0.local_ip4,
375                            self.pg0.remote_ip4)
376         self.config_protect(p)
377
378         pair = VppLcpPair(self, p.tun_if, self.host).add_vpp_config()
379
380         self.logger.error(self.vapi.cli("sh int addr"))
381         self.logger.info(self.vapi.cli("sh lcp"))
382         self.logger.info(self.vapi.cli("sh ip punt redirect"))
383
384         #
385         # Traffic Tests
386         #
387
388         # host to phy for v4
389         pkt = (IP(src=p.tun_if.local_ip4,
390                   dst=p.tun_if.remote_ip4) /
391                UDP(sport=1234, dport=1234) /
392                Raw())
393
394         rxs = self.send_and_expect(self.host, pkt * N_PKTS, self.tun_if)
395         self.verify_encrypted(p, p.vpp_tun_sa, rxs)
396
397         # phy to host for v4
398         pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
399                                      src=p.tun_if.remote_ip4,
400                                      dst=p.tun_if.local_ip4,
401                                      count=N_PKTS)
402         try:
403             rxs = self.send_and_expect(self.tun_if, pkts, self.host)
404             self.verify_decrypted(p, rxs)
405         finally:
406             self.logger.error(self.vapi.cli("sh trace"))
407
408         # cleanup
409         pair.remove_vpp_config()
410         self.unconfig_protect(p)
411         self.unconfig_sa(p)
412         self.unconfig_network(p)
413
414
415 if __name__ == '__main__':
416     unittest.main(testRunner=VppTestRunner)