http_static: misc bug fixes
[vpp.git] / test / test_wireguard.py
1 #!/usr/bin/env python3
2 """ Wg tests """
3
4 import datetime
5 import base64
6 import os
7
8 from hashlib import blake2s
9 from scapy.packet import Packet
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP, UDP
13 from scapy.layers.inet6 import IPv6
14 from scapy.contrib.wireguard import (
15     Wireguard,
16     WireguardResponse,
17     WireguardInitiation,
18     WireguardTransport,
19     WireguardCookieReply,
20 )
21 from cryptography.hazmat.primitives.asymmetric.x25519 import (
22     X25519PrivateKey,
23     X25519PublicKey,
24 )
25 from cryptography.hazmat.primitives.serialization import (
26     Encoding,
27     PrivateFormat,
28     PublicFormat,
29     NoEncryption,
30 )
31 from cryptography.hazmat.primitives.hashes import BLAKE2s, Hash
32 from cryptography.hazmat.primitives.hmac import HMAC
33 from cryptography.hazmat.backends import default_backend
34 from noise.connection import NoiseConnection, Keypair
35
36 from Crypto.Cipher import ChaCha20_Poly1305
37 from Crypto.Random import get_random_bytes
38
39 from vpp_ipip_tun_interface import VppIpIpTunInterface
40 from vpp_interface import VppInterface
41 from vpp_pg_interface import is_ipv6_misc
42 from vpp_ip_route import VppIpRoute, VppRoutePath
43 from vpp_object import VppObject
44 from vpp_papi import VppEnum
45 from framework import is_distro_ubuntu2204, is_distro_debian11, tag_fixme_vpp_debug
46 from framework import VppTestCase
47 from re import compile
48 import unittest
49
50 """ TestWg is a subclass of  VPPTestCase classes.
51
52 Wg test.
53
54 """
55
56
57 def private_key_bytes(k):
58     return k.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption())
59
60
61 def public_key_bytes(k):
62     return k.public_bytes(Encoding.Raw, PublicFormat.Raw)
63
64
65 def get_field_bytes(pkt, name):
66     fld, val = pkt.getfield_and_val(name)
67     return fld.i2m(pkt, val)
68
69
70 class VppWgInterface(VppInterface):
71     """
72     VPP WireGuard interface
73     """
74
75     def __init__(self, test, src, port):
76         super(VppWgInterface, self).__init__(test)
77
78         self.port = port
79         self.src = src
80         self.private_key = X25519PrivateKey.generate()
81         self.public_key = self.private_key.public_key()
82
83         # cookie related params
84         self.cookie_key = blake2s(b"cookie--" + self.public_key_bytes()).digest()
85
86     def public_key_bytes(self):
87         return public_key_bytes(self.public_key)
88
89     def private_key_bytes(self):
90         return private_key_bytes(self.private_key)
91
92     def add_vpp_config(self):
93         r = self.test.vapi.wireguard_interface_create(
94             interface={
95                 "user_instance": 0xFFFFFFFF,
96                 "port": self.port,
97                 "src_ip": self.src,
98                 "private_key": private_key_bytes(self.private_key),
99                 "generate_key": False,
100             }
101         )
102         self.set_sw_if_index(r.sw_if_index)
103         self.test.registry.register(self, self.test.logger)
104         return self
105
106     def remove_vpp_config(self):
107         self.test.vapi.wireguard_interface_delete(sw_if_index=self._sw_if_index)
108
109     def query_vpp_config(self):
110         ts = self.test.vapi.wireguard_interface_dump(sw_if_index=0xFFFFFFFF)
111         for t in ts:
112             if (
113                 t.interface.sw_if_index == self._sw_if_index
114                 and str(t.interface.src_ip) == self.src
115                 and t.interface.port == self.port
116                 and t.interface.private_key == private_key_bytes(self.private_key)
117             ):
118                 return True
119         return False
120
121     def want_events(self, peer_index=0xFFFFFFFF):
122         self.test.vapi.want_wireguard_peer_events(
123             enable_disable=1,
124             pid=os.getpid(),
125             sw_if_index=self._sw_if_index,
126             peer_index=peer_index,
127         )
128
129     def wait_events(self, expect, peers, timeout=5):
130         for i in range(len(peers)):
131             rv = self.test.vapi.wait_for_event(timeout, "wireguard_peer_event")
132             self.test.assertEqual(rv.peer_index, peers[i])
133             self.test.assertEqual(rv.flags, expect)
134
135     def __str__(self):
136         return self.object_id()
137
138     def object_id(self):
139         return "wireguard-%d" % self._sw_if_index
140
141
142 NOISE_HANDSHAKE_NAME = b"Noise_IKpsk2_25519_ChaChaPoly_BLAKE2s"
143 NOISE_IDENTIFIER_NAME = b"WireGuard v1 zx2c4 Jason@zx2c4.com"
144
145 HANDSHAKE_COUNTING_INTERVAL = 0.5
146 UNDER_LOAD_INTERVAL = 1.0
147 HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD = 40
148 HANDSHAKE_NUM_BEFORE_RATELIMITING = 5
149
150
151 class VppWgPeer(VppObject):
152     def __init__(self, test, itf, endpoint, port, allowed_ips, persistent_keepalive=15):
153         self._test = test
154         self.itf = itf
155         self.endpoint = endpoint
156         self.port = port
157         self.allowed_ips = allowed_ips
158         self.persistent_keepalive = persistent_keepalive
159
160         # remote peer's public
161         self.private_key = X25519PrivateKey.generate()
162         self.public_key = self.private_key.public_key()
163
164         # cookie related params
165         self.cookie_key = blake2s(b"cookie--" + self.public_key_bytes()).digest()
166         self.last_sent_cookie = None
167         self.last_mac1 = None
168         self.last_received_cookie = None
169
170         self.noise = NoiseConnection.from_name(NOISE_HANDSHAKE_NAME)
171
172     def change_endpoint(self, endpoint, port):
173         self.endpoint = endpoint
174         self.port = port
175
176     def add_vpp_config(self):
177         rv = self._test.vapi.wireguard_peer_add(
178             peer={
179                 "public_key": self.public_key_bytes(),
180                 "port": self.port,
181                 "endpoint": self.endpoint,
182                 "n_allowed_ips": len(self.allowed_ips),
183                 "allowed_ips": self.allowed_ips,
184                 "sw_if_index": self.itf.sw_if_index,
185                 "persistent_keepalive": self.persistent_keepalive,
186             }
187         )
188         self.index = rv.peer_index
189         self.receiver_index = self.index + 1
190         self._test.registry.register(self, self._test.logger)
191         return self
192
193     def remove_vpp_config(self):
194         self._test.vapi.wireguard_peer_remove(peer_index=self.index)
195
196     def object_id(self):
197         return "wireguard-peer-%s" % self.index
198
199     def public_key_bytes(self):
200         return public_key_bytes(self.public_key)
201
202     def query_vpp_config(self):
203         peers = self._test.vapi.wireguard_peers_dump()
204
205         for p in peers:
206             # "::" endpoint will be returned as "0.0.0.0" in peer's details
207             endpoint = "0.0.0.0" if self.endpoint == "::" else self.endpoint
208             if (
209                 p.peer.public_key == self.public_key_bytes()
210                 and p.peer.port == self.port
211                 and str(p.peer.endpoint) == endpoint
212                 and p.peer.sw_if_index == self.itf.sw_if_index
213                 and len(self.allowed_ips) == p.peer.n_allowed_ips
214             ):
215                 self.allowed_ips.sort()
216                 p.peer.allowed_ips.sort()
217
218                 for (a1, a2) in zip(self.allowed_ips, p.peer.allowed_ips):
219                     if str(a1) != str(a2):
220                         return False
221                 return True
222         return False
223
224     def mk_tunnel_header(self, tx_itf, is_ip6=False):
225         if is_ip6 is False:
226             return (
227                 Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac)
228                 / IP(src=self.endpoint, dst=self.itf.src)
229                 / UDP(sport=self.port, dport=self.itf.port)
230             )
231         else:
232             return (
233                 Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac)
234                 / IPv6(src=self.endpoint, dst=self.itf.src)
235                 / UDP(sport=self.port, dport=self.itf.port)
236             )
237
238     def noise_reset(self):
239         self.noise = NoiseConnection.from_name(NOISE_HANDSHAKE_NAME)
240
241     def noise_init(self, public_key=None):
242         self.noise.set_prologue(NOISE_IDENTIFIER_NAME)
243         self.noise.set_psks(psk=bytes(bytearray(32)))
244
245         if not public_key:
246             public_key = self.itf.public_key
247
248         # local/this private
249         self.noise.set_keypair_from_private_bytes(
250             Keypair.STATIC, private_key_bytes(self.private_key)
251         )
252         # remote's public
253         self.noise.set_keypair_from_public_bytes(
254             Keypair.REMOTE_STATIC, public_key_bytes(public_key)
255         )
256
257         self.noise.start_handshake()
258
259     def mk_cookie(self, p, tx_itf, is_resp=False, is_ip6=False):
260         self.verify_header(p, is_ip6)
261
262         wg_pkt = Wireguard(p[Raw])
263
264         if is_resp:
265             self._test.assertEqual(wg_pkt[Wireguard].message_type, 2)
266             self._test.assertEqual(wg_pkt[Wireguard].reserved_zero, 0)
267             self._test.assertEqual(wg_pkt[WireguardResponse].mac2, bytes([0] * 16))
268         else:
269             self._test.assertEqual(wg_pkt[Wireguard].message_type, 1)
270             self._test.assertEqual(wg_pkt[Wireguard].reserved_zero, 0)
271             self._test.assertEqual(wg_pkt[WireguardInitiation].mac2, bytes([0] * 16))
272
273         # collect info from wg packet (initiation or response)
274         src = get_field_bytes(p[IPv6 if is_ip6 else IP], "src")
275         sport = p[UDP].sport.to_bytes(2, byteorder="big")
276         if is_resp:
277             mac1 = wg_pkt[WireguardResponse].mac1
278             sender_index = wg_pkt[WireguardResponse].sender_index
279         else:
280             mac1 = wg_pkt[WireguardInitiation].mac1
281             sender_index = wg_pkt[WireguardInitiation].sender_index
282
283         # make cookie reply
284         cookie_reply = Wireguard() / WireguardCookieReply()
285         cookie_reply[Wireguard].message_type = 3
286         cookie_reply[Wireguard].reserved_zero = 0
287         cookie_reply[WireguardCookieReply].receiver_index = sender_index
288         nonce = get_random_bytes(24)
289         cookie_reply[WireguardCookieReply].nonce = nonce
290
291         # generate cookie data
292         changing_secret = get_random_bytes(32)
293         self.last_sent_cookie = blake2s(
294             src + sport, digest_size=16, key=changing_secret
295         ).digest()
296
297         # encrypt cookie data
298         cipher = ChaCha20_Poly1305.new(key=self.cookie_key, nonce=nonce)
299         cipher.update(mac1)
300         ciphertext, tag = cipher.encrypt_and_digest(self.last_sent_cookie)
301         cookie_reply[WireguardCookieReply].encrypted_cookie = ciphertext + tag
302
303         # prepare cookie reply to be sent
304         cookie_reply = self.mk_tunnel_header(tx_itf, is_ip6) / cookie_reply
305
306         return cookie_reply
307
308     def consume_cookie(self, p, is_ip6=False):
309         self.verify_header(p, is_ip6)
310
311         cookie_reply = Wireguard(p[Raw])
312
313         self._test.assertEqual(cookie_reply[Wireguard].message_type, 3)
314         self._test.assertEqual(cookie_reply[Wireguard].reserved_zero, 0)
315         self._test.assertEqual(
316             cookie_reply[WireguardCookieReply].receiver_index, self.receiver_index
317         )
318
319         # collect info from cookie reply
320         nonce = cookie_reply[WireguardCookieReply].nonce
321         encrypted_cookie = cookie_reply[WireguardCookieReply].encrypted_cookie
322         ciphertext, tag = encrypted_cookie[:16], encrypted_cookie[16:]
323
324         # decrypt cookie data
325         cipher = ChaCha20_Poly1305.new(key=self.itf.cookie_key, nonce=nonce)
326         cipher.update(self.last_mac1)
327         self.last_received_cookie = cipher.decrypt_and_verify(ciphertext, tag)
328
329     def mk_handshake(self, tx_itf, is_ip6=False, public_key=None):
330         self.noise.set_as_initiator()
331         self.noise_init(public_key)
332
333         p = Wireguard() / WireguardInitiation()
334
335         p[Wireguard].message_type = 1
336         p[Wireguard].reserved_zero = 0
337         p[WireguardInitiation].sender_index = self.receiver_index
338
339         # some random data for the message
340         #  lifted from the noise protocol's wireguard example
341         now = datetime.datetime.now()
342         tai = struct.pack(
343             "!qi",
344             4611686018427387914 + int(now.timestamp()),
345             int(now.microsecond * 1e3),
346         )
347         b = self.noise.write_message(payload=tai)
348
349         # load noise into init message
350         p[WireguardInitiation].unencrypted_ephemeral = b[0:32]
351         p[WireguardInitiation].encrypted_static = b[32:80]
352         p[WireguardInitiation].encrypted_timestamp = b[80:108]
353
354         # generate the mac1 hash
355         mac_key = blake2s(b"mac1----" + self.itf.public_key_bytes()).digest()
356         mac1 = blake2s(bytes(p)[0:116], digest_size=16, key=mac_key).digest()
357         p[WireguardInitiation].mac1 = mac1
358         self.last_mac1 = mac1
359
360         # generate the mac2 hash
361         if self.last_received_cookie:
362             mac2 = blake2s(
363                 bytes(p)[0:132], digest_size=16, key=self.last_received_cookie
364             ).digest()
365             p[WireguardInitiation].mac2 = mac2
366             self.last_received_cookie = None
367         else:
368             p[WireguardInitiation].mac2 = bytearray(16)
369
370         p = self.mk_tunnel_header(tx_itf, is_ip6) / p
371
372         return p
373
374     def verify_header(self, p, is_ip6=False):
375         if is_ip6 is False:
376             self._test.assertEqual(p[IP].src, self.itf.src)
377             self._test.assertEqual(p[IP].dst, self.endpoint)
378             self._test.assert_packet_checksums_valid(p)
379         else:
380             self._test.assertEqual(p[IPv6].src, self.itf.src)
381             self._test.assertEqual(p[IPv6].dst, self.endpoint)
382             self._test.assert_packet_checksums_valid(p, False)
383         self._test.assertEqual(p[UDP].sport, self.itf.port)
384         self._test.assertEqual(p[UDP].dport, self.port)
385
386     def consume_init(self, p, tx_itf, is_ip6=False, is_mac2=False):
387         self.noise.set_as_responder()
388         self.noise_init(self.itf.public_key)
389         self.verify_header(p, is_ip6)
390
391         init = Wireguard(p[Raw])
392
393         self._test.assertEqual(init[Wireguard].message_type, 1)
394         self._test.assertEqual(init[Wireguard].reserved_zero, 0)
395
396         self.sender = init[WireguardInitiation].sender_index
397
398         # validate the mac1 hash
399         mac_key = blake2s(b"mac1----" + public_key_bytes(self.public_key)).digest()
400         mac1 = blake2s(bytes(init)[0:-32], digest_size=16, key=mac_key).digest()
401         self._test.assertEqual(init[WireguardInitiation].mac1, mac1)
402
403         # validate the mac2 hash
404         if is_mac2:
405             self._test.assertNotEqual(init[WireguardInitiation].mac2, bytes([0] * 16))
406             self._test.assertNotEqual(self.last_sent_cookie, None)
407             mac2 = blake2s(
408                 bytes(init)[0:-16], digest_size=16, key=self.last_sent_cookie
409             ).digest()
410             self._test.assertEqual(init[WireguardInitiation].mac2, mac2)
411             self.last_sent_cookie = None
412         else:
413             self._test.assertEqual(init[WireguardInitiation].mac2, bytes([0] * 16))
414
415         # this passes only unencrypted_ephemeral, encrypted_static,
416         # encrypted_timestamp fields of the init
417         payload = self.noise.read_message(bytes(init)[8:-32])
418
419         # build the response
420         b = self.noise.write_message()
421         mac_key = blake2s(b"mac1----" + public_key_bytes(self.itf.public_key)).digest()
422         resp = Wireguard(message_type=2, reserved_zero=0) / WireguardResponse(
423             sender_index=self.receiver_index,
424             receiver_index=self.sender,
425             unencrypted_ephemeral=b[0:32],
426             encrypted_nothing=b[32:],
427         )
428         mac1 = blake2s(bytes(resp)[:-32], digest_size=16, key=mac_key).digest()
429         resp[WireguardResponse].mac1 = mac1
430         self.last_mac1 = mac1
431
432         resp = self.mk_tunnel_header(tx_itf, is_ip6) / resp
433         self._test.assertTrue(self.noise.handshake_finished)
434
435         return resp
436
437     def consume_response(self, p, is_ip6=False):
438         self.verify_header(p, is_ip6)
439
440         resp = Wireguard(p[Raw])
441
442         self._test.assertEqual(resp[Wireguard].message_type, 2)
443         self._test.assertEqual(resp[Wireguard].reserved_zero, 0)
444         self._test.assertEqual(
445             resp[WireguardResponse].receiver_index, self.receiver_index
446         )
447
448         self.sender = resp[Wireguard].sender_index
449
450         payload = self.noise.read_message(bytes(resp)[12:60])
451         self._test.assertEqual(payload, b"")
452         self._test.assertTrue(self.noise.handshake_finished)
453
454     def decrypt_transport(self, p, is_ip6=False):
455         self.verify_header(p, is_ip6)
456
457         p = Wireguard(p[Raw])
458         self._test.assertEqual(p[Wireguard].message_type, 4)
459         self._test.assertEqual(p[Wireguard].reserved_zero, 0)
460         self._test.assertEqual(
461             p[WireguardTransport].receiver_index, self.receiver_index
462         )
463
464         d = self.noise.decrypt(p[WireguardTransport].encrypted_encapsulated_packet)
465         return d
466
467     def encrypt_transport(self, p):
468         return self.noise.encrypt(bytes(p))
469
470     def validate_encapped(self, rxs, tx, is_tunnel_ip6=False, is_transport_ip6=False):
471         for rx in rxs:
472             rx = self.decrypt_transport(rx, is_tunnel_ip6)
473             if is_transport_ip6 is False:
474                 rx = IP(rx)
475                 # check the original packet is present
476                 self._test.assertEqual(rx[IP].dst, tx[IP].dst)
477                 self._test.assertEqual(rx[IP].ttl, tx[IP].ttl - 1)
478             else:
479                 rx = IPv6(rx)
480                 # check the original packet is present
481                 self._test.assertEqual(rx[IPv6].dst, tx[IPv6].dst)
482                 self._test.assertEqual(rx[IPv6].hlim, tx[IPv6].hlim - 1)
483
484     def want_events(self):
485         self._test.vapi.want_wireguard_peer_events(
486             enable_disable=1,
487             pid=os.getpid(),
488             peer_index=self.index,
489             sw_if_index=self.itf.sw_if_index,
490         )
491
492     def wait_event(self, expect, timeout=5):
493         rv = self._test.vapi.wait_for_event(timeout, "wireguard_peer_event")
494         self._test.assertEqual(rv.flags, expect)
495         self._test.assertEqual(rv.peer_index, self.index)
496
497
498 def is_handshake_init(p):
499     wg_p = Wireguard(p[Raw])
500
501     return wg_p[Wireguard].message_type == 1
502
503
504 class TestWg(VppTestCase):
505     """Wireguard Test Case"""
506
507     error_str = compile(r"Error")
508
509     wg4_output_node_name = "/err/wg4-output-tun/"
510     wg4_input_node_name = "/err/wg4-input/"
511     wg6_output_node_name = "/err/wg6-output-tun/"
512     wg6_input_node_name = "/err/wg6-input/"
513     kp4_error = wg4_output_node_name + "Keypair error"
514     mac4_error = wg4_input_node_name + "Invalid MAC handshake"
515     peer4_in_err = wg4_input_node_name + "Peer error"
516     peer4_out_err = wg4_output_node_name + "Peer error"
517     kp6_error = wg6_output_node_name + "Keypair error"
518     mac6_error = wg6_input_node_name + "Invalid MAC handshake"
519     peer6_in_err = wg6_input_node_name + "Peer error"
520     peer6_out_err = wg6_output_node_name + "Peer error"
521     cookie_dec4_err = wg4_input_node_name + "Failed during Cookie decryption"
522     cookie_dec6_err = wg6_input_node_name + "Failed during Cookie decryption"
523     ratelimited4_err = wg4_input_node_name + "Handshake ratelimited"
524     ratelimited6_err = wg6_input_node_name + "Handshake ratelimited"
525
526     @classmethod
527     def setUpClass(cls):
528         super(TestWg, cls).setUpClass()
529         if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
530             cls, "vpp"
531         ):
532             return
533         try:
534             cls.create_pg_interfaces(range(3))
535             for i in cls.pg_interfaces:
536                 i.admin_up()
537                 i.config_ip4()
538                 i.config_ip6()
539                 i.resolve_arp()
540                 i.resolve_ndp()
541
542         except Exception:
543             super(TestWg, cls).tearDownClass()
544             raise
545
546     @classmethod
547     def tearDownClass(cls):
548         super(TestWg, cls).tearDownClass()
549
550     def setUp(self):
551         super(VppTestCase, self).setUp()
552         self.base_kp4_err = self.statistics.get_err_counter(self.kp4_error)
553         self.base_mac4_err = self.statistics.get_err_counter(self.mac4_error)
554         self.base_peer4_in_err = self.statistics.get_err_counter(self.peer4_in_err)
555         self.base_peer4_out_err = self.statistics.get_err_counter(self.peer4_out_err)
556         self.base_kp6_err = self.statistics.get_err_counter(self.kp6_error)
557         self.base_mac6_err = self.statistics.get_err_counter(self.mac6_error)
558         self.base_peer6_in_err = self.statistics.get_err_counter(self.peer6_in_err)
559         self.base_peer6_out_err = self.statistics.get_err_counter(self.peer6_out_err)
560         self.base_cookie_dec4_err = self.statistics.get_err_counter(
561             self.cookie_dec4_err
562         )
563         self.base_cookie_dec6_err = self.statistics.get_err_counter(
564             self.cookie_dec6_err
565         )
566         self.base_ratelimited4_err = self.statistics.get_err_counter(
567             self.ratelimited4_err
568         )
569         self.base_ratelimited6_err = self.statistics.get_err_counter(
570             self.ratelimited6_err
571         )
572
573     def send_and_assert_no_replies_ignoring_init(
574         self, intf, pkts, remark="", timeout=None
575     ):
576         self.pg_send(intf, pkts)
577
578         def _filter_out_fn(p):
579             return is_ipv6_misc(p) or is_handshake_init(p)
580
581         try:
582             if not timeout:
583                 timeout = 1
584             for i in self.pg_interfaces:
585                 i.assert_nothing_captured(
586                     timeout=timeout, remark=remark, filter_out_fn=_filter_out_fn
587                 )
588                 timeout = 0.1
589         finally:
590             pass
591
592     def test_wg_interface(self):
593         """Simple interface creation"""
594         port = 12312
595
596         # Create interface
597         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
598
599         self.logger.info(self.vapi.cli("sh int"))
600
601         # delete interface
602         wg0.remove_vpp_config()
603
604     def test_handshake_hash(self):
605         """test hashing an init message"""
606         # a init packet generated by linux given the key below
607         h = (
608             "0100000098b9032b"
609             "55cc4b39e73c3d24"
610             "a2a1ab884b524a81"
611             "1808bb86640fb70d"
612             "e93154fec1879125"
613             "ab012624a27f0b75"
614             "c0a2582f438ddb5f"
615             "8e768af40b4ab444"
616             "02f9ff473e1b797e"
617             "80d39d93c5480c82"
618             "a3d4510f70396976"
619             "586fb67300a5167b"
620             "ae6ca3ff3dfd00eb"
621             "59be198810f5aa03"
622             "6abc243d2155ee4f"
623             "2336483900aef801"
624             "08752cd700000000"
625             "0000000000000000"
626             "00000000"
627         )
628
629         b = bytearray.fromhex(h)
630         tgt = Wireguard(b)
631
632         pubb = base64.b64decode("aRuHFTTxICIQNefp05oKWlJv3zgKxb8+WW7JJMh0jyM=")
633         pub = X25519PublicKey.from_public_bytes(pubb)
634
635         self.assertEqual(pubb, public_key_bytes(pub))
636
637         # strip the macs and build a new packet
638         init = b[0:-32]
639         mac_key = blake2s(b"mac1----" + public_key_bytes(pub)).digest()
640         init += blake2s(init, digest_size=16, key=mac_key).digest()
641         init += b"\x00" * 16
642
643         act = Wireguard(init)
644
645         self.assertEqual(tgt, act)
646
647     def _test_wg_send_cookie_tmpl(self, is_resp, is_ip6):
648         port = 12323
649
650         # create wg interface
651         if is_ip6:
652             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
653             wg0.admin_up()
654             wg0.config_ip6()
655         else:
656             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
657             wg0.admin_up()
658             wg0.config_ip4()
659
660         self.pg_enable_capture(self.pg_interfaces)
661         self.pg_start()
662
663         # create a peer
664         if is_ip6:
665             peer_1 = VppWgPeer(
666                 self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
667             ).add_vpp_config()
668         else:
669             peer_1 = VppWgPeer(
670                 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
671             ).add_vpp_config()
672         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
673
674         if is_resp:
675             # prepare and send a handshake initiation
676             # expect the peer to send a handshake response
677             init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
678             rxs = self.send_and_expect(self.pg1, [init], self.pg1)
679         else:
680             # wait for the peer to send a handshake initiation
681             rxs = self.pg1.get_capture(1, timeout=2)
682
683         # prepare and send a wrong cookie reply
684         # expect no replies and the cookie error incremented
685         cookie = peer_1.mk_cookie(rxs[0], self.pg1, is_resp=is_resp, is_ip6=is_ip6)
686         cookie.nonce = b"1234567890"
687         self.send_and_assert_no_replies(self.pg1, [cookie], timeout=0.1)
688         if is_ip6:
689             self.assertEqual(
690                 self.base_cookie_dec6_err + 1,
691                 self.statistics.get_err_counter(self.cookie_dec6_err),
692             )
693         else:
694             self.assertEqual(
695                 self.base_cookie_dec4_err + 1,
696                 self.statistics.get_err_counter(self.cookie_dec4_err),
697             )
698
699         # prepare and send a correct cookie reply
700         cookie = peer_1.mk_cookie(rxs[0], self.pg1, is_resp=is_resp, is_ip6=is_ip6)
701         self.pg_send(self.pg1, [cookie])
702
703         # wait for the peer to send a handshake initiation with mac2 set
704         rxs = self.pg1.get_capture(1, timeout=6)
705
706         # verify the initiation and its mac2
707         peer_1.consume_init(rxs[0], self.pg1, is_ip6=is_ip6, is_mac2=True)
708
709         # remove configs
710         peer_1.remove_vpp_config()
711         wg0.remove_vpp_config()
712
713     def test_wg_send_cookie_on_init_v4(self):
714         """Send cookie on handshake initiation (v4)"""
715         self._test_wg_send_cookie_tmpl(is_resp=False, is_ip6=False)
716
717     def test_wg_send_cookie_on_init_v6(self):
718         """Send cookie on handshake initiation (v6)"""
719         self._test_wg_send_cookie_tmpl(is_resp=False, is_ip6=True)
720
721     def test_wg_send_cookie_on_resp_v4(self):
722         """Send cookie on handshake response (v4)"""
723         self._test_wg_send_cookie_tmpl(is_resp=True, is_ip6=False)
724
725     def test_wg_send_cookie_on_resp_v6(self):
726         """Send cookie on handshake response (v6)"""
727         self._test_wg_send_cookie_tmpl(is_resp=True, is_ip6=True)
728
729     def _test_wg_receive_cookie_tmpl(self, is_resp, is_ip6):
730         port = 12323
731
732         # create wg interface
733         if is_ip6:
734             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
735             wg0.admin_up()
736             wg0.config_ip6()
737         else:
738             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
739             wg0.admin_up()
740             wg0.config_ip4()
741
742         self.pg_enable_capture(self.pg_interfaces)
743         self.pg_start()
744
745         # create a peer
746         if is_ip6:
747             peer_1 = VppWgPeer(
748                 self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
749             ).add_vpp_config()
750         else:
751             peer_1 = VppWgPeer(
752                 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
753             ).add_vpp_config()
754         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
755
756         if is_resp:
757             # wait for the peer to send a handshake initiation
758             rxs = self.pg1.get_capture(1, timeout=2)
759             # prepare and send a bunch of handshake responses
760             # expect to switch to under load state
761             resp = peer_1.consume_init(rxs[0], self.pg1, is_ip6=is_ip6)
762             txs = [resp] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
763             rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
764             # reset noise to be able to turn into initiator later
765             peer_1.noise_reset()
766         else:
767             # prepare and send a bunch of handshake initiations
768             # expect to switch to under load state
769             init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
770             txs = [init] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
771             rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
772
773         # expect the peer to send a cookie reply
774         peer_1.consume_cookie(rxs[-1], is_ip6=is_ip6)
775
776         # prepare and send a handshake initiation with wrong mac2
777         # expect a cookie reply
778         init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
779         init.mac2 = b"1234567890"
780         rxs = self.send_and_expect(self.pg1, [init], self.pg1)
781         peer_1.consume_cookie(rxs[0], is_ip6=is_ip6)
782
783         # prepare and send a handshake initiation with correct mac2
784         # expect a handshake response
785         init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
786         rxs = self.send_and_expect(self.pg1, [init], self.pg1)
787
788         # verify the response
789         peer_1.consume_response(rxs[0], is_ip6=is_ip6)
790
791         # clear up under load state
792         self.sleep(UNDER_LOAD_INTERVAL)
793
794         # remove configs
795         peer_1.remove_vpp_config()
796         wg0.remove_vpp_config()
797
798     def test_wg_receive_cookie_on_init_v4(self):
799         """Receive cookie on handshake initiation (v4)"""
800         self._test_wg_receive_cookie_tmpl(is_resp=False, is_ip6=False)
801
802     def test_wg_receive_cookie_on_init_v6(self):
803         """Receive cookie on handshake initiation (v6)"""
804         self._test_wg_receive_cookie_tmpl(is_resp=False, is_ip6=True)
805
806     def test_wg_receive_cookie_on_resp_v4(self):
807         """Receive cookie on handshake response (v4)"""
808         self._test_wg_receive_cookie_tmpl(is_resp=True, is_ip6=False)
809
810     def test_wg_receive_cookie_on_resp_v6(self):
811         """Receive cookie on handshake response (v6)"""
812         self._test_wg_receive_cookie_tmpl(is_resp=True, is_ip6=True)
813
814     def test_wg_under_load_interval(self):
815         """Under load interval"""
816         port = 12323
817
818         # create wg interface
819         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
820         wg0.admin_up()
821         wg0.config_ip4()
822
823         self.pg_enable_capture(self.pg_interfaces)
824         self.pg_start()
825
826         # create a peer
827         peer_1 = VppWgPeer(
828             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
829         ).add_vpp_config()
830         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
831
832         # prepare and send a bunch of handshake initiations
833         # expect to switch to under load state
834         init = peer_1.mk_handshake(self.pg1)
835         txs = [init] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
836         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
837
838         # expect the peer to send a cookie reply
839         peer_1.consume_cookie(rxs[-1])
840
841         # sleep till the next counting interval
842         # expect under load state is still active
843         self.sleep(HANDSHAKE_COUNTING_INTERVAL)
844
845         # prepare and send a handshake initiation with wrong mac2
846         # expect a cookie reply
847         init = peer_1.mk_handshake(self.pg1)
848         init.mac2 = b"1234567890"
849         rxs = self.send_and_expect(self.pg1, [init], self.pg1)
850         peer_1.consume_cookie(rxs[0])
851
852         # sleep till the end of being under load
853         # expect under load state is over
854         self.sleep(UNDER_LOAD_INTERVAL - HANDSHAKE_COUNTING_INTERVAL)
855
856         # prepare and send a handshake initiation with wrong mac2
857         # expect a handshake response
858         init = peer_1.mk_handshake(self.pg1)
859         init.mac2 = b"1234567890"
860         rxs = self.send_and_expect(self.pg1, [init], self.pg1)
861
862         # verify the response
863         peer_1.consume_response(rxs[0])
864
865         # remove configs
866         peer_1.remove_vpp_config()
867         wg0.remove_vpp_config()
868
869     def _test_wg_handshake_ratelimiting_tmpl(self, is_ip6):
870         port = 12323
871
872         # create wg interface
873         if is_ip6:
874             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
875             wg0.admin_up()
876             wg0.config_ip6()
877         else:
878             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
879             wg0.admin_up()
880             wg0.config_ip4()
881
882         self.pg_enable_capture(self.pg_interfaces)
883         self.pg_start()
884
885         # create a peer
886         if is_ip6:
887             peer_1 = VppWgPeer(
888                 self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
889             ).add_vpp_config()
890         else:
891             peer_1 = VppWgPeer(
892                 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
893             ).add_vpp_config()
894         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
895
896         # prepare and send a bunch of handshake initiations
897         # expect to switch to under load state
898         init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
899         txs = [init] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
900         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
901
902         # expect the peer to send a cookie reply
903         peer_1.consume_cookie(rxs[-1], is_ip6=is_ip6)
904
905         # prepare and send a bunch of handshake initiations with correct mac2
906         # expect a handshake response and then ratelimiting
907         NUM_TO_REJECT = 10
908         init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
909         txs = [init] * (HANDSHAKE_NUM_BEFORE_RATELIMITING + NUM_TO_REJECT)
910         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
911
912         if is_ip6:
913             self.assertEqual(
914                 self.base_ratelimited6_err + NUM_TO_REJECT,
915                 self.statistics.get_err_counter(self.ratelimited6_err),
916             )
917         else:
918             self.assertEqual(
919                 self.base_ratelimited4_err + NUM_TO_REJECT,
920                 self.statistics.get_err_counter(self.ratelimited4_err),
921             )
922
923         # verify the response
924         peer_1.consume_response(rxs[0], is_ip6=is_ip6)
925
926         # clear up under load state
927         self.sleep(UNDER_LOAD_INTERVAL)
928
929         # remove configs
930         peer_1.remove_vpp_config()
931         wg0.remove_vpp_config()
932
933     def test_wg_handshake_ratelimiting_v4(self):
934         """Handshake ratelimiting (v4)"""
935         self._test_wg_handshake_ratelimiting_tmpl(is_ip6=False)
936
937     def test_wg_handshake_ratelimiting_v6(self):
938         """Handshake ratelimiting (v6)"""
939         self._test_wg_handshake_ratelimiting_tmpl(is_ip6=True)
940
941     def test_wg_handshake_ratelimiting_multi_peer(self):
942         """Handshake ratelimiting (multiple peer)"""
943         port = 12323
944
945         # create wg interface
946         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
947         wg0.admin_up()
948         wg0.config_ip4()
949
950         self.pg_enable_capture(self.pg_interfaces)
951         self.pg_start()
952
953         # create two peers
954         NUM_PEERS = 2
955         self.pg1.generate_remote_hosts(NUM_PEERS)
956         self.pg1.configure_ipv4_neighbors()
957
958         peer_1 = VppWgPeer(
959             self, wg0, self.pg1.remote_hosts[0].ip4, port + 1, ["10.11.3.0/24"]
960         ).add_vpp_config()
961         peer_2 = VppWgPeer(
962             self, wg0, self.pg1.remote_hosts[1].ip4, port + 1, ["10.11.4.0/24"]
963         ).add_vpp_config()
964         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 2)
965
966         # (peer_1) prepare and send a bunch of handshake initiations
967         # expect not to switch to under load state
968         init_1 = peer_1.mk_handshake(self.pg1)
969         txs = [init_1] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
970         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
971
972         # (peer_1) expect the peer to send a handshake response
973         peer_1.consume_response(rxs[0])
974         peer_1.noise_reset()
975
976         # (peer_1) send another bunch of handshake initiations
977         # expect to switch to under load state
978         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
979
980         # (peer_1) expect the peer to send a cookie reply
981         peer_1.consume_cookie(rxs[-1])
982
983         # (peer_2) prepare and send a handshake initiation
984         # expect a cookie reply
985         init_2 = peer_2.mk_handshake(self.pg1)
986         rxs = self.send_and_expect(self.pg1, [init_2], self.pg1)
987         peer_2.consume_cookie(rxs[0])
988
989         # (peer_1) (peer_2) prepare and send a bunch of handshake initiations with correct mac2
990         # expect a handshake response and then ratelimiting
991         PEER_1_NUM_TO_REJECT = 2
992         PEER_2_NUM_TO_REJECT = 5
993         init_1 = peer_1.mk_handshake(self.pg1)
994         txs = [init_1] * (HANDSHAKE_NUM_BEFORE_RATELIMITING + PEER_1_NUM_TO_REJECT)
995         init_2 = peer_2.mk_handshake(self.pg1)
996         txs += [init_2] * (HANDSHAKE_NUM_BEFORE_RATELIMITING + PEER_2_NUM_TO_REJECT)
997         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
998
999         self.assertTrue(
1000             self.base_ratelimited4_err + PEER_1_NUM_TO_REJECT
1001             < self.statistics.get_err_counter(self.ratelimited4_err)
1002             <= self.base_ratelimited4_err + PEER_1_NUM_TO_REJECT + PEER_2_NUM_TO_REJECT
1003         )
1004
1005         # (peer_1) (peer_2) verify the response
1006         peer_1.consume_response(rxs[0])
1007         peer_2.consume_response(rxs[1])
1008
1009         # clear up under load state
1010         self.sleep(UNDER_LOAD_INTERVAL)
1011
1012         # remove configs
1013         peer_1.remove_vpp_config()
1014         peer_2.remove_vpp_config()
1015         wg0.remove_vpp_config()
1016
1017     def _test_wg_peer_roaming_on_handshake_tmpl(self, is_endpoint_set, is_resp, is_ip6):
1018         port = 12323
1019
1020         # create wg interface
1021         if is_ip6:
1022             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1023             wg0.admin_up()
1024             wg0.config_ip6()
1025         else:
1026             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1027             wg0.admin_up()
1028             wg0.config_ip4()
1029
1030         self.pg_enable_capture(self.pg_interfaces)
1031         self.pg_start()
1032
1033         # create more remote hosts
1034         NUM_REMOTE_HOSTS = 2
1035         self.pg1.generate_remote_hosts(NUM_REMOTE_HOSTS)
1036         if is_ip6:
1037             self.pg1.configure_ipv6_neighbors()
1038         else:
1039             self.pg1.configure_ipv4_neighbors()
1040
1041         # create a peer
1042         if is_ip6:
1043             peer_1 = VppWgPeer(
1044                 test=self,
1045                 itf=wg0,
1046                 endpoint=self.pg1.remote_hosts[0].ip6 if is_endpoint_set else "::",
1047                 port=port + 1 if is_endpoint_set else 0,
1048                 allowed_ips=["1::3:0/112"],
1049             ).add_vpp_config()
1050         else:
1051             peer_1 = VppWgPeer(
1052                 test=self,
1053                 itf=wg0,
1054                 endpoint=self.pg1.remote_hosts[0].ip4 if is_endpoint_set else "0.0.0.0",
1055                 port=port + 1 if is_endpoint_set else 0,
1056                 allowed_ips=["10.11.3.0/24"],
1057             ).add_vpp_config()
1058         self.assertTrue(peer_1.query_vpp_config())
1059
1060         if is_resp:
1061             # wait for the peer to send a handshake initiation
1062             rxs = self.pg1.get_capture(1, timeout=2)
1063             # prepare a handshake response
1064             resp = peer_1.consume_init(rxs[0], self.pg1, is_ip6=is_ip6)
1065             # change endpoint
1066             if is_ip6:
1067                 peer_1.change_endpoint(self.pg1.remote_hosts[1].ip6, port + 100)
1068                 resp[IPv6].src, resp[UDP].sport = peer_1.endpoint, peer_1.port
1069             else:
1070                 peer_1.change_endpoint(self.pg1.remote_hosts[1].ip4, port + 100)
1071                 resp[IP].src, resp[UDP].sport = peer_1.endpoint, peer_1.port
1072             # send the handshake response
1073             # expect a keepalive message sent to the new endpoint
1074             rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
1075             # verify the keepalive message
1076             b = peer_1.decrypt_transport(rxs[0], is_ip6=is_ip6)
1077             self.assertEqual(0, len(b))
1078         else:
1079             # change endpoint
1080             if is_ip6:
1081                 peer_1.change_endpoint(self.pg1.remote_hosts[1].ip6, port + 100)
1082             else:
1083                 peer_1.change_endpoint(self.pg1.remote_hosts[1].ip4, port + 100)
1084             # prepare and send a handshake initiation
1085             # expect a handshake response sent to the new endpoint
1086             init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
1087             rxs = self.send_and_expect(self.pg1, [init], self.pg1)
1088             # verify the response
1089             peer_1.consume_response(rxs[0], is_ip6=is_ip6)
1090         self.assertTrue(peer_1.query_vpp_config())
1091
1092         # remove configs
1093         peer_1.remove_vpp_config()
1094         wg0.remove_vpp_config()
1095
1096     def test_wg_peer_roaming_on_init_v4(self):
1097         """Peer roaming on handshake initiation (v4)"""
1098         self._test_wg_peer_roaming_on_handshake_tmpl(
1099             is_endpoint_set=False, is_resp=False, is_ip6=False
1100         )
1101
1102     def test_wg_peer_roaming_on_init_v6(self):
1103         """Peer roaming on handshake initiation (v6)"""
1104         self._test_wg_peer_roaming_on_handshake_tmpl(
1105             is_endpoint_set=False, is_resp=False, is_ip6=True
1106         )
1107
1108     def test_wg_peer_roaming_on_resp_v4(self):
1109         """Peer roaming on handshake response (v4)"""
1110         self._test_wg_peer_roaming_on_handshake_tmpl(
1111             is_endpoint_set=True, is_resp=True, is_ip6=False
1112         )
1113
1114     def test_wg_peer_roaming_on_resp_v6(self):
1115         """Peer roaming on handshake response (v6)"""
1116         self._test_wg_peer_roaming_on_handshake_tmpl(
1117             is_endpoint_set=True, is_resp=True, is_ip6=True
1118         )
1119
1120     def _test_wg_peer_roaming_on_data_tmpl(self, is_async, is_ip6):
1121         self.vapi.wg_set_async_mode(is_async)
1122         port = 12323
1123
1124         # create wg interface
1125         if is_ip6:
1126             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1127             wg0.admin_up()
1128             wg0.config_ip6()
1129         else:
1130             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1131             wg0.admin_up()
1132             wg0.config_ip4()
1133
1134         self.pg_enable_capture(self.pg_interfaces)
1135         self.pg_start()
1136
1137         # create more remote hosts
1138         NUM_REMOTE_HOSTS = 2
1139         self.pg1.generate_remote_hosts(NUM_REMOTE_HOSTS)
1140         if is_ip6:
1141             self.pg1.configure_ipv6_neighbors()
1142         else:
1143             self.pg1.configure_ipv4_neighbors()
1144
1145         # create a peer
1146         if is_ip6:
1147             peer_1 = VppWgPeer(
1148                 self, wg0, self.pg1.remote_hosts[0].ip6, port + 1, ["1::3:0/112"]
1149             ).add_vpp_config()
1150         else:
1151             peer_1 = VppWgPeer(
1152                 self, wg0, self.pg1.remote_hosts[0].ip4, port + 1, ["10.11.3.0/24"]
1153             ).add_vpp_config()
1154         self.assertTrue(peer_1.query_vpp_config())
1155
1156         # create a route to rewrite traffic into the wg interface
1157         if is_ip6:
1158             r1 = VppIpRoute(
1159                 self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
1160             ).add_vpp_config()
1161         else:
1162             r1 = VppIpRoute(
1163                 self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1164             ).add_vpp_config()
1165
1166         # wait for the peer to send a handshake initiation
1167         rxs = self.pg1.get_capture(1, timeout=2)
1168
1169         # prepare and send a handshake response
1170         # expect a keepalive message
1171         resp = peer_1.consume_init(rxs[0], self.pg1, is_ip6=is_ip6)
1172         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
1173
1174         # verify the keepalive message
1175         b = peer_1.decrypt_transport(rxs[0], is_ip6=is_ip6)
1176         self.assertEqual(0, len(b))
1177
1178         # change endpoint
1179         if is_ip6:
1180             peer_1.change_endpoint(self.pg1.remote_hosts[1].ip6, port + 100)
1181         else:
1182             peer_1.change_endpoint(self.pg1.remote_hosts[1].ip4, port + 100)
1183
1184         # prepare and send a data packet
1185         # expect endpoint change
1186         if is_ip6:
1187             ip_header = IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1188         else:
1189             ip_header = IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1190         data = (
1191             peer_1.mk_tunnel_header(self.pg1, is_ip6=is_ip6)
1192             / Wireguard(message_type=4, reserved_zero=0)
1193             / WireguardTransport(
1194                 receiver_index=peer_1.sender,
1195                 counter=0,
1196                 encrypted_encapsulated_packet=peer_1.encrypt_transport(
1197                     ip_header / UDP(sport=222, dport=223) / Raw()
1198                 ),
1199             )
1200         )
1201         rxs = self.send_and_expect(self.pg1, [data], self.pg0)
1202         if is_ip6:
1203             self.assertEqual(rxs[0][IPv6].dst, self.pg0.remote_ip6)
1204             self.assertEqual(rxs[0][IPv6].hlim, 19)
1205         else:
1206             self.assertEqual(rxs[0][IP].dst, self.pg0.remote_ip4)
1207             self.assertEqual(rxs[0][IP].ttl, 19)
1208         self.assertTrue(peer_1.query_vpp_config())
1209
1210         # prepare and send a packet that will be rewritten into the wg interface
1211         # expect a data packet sent to the new endpoint
1212         if is_ip6:
1213             ip_header = IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1214         else:
1215             ip_header = IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1216         p = (
1217             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1218             / ip_header
1219             / UDP(sport=555, dport=556)
1220             / Raw()
1221         )
1222         rxs = self.send_and_expect(self.pg0, [p], self.pg1)
1223
1224         # verify the data packet
1225         peer_1.validate_encapped(rxs, p, is_tunnel_ip6=is_ip6, is_transport_ip6=is_ip6)
1226
1227         # remove configs
1228         r1.remove_vpp_config()
1229         peer_1.remove_vpp_config()
1230         wg0.remove_vpp_config()
1231
1232     def test_wg_peer_roaming_on_data_v4_sync(self):
1233         """Peer roaming on data packet (v4, sync)"""
1234         self._test_wg_peer_roaming_on_data_tmpl(is_async=False, is_ip6=False)
1235
1236     def test_wg_peer_roaming_on_data_v6_sync(self):
1237         """Peer roaming on data packet (v6, sync)"""
1238         self._test_wg_peer_roaming_on_data_tmpl(is_async=False, is_ip6=True)
1239
1240     def test_wg_peer_roaming_on_data_v4_async(self):
1241         """Peer roaming on data packet (v4, async)"""
1242         self._test_wg_peer_roaming_on_data_tmpl(is_async=True, is_ip6=False)
1243
1244     def test_wg_peer_roaming_on_data_v6_async(self):
1245         """Peer roaming on data packet (v6, async)"""
1246         self._test_wg_peer_roaming_on_data_tmpl(is_async=True, is_ip6=True)
1247
1248     def test_wg_peer_resp(self):
1249         """Send handshake response IPv4 tunnel"""
1250         port = 12323
1251
1252         # Create interfaces
1253         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1254         wg0.admin_up()
1255         wg0.config_ip4()
1256
1257         self.pg_enable_capture(self.pg_interfaces)
1258         self.pg_start()
1259
1260         peer_1 = VppWgPeer(
1261             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
1262         ).add_vpp_config()
1263         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1264
1265         r1 = VppIpRoute(
1266             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1267         ).add_vpp_config()
1268
1269         # wait for the peer to send a handshake
1270         rx = self.pg1.get_capture(1, timeout=2)
1271
1272         # consume the handshake in the noise protocol and
1273         # generate the response
1274         resp = peer_1.consume_init(rx[0], self.pg1)
1275
1276         # send the response, get keepalive
1277         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
1278
1279         for rx in rxs:
1280             b = peer_1.decrypt_transport(rx)
1281             self.assertEqual(0, len(b))
1282
1283         # send a packets that are routed into the tunnel
1284         p = (
1285             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1286             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1287             / UDP(sport=555, dport=556)
1288             / Raw(b"\x00" * 80)
1289         )
1290
1291         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1292
1293         peer_1.validate_encapped(rxs, p)
1294
1295         # send packets into the tunnel, expect to receive them on
1296         # the other side
1297         p = [
1298             (
1299                 peer_1.mk_tunnel_header(self.pg1)
1300                 / Wireguard(message_type=4, reserved_zero=0)
1301                 / WireguardTransport(
1302                     receiver_index=peer_1.sender,
1303                     counter=ii,
1304                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1305                         (
1306                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1307                             / UDP(sport=222, dport=223)
1308                             / Raw()
1309                         )
1310                     ),
1311                 )
1312             )
1313             for ii in range(255)
1314         ]
1315
1316         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1317
1318         for rx in rxs:
1319             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1320             self.assertEqual(rx[IP].ttl, 19)
1321
1322         r1.remove_vpp_config()
1323         peer_1.remove_vpp_config()
1324         wg0.remove_vpp_config()
1325
1326     def test_wg_peer_resp_ipv6(self):
1327         """Send handshake response IPv6 tunnel"""
1328         port = 12323
1329
1330         # Create interfaces
1331         wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1332         wg0.admin_up()
1333         wg0.config_ip4()
1334
1335         self.pg_enable_capture(self.pg_interfaces)
1336         self.pg_start()
1337
1338         peer_1 = VppWgPeer(
1339             self, wg0, self.pg1.remote_ip6, port + 1, ["10.11.3.0/24"]
1340         ).add_vpp_config()
1341         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1342
1343         r1 = VppIpRoute(
1344             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1345         ).add_vpp_config()
1346
1347         # wait for the peer to send a handshake
1348         rx = self.pg1.get_capture(1, timeout=2)
1349
1350         # consume the handshake in the noise protocol and
1351         # generate the response
1352         resp = peer_1.consume_init(rx[0], self.pg1, is_ip6=True)
1353
1354         # send the response, get keepalive
1355         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
1356
1357         for rx in rxs:
1358             b = peer_1.decrypt_transport(rx, True)
1359             self.assertEqual(0, len(b))
1360
1361         # send a packets that are routed into the tunnel
1362         p = (
1363             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1364             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1365             / UDP(sport=555, dport=556)
1366             / Raw(b"\x00" * 80)
1367         )
1368
1369         rxs = self.send_and_expect(self.pg0, p * 2, self.pg1)
1370         peer_1.validate_encapped(rxs, p, True)
1371
1372         # send packets into the tunnel, expect to receive them on
1373         # the other side
1374         p = [
1375             (
1376                 peer_1.mk_tunnel_header(self.pg1, True)
1377                 / Wireguard(message_type=4, reserved_zero=0)
1378                 / WireguardTransport(
1379                     receiver_index=peer_1.sender,
1380                     counter=ii,
1381                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1382                         (
1383                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1384                             / UDP(sport=222, dport=223)
1385                             / Raw()
1386                         )
1387                     ),
1388                 )
1389             )
1390             for ii in range(255)
1391         ]
1392
1393         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1394
1395         for rx in rxs:
1396             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1397             self.assertEqual(rx[IP].ttl, 19)
1398
1399         r1.remove_vpp_config()
1400         peer_1.remove_vpp_config()
1401         wg0.remove_vpp_config()
1402
1403     def test_wg_peer_v4o4(self):
1404         """Test v4o4"""
1405
1406         port = 12333
1407
1408         # Create interfaces
1409         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1410         wg0.admin_up()
1411         wg0.config_ip4()
1412
1413         peer_1 = VppWgPeer(
1414             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
1415         ).add_vpp_config()
1416         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1417
1418         r1 = VppIpRoute(
1419             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1420         ).add_vpp_config()
1421         r2 = VppIpRoute(
1422             self, "20.22.3.0", 24, [VppRoutePath("20.22.3.1", wg0.sw_if_index)]
1423         ).add_vpp_config()
1424
1425         # route a packet into the wg interface
1426         #  use the allowed-ip prefix
1427         #  this is dropped because the peer is not initiated
1428         p = (
1429             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1430             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1431             / UDP(sport=555, dport=556)
1432             / Raw()
1433         )
1434         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1435         self.assertEqual(
1436             self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
1437         )
1438
1439         # route a packet into the wg interface
1440         #  use a not allowed-ip prefix
1441         #  this is dropped because there is no matching peer
1442         p = (
1443             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1444             / IP(src=self.pg0.remote_ip4, dst="20.22.3.2")
1445             / UDP(sport=555, dport=556)
1446             / Raw()
1447         )
1448         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1449         self.assertEqual(
1450             self.base_peer4_out_err + 1,
1451             self.statistics.get_err_counter(self.peer4_out_err),
1452         )
1453
1454         # send a handsake from the peer with an invalid MAC
1455         p = peer_1.mk_handshake(self.pg1)
1456         p[WireguardInitiation].mac1 = b"foobar"
1457         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1458         self.assertEqual(
1459             self.base_mac4_err + 1, self.statistics.get_err_counter(self.mac4_error)
1460         )
1461
1462         # send a handsake from the peer but signed by the wrong key.
1463         p = peer_1.mk_handshake(
1464             self.pg1, False, X25519PrivateKey.generate().public_key()
1465         )
1466         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1467         self.assertEqual(
1468             self.base_peer4_in_err + 1,
1469             self.statistics.get_err_counter(self.peer4_in_err),
1470         )
1471
1472         # send a valid handsake init for which we expect a response
1473         p = peer_1.mk_handshake(self.pg1)
1474
1475         rx = self.send_and_expect(self.pg1, [p], self.pg1)
1476
1477         peer_1.consume_response(rx[0])
1478
1479         # route a packet into the wg interface
1480         #  this is dropped because the peer is still not initiated
1481         p = (
1482             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1483             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1484             / UDP(sport=555, dport=556)
1485             / Raw()
1486         )
1487         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1488         self.assertEqual(
1489             self.base_kp4_err + 2, self.statistics.get_err_counter(self.kp4_error)
1490         )
1491
1492         # send a data packet from the peer through the tunnel
1493         # this completes the handshake
1494         p = (
1495             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1496             / UDP(sport=222, dport=223)
1497             / Raw()
1498         )
1499         d = peer_1.encrypt_transport(p)
1500         p = peer_1.mk_tunnel_header(self.pg1) / (
1501             Wireguard(message_type=4, reserved_zero=0)
1502             / WireguardTransport(
1503                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1504             )
1505         )
1506         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1507
1508         for rx in rxs:
1509             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1510             self.assertEqual(rx[IP].ttl, 19)
1511
1512         # send a packets that are routed into the tunnel
1513         p = (
1514             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1515             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1516             / UDP(sport=555, dport=556)
1517             / Raw(b"\x00" * 80)
1518         )
1519
1520         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1521
1522         for rx in rxs:
1523             rx = IP(peer_1.decrypt_transport(rx))
1524
1525             # check the original packet is present
1526             self.assertEqual(rx[IP].dst, p[IP].dst)
1527             self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
1528
1529         # send packets into the tunnel, expect to receive them on
1530         # the other side
1531         p = [
1532             (
1533                 peer_1.mk_tunnel_header(self.pg1)
1534                 / Wireguard(message_type=4, reserved_zero=0)
1535                 / WireguardTransport(
1536                     receiver_index=peer_1.sender,
1537                     counter=ii + 1,
1538                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1539                         (
1540                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1541                             / UDP(sport=222, dport=223)
1542                             / Raw()
1543                         )
1544                     ),
1545                 )
1546             )
1547             for ii in range(255)
1548         ]
1549
1550         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1551
1552         for rx in rxs:
1553             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1554             self.assertEqual(rx[IP].ttl, 19)
1555
1556         r1.remove_vpp_config()
1557         r2.remove_vpp_config()
1558         peer_1.remove_vpp_config()
1559         wg0.remove_vpp_config()
1560
1561     def test_wg_peer_v6o6(self):
1562         """Test v6o6"""
1563
1564         port = 12343
1565
1566         # Create interfaces
1567         wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1568         wg0.admin_up()
1569         wg0.config_ip6()
1570
1571         peer_1 = VppWgPeer(
1572             self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
1573         ).add_vpp_config()
1574         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1575
1576         r1 = VppIpRoute(
1577             self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
1578         ).add_vpp_config()
1579         r2 = VppIpRoute(
1580             self, "22::3:0", 112, [VppRoutePath("22::3:1", wg0.sw_if_index)]
1581         ).add_vpp_config()
1582
1583         # route a packet into the wg interface
1584         #  use the allowed-ip prefix
1585         #  this is dropped because the peer is not initiated
1586
1587         p = (
1588             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1589             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1590             / UDP(sport=555, dport=556)
1591             / Raw()
1592         )
1593         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1594
1595         self.assertEqual(
1596             self.base_kp6_err + 1, self.statistics.get_err_counter(self.kp6_error)
1597         )
1598
1599         # route a packet into the wg interface
1600         #  use a not allowed-ip prefix
1601         #  this is dropped because there is no matching peer
1602         p = (
1603             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1604             / IPv6(src=self.pg0.remote_ip6, dst="22::3:2")
1605             / UDP(sport=555, dport=556)
1606             / Raw()
1607         )
1608         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1609         self.assertEqual(
1610             self.base_peer6_out_err + 1,
1611             self.statistics.get_err_counter(self.peer6_out_err),
1612         )
1613
1614         # send a handsake from the peer with an invalid MAC
1615         p = peer_1.mk_handshake(self.pg1, True)
1616         p[WireguardInitiation].mac1 = b"foobar"
1617         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1618
1619         self.assertEqual(
1620             self.base_mac6_err + 1, self.statistics.get_err_counter(self.mac6_error)
1621         )
1622
1623         # send a handsake from the peer but signed by the wrong key.
1624         p = peer_1.mk_handshake(
1625             self.pg1, True, X25519PrivateKey.generate().public_key()
1626         )
1627         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1628         self.assertEqual(
1629             self.base_peer6_in_err + 1,
1630             self.statistics.get_err_counter(self.peer6_in_err),
1631         )
1632
1633         # send a valid handsake init for which we expect a response
1634         p = peer_1.mk_handshake(self.pg1, True)
1635
1636         rx = self.send_and_expect(self.pg1, [p], self.pg1)
1637
1638         peer_1.consume_response(rx[0], True)
1639
1640         # route a packet into the wg interface
1641         #  this is dropped because the peer is still not initiated
1642         p = (
1643             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1644             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1645             / UDP(sport=555, dport=556)
1646             / Raw()
1647         )
1648         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1649         self.assertEqual(
1650             self.base_kp6_err + 2, self.statistics.get_err_counter(self.kp6_error)
1651         )
1652
1653         # send a data packet from the peer through the tunnel
1654         # this completes the handshake
1655         p = (
1656             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1657             / UDP(sport=222, dport=223)
1658             / Raw()
1659         )
1660         d = peer_1.encrypt_transport(p)
1661         p = peer_1.mk_tunnel_header(self.pg1, True) / (
1662             Wireguard(message_type=4, reserved_zero=0)
1663             / WireguardTransport(
1664                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1665             )
1666         )
1667         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1668
1669         for rx in rxs:
1670             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
1671             self.assertEqual(rx[IPv6].hlim, 19)
1672
1673         # send a packets that are routed into the tunnel
1674         p = (
1675             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1676             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1677             / UDP(sport=555, dport=556)
1678             / Raw(b"\x00" * 80)
1679         )
1680
1681         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1682
1683         for rx in rxs:
1684             rx = IPv6(peer_1.decrypt_transport(rx, True))
1685
1686             # check the original packet is present
1687             self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
1688             self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim - 1)
1689
1690         # send packets into the tunnel, expect to receive them on
1691         # the other side
1692         p = [
1693             (
1694                 peer_1.mk_tunnel_header(self.pg1, True)
1695                 / Wireguard(message_type=4, reserved_zero=0)
1696                 / WireguardTransport(
1697                     receiver_index=peer_1.sender,
1698                     counter=ii + 1,
1699                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1700                         (
1701                             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1702                             / UDP(sport=222, dport=223)
1703                             / Raw()
1704                         )
1705                     ),
1706                 )
1707             )
1708             for ii in range(255)
1709         ]
1710
1711         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1712
1713         for rx in rxs:
1714             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
1715             self.assertEqual(rx[IPv6].hlim, 19)
1716
1717         r1.remove_vpp_config()
1718         r2.remove_vpp_config()
1719         peer_1.remove_vpp_config()
1720         wg0.remove_vpp_config()
1721
1722     def test_wg_peer_v6o4(self):
1723         """Test v6o4"""
1724
1725         port = 12353
1726
1727         # Create interfaces
1728         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1729         wg0.admin_up()
1730         wg0.config_ip6()
1731
1732         peer_1 = VppWgPeer(
1733             self, wg0, self.pg1.remote_ip4, port + 1, ["1::3:0/112"]
1734         ).add_vpp_config()
1735         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1736
1737         r1 = VppIpRoute(
1738             self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
1739         ).add_vpp_config()
1740
1741         # route a packet into the wg interface
1742         #  use the allowed-ip prefix
1743         #  this is dropped because the peer is not initiated
1744         p = (
1745             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1746             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1747             / UDP(sport=555, dport=556)
1748             / Raw()
1749         )
1750         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1751         self.assertEqual(
1752             self.base_kp6_err + 1, self.statistics.get_err_counter(self.kp6_error)
1753         )
1754
1755         # send a handsake from the peer with an invalid MAC
1756         p = peer_1.mk_handshake(self.pg1)
1757         p[WireguardInitiation].mac1 = b"foobar"
1758         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1759
1760         self.assertEqual(
1761             self.base_mac4_err + 1, self.statistics.get_err_counter(self.mac4_error)
1762         )
1763
1764         # send a handsake from the peer but signed by the wrong key.
1765         p = peer_1.mk_handshake(
1766             self.pg1, False, X25519PrivateKey.generate().public_key()
1767         )
1768         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1769         self.assertEqual(
1770             self.base_peer4_in_err + 1,
1771             self.statistics.get_err_counter(self.peer4_in_err),
1772         )
1773
1774         # send a valid handsake init for which we expect a response
1775         p = peer_1.mk_handshake(self.pg1)
1776
1777         rx = self.send_and_expect(self.pg1, [p], self.pg1)
1778
1779         peer_1.consume_response(rx[0])
1780
1781         # route a packet into the wg interface
1782         #  this is dropped because the peer is still not initiated
1783         p = (
1784             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1785             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1786             / UDP(sport=555, dport=556)
1787             / Raw()
1788         )
1789         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1790         self.assertEqual(
1791             self.base_kp6_err + 2, self.statistics.get_err_counter(self.kp6_error)
1792         )
1793
1794         # send a data packet from the peer through the tunnel
1795         # this completes the handshake
1796         p = (
1797             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1798             / UDP(sport=222, dport=223)
1799             / Raw()
1800         )
1801         d = peer_1.encrypt_transport(p)
1802         p = peer_1.mk_tunnel_header(self.pg1) / (
1803             Wireguard(message_type=4, reserved_zero=0)
1804             / WireguardTransport(
1805                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1806             )
1807         )
1808         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1809
1810         for rx in rxs:
1811             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
1812             self.assertEqual(rx[IPv6].hlim, 19)
1813
1814         # send a packets that are routed into the tunnel
1815         p = (
1816             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1817             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1818             / UDP(sport=555, dport=556)
1819             / Raw(b"\x00" * 80)
1820         )
1821
1822         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1823
1824         for rx in rxs:
1825             rx = IPv6(peer_1.decrypt_transport(rx))
1826
1827             # check the original packet is present
1828             self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
1829             self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim - 1)
1830
1831         # send packets into the tunnel, expect to receive them on
1832         # the other side
1833         p = [
1834             (
1835                 peer_1.mk_tunnel_header(self.pg1)
1836                 / Wireguard(message_type=4, reserved_zero=0)
1837                 / WireguardTransport(
1838                     receiver_index=peer_1.sender,
1839                     counter=ii + 1,
1840                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1841                         (
1842                             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1843                             / UDP(sport=222, dport=223)
1844                             / Raw()
1845                         )
1846                     ),
1847                 )
1848             )
1849             for ii in range(255)
1850         ]
1851
1852         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1853
1854         for rx in rxs:
1855             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
1856             self.assertEqual(rx[IPv6].hlim, 19)
1857
1858         r1.remove_vpp_config()
1859         peer_1.remove_vpp_config()
1860         wg0.remove_vpp_config()
1861
1862     def test_wg_peer_v4o6(self):
1863         """Test v4o6"""
1864
1865         port = 12363
1866
1867         # Create interfaces
1868         wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1869         wg0.admin_up()
1870         wg0.config_ip4()
1871
1872         peer_1 = VppWgPeer(
1873             self, wg0, self.pg1.remote_ip6, port + 1, ["10.11.3.0/24"]
1874         ).add_vpp_config()
1875         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1876
1877         r1 = VppIpRoute(
1878             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1879         ).add_vpp_config()
1880
1881         # route a packet into the wg interface
1882         #  use the allowed-ip prefix
1883         #  this is dropped because the peer is not initiated
1884         p = (
1885             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1886             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1887             / UDP(sport=555, dport=556)
1888             / Raw()
1889         )
1890         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1891         self.assertEqual(
1892             self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
1893         )
1894
1895         # send a handsake from the peer with an invalid MAC
1896         p = peer_1.mk_handshake(self.pg1, True)
1897         p[WireguardInitiation].mac1 = b"foobar"
1898         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1899         self.assertEqual(
1900             self.base_mac6_err + 1, self.statistics.get_err_counter(self.mac6_error)
1901         )
1902
1903         # send a handsake from the peer but signed by the wrong key.
1904         p = peer_1.mk_handshake(
1905             self.pg1, True, X25519PrivateKey.generate().public_key()
1906         )
1907         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1908         self.assertEqual(
1909             self.base_peer6_in_err + 1,
1910             self.statistics.get_err_counter(self.peer6_in_err),
1911         )
1912
1913         # send a valid handsake init for which we expect a response
1914         p = peer_1.mk_handshake(self.pg1, True)
1915
1916         rx = self.send_and_expect(self.pg1, [p], self.pg1)
1917
1918         peer_1.consume_response(rx[0], True)
1919
1920         # route a packet into the wg interface
1921         #  this is dropped because the peer is still not initiated
1922         p = (
1923             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1924             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1925             / UDP(sport=555, dport=556)
1926             / Raw()
1927         )
1928         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1929         self.assertEqual(
1930             self.base_kp4_err + 2, self.statistics.get_err_counter(self.kp4_error)
1931         )
1932
1933         # send a data packet from the peer through the tunnel
1934         # this completes the handshake
1935         p = (
1936             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1937             / UDP(sport=222, dport=223)
1938             / Raw()
1939         )
1940         d = peer_1.encrypt_transport(p)
1941         p = peer_1.mk_tunnel_header(self.pg1, True) / (
1942             Wireguard(message_type=4, reserved_zero=0)
1943             / WireguardTransport(
1944                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1945             )
1946         )
1947         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1948
1949         for rx in rxs:
1950             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1951             self.assertEqual(rx[IP].ttl, 19)
1952
1953         # send a packets that are routed into the tunnel
1954         p = (
1955             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1956             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1957             / UDP(sport=555, dport=556)
1958             / Raw(b"\x00" * 80)
1959         )
1960
1961         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1962
1963         for rx in rxs:
1964             rx = IP(peer_1.decrypt_transport(rx, True))
1965
1966             # check the original packet is present
1967             self.assertEqual(rx[IP].dst, p[IP].dst)
1968             self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
1969
1970         # send packets into the tunnel, expect to receive them on
1971         # the other side
1972         p = [
1973             (
1974                 peer_1.mk_tunnel_header(self.pg1, True)
1975                 / Wireguard(message_type=4, reserved_zero=0)
1976                 / WireguardTransport(
1977                     receiver_index=peer_1.sender,
1978                     counter=ii + 1,
1979                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1980                         (
1981                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1982                             / UDP(sport=222, dport=223)
1983                             / Raw()
1984                         )
1985                     ),
1986                 )
1987             )
1988             for ii in range(255)
1989         ]
1990
1991         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1992
1993         for rx in rxs:
1994             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1995             self.assertEqual(rx[IP].ttl, 19)
1996
1997         r1.remove_vpp_config()
1998         peer_1.remove_vpp_config()
1999         wg0.remove_vpp_config()
2000
2001     def test_wg_multi_peer(self):
2002         """multiple peer setup"""
2003         port = 12373
2004
2005         # Create interfaces
2006         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2007         wg1 = VppWgInterface(self, self.pg2.local_ip4, port + 1).add_vpp_config()
2008         wg0.admin_up()
2009         wg1.admin_up()
2010
2011         # Check peer counter
2012         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 0)
2013
2014         self.pg_enable_capture(self.pg_interfaces)
2015         self.pg_start()
2016
2017         # Create many peers on sencond interface
2018         NUM_PEERS = 16
2019         self.pg2.generate_remote_hosts(NUM_PEERS)
2020         self.pg2.configure_ipv4_neighbors()
2021         self.pg1.generate_remote_hosts(NUM_PEERS)
2022         self.pg1.configure_ipv4_neighbors()
2023
2024         peers_1 = []
2025         peers_2 = []
2026         routes_1 = []
2027         routes_2 = []
2028         for i in range(NUM_PEERS):
2029             peers_1.append(
2030                 VppWgPeer(
2031                     self,
2032                     wg0,
2033                     self.pg1.remote_hosts[i].ip4,
2034                     port + 1 + i,
2035                     ["10.0.%d.4/32" % i],
2036                 ).add_vpp_config()
2037             )
2038             routes_1.append(
2039                 VppIpRoute(
2040                     self,
2041                     "10.0.%d.4" % i,
2042                     32,
2043                     [VppRoutePath(self.pg1.remote_hosts[i].ip4, wg0.sw_if_index)],
2044                 ).add_vpp_config()
2045             )
2046
2047             peers_2.append(
2048                 VppWgPeer(
2049                     self,
2050                     wg1,
2051                     self.pg2.remote_hosts[i].ip4,
2052                     port + 100 + i,
2053                     ["10.100.%d.4/32" % i],
2054                 ).add_vpp_config()
2055             )
2056             routes_2.append(
2057                 VppIpRoute(
2058                     self,
2059                     "10.100.%d.4" % i,
2060                     32,
2061                     [VppRoutePath(self.pg2.remote_hosts[i].ip4, wg1.sw_if_index)],
2062                 ).add_vpp_config()
2063             )
2064
2065         self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS * 2)
2066
2067         self.logger.info(self.vapi.cli("show wireguard peer"))
2068         self.logger.info(self.vapi.cli("show wireguard interface"))
2069         self.logger.info(self.vapi.cli("show adj 37"))
2070         self.logger.info(self.vapi.cli("sh ip fib 172.16.3.17"))
2071         self.logger.info(self.vapi.cli("sh ip fib 10.11.3.0"))
2072
2073         # remove routes
2074         for r in routes_1:
2075             r.remove_vpp_config()
2076         for r in routes_2:
2077             r.remove_vpp_config()
2078
2079         # remove peers
2080         for p in peers_1:
2081             self.assertTrue(p.query_vpp_config())
2082             p.remove_vpp_config()
2083         for p in peers_2:
2084             self.assertTrue(p.query_vpp_config())
2085             p.remove_vpp_config()
2086
2087         wg0.remove_vpp_config()
2088         wg1.remove_vpp_config()
2089
2090     def test_wg_multi_interface(self):
2091         """Multi-tunnel on the same port"""
2092         port = 12500
2093
2094         # Create many wireguard interfaces
2095         NUM_IFS = 4
2096         self.pg1.generate_remote_hosts(NUM_IFS)
2097         self.pg1.configure_ipv4_neighbors()
2098         self.pg0.generate_remote_hosts(NUM_IFS)
2099         self.pg0.configure_ipv4_neighbors()
2100
2101         # Create interfaces with a peer on each
2102         peers = []
2103         routes = []
2104         wg_ifs = []
2105         for i in range(NUM_IFS):
2106             # Use the same port for each interface
2107             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2108             wg0.admin_up()
2109             wg0.config_ip4()
2110             wg_ifs.append(wg0)
2111             peers.append(
2112                 VppWgPeer(
2113                     self,
2114                     wg0,
2115                     self.pg1.remote_hosts[i].ip4,
2116                     port + 1 + i,
2117                     ["10.0.%d.0/24" % i],
2118                 ).add_vpp_config()
2119             )
2120
2121             routes.append(
2122                 VppIpRoute(
2123                     self,
2124                     "10.0.%d.0" % i,
2125                     24,
2126                     [VppRoutePath("10.0.%d.4" % i, wg0.sw_if_index)],
2127                 ).add_vpp_config()
2128             )
2129
2130         self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_IFS)
2131
2132         for i in range(NUM_IFS):
2133             # send a valid handsake init for which we expect a response
2134             p = peers[i].mk_handshake(self.pg1)
2135             rx = self.send_and_expect(self.pg1, [p], self.pg1)
2136             peers[i].consume_response(rx[0])
2137
2138             # send a data packet from the peer through the tunnel
2139             # this completes the handshake
2140             p = (
2141                 IP(src="10.0.%d.4" % i, dst=self.pg0.remote_hosts[i].ip4, ttl=20)
2142                 / UDP(sport=222, dport=223)
2143                 / Raw()
2144             )
2145             d = peers[i].encrypt_transport(p)
2146             p = peers[i].mk_tunnel_header(self.pg1) / (
2147                 Wireguard(message_type=4, reserved_zero=0)
2148                 / WireguardTransport(
2149                     receiver_index=peers[i].sender,
2150                     counter=0,
2151                     encrypted_encapsulated_packet=d,
2152                 )
2153             )
2154             rxs = self.send_and_expect(self.pg1, [p], self.pg0)
2155             for rx in rxs:
2156                 self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[i].ip4)
2157                 self.assertEqual(rx[IP].ttl, 19)
2158
2159         # send a packets that are routed into the tunnel
2160         for i in range(NUM_IFS):
2161             p = (
2162                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2163                 / IP(src=self.pg0.remote_hosts[i].ip4, dst="10.0.%d.4" % i)
2164                 / UDP(sport=555, dport=556)
2165                 / Raw(b"\x00" * 80)
2166             )
2167
2168             rxs = self.send_and_expect(self.pg0, p * 64, self.pg1)
2169
2170             for rx in rxs:
2171                 rx = IP(peers[i].decrypt_transport(rx))
2172
2173                 # check the oringial packet is present
2174                 self.assertEqual(rx[IP].dst, p[IP].dst)
2175                 self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
2176
2177         # send packets into the tunnel
2178         for i in range(NUM_IFS):
2179             p = [
2180                 (
2181                     peers[i].mk_tunnel_header(self.pg1)
2182                     / Wireguard(message_type=4, reserved_zero=0)
2183                     / WireguardTransport(
2184                         receiver_index=peers[i].sender,
2185                         counter=ii + 1,
2186                         encrypted_encapsulated_packet=peers[i].encrypt_transport(
2187                             (
2188                                 IP(
2189                                     src="10.0.%d.4" % i,
2190                                     dst=self.pg0.remote_hosts[i].ip4,
2191                                     ttl=20,
2192                                 )
2193                                 / UDP(sport=222, dport=223)
2194                                 / Raw()
2195                             )
2196                         ),
2197                     )
2198                 )
2199                 for ii in range(64)
2200             ]
2201
2202             rxs = self.send_and_expect(self.pg1, p, self.pg0)
2203
2204             for rx in rxs:
2205                 self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[i].ip4)
2206                 self.assertEqual(rx[IP].ttl, 19)
2207
2208         for r in routes:
2209             r.remove_vpp_config()
2210         for p in peers:
2211             p.remove_vpp_config()
2212         for i in wg_ifs:
2213             i.remove_vpp_config()
2214
2215     def test_wg_event(self):
2216         """Test events"""
2217         port = 12600
2218         ESTABLISHED_FLAG = (
2219             VppEnum.vl_api_wireguard_peer_flags_t.WIREGUARD_PEER_ESTABLISHED
2220         )
2221         DEAD_FLAG = VppEnum.vl_api_wireguard_peer_flags_t.WIREGUARD_PEER_STATUS_DEAD
2222
2223         # Create interfaces
2224         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2225         wg1 = VppWgInterface(self, self.pg2.local_ip4, port + 1).add_vpp_config()
2226         wg0.admin_up()
2227         wg1.admin_up()
2228
2229         # Check peer counter
2230         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 0)
2231
2232         self.pg_enable_capture(self.pg_interfaces)
2233         self.pg_start()
2234
2235         # Create peers
2236         NUM_PEERS = 2
2237         self.pg2.generate_remote_hosts(NUM_PEERS)
2238         self.pg2.configure_ipv4_neighbors()
2239         self.pg1.generate_remote_hosts(NUM_PEERS)
2240         self.pg1.configure_ipv4_neighbors()
2241
2242         peers_0 = []
2243         peers_1 = []
2244         routes_0 = []
2245         routes_1 = []
2246         for i in range(NUM_PEERS):
2247             peers_0.append(
2248                 VppWgPeer(
2249                     self,
2250                     wg0,
2251                     self.pg1.remote_hosts[i].ip4,
2252                     port + 1 + i,
2253                     ["10.0.%d.4/32" % i],
2254                 ).add_vpp_config()
2255             )
2256             routes_0.append(
2257                 VppIpRoute(
2258                     self,
2259                     "10.0.%d.4" % i,
2260                     32,
2261                     [VppRoutePath(self.pg1.remote_hosts[i].ip4, wg0.sw_if_index)],
2262                 ).add_vpp_config()
2263             )
2264
2265             peers_1.append(
2266                 VppWgPeer(
2267                     self,
2268                     wg1,
2269                     self.pg2.remote_hosts[i].ip4,
2270                     port + 100 + i,
2271                     ["10.100.%d.4/32" % i],
2272                 ).add_vpp_config()
2273             )
2274             routes_1.append(
2275                 VppIpRoute(
2276                     self,
2277                     "10.100.%d.4" % i,
2278                     32,
2279                     [VppRoutePath(self.pg2.remote_hosts[i].ip4, wg1.sw_if_index)],
2280                 ).add_vpp_config()
2281             )
2282
2283         self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS * 2)
2284
2285         # Want events from the first perr of wg0
2286         # and from all wg1 peers
2287         peers_0[0].want_events()
2288         wg1.want_events()
2289
2290         for i in range(NUM_PEERS):
2291             # send a valid handsake init for which we expect a response
2292             p = peers_0[i].mk_handshake(self.pg1)
2293             rx = self.send_and_expect(self.pg1, [p], self.pg1)
2294             peers_0[i].consume_response(rx[0])
2295             if i == 0:
2296                 peers_0[0].wait_event(ESTABLISHED_FLAG)
2297
2298             p = peers_1[i].mk_handshake(self.pg2)
2299             rx = self.send_and_expect(self.pg2, [p], self.pg2)
2300             peers_1[i].consume_response(rx[0])
2301
2302         wg1.wait_events(ESTABLISHED_FLAG, [peers_1[0].index, peers_1[1].index])
2303
2304         # remove routes
2305         for r in routes_0:
2306             r.remove_vpp_config()
2307         for r in routes_1:
2308             r.remove_vpp_config()
2309
2310         # remove peers
2311         for i in range(NUM_PEERS):
2312             self.assertTrue(peers_0[i].query_vpp_config())
2313             peers_0[i].remove_vpp_config()
2314             if i == 0:
2315                 peers_0[i].wait_event(0)
2316                 peers_0[i].wait_event(DEAD_FLAG)
2317         for p in peers_1:
2318             self.assertTrue(p.query_vpp_config())
2319             p.remove_vpp_config()
2320             p.wait_event(0)
2321             p.wait_event(DEAD_FLAG)
2322
2323         wg0.remove_vpp_config()
2324         wg1.remove_vpp_config()
2325
2326     def test_wg_sending_handshake_when_admin_down(self):
2327         """Sending handshake when admin down"""
2328         port = 12323
2329
2330         # create wg interface
2331         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2332         wg0.config_ip4()
2333
2334         # create a peer
2335         peer_1 = VppWgPeer(
2336             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
2337         ).add_vpp_config()
2338         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
2339
2340         self.pg_enable_capture(self.pg_interfaces)
2341         self.pg_start()
2342
2343         # wait for the peer to send a handshake initiation
2344         # expect no handshakes
2345         for i in range(2):
2346             self.pg1.assert_nothing_captured(remark="handshake packet(s) sent")
2347
2348         self.pg_enable_capture(self.pg_interfaces)
2349         self.pg_start()
2350
2351         # administratively enable the wg interface
2352         # expect the peer to send a handshake initiation
2353         wg0.admin_up()
2354         rxs = self.pg1.get_capture(1, timeout=2)
2355         peer_1.consume_init(rxs[0], self.pg1)
2356
2357         self.pg_enable_capture(self.pg_interfaces)
2358         self.pg_start()
2359
2360         # administratively disable the wg interface
2361         # expect no handshakes
2362         wg0.admin_down()
2363         for i in range(6):
2364             self.pg1.assert_nothing_captured(remark="handshake packet(s) sent")
2365
2366         # remove configs
2367         peer_1.remove_vpp_config()
2368         wg0.remove_vpp_config()
2369
2370     def test_wg_sending_data_when_admin_down(self):
2371         """Sending data when admin down"""
2372         port = 12323
2373
2374         # create wg interface
2375         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2376         wg0.admin_up()
2377         wg0.config_ip4()
2378
2379         self.pg_enable_capture(self.pg_interfaces)
2380         self.pg_start()
2381
2382         # create a peer
2383         peer_1 = VppWgPeer(
2384             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
2385         ).add_vpp_config()
2386         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
2387
2388         # create a route to rewrite traffic into the wg interface
2389         r1 = VppIpRoute(
2390             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
2391         ).add_vpp_config()
2392
2393         # wait for the peer to send a handshake initiation
2394         rxs = self.pg1.get_capture(1, timeout=2)
2395
2396         # prepare and send a handshake response
2397         # expect a keepalive message
2398         resp = peer_1.consume_init(rxs[0], self.pg1)
2399         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
2400
2401         # verify the keepalive message
2402         b = peer_1.decrypt_transport(rxs[0])
2403         self.assertEqual(0, len(b))
2404
2405         # prepare and send a packet that will be rewritten into the wg interface
2406         # expect a data packet sent
2407         p = (
2408             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2409             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
2410             / UDP(sport=555, dport=556)
2411             / Raw()
2412         )
2413         rxs = self.send_and_expect(self.pg0, [p], self.pg1)
2414
2415         # verify the data packet
2416         peer_1.validate_encapped(rxs, p)
2417
2418         # administratively disable the wg interface
2419         wg0.admin_down()
2420
2421         # send a packet that will be rewritten into the wg interface
2422         # expect no data packets sent
2423         self.send_and_assert_no_replies(self.pg0, [p])
2424
2425         # administratively enable the wg interface
2426         # expect the peer to send a handshake initiation
2427         wg0.admin_up()
2428         peer_1.noise_reset()
2429         rxs = self.pg1.get_capture(1, timeout=2)
2430         resp = peer_1.consume_init(rxs[0], self.pg1)
2431
2432         # send a packet that will be rewritten into the wg interface
2433         # expect no data packets sent because the peer is not initiated
2434         self.send_and_assert_no_replies(self.pg0, [p])
2435         self.assertEqual(
2436             self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
2437         )
2438
2439         # send a handshake response and expect a keepalive message
2440         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
2441
2442         # verify the keepalive message
2443         b = peer_1.decrypt_transport(rxs[0])
2444         self.assertEqual(0, len(b))
2445
2446         # send a packet that will be rewritten into the wg interface
2447         # expect a data packet sent
2448         rxs = self.send_and_expect(self.pg0, [p], self.pg1)
2449
2450         # verify the data packet
2451         peer_1.validate_encapped(rxs, p)
2452
2453         # remove configs
2454         r1.remove_vpp_config()
2455         peer_1.remove_vpp_config()
2456         wg0.remove_vpp_config()
2457
2458
2459 @tag_fixme_vpp_debug
2460 class WireguardHandoffTests(TestWg):
2461     """Wireguard Tests in multi worker setup"""
2462
2463     vpp_worker_count = 2
2464
2465     def test_wg_peer_init(self):
2466         """Handoff"""
2467
2468         port = 12383
2469
2470         # Create interfaces
2471         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2472         wg0.admin_up()
2473         wg0.config_ip4()
2474
2475         peer_1 = VppWgPeer(
2476             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.2.0/24", "10.11.3.0/24"]
2477         ).add_vpp_config()
2478         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
2479
2480         r1 = VppIpRoute(
2481             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
2482         ).add_vpp_config()
2483
2484         # send a valid handsake init for which we expect a response
2485         p = peer_1.mk_handshake(self.pg1)
2486
2487         rx = self.send_and_expect(self.pg1, [p], self.pg1)
2488
2489         peer_1.consume_response(rx[0])
2490
2491         # send a data packet from the peer through the tunnel
2492         # this completes the handshake and pins the peer to worker 0
2493         p = (
2494             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
2495             / UDP(sport=222, dport=223)
2496             / Raw()
2497         )
2498         d = peer_1.encrypt_transport(p)
2499         p = peer_1.mk_tunnel_header(self.pg1) / (
2500             Wireguard(message_type=4, reserved_zero=0)
2501             / WireguardTransport(
2502                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
2503             )
2504         )
2505         rxs = self.send_and_expect(self.pg1, [p], self.pg0, worker=0)
2506
2507         for rx in rxs:
2508             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
2509             self.assertEqual(rx[IP].ttl, 19)
2510
2511         # send a packets that are routed into the tunnel
2512         # and pins the peer tp worker 1
2513         pe = (
2514             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2515             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
2516             / UDP(sport=555, dport=556)
2517             / Raw(b"\x00" * 80)
2518         )
2519         rxs = self.send_and_expect(self.pg0, pe * 255, self.pg1, worker=1)
2520         peer_1.validate_encapped(rxs, pe)
2521
2522         # send packets into the tunnel, from the other worker
2523         p = [
2524             (
2525                 peer_1.mk_tunnel_header(self.pg1)
2526                 / Wireguard(message_type=4, reserved_zero=0)
2527                 / WireguardTransport(
2528                     receiver_index=peer_1.sender,
2529                     counter=ii + 1,
2530                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
2531                         (
2532                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
2533                             / UDP(sport=222, dport=223)
2534                             / Raw()
2535                         )
2536                     ),
2537                 )
2538             )
2539             for ii in range(255)
2540         ]
2541
2542         rxs = self.send_and_expect(self.pg1, p, self.pg0, worker=1)
2543
2544         for rx in rxs:
2545             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
2546             self.assertEqual(rx[IP].ttl, 19)
2547
2548         # send a packets that are routed into the tunnel
2549         # from worker 0
2550         rxs = self.send_and_expect(self.pg0, pe * 255, self.pg1, worker=0)
2551
2552         peer_1.validate_encapped(rxs, pe)
2553
2554         r1.remove_vpp_config()
2555         peer_1.remove_vpp_config()
2556         wg0.remove_vpp_config()
2557
2558     @unittest.skip("test disabled")
2559     def test_wg_multi_interface(self):
2560         """Multi-tunnel on the same port"""
2561
2562
2563 class TestWgFIB(VppTestCase):
2564     """Wireguard FIB Test Case"""
2565
2566     @classmethod
2567     def setUpClass(cls):
2568         super(TestWgFIB, cls).setUpClass()
2569
2570     @classmethod
2571     def tearDownClass(cls):
2572         super(TestWgFIB, cls).tearDownClass()
2573
2574     def setUp(self):
2575         super(TestWgFIB, self).setUp()
2576
2577         self.create_pg_interfaces(range(2))
2578
2579         for i in self.pg_interfaces:
2580             i.admin_up()
2581             i.config_ip4()
2582
2583     def tearDown(self):
2584         for i in self.pg_interfaces:
2585             i.unconfig_ip4()
2586             i.admin_down()
2587         super(TestWgFIB, self).tearDown()
2588
2589     def test_wg_fib_tracking(self):
2590         """FIB tracking"""
2591         port = 12323
2592
2593         # create wg interface
2594         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2595         wg0.admin_up()
2596         wg0.config_ip4()
2597
2598         self.pg_enable_capture(self.pg_interfaces)
2599         self.pg_start()
2600
2601         # create a peer
2602         peer_1 = VppWgPeer(
2603             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
2604         ).add_vpp_config()
2605         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
2606
2607         # create a route to rewrite traffic into the wg interface
2608         r1 = VppIpRoute(
2609             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
2610         ).add_vpp_config()
2611
2612         # resolve ARP and expect the adjacency to update
2613         self.pg1.resolve_arp()
2614
2615         # wait for the peer to send a handshake initiation
2616         rxs = self.pg1.get_capture(2, timeout=6)
2617
2618         # prepare and send a handshake response
2619         # expect a keepalive message
2620         resp = peer_1.consume_init(rxs[1], self.pg1)
2621         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
2622
2623         # verify the keepalive message
2624         b = peer_1.decrypt_transport(rxs[0])
2625         self.assertEqual(0, len(b))
2626
2627         # prepare and send a packet that will be rewritten into the wg interface
2628         # expect a data packet sent
2629         p = (
2630             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2631             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
2632             / UDP(sport=555, dport=556)
2633             / Raw()
2634         )
2635         rxs = self.send_and_expect(self.pg0, [p], self.pg1)
2636
2637         # verify the data packet
2638         peer_1.validate_encapped(rxs, p)
2639
2640         # remove configs
2641         r1.remove_vpp_config()
2642         peer_1.remove_vpp_config()
2643         wg0.remove_vpp_config()