ede02f109fae0e9876dc1849845636c9d57acf39
[vpp.git] / test / test_wireguard.py
1 #!/usr/bin/env python3
2 """ Wg tests """
3
4 import datetime
5 import base64
6 import os
7
8 from hashlib import blake2s
9 from config import config
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether
12 from scapy.layers.inet import IP, UDP
13 from scapy.layers.inet6 import IPv6
14 from scapy.layers.vxlan import VXLAN
15 from scapy.contrib.wireguard import (
16     Wireguard,
17     WireguardResponse,
18     WireguardInitiation,
19     WireguardTransport,
20     WireguardCookieReply,
21 )
22 from cryptography.hazmat.primitives.asymmetric.x25519 import (
23     X25519PrivateKey,
24     X25519PublicKey,
25 )
26 from cryptography.hazmat.primitives.serialization import (
27     Encoding,
28     PrivateFormat,
29     PublicFormat,
30     NoEncryption,
31 )
32 from noise.connection import NoiseConnection, Keypair
33
34 from Crypto.Cipher import ChaCha20_Poly1305
35 from Crypto.Random import get_random_bytes
36
37 from vpp_interface import VppInterface
38 from vpp_pg_interface import is_ipv6_misc
39 from vpp_ip_route import VppIpRoute, VppRoutePath
40 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
41 from vpp_vxlan_tunnel import VppVxlanTunnel
42 from vpp_object import VppObject
43 from vpp_papi import VppEnum
44 from asfframework import tag_run_solo, tag_fixme_vpp_debug
45 from framework import VppTestCase
46 from re import compile
47 import unittest
48
49 """ TestWg is a subclass of  VPPTestCase classes.
50
51 Wg test.
52
53 """
54
55
56 def private_key_bytes(k):
57     return k.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption())
58
59
60 def public_key_bytes(k):
61     return k.public_bytes(Encoding.Raw, PublicFormat.Raw)
62
63
64 def get_field_bytes(pkt, name):
65     fld, val = pkt.getfield_and_val(name)
66     return fld.i2m(pkt, val)
67
68
69 class VppWgInterface(VppInterface):
70     """
71     VPP WireGuard interface
72     """
73
74     def __init__(self, test, src, port):
75         super(VppWgInterface, self).__init__(test)
76
77         self.port = port
78         self.src = src
79         self.private_key = X25519PrivateKey.generate()
80         self.public_key = self.private_key.public_key()
81
82         # cookie related params
83         self.cookie_key = blake2s(b"cookie--" + self.public_key_bytes()).digest()
84
85     def public_key_bytes(self):
86         return public_key_bytes(self.public_key)
87
88     def private_key_bytes(self):
89         return private_key_bytes(self.private_key)
90
91     def add_vpp_config(self):
92         r = self.test.vapi.wireguard_interface_create(
93             interface={
94                 "user_instance": 0xFFFFFFFF,
95                 "port": self.port,
96                 "src_ip": self.src,
97                 "private_key": private_key_bytes(self.private_key),
98                 "generate_key": False,
99             }
100         )
101         self.set_sw_if_index(r.sw_if_index)
102         self.test.registry.register(self, self.test.logger)
103         return self
104
105     def remove_vpp_config(self):
106         self.test.vapi.wireguard_interface_delete(sw_if_index=self._sw_if_index)
107
108     def query_vpp_config(self):
109         ts = self.test.vapi.wireguard_interface_dump(sw_if_index=0xFFFFFFFF)
110         for t in ts:
111             if (
112                 t.interface.sw_if_index == self._sw_if_index
113                 and str(t.interface.src_ip) == self.src
114                 and t.interface.port == self.port
115                 and t.interface.private_key == private_key_bytes(self.private_key)
116             ):
117                 return True
118         return False
119
120     def want_events(self, peer_index=0xFFFFFFFF):
121         self.test.vapi.want_wireguard_peer_events(
122             enable_disable=1,
123             pid=os.getpid(),
124             sw_if_index=self._sw_if_index,
125             peer_index=peer_index,
126         )
127
128     def wait_events(self, expect, peers, timeout=5):
129         for i in range(len(peers)):
130             rv = self.test.vapi.wait_for_event(timeout, "wireguard_peer_event")
131             self.test.assertEqual(rv.peer_index, peers[i])
132             self.test.assertEqual(rv.flags, expect)
133
134     def __str__(self):
135         return self.object_id()
136
137     def object_id(self):
138         return "wireguard-%d" % self._sw_if_index
139
140
141 NOISE_HANDSHAKE_NAME = b"Noise_IKpsk2_25519_ChaChaPoly_BLAKE2s"
142 NOISE_IDENTIFIER_NAME = b"WireGuard v1 zx2c4 Jason@zx2c4.com"
143
144 HANDSHAKE_COUNTING_INTERVAL = 0.5
145 UNDER_LOAD_INTERVAL = 1.0
146 HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD = 40
147 HANDSHAKE_NUM_BEFORE_RATELIMITING = 5
148
149 HANDSHAKE_JITTER = 0.5
150
151
152 class VppWgPeer(VppObject):
153     def __init__(self, test, itf, endpoint, port, allowed_ips, persistent_keepalive=15):
154         self._test = test
155         self.itf = itf
156         self.endpoint = endpoint
157         self.port = port
158         self.allowed_ips = allowed_ips
159         self.persistent_keepalive = persistent_keepalive
160
161         # remote peer's public
162         self.private_key = X25519PrivateKey.generate()
163         self.public_key = self.private_key.public_key()
164
165         # cookie related params
166         self.cookie_key = blake2s(b"cookie--" + self.public_key_bytes()).digest()
167         self.last_sent_cookie = None
168         self.last_mac1 = None
169         self.last_received_cookie = None
170
171         self.noise = NoiseConnection.from_name(NOISE_HANDSHAKE_NAME)
172
173     def change_endpoint(self, endpoint, port):
174         self.endpoint = endpoint
175         self.port = port
176
177     def add_vpp_config(self):
178         rv = self._test.vapi.wireguard_peer_add(
179             peer={
180                 "public_key": self.public_key_bytes(),
181                 "port": self.port,
182                 "endpoint": self.endpoint,
183                 "n_allowed_ips": len(self.allowed_ips),
184                 "allowed_ips": self.allowed_ips,
185                 "sw_if_index": self.itf.sw_if_index,
186                 "persistent_keepalive": self.persistent_keepalive,
187             }
188         )
189         self.index = rv.peer_index
190         self.receiver_index = self.index + 1
191         self._test.registry.register(self, self._test.logger)
192         return self
193
194     def remove_vpp_config(self):
195         self._test.vapi.wireguard_peer_remove(peer_index=self.index)
196
197     def object_id(self):
198         return "wireguard-peer-%s" % self.index
199
200     def public_key_bytes(self):
201         return public_key_bytes(self.public_key)
202
203     def query_vpp_config(self):
204         peers = self._test.vapi.wireguard_peers_dump()
205
206         for p in peers:
207             # "::" endpoint will be returned as "0.0.0.0" in peer's details
208             endpoint = "0.0.0.0" if self.endpoint == "::" else self.endpoint
209             if (
210                 p.peer.public_key == self.public_key_bytes()
211                 and p.peer.port == self.port
212                 and str(p.peer.endpoint) == endpoint
213                 and p.peer.sw_if_index == self.itf.sw_if_index
214                 and len(self.allowed_ips) == p.peer.n_allowed_ips
215             ):
216                 self.allowed_ips.sort()
217                 p.peer.allowed_ips.sort()
218
219                 for a1, a2 in zip(self.allowed_ips, p.peer.allowed_ips):
220                     if str(a1) != str(a2):
221                         return False
222                 return True
223         return False
224
225     def mk_tunnel_header(self, tx_itf, is_ip6=False):
226         if is_ip6 is False:
227             return (
228                 Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac)
229                 / IP(src=self.endpoint, dst=self.itf.src)
230                 / UDP(sport=self.port, dport=self.itf.port)
231             )
232         else:
233             return (
234                 Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac)
235                 / IPv6(src=self.endpoint, dst=self.itf.src)
236                 / UDP(sport=self.port, dport=self.itf.port)
237             )
238
239     def noise_reset(self):
240         self.noise = NoiseConnection.from_name(NOISE_HANDSHAKE_NAME)
241
242     def noise_init(self, public_key=None):
243         self.noise.set_prologue(NOISE_IDENTIFIER_NAME)
244         self.noise.set_psks(psk=bytes(bytearray(32)))
245
246         if not public_key:
247             public_key = self.itf.public_key
248
249         # local/this private
250         self.noise.set_keypair_from_private_bytes(
251             Keypair.STATIC, private_key_bytes(self.private_key)
252         )
253         # remote's public
254         self.noise.set_keypair_from_public_bytes(
255             Keypair.REMOTE_STATIC, public_key_bytes(public_key)
256         )
257
258         self.noise.start_handshake()
259
260     def mk_cookie(self, p, tx_itf, is_resp=False, is_ip6=False):
261         self.verify_header(p, is_ip6)
262
263         wg_pkt = Wireguard(p[Raw])
264
265         if is_resp:
266             self._test.assertEqual(wg_pkt[Wireguard].message_type, 2)
267             self._test.assertEqual(wg_pkt[Wireguard].reserved_zero, 0)
268             self._test.assertEqual(wg_pkt[WireguardResponse].mac2, bytes([0] * 16))
269         else:
270             self._test.assertEqual(wg_pkt[Wireguard].message_type, 1)
271             self._test.assertEqual(wg_pkt[Wireguard].reserved_zero, 0)
272             self._test.assertEqual(wg_pkt[WireguardInitiation].mac2, bytes([0] * 16))
273
274         # collect info from wg packet (initiation or response)
275         src = get_field_bytes(p[IPv6 if is_ip6 else IP], "src")
276         sport = p[UDP].sport.to_bytes(2, byteorder="big")
277         if is_resp:
278             mac1 = wg_pkt[WireguardResponse].mac1
279             sender_index = wg_pkt[WireguardResponse].sender_index
280         else:
281             mac1 = wg_pkt[WireguardInitiation].mac1
282             sender_index = wg_pkt[WireguardInitiation].sender_index
283
284         # make cookie reply
285         cookie_reply = Wireguard() / WireguardCookieReply()
286         cookie_reply[Wireguard].message_type = 3
287         cookie_reply[Wireguard].reserved_zero = 0
288         cookie_reply[WireguardCookieReply].receiver_index = sender_index
289         nonce = get_random_bytes(24)
290         cookie_reply[WireguardCookieReply].nonce = nonce
291
292         # generate cookie data
293         changing_secret = get_random_bytes(32)
294         self.last_sent_cookie = blake2s(
295             src + sport, digest_size=16, key=changing_secret
296         ).digest()
297
298         # encrypt cookie data
299         cipher = ChaCha20_Poly1305.new(key=self.cookie_key, nonce=nonce)
300         cipher.update(mac1)
301         ciphertext, tag = cipher.encrypt_and_digest(self.last_sent_cookie)
302         cookie_reply[WireguardCookieReply].encrypted_cookie = ciphertext + tag
303
304         # prepare cookie reply to be sent
305         cookie_reply = self.mk_tunnel_header(tx_itf, is_ip6) / cookie_reply
306
307         return cookie_reply
308
309     def consume_cookie(self, p, is_ip6=False):
310         self.verify_header(p, is_ip6)
311
312         cookie_reply = Wireguard(p[Raw])
313
314         self._test.assertEqual(cookie_reply[Wireguard].message_type, 3)
315         self._test.assertEqual(cookie_reply[Wireguard].reserved_zero, 0)
316         self._test.assertEqual(
317             cookie_reply[WireguardCookieReply].receiver_index, self.receiver_index
318         )
319
320         # collect info from cookie reply
321         nonce = cookie_reply[WireguardCookieReply].nonce
322         encrypted_cookie = cookie_reply[WireguardCookieReply].encrypted_cookie
323         ciphertext, tag = encrypted_cookie[:16], encrypted_cookie[16:]
324
325         # decrypt cookie data
326         cipher = ChaCha20_Poly1305.new(key=self.itf.cookie_key, nonce=nonce)
327         cipher.update(self.last_mac1)
328         self.last_received_cookie = cipher.decrypt_and_verify(ciphertext, tag)
329
330     def mk_handshake(self, tx_itf, is_ip6=False, public_key=None):
331         self.noise.set_as_initiator()
332         self.noise_init(public_key)
333
334         p = Wireguard() / WireguardInitiation()
335
336         p[Wireguard].message_type = 1
337         p[Wireguard].reserved_zero = 0
338         p[WireguardInitiation].sender_index = self.receiver_index
339
340         # some random data for the message
341         #  lifted from the noise protocol's wireguard example
342         now = datetime.datetime.now()
343         tai = struct.pack(
344             "!qi",
345             4611686018427387914 + int(now.timestamp()),
346             int(now.microsecond * 1e3),
347         )
348         b = self.noise.write_message(payload=tai)
349
350         # load noise into init message
351         p[WireguardInitiation].unencrypted_ephemeral = b[0:32]
352         p[WireguardInitiation].encrypted_static = b[32:80]
353         p[WireguardInitiation].encrypted_timestamp = b[80:108]
354
355         # generate the mac1 hash
356         mac_key = blake2s(b"mac1----" + self.itf.public_key_bytes()).digest()
357         mac1 = blake2s(bytes(p)[0:116], digest_size=16, key=mac_key).digest()
358         p[WireguardInitiation].mac1 = mac1
359         self.last_mac1 = mac1
360
361         # generate the mac2 hash
362         if self.last_received_cookie:
363             mac2 = blake2s(
364                 bytes(p)[0:132], digest_size=16, key=self.last_received_cookie
365             ).digest()
366             p[WireguardInitiation].mac2 = mac2
367             self.last_received_cookie = None
368         else:
369             p[WireguardInitiation].mac2 = bytearray(16)
370
371         p = self.mk_tunnel_header(tx_itf, is_ip6) / p
372
373         return p
374
375     def verify_header(self, p, is_ip6=False):
376         if is_ip6 is False:
377             self._test.assertEqual(p[IP].src, self.itf.src)
378             self._test.assertEqual(p[IP].dst, self.endpoint)
379             self._test.assert_packet_checksums_valid(p)
380         else:
381             self._test.assertEqual(p[IPv6].src, self.itf.src)
382             self._test.assertEqual(p[IPv6].dst, self.endpoint)
383             self._test.assert_packet_checksums_valid(p, False)
384         self._test.assertEqual(p[UDP].sport, self.itf.port)
385         self._test.assertEqual(p[UDP].dport, self.port)
386
387     def consume_init(self, p, tx_itf, is_ip6=False, is_mac2=False):
388         self.noise.set_as_responder()
389         self.noise_init(self.itf.public_key)
390         self.verify_header(p, is_ip6)
391
392         init = Wireguard(p[Raw])
393
394         self._test.assertEqual(init[Wireguard].message_type, 1)
395         self._test.assertEqual(init[Wireguard].reserved_zero, 0)
396
397         self.sender = init[WireguardInitiation].sender_index
398
399         # validate the mac1 hash
400         mac_key = blake2s(b"mac1----" + public_key_bytes(self.public_key)).digest()
401         mac1 = blake2s(bytes(init)[0:-32], digest_size=16, key=mac_key).digest()
402         self._test.assertEqual(init[WireguardInitiation].mac1, mac1)
403
404         # validate the mac2 hash
405         if is_mac2:
406             self._test.assertNotEqual(init[WireguardInitiation].mac2, bytes([0] * 16))
407             self._test.assertNotEqual(self.last_sent_cookie, None)
408             mac2 = blake2s(
409                 bytes(init)[0:-16], digest_size=16, key=self.last_sent_cookie
410             ).digest()
411             self._test.assertEqual(init[WireguardInitiation].mac2, mac2)
412             self.last_sent_cookie = None
413         else:
414             self._test.assertEqual(init[WireguardInitiation].mac2, bytes([0] * 16))
415
416         # this passes only unencrypted_ephemeral, encrypted_static,
417         # encrypted_timestamp fields of the init
418         payload = self.noise.read_message(bytes(init)[8:-32])
419
420         # build the response
421         b = self.noise.write_message()
422         mac_key = blake2s(b"mac1----" + public_key_bytes(self.itf.public_key)).digest()
423         resp = Wireguard(message_type=2, reserved_zero=0) / WireguardResponse(
424             sender_index=self.receiver_index,
425             receiver_index=self.sender,
426             unencrypted_ephemeral=b[0:32],
427             encrypted_nothing=b[32:],
428         )
429         mac1 = blake2s(bytes(resp)[:-32], digest_size=16, key=mac_key).digest()
430         resp[WireguardResponse].mac1 = mac1
431         self.last_mac1 = mac1
432
433         resp = self.mk_tunnel_header(tx_itf, is_ip6) / resp
434         self._test.assertTrue(self.noise.handshake_finished)
435
436         return resp
437
438     def consume_response(self, p, is_ip6=False):
439         self.verify_header(p, is_ip6)
440
441         resp = Wireguard(p[Raw])
442
443         self._test.assertEqual(resp[Wireguard].message_type, 2)
444         self._test.assertEqual(resp[Wireguard].reserved_zero, 0)
445         self._test.assertEqual(
446             resp[WireguardResponse].receiver_index, self.receiver_index
447         )
448
449         self.sender = resp[Wireguard].sender_index
450
451         payload = self.noise.read_message(bytes(resp)[12:60])
452         self._test.assertEqual(payload, b"")
453         self._test.assertTrue(self.noise.handshake_finished)
454
455     def decrypt_transport(self, p, is_ip6=False):
456         self.verify_header(p, is_ip6)
457
458         p = Wireguard(p[Raw])
459         self._test.assertEqual(p[Wireguard].message_type, 4)
460         self._test.assertEqual(p[Wireguard].reserved_zero, 0)
461         self._test.assertEqual(
462             p[WireguardTransport].receiver_index, self.receiver_index
463         )
464
465         d = self.noise.decrypt(p[WireguardTransport].encrypted_encapsulated_packet)
466         return d
467
468     def encrypt_transport(self, p):
469         return self.noise.encrypt(bytes(p))
470
471     def validate_encapped(self, rxs, tx, is_tunnel_ip6=False, is_transport_ip6=False):
472         ret_rxs = []
473         for rx in rxs:
474             rx = self.decrypt_transport(rx, is_tunnel_ip6)
475             if is_transport_ip6 is False:
476                 rx = IP(rx)
477                 # check the original packet is present
478                 self._test.assertEqual(rx[IP].dst, tx[IP].dst)
479                 self._test.assertEqual(rx[IP].ttl, tx[IP].ttl - 1)
480             else:
481                 rx = IPv6(rx)
482                 # check the original packet is present
483                 self._test.assertEqual(rx[IPv6].dst, tx[IPv6].dst)
484                 self._test.assertEqual(rx[IPv6].hlim, tx[IPv6].hlim - 1)
485             ret_rxs.append(rx)
486         return ret_rxs
487
488     def want_events(self):
489         self._test.vapi.want_wireguard_peer_events(
490             enable_disable=1,
491             pid=os.getpid(),
492             peer_index=self.index,
493             sw_if_index=self.itf.sw_if_index,
494         )
495
496     def wait_event(self, expect, timeout=5):
497         rv = self._test.vapi.wait_for_event(timeout, "wireguard_peer_event")
498         self._test.assertEqual(rv.flags, expect)
499         self._test.assertEqual(rv.peer_index, self.index)
500
501
502 def is_handshake_init(p):
503     wg_p = Wireguard(p[Raw])
504
505     return wg_p[Wireguard].message_type == 1
506
507
508 @unittest.skipIf(
509     "wireguard" in config.excluded_plugins, "Exclude Wireguard plugin tests"
510 )
511 @tag_run_solo
512 class TestWg(VppTestCase):
513     """Wireguard Test Case"""
514
515     error_str = compile(r"Error")
516
517     wg4_output_node_name = "/err/wg4-output-tun/"
518     wg4_input_node_name = "/err/wg4-input/"
519     wg6_output_node_name = "/err/wg6-output-tun/"
520     wg6_input_node_name = "/err/wg6-input/"
521     kp4_error = wg4_output_node_name + "Keypair error"
522     mac4_error = wg4_input_node_name + "Invalid MAC handshake"
523     peer4_in_err = wg4_input_node_name + "Peer error"
524     peer4_out_err = wg4_output_node_name + "Peer error"
525     kp6_error = wg6_output_node_name + "Keypair error"
526     mac6_error = wg6_input_node_name + "Invalid MAC handshake"
527     peer6_in_err = wg6_input_node_name + "Peer error"
528     peer6_out_err = wg6_output_node_name + "Peer error"
529     cookie_dec4_err = wg4_input_node_name + "Failed during Cookie decryption"
530     cookie_dec6_err = wg6_input_node_name + "Failed during Cookie decryption"
531     ratelimited4_err = wg4_input_node_name + "Handshake ratelimited"
532     ratelimited6_err = wg6_input_node_name + "Handshake ratelimited"
533
534     @classmethod
535     def setUpClass(cls):
536         super(TestWg, cls).setUpClass()
537         try:
538             cls.create_pg_interfaces(range(3))
539             for i in cls.pg_interfaces:
540                 i.admin_up()
541                 i.config_ip4()
542                 i.config_ip6()
543                 i.resolve_arp()
544                 i.resolve_ndp()
545
546         except Exception:
547             super(TestWg, cls).tearDownClass()
548             raise
549
550     @classmethod
551     def tearDownClass(cls):
552         super(TestWg, cls).tearDownClass()
553
554     def setUp(self):
555         super(VppTestCase, self).setUp()
556         self.base_kp4_err = self.statistics.get_err_counter(self.kp4_error)
557         self.base_mac4_err = self.statistics.get_err_counter(self.mac4_error)
558         self.base_peer4_in_err = self.statistics.get_err_counter(self.peer4_in_err)
559         self.base_peer4_out_err = self.statistics.get_err_counter(self.peer4_out_err)
560         self.base_kp6_err = self.statistics.get_err_counter(self.kp6_error)
561         self.base_mac6_err = self.statistics.get_err_counter(self.mac6_error)
562         self.base_peer6_in_err = self.statistics.get_err_counter(self.peer6_in_err)
563         self.base_peer6_out_err = self.statistics.get_err_counter(self.peer6_out_err)
564         self.base_cookie_dec4_err = self.statistics.get_err_counter(
565             self.cookie_dec4_err
566         )
567         self.base_cookie_dec6_err = self.statistics.get_err_counter(
568             self.cookie_dec6_err
569         )
570         self.base_ratelimited4_err = self.statistics.get_err_counter(
571             self.ratelimited4_err
572         )
573         self.base_ratelimited6_err = self.statistics.get_err_counter(
574             self.ratelimited6_err
575         )
576
577     def send_and_assert_no_replies_ignoring_init(
578         self, intf, pkts, remark="", timeout=None
579     ):
580         self.pg_send(intf, pkts)
581
582         def _filter_out_fn(p):
583             return is_ipv6_misc(p) or is_handshake_init(p)
584
585         try:
586             if not timeout:
587                 timeout = 1
588             for i in self.pg_interfaces:
589                 i.assert_nothing_captured(
590                     timeout=timeout, remark=remark, filter_out_fn=_filter_out_fn
591                 )
592                 timeout = 0.1
593         finally:
594             pass
595
596     def test_wg_interface(self):
597         """Simple interface creation"""
598         port = 12312
599
600         # Create interface
601         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
602
603         self.logger.info(self.vapi.cli("sh int"))
604
605         # delete interface
606         wg0.remove_vpp_config()
607
608     def test_handshake_hash(self):
609         """test hashing an init message"""
610         # a init packet generated by linux given the key below
611         h = (
612             "0100000098b9032b"
613             "55cc4b39e73c3d24"
614             "a2a1ab884b524a81"
615             "1808bb86640fb70d"
616             "e93154fec1879125"
617             "ab012624a27f0b75"
618             "c0a2582f438ddb5f"
619             "8e768af40b4ab444"
620             "02f9ff473e1b797e"
621             "80d39d93c5480c82"
622             "a3d4510f70396976"
623             "586fb67300a5167b"
624             "ae6ca3ff3dfd00eb"
625             "59be198810f5aa03"
626             "6abc243d2155ee4f"
627             "2336483900aef801"
628             "08752cd700000000"
629             "0000000000000000"
630             "00000000"
631         )
632
633         b = bytearray.fromhex(h)
634         tgt = Wireguard(b)
635
636         pubb = base64.b64decode("aRuHFTTxICIQNefp05oKWlJv3zgKxb8+WW7JJMh0jyM=")
637         pub = X25519PublicKey.from_public_bytes(pubb)
638
639         self.assertEqual(pubb, public_key_bytes(pub))
640
641         # strip the macs and build a new packet
642         init = b[0:-32]
643         mac_key = blake2s(b"mac1----" + public_key_bytes(pub)).digest()
644         init += blake2s(init, digest_size=16, key=mac_key).digest()
645         init += b"\x00" * 16
646
647         act = Wireguard(init)
648
649         self.assertEqual(tgt, act)
650
651     def _test_wg_send_cookie_tmpl(self, is_resp, is_ip6):
652         port = 12323
653
654         # create wg interface
655         if is_ip6:
656             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
657             wg0.admin_up()
658             wg0.config_ip6()
659         else:
660             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
661             wg0.admin_up()
662             wg0.config_ip4()
663
664         self.pg_enable_capture(self.pg_interfaces)
665         self.pg_start()
666
667         # create a peer
668         if is_ip6:
669             peer_1 = VppWgPeer(
670                 self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
671             ).add_vpp_config()
672         else:
673             peer_1 = VppWgPeer(
674                 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
675             ).add_vpp_config()
676         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
677
678         if is_resp:
679             # skip the first automatic handshake
680             self.pg1.get_capture(1, timeout=HANDSHAKE_JITTER)
681
682             # prepare and send a handshake initiation
683             # expect the peer to send a handshake response
684             init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
685             rxs = self.send_and_expect(self.pg1, [init], self.pg1)
686         else:
687             # wait for the peer to send a handshake initiation
688             rxs = self.pg1.get_capture(1, timeout=2)
689
690         # prepare and send a wrong cookie reply
691         # expect no replies and the cookie error incremented
692         cookie = peer_1.mk_cookie(rxs[0], self.pg1, is_resp=is_resp, is_ip6=is_ip6)
693         cookie.nonce = b"1234567890"
694         self.send_and_assert_no_replies(self.pg1, [cookie], timeout=0.1)
695         if is_ip6:
696             self.assertEqual(
697                 self.base_cookie_dec6_err + 1,
698                 self.statistics.get_err_counter(self.cookie_dec6_err),
699             )
700         else:
701             self.assertEqual(
702                 self.base_cookie_dec4_err + 1,
703                 self.statistics.get_err_counter(self.cookie_dec4_err),
704             )
705
706         # prepare and send a correct cookie reply
707         cookie = peer_1.mk_cookie(rxs[0], self.pg1, is_resp=is_resp, is_ip6=is_ip6)
708         self.pg_send(self.pg1, [cookie])
709
710         # wait for the peer to send a handshake initiation with mac2 set
711         rxs = self.pg1.get_capture(1, timeout=6)
712
713         # verify the initiation and its mac2
714         peer_1.consume_init(rxs[0], self.pg1, is_ip6=is_ip6, is_mac2=True)
715
716         # remove configs
717         peer_1.remove_vpp_config()
718         wg0.remove_vpp_config()
719
720     def test_wg_send_cookie_on_init_v4(self):
721         """Send cookie on handshake initiation (v4)"""
722         self._test_wg_send_cookie_tmpl(is_resp=False, is_ip6=False)
723
724     def test_wg_send_cookie_on_init_v6(self):
725         """Send cookie on handshake initiation (v6)"""
726         self._test_wg_send_cookie_tmpl(is_resp=False, is_ip6=True)
727
728     def test_wg_send_cookie_on_resp_v4(self):
729         """Send cookie on handshake response (v4)"""
730         self._test_wg_send_cookie_tmpl(is_resp=True, is_ip6=False)
731
732     def test_wg_send_cookie_on_resp_v6(self):
733         """Send cookie on handshake response (v6)"""
734         self._test_wg_send_cookie_tmpl(is_resp=True, is_ip6=True)
735
736     def _test_wg_receive_cookie_tmpl(self, is_resp, is_ip6):
737         port = 12323
738
739         # create wg interface
740         if is_ip6:
741             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
742             wg0.admin_up()
743             wg0.config_ip6()
744         else:
745             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
746             wg0.admin_up()
747             wg0.config_ip4()
748
749         self.pg_enable_capture(self.pg_interfaces)
750         self.pg_start()
751
752         # create a peer
753         if is_ip6:
754             peer_1 = VppWgPeer(
755                 self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
756             ).add_vpp_config()
757         else:
758             peer_1 = VppWgPeer(
759                 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
760             ).add_vpp_config()
761         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
762
763         if is_resp:
764             # wait for the peer to send a handshake initiation
765             rxs = self.pg1.get_capture(1, timeout=2)
766             # prepare and send a bunch of handshake responses
767             # expect to switch to under load state
768             resp = peer_1.consume_init(rxs[0], self.pg1, is_ip6=is_ip6)
769             txs = [resp] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
770             rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
771             # reset noise to be able to turn into initiator later
772             peer_1.noise_reset()
773         else:
774             # skip the first automatic handshake
775             self.pg1.get_capture(1, timeout=HANDSHAKE_JITTER)
776
777             # prepare and send a bunch of handshake initiations
778             # expect to switch to under load state
779             init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
780             txs = [init] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
781             rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
782
783         # expect the peer to send a cookie reply
784         peer_1.consume_cookie(rxs[-1], is_ip6=is_ip6)
785
786         # prepare and send a handshake initiation with wrong mac2
787         # expect a cookie reply
788         init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
789         init.mac2 = b"1234567890"
790         rxs = self.send_and_expect(self.pg1, [init], self.pg1)
791         peer_1.consume_cookie(rxs[0], is_ip6=is_ip6)
792
793         # prepare and send a handshake initiation with correct mac2
794         # expect a handshake response
795         init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
796         rxs = self.send_and_expect(self.pg1, [init], self.pg1)
797
798         # verify the response
799         peer_1.consume_response(rxs[0], is_ip6=is_ip6)
800
801         # clear up under load state
802         self.sleep(UNDER_LOAD_INTERVAL)
803
804         # remove configs
805         peer_1.remove_vpp_config()
806         wg0.remove_vpp_config()
807
808     def test_wg_receive_cookie_on_init_v4(self):
809         """Receive cookie on handshake initiation (v4)"""
810         self._test_wg_receive_cookie_tmpl(is_resp=False, is_ip6=False)
811
812     def test_wg_receive_cookie_on_init_v6(self):
813         """Receive cookie on handshake initiation (v6)"""
814         self._test_wg_receive_cookie_tmpl(is_resp=False, is_ip6=True)
815
816     def test_wg_receive_cookie_on_resp_v4(self):
817         """Receive cookie on handshake response (v4)"""
818         self._test_wg_receive_cookie_tmpl(is_resp=True, is_ip6=False)
819
820     def test_wg_receive_cookie_on_resp_v6(self):
821         """Receive cookie on handshake response (v6)"""
822         self._test_wg_receive_cookie_tmpl(is_resp=True, is_ip6=True)
823
824     def test_wg_under_load_interval(self):
825         """Under load interval"""
826         port = 12323
827
828         # create wg interface
829         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
830         wg0.admin_up()
831         wg0.config_ip4()
832
833         self.pg_enable_capture(self.pg_interfaces)
834         self.pg_start()
835
836         # create a peer
837         peer_1 = VppWgPeer(
838             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
839         ).add_vpp_config()
840         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
841
842         # skip the first automatic handshake
843         self.pg1.get_capture(1, timeout=HANDSHAKE_JITTER)
844
845         # prepare and send a bunch of handshake initiations
846         # expect to switch to under load state
847         init = peer_1.mk_handshake(self.pg1)
848         txs = [init] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
849         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
850
851         # expect the peer to send a cookie reply
852         peer_1.consume_cookie(rxs[-1])
853
854         # sleep till the next counting interval
855         # expect under load state is still active
856         self.sleep(HANDSHAKE_COUNTING_INTERVAL)
857
858         # prepare and send a handshake initiation with wrong mac2
859         # expect a cookie reply
860         init = peer_1.mk_handshake(self.pg1)
861         init.mac2 = b"1234567890"
862         rxs = self.send_and_expect(self.pg1, [init], self.pg1)
863         peer_1.consume_cookie(rxs[0])
864
865         # sleep till the end of being under load
866         # expect under load state is over
867         self.sleep(UNDER_LOAD_INTERVAL - HANDSHAKE_COUNTING_INTERVAL)
868
869         # prepare and send a handshake initiation with wrong mac2
870         # expect a handshake response
871         init = peer_1.mk_handshake(self.pg1)
872         init.mac2 = b"1234567890"
873         rxs = self.send_and_expect(self.pg1, [init], self.pg1)
874
875         # verify the response
876         peer_1.consume_response(rxs[0])
877
878         # remove configs
879         peer_1.remove_vpp_config()
880         wg0.remove_vpp_config()
881
882     def _test_wg_handshake_ratelimiting_tmpl(self, is_ip6):
883         port = 12323
884
885         # create wg interface
886         if is_ip6:
887             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
888             wg0.admin_up()
889             wg0.config_ip6()
890         else:
891             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
892             wg0.admin_up()
893             wg0.config_ip4()
894
895         self.pg_enable_capture(self.pg_interfaces)
896         self.pg_start()
897
898         # create a peer
899         if is_ip6:
900             peer_1 = VppWgPeer(
901                 self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
902             ).add_vpp_config()
903         else:
904             peer_1 = VppWgPeer(
905                 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
906             ).add_vpp_config()
907         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
908
909         # skip the first automatic handshake
910         self.pg1.get_capture(1, timeout=HANDSHAKE_JITTER)
911
912         # prepare and send a bunch of handshake initiations
913         # expect to switch to under load state
914         init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
915         txs = [init] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
916         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
917
918         # expect the peer to send a cookie reply
919         peer_1.consume_cookie(rxs[-1], is_ip6=is_ip6)
920
921         # prepare and send a bunch of handshake initiations with correct mac2
922         # expect a handshake response and then ratelimiting
923         NUM_TO_REJECT = 10
924         init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
925         txs = [init] * (HANDSHAKE_NUM_BEFORE_RATELIMITING + NUM_TO_REJECT)
926
927         # TODO: Deterimine why no handshake response is sent back if test is
928         #       not run in as part of the test suite.  It fails only very occasionally
929         #       when run solo.
930         #
931         #       Until then, if no response, don't fail trying to verify it.
932         #       The error counter test still verifies that the correct number of
933         #       handshake initiaions are ratelimited.
934         try:
935             rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
936         except:
937             self.logger.debug(
938                 f"{self._testMethodDoc}: send_and_expect_some() failed to get any response packets."
939             )
940             rxs = None
941             pass
942
943         if is_ip6:
944             self.assertEqual(
945                 self.base_ratelimited6_err + NUM_TO_REJECT,
946                 self.statistics.get_err_counter(self.ratelimited6_err),
947             )
948         else:
949             self.assertEqual(
950                 self.base_ratelimited4_err + NUM_TO_REJECT,
951                 self.statistics.get_err_counter(self.ratelimited4_err),
952             )
953
954         # verify the response
955         if rxs is not None:
956             peer_1.consume_response(rxs[0], is_ip6=is_ip6)
957
958         # clear up under load state
959         self.sleep(UNDER_LOAD_INTERVAL)
960
961         # remove configs
962         peer_1.remove_vpp_config()
963         wg0.remove_vpp_config()
964
965     def test_wg_handshake_ratelimiting_v4(self):
966         """Handshake ratelimiting (v4)"""
967         self._test_wg_handshake_ratelimiting_tmpl(is_ip6=False)
968
969     def test_wg_handshake_ratelimiting_v6(self):
970         """Handshake ratelimiting (v6)"""
971         self._test_wg_handshake_ratelimiting_tmpl(is_ip6=True)
972
973     def test_wg_handshake_ratelimiting_multi_peer(self):
974         """Handshake ratelimiting (multiple peer)"""
975         port = 12323
976
977         # create wg interface
978         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
979         wg0.admin_up()
980         wg0.config_ip4()
981
982         self.pg_enable_capture(self.pg_interfaces)
983         self.pg_start()
984
985         # create two peers
986         NUM_PEERS = 2
987         self.pg1.generate_remote_hosts(NUM_PEERS)
988         self.pg1.configure_ipv4_neighbors()
989
990         peer_1 = VppWgPeer(
991             self, wg0, self.pg1.remote_hosts[0].ip4, port + 1, ["10.11.3.0/24"]
992         ).add_vpp_config()
993         peer_2 = VppWgPeer(
994             self, wg0, self.pg1.remote_hosts[1].ip4, port + 1, ["10.11.4.0/24"]
995         ).add_vpp_config()
996         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 2)
997
998         # skip the first automatic handshake
999         self.pg1.get_capture(NUM_PEERS, timeout=HANDSHAKE_JITTER)
1000
1001         # (peer_1) prepare and send a bunch of handshake initiations
1002         # expect not to switch to under load state
1003         init_1 = peer_1.mk_handshake(self.pg1)
1004         txs = [init_1] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
1005         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
1006
1007         # (peer_1) expect the peer to send a handshake response
1008         peer_1.consume_response(rxs[0])
1009         peer_1.noise_reset()
1010
1011         # (peer_1) send another bunch of handshake initiations
1012         # expect to switch to under load state
1013         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
1014
1015         # (peer_1) expect the peer to send a cookie reply
1016         peer_1.consume_cookie(rxs[-1])
1017
1018         # (peer_2) prepare and send a handshake initiation
1019         # expect a cookie reply
1020         init_2 = peer_2.mk_handshake(self.pg1)
1021         rxs = self.send_and_expect(self.pg1, [init_2], self.pg1)
1022         peer_2.consume_cookie(rxs[0])
1023
1024         # (peer_1) (peer_2) prepare and send a bunch of handshake initiations with correct mac2
1025         # expect a handshake response and then ratelimiting
1026         PEER_1_NUM_TO_REJECT = 2
1027         PEER_2_NUM_TO_REJECT = 5
1028         init_1 = peer_1.mk_handshake(self.pg1)
1029         txs = [init_1] * (HANDSHAKE_NUM_BEFORE_RATELIMITING + PEER_1_NUM_TO_REJECT)
1030         init_2 = peer_2.mk_handshake(self.pg1)
1031         txs += [init_2] * (HANDSHAKE_NUM_BEFORE_RATELIMITING + PEER_2_NUM_TO_REJECT)
1032         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
1033
1034         self.assertTrue(
1035             self.base_ratelimited4_err + PEER_1_NUM_TO_REJECT
1036             < self.statistics.get_err_counter(self.ratelimited4_err)
1037             <= self.base_ratelimited4_err + PEER_1_NUM_TO_REJECT + PEER_2_NUM_TO_REJECT
1038         )
1039
1040         # (peer_1) (peer_2) verify the response
1041         peer_1.consume_response(rxs[0])
1042         peer_2.consume_response(rxs[1])
1043
1044         # clear up under load state
1045         self.sleep(UNDER_LOAD_INTERVAL)
1046
1047         # remove configs
1048         peer_1.remove_vpp_config()
1049         peer_2.remove_vpp_config()
1050         wg0.remove_vpp_config()
1051
1052     def _test_wg_peer_roaming_on_handshake_tmpl(self, is_endpoint_set, is_resp, is_ip6):
1053         port = 12323
1054
1055         # create wg interface
1056         if is_ip6:
1057             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1058             wg0.admin_up()
1059             wg0.config_ip6()
1060         else:
1061             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1062             wg0.admin_up()
1063             wg0.config_ip4()
1064
1065         self.pg_enable_capture(self.pg_interfaces)
1066         self.pg_start()
1067
1068         # create more remote hosts
1069         NUM_REMOTE_HOSTS = 2
1070         self.pg1.generate_remote_hosts(NUM_REMOTE_HOSTS)
1071         if is_ip6:
1072             self.pg1.configure_ipv6_neighbors()
1073         else:
1074             self.pg1.configure_ipv4_neighbors()
1075
1076         # create a peer
1077         if is_ip6:
1078             peer_1 = VppWgPeer(
1079                 test=self,
1080                 itf=wg0,
1081                 endpoint=self.pg1.remote_hosts[0].ip6 if is_endpoint_set else "::",
1082                 port=port + 1 if is_endpoint_set else 0,
1083                 allowed_ips=["1::3:0/112"],
1084             ).add_vpp_config()
1085         else:
1086             peer_1 = VppWgPeer(
1087                 test=self,
1088                 itf=wg0,
1089                 endpoint=self.pg1.remote_hosts[0].ip4 if is_endpoint_set else "0.0.0.0",
1090                 port=port + 1 if is_endpoint_set else 0,
1091                 allowed_ips=["10.11.3.0/24"],
1092             ).add_vpp_config()
1093         self.assertTrue(peer_1.query_vpp_config())
1094
1095         if is_resp:
1096             # wait for the peer to send a handshake initiation
1097             rxs = self.pg1.get_capture(1, timeout=2)
1098             # prepare a handshake response
1099             resp = peer_1.consume_init(rxs[0], self.pg1, is_ip6=is_ip6)
1100             # change endpoint
1101             if is_ip6:
1102                 peer_1.change_endpoint(self.pg1.remote_hosts[1].ip6, port + 100)
1103                 resp[IPv6].src, resp[UDP].sport = peer_1.endpoint, peer_1.port
1104             else:
1105                 peer_1.change_endpoint(self.pg1.remote_hosts[1].ip4, port + 100)
1106                 resp[IP].src, resp[UDP].sport = peer_1.endpoint, peer_1.port
1107             # send the handshake response
1108             # expect a keepalive message sent to the new endpoint
1109             rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
1110             # verify the keepalive message
1111             b = peer_1.decrypt_transport(rxs[0], is_ip6=is_ip6)
1112             self.assertEqual(0, len(b))
1113         else:
1114             # change endpoint
1115             if is_ip6:
1116                 peer_1.change_endpoint(self.pg1.remote_hosts[1].ip6, port + 100)
1117             else:
1118                 peer_1.change_endpoint(self.pg1.remote_hosts[1].ip4, port + 100)
1119             # prepare and send a handshake initiation
1120             # expect a handshake response sent to the new endpoint
1121             init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
1122             rxs = self.send_and_expect(self.pg1, [init], self.pg1)
1123             # verify the response
1124             peer_1.consume_response(rxs[0], is_ip6=is_ip6)
1125         self.assertTrue(peer_1.query_vpp_config())
1126
1127         # remove configs
1128         peer_1.remove_vpp_config()
1129         wg0.remove_vpp_config()
1130
1131     def test_wg_peer_roaming_on_init_v4(self):
1132         """Peer roaming on handshake initiation (v4)"""
1133         self._test_wg_peer_roaming_on_handshake_tmpl(
1134             is_endpoint_set=False, is_resp=False, is_ip6=False
1135         )
1136
1137     def test_wg_peer_roaming_on_init_v6(self):
1138         """Peer roaming on handshake initiation (v6)"""
1139         self._test_wg_peer_roaming_on_handshake_tmpl(
1140             is_endpoint_set=False, is_resp=False, is_ip6=True
1141         )
1142
1143     def test_wg_peer_roaming_on_resp_v4(self):
1144         """Peer roaming on handshake response (v4)"""
1145         self._test_wg_peer_roaming_on_handshake_tmpl(
1146             is_endpoint_set=True, is_resp=True, is_ip6=False
1147         )
1148
1149     def test_wg_peer_roaming_on_resp_v6(self):
1150         """Peer roaming on handshake response (v6)"""
1151         self._test_wg_peer_roaming_on_handshake_tmpl(
1152             is_endpoint_set=True, is_resp=True, is_ip6=True
1153         )
1154
1155     def _test_wg_peer_roaming_on_data_tmpl(self, is_async, is_ip6):
1156         self.vapi.wg_set_async_mode(is_async)
1157         port = 12323
1158
1159         # create wg interface
1160         if is_ip6:
1161             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1162             wg0.admin_up()
1163             wg0.config_ip6()
1164         else:
1165             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1166             wg0.admin_up()
1167             wg0.config_ip4()
1168
1169         self.pg_enable_capture(self.pg_interfaces)
1170         self.pg_start()
1171
1172         # create more remote hosts
1173         NUM_REMOTE_HOSTS = 2
1174         self.pg1.generate_remote_hosts(NUM_REMOTE_HOSTS)
1175         if is_ip6:
1176             self.pg1.configure_ipv6_neighbors()
1177         else:
1178             self.pg1.configure_ipv4_neighbors()
1179
1180         # create a peer
1181         if is_ip6:
1182             peer_1 = VppWgPeer(
1183                 self, wg0, self.pg1.remote_hosts[0].ip6, port + 1, ["1::3:0/112"]
1184             ).add_vpp_config()
1185         else:
1186             peer_1 = VppWgPeer(
1187                 self, wg0, self.pg1.remote_hosts[0].ip4, port + 1, ["10.11.3.0/24"]
1188             ).add_vpp_config()
1189         self.assertTrue(peer_1.query_vpp_config())
1190
1191         # create a route to rewrite traffic into the wg interface
1192         if is_ip6:
1193             r1 = VppIpRoute(
1194                 self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
1195             ).add_vpp_config()
1196         else:
1197             r1 = VppIpRoute(
1198                 self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1199             ).add_vpp_config()
1200
1201         # wait for the peer to send a handshake initiation
1202         rxs = self.pg1.get_capture(1, timeout=2)
1203
1204         # prepare and send a handshake response
1205         # expect a keepalive message
1206         resp = peer_1.consume_init(rxs[0], self.pg1, is_ip6=is_ip6)
1207         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
1208
1209         # verify the keepalive message
1210         b = peer_1.decrypt_transport(rxs[0], is_ip6=is_ip6)
1211         self.assertEqual(0, len(b))
1212
1213         # change endpoint
1214         if is_ip6:
1215             peer_1.change_endpoint(self.pg1.remote_hosts[1].ip6, port + 100)
1216         else:
1217             peer_1.change_endpoint(self.pg1.remote_hosts[1].ip4, port + 100)
1218
1219         # prepare and send a data packet
1220         # expect endpoint change
1221         if is_ip6:
1222             ip_header = IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1223         else:
1224             ip_header = IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1225         data = (
1226             peer_1.mk_tunnel_header(self.pg1, is_ip6=is_ip6)
1227             / Wireguard(message_type=4, reserved_zero=0)
1228             / WireguardTransport(
1229                 receiver_index=peer_1.sender,
1230                 counter=0,
1231                 encrypted_encapsulated_packet=peer_1.encrypt_transport(
1232                     ip_header / UDP(sport=222, dport=223) / Raw()
1233                 ),
1234             )
1235         )
1236         rxs = self.send_and_expect(self.pg1, [data], self.pg0)
1237         if is_ip6:
1238             self.assertEqual(rxs[0][IPv6].dst, self.pg0.remote_ip6)
1239             self.assertEqual(rxs[0][IPv6].hlim, 19)
1240         else:
1241             self.assertEqual(rxs[0][IP].dst, self.pg0.remote_ip4)
1242             self.assertEqual(rxs[0][IP].ttl, 19)
1243         self.assertTrue(peer_1.query_vpp_config())
1244
1245         # prepare and send a packet that will be rewritten into the wg interface
1246         # expect a data packet sent to the new endpoint
1247         if is_ip6:
1248             ip_header = IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1249         else:
1250             ip_header = IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1251         p = (
1252             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1253             / ip_header
1254             / UDP(sport=555, dport=556)
1255             / Raw()
1256         )
1257         rxs = self.send_and_expect(self.pg0, [p], self.pg1)
1258
1259         # verify the data packet
1260         peer_1.validate_encapped(rxs, p, is_tunnel_ip6=is_ip6, is_transport_ip6=is_ip6)
1261
1262         # remove configs
1263         r1.remove_vpp_config()
1264         peer_1.remove_vpp_config()
1265         wg0.remove_vpp_config()
1266
1267     def test_wg_peer_roaming_on_data_v4_sync(self):
1268         """Peer roaming on data packet (v4, sync)"""
1269         self._test_wg_peer_roaming_on_data_tmpl(is_async=False, is_ip6=False)
1270
1271     def test_wg_peer_roaming_on_data_v6_sync(self):
1272         """Peer roaming on data packet (v6, sync)"""
1273         self._test_wg_peer_roaming_on_data_tmpl(is_async=False, is_ip6=True)
1274
1275     def test_wg_peer_roaming_on_data_v4_async(self):
1276         """Peer roaming on data packet (v4, async)"""
1277         self._test_wg_peer_roaming_on_data_tmpl(is_async=True, is_ip6=False)
1278
1279     def test_wg_peer_roaming_on_data_v6_async(self):
1280         """Peer roaming on data packet (v6, async)"""
1281         self._test_wg_peer_roaming_on_data_tmpl(is_async=True, is_ip6=True)
1282
1283     def test_wg_peer_resp(self):
1284         """Send handshake response IPv4 tunnel"""
1285         port = 12323
1286
1287         # Create interfaces
1288         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1289         wg0.admin_up()
1290         wg0.config_ip4()
1291
1292         self.pg_enable_capture(self.pg_interfaces)
1293         self.pg_start()
1294
1295         peer_1 = VppWgPeer(
1296             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
1297         ).add_vpp_config()
1298         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1299
1300         r1 = VppIpRoute(
1301             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1302         ).add_vpp_config()
1303
1304         # wait for the peer to send a handshake
1305         rx = self.pg1.get_capture(1, timeout=2)
1306
1307         # consume the handshake in the noise protocol and
1308         # generate the response
1309         resp = peer_1.consume_init(rx[0], self.pg1)
1310
1311         # send the response, get keepalive
1312         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
1313
1314         for rx in rxs:
1315             b = peer_1.decrypt_transport(rx)
1316             self.assertEqual(0, len(b))
1317
1318         # send a packets that are routed into the tunnel
1319         p = (
1320             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1321             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1322             / UDP(sport=555, dport=556)
1323             / Raw(b"\x00" * 80)
1324         )
1325
1326         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1327
1328         peer_1.validate_encapped(rxs, p)
1329
1330         # send packets into the tunnel, expect to receive them on
1331         # the other side
1332         p = [
1333             (
1334                 peer_1.mk_tunnel_header(self.pg1)
1335                 / Wireguard(message_type=4, reserved_zero=0)
1336                 / WireguardTransport(
1337                     receiver_index=peer_1.sender,
1338                     counter=ii,
1339                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1340                         (
1341                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1342                             / UDP(sport=222, dport=223)
1343                             / Raw()
1344                         )
1345                     ),
1346                 )
1347             )
1348             for ii in range(255)
1349         ]
1350
1351         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1352
1353         for rx in rxs:
1354             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1355             self.assertEqual(rx[IP].ttl, 19)
1356
1357         r1.remove_vpp_config()
1358         peer_1.remove_vpp_config()
1359         wg0.remove_vpp_config()
1360
1361     def test_wg_peer_resp_ipv6(self):
1362         """Send handshake response IPv6 tunnel"""
1363         port = 12323
1364
1365         # Create interfaces
1366         wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1367         wg0.admin_up()
1368         wg0.config_ip4()
1369
1370         self.pg_enable_capture(self.pg_interfaces)
1371         self.pg_start()
1372
1373         peer_1 = VppWgPeer(
1374             self, wg0, self.pg1.remote_ip6, port + 1, ["10.11.3.0/24"]
1375         ).add_vpp_config()
1376         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1377
1378         r1 = VppIpRoute(
1379             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1380         ).add_vpp_config()
1381
1382         # wait for the peer to send a handshake
1383         rx = self.pg1.get_capture(1, timeout=2)
1384
1385         # consume the handshake in the noise protocol and
1386         # generate the response
1387         resp = peer_1.consume_init(rx[0], self.pg1, is_ip6=True)
1388
1389         # send the response, get keepalive
1390         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
1391
1392         for rx in rxs:
1393             b = peer_1.decrypt_transport(rx, True)
1394             self.assertEqual(0, len(b))
1395
1396         # send a packets that are routed into the tunnel
1397         p = (
1398             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1399             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1400             / UDP(sport=555, dport=556)
1401             / Raw(b"\x00" * 80)
1402         )
1403
1404         rxs = self.send_and_expect(self.pg0, p * 2, self.pg1)
1405         peer_1.validate_encapped(rxs, p, True)
1406
1407         # send packets into the tunnel, expect to receive them on
1408         # the other side
1409         p = [
1410             (
1411                 peer_1.mk_tunnel_header(self.pg1, True)
1412                 / Wireguard(message_type=4, reserved_zero=0)
1413                 / WireguardTransport(
1414                     receiver_index=peer_1.sender,
1415                     counter=ii,
1416                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1417                         (
1418                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1419                             / UDP(sport=222, dport=223)
1420                             / Raw()
1421                         )
1422                     ),
1423                 )
1424             )
1425             for ii in range(255)
1426         ]
1427
1428         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1429
1430         for rx in rxs:
1431             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1432             self.assertEqual(rx[IP].ttl, 19)
1433
1434         r1.remove_vpp_config()
1435         peer_1.remove_vpp_config()
1436         wg0.remove_vpp_config()
1437
1438     def test_wg_peer_v4o4(self):
1439         """Test v4o4"""
1440
1441         port = 12333
1442
1443         # Create interfaces
1444         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1445         wg0.admin_up()
1446         wg0.config_ip4()
1447
1448         peer_1 = VppWgPeer(
1449             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
1450         ).add_vpp_config()
1451         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1452
1453         r1 = VppIpRoute(
1454             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1455         ).add_vpp_config()
1456         r2 = VppIpRoute(
1457             self, "20.22.3.0", 24, [VppRoutePath("20.22.3.1", wg0.sw_if_index)]
1458         ).add_vpp_config()
1459
1460         # route a packet into the wg interface
1461         #  use the allowed-ip prefix
1462         #  this is dropped because the peer is not initiated
1463         p = (
1464             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1465             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1466             / UDP(sport=555, dport=556)
1467             / Raw()
1468         )
1469         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1470         self.assertEqual(
1471             self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
1472         )
1473
1474         # route a packet into the wg interface
1475         #  use a not allowed-ip prefix
1476         #  this is dropped because there is no matching peer
1477         p = (
1478             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1479             / IP(src=self.pg0.remote_ip4, dst="20.22.3.2")
1480             / UDP(sport=555, dport=556)
1481             / Raw()
1482         )
1483         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1484         self.assertEqual(
1485             self.base_peer4_out_err + 1,
1486             self.statistics.get_err_counter(self.peer4_out_err),
1487         )
1488
1489         # send a handsake from the peer with an invalid MAC
1490         p = peer_1.mk_handshake(self.pg1)
1491         p[WireguardInitiation].mac1 = b"foobar"
1492         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1493         self.assertEqual(
1494             self.base_mac4_err + 1, self.statistics.get_err_counter(self.mac4_error)
1495         )
1496
1497         # send a handsake from the peer but signed by the wrong key.
1498         p = peer_1.mk_handshake(
1499             self.pg1, False, X25519PrivateKey.generate().public_key()
1500         )
1501         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1502         self.assertEqual(
1503             self.base_peer4_in_err + 1,
1504             self.statistics.get_err_counter(self.peer4_in_err),
1505         )
1506
1507         # send a valid handsake init for which we expect a response
1508         p = peer_1.mk_handshake(self.pg1)
1509
1510         rx = self.send_and_expect(self.pg1, [p], self.pg1)
1511
1512         peer_1.consume_response(rx[0])
1513
1514         # route a packet into the wg interface
1515         #  this is dropped because the peer is still not initiated
1516         p = (
1517             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1518             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1519             / UDP(sport=555, dport=556)
1520             / Raw()
1521         )
1522         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1523         self.assertEqual(
1524             self.base_kp4_err + 2, self.statistics.get_err_counter(self.kp4_error)
1525         )
1526
1527         # send a data packet from the peer through the tunnel
1528         # this completes the handshake
1529         p = (
1530             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1531             / UDP(sport=222, dport=223)
1532             / Raw()
1533         )
1534         d = peer_1.encrypt_transport(p)
1535         p = peer_1.mk_tunnel_header(self.pg1) / (
1536             Wireguard(message_type=4, reserved_zero=0)
1537             / WireguardTransport(
1538                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1539             )
1540         )
1541         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1542
1543         for rx in rxs:
1544             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1545             self.assertEqual(rx[IP].ttl, 19)
1546
1547         # send a packets that are routed into the tunnel
1548         p = (
1549             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1550             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1551             / UDP(sport=555, dport=556)
1552             / Raw(b"\x00" * 80)
1553         )
1554
1555         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1556
1557         for rx in rxs:
1558             rx = IP(peer_1.decrypt_transport(rx))
1559
1560             # check the original packet is present
1561             self.assertEqual(rx[IP].dst, p[IP].dst)
1562             self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
1563
1564         # send packets into the tunnel, expect to receive them on
1565         # the other side
1566         p = [
1567             (
1568                 peer_1.mk_tunnel_header(self.pg1)
1569                 / Wireguard(message_type=4, reserved_zero=0)
1570                 / WireguardTransport(
1571                     receiver_index=peer_1.sender,
1572                     counter=ii + 1,
1573                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1574                         (
1575                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1576                             / UDP(sport=222, dport=223)
1577                             / Raw()
1578                         )
1579                     ),
1580                 )
1581             )
1582             for ii in range(255)
1583         ]
1584
1585         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1586
1587         for rx in rxs:
1588             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1589             self.assertEqual(rx[IP].ttl, 19)
1590
1591         r1.remove_vpp_config()
1592         r2.remove_vpp_config()
1593         peer_1.remove_vpp_config()
1594         wg0.remove_vpp_config()
1595
1596     def test_wg_peer_v6o6(self):
1597         """Test v6o6"""
1598
1599         port = 12343
1600
1601         # Create interfaces
1602         wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1603         wg0.admin_up()
1604         wg0.config_ip6()
1605
1606         peer_1 = VppWgPeer(
1607             self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
1608         ).add_vpp_config()
1609         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1610
1611         r1 = VppIpRoute(
1612             self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
1613         ).add_vpp_config()
1614         r2 = VppIpRoute(
1615             self, "22::3:0", 112, [VppRoutePath("22::3:1", wg0.sw_if_index)]
1616         ).add_vpp_config()
1617
1618         # route a packet into the wg interface
1619         #  use the allowed-ip prefix
1620         #  this is dropped because the peer is not initiated
1621
1622         p = (
1623             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1624             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1625             / UDP(sport=555, dport=556)
1626             / Raw()
1627         )
1628         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1629
1630         self.assertEqual(
1631             self.base_kp6_err + 1, self.statistics.get_err_counter(self.kp6_error)
1632         )
1633
1634         # route a packet into the wg interface
1635         #  use a not allowed-ip prefix
1636         #  this is dropped because there is no matching peer
1637         p = (
1638             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1639             / IPv6(src=self.pg0.remote_ip6, dst="22::3:2")
1640             / UDP(sport=555, dport=556)
1641             / Raw()
1642         )
1643         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1644         self.assertEqual(
1645             self.base_peer6_out_err + 1,
1646             self.statistics.get_err_counter(self.peer6_out_err),
1647         )
1648
1649         # send a handsake from the peer with an invalid MAC
1650         p = peer_1.mk_handshake(self.pg1, True)
1651         p[WireguardInitiation].mac1 = b"foobar"
1652         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1653
1654         self.assertEqual(
1655             self.base_mac6_err + 1, self.statistics.get_err_counter(self.mac6_error)
1656         )
1657
1658         # send a handsake from the peer but signed by the wrong key.
1659         p = peer_1.mk_handshake(
1660             self.pg1, True, X25519PrivateKey.generate().public_key()
1661         )
1662         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1663         self.assertEqual(
1664             self.base_peer6_in_err + 1,
1665             self.statistics.get_err_counter(self.peer6_in_err),
1666         )
1667
1668         # send a valid handsake init for which we expect a response
1669         p = peer_1.mk_handshake(self.pg1, True)
1670
1671         rx = self.send_and_expect(self.pg1, [p], self.pg1)
1672
1673         peer_1.consume_response(rx[0], True)
1674
1675         # route a packet into the wg interface
1676         #  this is dropped because the peer is still not initiated
1677         p = (
1678             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1679             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1680             / UDP(sport=555, dport=556)
1681             / Raw()
1682         )
1683         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1684         self.assertEqual(
1685             self.base_kp6_err + 2, self.statistics.get_err_counter(self.kp6_error)
1686         )
1687
1688         # send a data packet from the peer through the tunnel
1689         # this completes the handshake
1690         p = (
1691             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1692             / UDP(sport=222, dport=223)
1693             / Raw()
1694         )
1695         d = peer_1.encrypt_transport(p)
1696         p = peer_1.mk_tunnel_header(self.pg1, True) / (
1697             Wireguard(message_type=4, reserved_zero=0)
1698             / WireguardTransport(
1699                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1700             )
1701         )
1702         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1703
1704         for rx in rxs:
1705             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
1706             self.assertEqual(rx[IPv6].hlim, 19)
1707
1708         # send a packets that are routed into the tunnel
1709         p = (
1710             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1711             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1712             / UDP(sport=555, dport=556)
1713             / Raw(b"\x00" * 80)
1714         )
1715
1716         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1717
1718         for rx in rxs:
1719             rx = IPv6(peer_1.decrypt_transport(rx, True))
1720
1721             # check the original packet is present
1722             self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
1723             self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim - 1)
1724
1725         # send packets into the tunnel, expect to receive them on
1726         # the other side
1727         p = [
1728             (
1729                 peer_1.mk_tunnel_header(self.pg1, True)
1730                 / Wireguard(message_type=4, reserved_zero=0)
1731                 / WireguardTransport(
1732                     receiver_index=peer_1.sender,
1733                     counter=ii + 1,
1734                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1735                         (
1736                             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1737                             / UDP(sport=222, dport=223)
1738                             / Raw()
1739                         )
1740                     ),
1741                 )
1742             )
1743             for ii in range(255)
1744         ]
1745
1746         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1747
1748         for rx in rxs:
1749             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
1750             self.assertEqual(rx[IPv6].hlim, 19)
1751
1752         r1.remove_vpp_config()
1753         r2.remove_vpp_config()
1754         peer_1.remove_vpp_config()
1755         wg0.remove_vpp_config()
1756
1757     def test_wg_peer_v6o4(self):
1758         """Test v6o4"""
1759
1760         port = 12353
1761
1762         # Create interfaces
1763         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1764         wg0.admin_up()
1765         wg0.config_ip6()
1766
1767         peer_1 = VppWgPeer(
1768             self, wg0, self.pg1.remote_ip4, port + 1, ["1::3:0/112"]
1769         ).add_vpp_config()
1770         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1771
1772         r1 = VppIpRoute(
1773             self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
1774         ).add_vpp_config()
1775
1776         # route a packet into the wg interface
1777         #  use the allowed-ip prefix
1778         #  this is dropped because the peer is not initiated
1779         p = (
1780             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1781             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1782             / UDP(sport=555, dport=556)
1783             / Raw()
1784         )
1785         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1786         self.assertEqual(
1787             self.base_kp6_err + 1, self.statistics.get_err_counter(self.kp6_error)
1788         )
1789
1790         # send a handsake from the peer with an invalid MAC
1791         p = peer_1.mk_handshake(self.pg1)
1792         p[WireguardInitiation].mac1 = b"foobar"
1793         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1794
1795         self.assertEqual(
1796             self.base_mac4_err + 1, self.statistics.get_err_counter(self.mac4_error)
1797         )
1798
1799         # send a handsake from the peer but signed by the wrong key.
1800         p = peer_1.mk_handshake(
1801             self.pg1, False, X25519PrivateKey.generate().public_key()
1802         )
1803         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1804         self.assertEqual(
1805             self.base_peer4_in_err + 1,
1806             self.statistics.get_err_counter(self.peer4_in_err),
1807         )
1808
1809         # send a valid handsake init for which we expect a response
1810         p = peer_1.mk_handshake(self.pg1)
1811
1812         rx = self.send_and_expect(self.pg1, [p], self.pg1)
1813
1814         peer_1.consume_response(rx[0])
1815
1816         # route a packet into the wg interface
1817         #  this is dropped because the peer is still not initiated
1818         p = (
1819             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1820             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1821             / UDP(sport=555, dport=556)
1822             / Raw()
1823         )
1824         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1825         self.assertEqual(
1826             self.base_kp6_err + 2, self.statistics.get_err_counter(self.kp6_error)
1827         )
1828
1829         # send a data packet from the peer through the tunnel
1830         # this completes the handshake
1831         p = (
1832             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1833             / UDP(sport=222, dport=223)
1834             / Raw()
1835         )
1836         d = peer_1.encrypt_transport(p)
1837         p = peer_1.mk_tunnel_header(self.pg1) / (
1838             Wireguard(message_type=4, reserved_zero=0)
1839             / WireguardTransport(
1840                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1841             )
1842         )
1843         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1844
1845         for rx in rxs:
1846             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
1847             self.assertEqual(rx[IPv6].hlim, 19)
1848
1849         # send a packets that are routed into the tunnel
1850         p = (
1851             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1852             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1853             / UDP(sport=555, dport=556)
1854             / Raw(b"\x00" * 80)
1855         )
1856
1857         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1858
1859         for rx in rxs:
1860             rx = IPv6(peer_1.decrypt_transport(rx))
1861
1862             # check the original packet is present
1863             self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
1864             self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim - 1)
1865
1866         # send packets into the tunnel, expect to receive them on
1867         # the other side
1868         p = [
1869             (
1870                 peer_1.mk_tunnel_header(self.pg1)
1871                 / Wireguard(message_type=4, reserved_zero=0)
1872                 / WireguardTransport(
1873                     receiver_index=peer_1.sender,
1874                     counter=ii + 1,
1875                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1876                         (
1877                             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1878                             / UDP(sport=222, dport=223)
1879                             / Raw()
1880                         )
1881                     ),
1882                 )
1883             )
1884             for ii in range(255)
1885         ]
1886
1887         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1888
1889         for rx in rxs:
1890             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
1891             self.assertEqual(rx[IPv6].hlim, 19)
1892
1893         r1.remove_vpp_config()
1894         peer_1.remove_vpp_config()
1895         wg0.remove_vpp_config()
1896
1897     def test_wg_peer_v4o6(self):
1898         """Test v4o6"""
1899
1900         port = 12363
1901
1902         # Create interfaces
1903         wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1904         wg0.admin_up()
1905         wg0.config_ip4()
1906
1907         peer_1 = VppWgPeer(
1908             self, wg0, self.pg1.remote_ip6, port + 1, ["10.11.3.0/24"]
1909         ).add_vpp_config()
1910         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1911
1912         r1 = VppIpRoute(
1913             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1914         ).add_vpp_config()
1915
1916         # route a packet into the wg interface
1917         #  use the allowed-ip prefix
1918         #  this is dropped because the peer is not initiated
1919         p = (
1920             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1921             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1922             / UDP(sport=555, dport=556)
1923             / Raw()
1924         )
1925         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1926         self.assertEqual(
1927             self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
1928         )
1929
1930         # send a handsake from the peer with an invalid MAC
1931         p = peer_1.mk_handshake(self.pg1, True)
1932         p[WireguardInitiation].mac1 = b"foobar"
1933         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1934         self.assertEqual(
1935             self.base_mac6_err + 1, self.statistics.get_err_counter(self.mac6_error)
1936         )
1937
1938         # send a handsake from the peer but signed by the wrong key.
1939         p = peer_1.mk_handshake(
1940             self.pg1, True, X25519PrivateKey.generate().public_key()
1941         )
1942         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1943         self.assertEqual(
1944             self.base_peer6_in_err + 1,
1945             self.statistics.get_err_counter(self.peer6_in_err),
1946         )
1947
1948         # send a valid handsake init for which we expect a response
1949         p = peer_1.mk_handshake(self.pg1, True)
1950
1951         rx = self.send_and_expect(self.pg1, [p], self.pg1)
1952
1953         peer_1.consume_response(rx[0], True)
1954
1955         # route a packet into the wg interface
1956         #  this is dropped because the peer is still not initiated
1957         p = (
1958             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1959             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1960             / UDP(sport=555, dport=556)
1961             / Raw()
1962         )
1963         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1964         self.assertEqual(
1965             self.base_kp4_err + 2, self.statistics.get_err_counter(self.kp4_error)
1966         )
1967
1968         # send a data packet from the peer through the tunnel
1969         # this completes the handshake
1970         p = (
1971             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1972             / UDP(sport=222, dport=223)
1973             / Raw()
1974         )
1975         d = peer_1.encrypt_transport(p)
1976         p = peer_1.mk_tunnel_header(self.pg1, True) / (
1977             Wireguard(message_type=4, reserved_zero=0)
1978             / WireguardTransport(
1979                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1980             )
1981         )
1982         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1983
1984         for rx in rxs:
1985             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1986             self.assertEqual(rx[IP].ttl, 19)
1987
1988         # send a packets that are routed into the tunnel
1989         p = (
1990             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1991             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1992             / UDP(sport=555, dport=556)
1993             / Raw(b"\x00" * 80)
1994         )
1995
1996         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1997
1998         for rx in rxs:
1999             rx = IP(peer_1.decrypt_transport(rx, True))
2000
2001             # check the original packet is present
2002             self.assertEqual(rx[IP].dst, p[IP].dst)
2003             self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
2004
2005         # send packets into the tunnel, expect to receive them on
2006         # the other side
2007         p = [
2008             (
2009                 peer_1.mk_tunnel_header(self.pg1, True)
2010                 / Wireguard(message_type=4, reserved_zero=0)
2011                 / WireguardTransport(
2012                     receiver_index=peer_1.sender,
2013                     counter=ii + 1,
2014                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
2015                         (
2016                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
2017                             / UDP(sport=222, dport=223)
2018                             / Raw()
2019                         )
2020                     ),
2021                 )
2022             )
2023             for ii in range(255)
2024         ]
2025
2026         rxs = self.send_and_expect(self.pg1, p, self.pg0)
2027
2028         for rx in rxs:
2029             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
2030             self.assertEqual(rx[IP].ttl, 19)
2031
2032         r1.remove_vpp_config()
2033         peer_1.remove_vpp_config()
2034         wg0.remove_vpp_config()
2035
2036     def test_wg_multi_peer(self):
2037         """multiple peer setup"""
2038         port = 12373
2039
2040         # Create interfaces
2041         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2042         wg1 = VppWgInterface(self, self.pg2.local_ip4, port + 1).add_vpp_config()
2043         wg0.admin_up()
2044         wg1.admin_up()
2045
2046         # Check peer counter
2047         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 0)
2048
2049         self.pg_enable_capture(self.pg_interfaces)
2050         self.pg_start()
2051
2052         # Create many peers on sencond interface
2053         NUM_PEERS = 16
2054         self.pg2.generate_remote_hosts(NUM_PEERS)
2055         self.pg2.configure_ipv4_neighbors()
2056         self.pg1.generate_remote_hosts(NUM_PEERS)
2057         self.pg1.configure_ipv4_neighbors()
2058
2059         peers_1 = []
2060         peers_2 = []
2061         routes_1 = []
2062         routes_2 = []
2063         for i in range(NUM_PEERS):
2064             peers_1.append(
2065                 VppWgPeer(
2066                     self,
2067                     wg0,
2068                     self.pg1.remote_hosts[i].ip4,
2069                     port + 1 + i,
2070                     ["10.0.%d.4/32" % i],
2071                 ).add_vpp_config()
2072             )
2073             routes_1.append(
2074                 VppIpRoute(
2075                     self,
2076                     "10.0.%d.4" % i,
2077                     32,
2078                     [VppRoutePath(self.pg1.remote_hosts[i].ip4, wg0.sw_if_index)],
2079                 ).add_vpp_config()
2080             )
2081
2082             peers_2.append(
2083                 VppWgPeer(
2084                     self,
2085                     wg1,
2086                     self.pg2.remote_hosts[i].ip4,
2087                     port + 100 + i,
2088                     ["10.100.%d.4/32" % i],
2089                 ).add_vpp_config()
2090             )
2091             routes_2.append(
2092                 VppIpRoute(
2093                     self,
2094                     "10.100.%d.4" % i,
2095                     32,
2096                     [VppRoutePath(self.pg2.remote_hosts[i].ip4, wg1.sw_if_index)],
2097                 ).add_vpp_config()
2098             )
2099
2100         self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS * 2)
2101
2102         self.logger.info(self.vapi.cli("show wireguard peer"))
2103         self.logger.info(self.vapi.cli("show wireguard interface"))
2104         self.logger.info(self.vapi.cli("show adj 37"))
2105         self.logger.info(self.vapi.cli("sh ip fib 172.16.3.17"))
2106         self.logger.info(self.vapi.cli("sh ip fib 10.11.3.0"))
2107
2108         # remove routes
2109         for r in routes_1:
2110             r.remove_vpp_config()
2111         for r in routes_2:
2112             r.remove_vpp_config()
2113
2114         # remove peers
2115         for p in peers_1:
2116             self.assertTrue(p.query_vpp_config())
2117             p.remove_vpp_config()
2118         for p in peers_2:
2119             self.assertTrue(p.query_vpp_config())
2120             p.remove_vpp_config()
2121
2122         wg0.remove_vpp_config()
2123         wg1.remove_vpp_config()
2124
2125     def test_wg_multi_interface(self):
2126         """Multi-tunnel on the same port"""
2127         port = 12500
2128
2129         # Create many wireguard interfaces
2130         NUM_IFS = 4
2131         self.pg1.generate_remote_hosts(NUM_IFS)
2132         self.pg1.configure_ipv4_neighbors()
2133         self.pg0.generate_remote_hosts(NUM_IFS)
2134         self.pg0.configure_ipv4_neighbors()
2135
2136         self.pg_enable_capture(self.pg_interfaces)
2137         self.pg_start()
2138
2139         # Create interfaces with a peer on each
2140         peers = []
2141         routes = []
2142         wg_ifs = []
2143         for i in range(NUM_IFS):
2144             # Use the same port for each interface
2145             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2146             wg0.admin_up()
2147             wg0.config_ip4()
2148             wg_ifs.append(wg0)
2149             peers.append(
2150                 VppWgPeer(
2151                     self,
2152                     wg0,
2153                     self.pg1.remote_hosts[i].ip4,
2154                     port + 1 + i,
2155                     ["10.0.%d.0/24" % i],
2156                 ).add_vpp_config()
2157             )
2158
2159             routes.append(
2160                 VppIpRoute(
2161                     self,
2162                     "10.0.%d.0" % i,
2163                     24,
2164                     [VppRoutePath("10.0.%d.4" % i, wg0.sw_if_index)],
2165                 ).add_vpp_config()
2166             )
2167
2168         self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_IFS)
2169
2170         # skip the first automatic handshake
2171         self.pg1.get_capture(NUM_IFS, timeout=HANDSHAKE_JITTER)
2172
2173         for i in range(NUM_IFS):
2174             # send a valid handsake init for which we expect a response
2175             p = peers[i].mk_handshake(self.pg1)
2176             rx = self.send_and_expect(self.pg1, [p], self.pg1)
2177             peers[i].consume_response(rx[0])
2178
2179             # send a data packet from the peer through the tunnel
2180             # this completes the handshake
2181             p = (
2182                 IP(src="10.0.%d.4" % i, dst=self.pg0.remote_hosts[i].ip4, ttl=20)
2183                 / UDP(sport=222, dport=223)
2184                 / Raw()
2185             )
2186             d = peers[i].encrypt_transport(p)
2187             p = peers[i].mk_tunnel_header(self.pg1) / (
2188                 Wireguard(message_type=4, reserved_zero=0)
2189                 / WireguardTransport(
2190                     receiver_index=peers[i].sender,
2191                     counter=0,
2192                     encrypted_encapsulated_packet=d,
2193                 )
2194             )
2195             rxs = self.send_and_expect(self.pg1, [p], self.pg0)
2196             for rx in rxs:
2197                 self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[i].ip4)
2198                 self.assertEqual(rx[IP].ttl, 19)
2199
2200         # send a packets that are routed into the tunnel
2201         for i in range(NUM_IFS):
2202             p = (
2203                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2204                 / IP(src=self.pg0.remote_hosts[i].ip4, dst="10.0.%d.4" % i)
2205                 / UDP(sport=555, dport=556)
2206                 / Raw(b"\x00" * 80)
2207             )
2208
2209             rxs = self.send_and_expect(self.pg0, p * 64, self.pg1)
2210
2211             for rx in rxs:
2212                 rx = IP(peers[i].decrypt_transport(rx))
2213
2214                 # check the oringial packet is present
2215                 self.assertEqual(rx[IP].dst, p[IP].dst)
2216                 self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
2217
2218         # send packets into the tunnel
2219         for i in range(NUM_IFS):
2220             p = [
2221                 (
2222                     peers[i].mk_tunnel_header(self.pg1)
2223                     / Wireguard(message_type=4, reserved_zero=0)
2224                     / WireguardTransport(
2225                         receiver_index=peers[i].sender,
2226                         counter=ii + 1,
2227                         encrypted_encapsulated_packet=peers[i].encrypt_transport(
2228                             (
2229                                 IP(
2230                                     src="10.0.%d.4" % i,
2231                                     dst=self.pg0.remote_hosts[i].ip4,
2232                                     ttl=20,
2233                                 )
2234                                 / UDP(sport=222, dport=223)
2235                                 / Raw()
2236                             )
2237                         ),
2238                     )
2239                 )
2240                 for ii in range(64)
2241             ]
2242
2243             rxs = self.send_and_expect(self.pg1, p, self.pg0)
2244
2245             for rx in rxs:
2246                 self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[i].ip4)
2247                 self.assertEqual(rx[IP].ttl, 19)
2248
2249         for r in routes:
2250             r.remove_vpp_config()
2251         for p in peers:
2252             p.remove_vpp_config()
2253         for i in wg_ifs:
2254             i.remove_vpp_config()
2255
2256     def test_wg_event(self):
2257         """Test events"""
2258         port = 12600
2259         ESTABLISHED_FLAG = (
2260             VppEnum.vl_api_wireguard_peer_flags_t.WIREGUARD_PEER_ESTABLISHED
2261         )
2262         DEAD_FLAG = VppEnum.vl_api_wireguard_peer_flags_t.WIREGUARD_PEER_STATUS_DEAD
2263
2264         # Create interfaces
2265         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2266         wg1 = VppWgInterface(self, self.pg2.local_ip4, port + 1).add_vpp_config()
2267         wg0.admin_up()
2268         wg1.admin_up()
2269
2270         # Check peer counter
2271         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 0)
2272
2273         self.pg_enable_capture(self.pg_interfaces)
2274         self.pg_start()
2275
2276         # Create peers
2277         NUM_PEERS = 2
2278         self.pg2.generate_remote_hosts(NUM_PEERS)
2279         self.pg2.configure_ipv4_neighbors()
2280         self.pg1.generate_remote_hosts(NUM_PEERS)
2281         self.pg1.configure_ipv4_neighbors()
2282
2283         peers_0 = []
2284         peers_1 = []
2285         routes_0 = []
2286         routes_1 = []
2287         for i in range(NUM_PEERS):
2288             peers_0.append(
2289                 VppWgPeer(
2290                     self,
2291                     wg0,
2292                     self.pg1.remote_hosts[i].ip4,
2293                     port + 1 + i,
2294                     ["10.0.%d.4/32" % i],
2295                 ).add_vpp_config()
2296             )
2297             routes_0.append(
2298                 VppIpRoute(
2299                     self,
2300                     "10.0.%d.4" % i,
2301                     32,
2302                     [VppRoutePath(self.pg1.remote_hosts[i].ip4, wg0.sw_if_index)],
2303                 ).add_vpp_config()
2304             )
2305
2306             peers_1.append(
2307                 VppWgPeer(
2308                     self,
2309                     wg1,
2310                     self.pg2.remote_hosts[i].ip4,
2311                     port + 100 + i,
2312                     ["10.100.%d.4/32" % i],
2313                 ).add_vpp_config()
2314             )
2315             routes_1.append(
2316                 VppIpRoute(
2317                     self,
2318                     "10.100.%d.4" % i,
2319                     32,
2320                     [VppRoutePath(self.pg2.remote_hosts[i].ip4, wg1.sw_if_index)],
2321                 ).add_vpp_config()
2322             )
2323
2324         self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS * 2)
2325
2326         # skip the first automatic handshake
2327         self.pg1.get_capture(NUM_PEERS, timeout=HANDSHAKE_JITTER)
2328         self.pg2.get_capture(NUM_PEERS, timeout=HANDSHAKE_JITTER)
2329
2330         # Want events from the first perr of wg0
2331         # and from all wg1 peers
2332         peers_0[0].want_events()
2333         wg1.want_events()
2334
2335         for i in range(NUM_PEERS):
2336             # wg0 peers: send a valid handsake init for which we expect a response
2337             p = peers_0[i].mk_handshake(self.pg1)
2338             rx = self.send_and_expect(self.pg1, [p], self.pg1)
2339             peers_0[i].consume_response(rx[0])
2340
2341             # wg0 peers: send empty packet, it means successful connection (WIREGUARD_PEER_ESTABLISHED)
2342             keepalive = peers_0[i].encrypt_transport(0)
2343             p = peers_0[i].mk_tunnel_header(self.pg1) / (
2344                 Wireguard(message_type=4, reserved_zero=0)
2345                 / WireguardTransport(
2346                     receiver_index=peers_0[i].sender,
2347                     counter=0,
2348                     encrypted_encapsulated_packet=keepalive,
2349                 )
2350             )
2351             # TODO: Figure out wny there are sometimes wg packets received here
2352             # self.send_and_assert_no_replies(self.pg1, [p])
2353             self.pg_send(self.pg1, [p])
2354
2355             # wg0 peers: wait for established flag
2356             if i == 0:
2357                 peers_0[0].wait_event(ESTABLISHED_FLAG)
2358
2359             # wg1 peers: send a valid handsake init for which we expect a response
2360             p = peers_1[i].mk_handshake(self.pg2)
2361             rx = self.send_and_expect(self.pg2, [p], self.pg2)
2362             peers_1[i].consume_response(rx[0])
2363
2364             # wg1 peers: send empty packet, it means successful connection (WIREGUARD_PEER_ESTABLISHED)
2365             keepalive = peers_1[i].encrypt_transport(0)
2366             p = peers_1[i].mk_tunnel_header(self.pg2) / (
2367                 Wireguard(message_type=4, reserved_zero=0)
2368                 / WireguardTransport(
2369                     receiver_index=peers_1[i].sender,
2370                     counter=0,
2371                     encrypted_encapsulated_packet=keepalive,
2372                 )
2373             )
2374             self.send_and_assert_no_replies(self.pg2, [p])
2375
2376         # wg1 peers: wait for established flag
2377         wg1.wait_events(ESTABLISHED_FLAG, [peers_1[0].index, peers_1[1].index])
2378
2379         # remove routes
2380         for r in routes_0:
2381             r.remove_vpp_config()
2382         for r in routes_1:
2383             r.remove_vpp_config()
2384
2385         # remove peers
2386         for i in range(NUM_PEERS):
2387             self.assertTrue(peers_0[i].query_vpp_config())
2388             peers_0[i].remove_vpp_config()
2389             if i == 0:
2390                 peers_0[i].wait_event(0)
2391                 peers_0[i].wait_event(DEAD_FLAG)
2392         for p in peers_1:
2393             self.assertTrue(p.query_vpp_config())
2394             p.remove_vpp_config()
2395             p.wait_event(0)
2396             p.wait_event(DEAD_FLAG)
2397
2398         wg0.remove_vpp_config()
2399         wg1.remove_vpp_config()
2400
2401     def test_wg_sending_handshake_when_admin_down(self):
2402         """Sending handshake when admin down"""
2403         port = 12323
2404
2405         # create wg interface
2406         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2407         wg0.config_ip4()
2408
2409         # create a peer
2410         peer_1 = VppWgPeer(
2411             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
2412         ).add_vpp_config()
2413         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
2414
2415         self.pg_enable_capture(self.pg_interfaces)
2416         self.pg_start()
2417
2418         # wait for the peer to send a handshake initiation
2419         # expect no handshakes
2420         for i in range(2):
2421             self.pg1.assert_nothing_captured(remark="handshake packet(s) sent")
2422
2423         self.pg_enable_capture(self.pg_interfaces)
2424         self.pg_start()
2425
2426         # administratively enable the wg interface
2427         # expect the peer to send a handshake initiation
2428         wg0.admin_up()
2429         rxs = self.pg1.get_capture(1, timeout=2)
2430         peer_1.consume_init(rxs[0], self.pg1)
2431
2432         self.pg_enable_capture(self.pg_interfaces)
2433         self.pg_start()
2434
2435         # administratively disable the wg interface
2436         # expect no handshakes
2437         wg0.admin_down()
2438         for i in range(6):
2439             self.pg1.assert_nothing_captured(remark="handshake packet(s) sent")
2440
2441         # remove configs
2442         peer_1.remove_vpp_config()
2443         wg0.remove_vpp_config()
2444
2445     def test_wg_sending_data_when_admin_down(self):
2446         """Sending data when admin down"""
2447         port = 12323
2448
2449         # create wg interface
2450         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2451         wg0.admin_up()
2452         wg0.config_ip4()
2453
2454         self.pg_enable_capture(self.pg_interfaces)
2455         self.pg_start()
2456
2457         # create a peer
2458         peer_1 = VppWgPeer(
2459             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
2460         ).add_vpp_config()
2461         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
2462
2463         # create a route to rewrite traffic into the wg interface
2464         r1 = VppIpRoute(
2465             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
2466         ).add_vpp_config()
2467
2468         # wait for the peer to send a handshake initiation
2469         rxs = self.pg1.get_capture(1, timeout=2)
2470
2471         # prepare and send a handshake response
2472         # expect a keepalive message
2473         resp = peer_1.consume_init(rxs[0], self.pg1)
2474         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
2475
2476         # verify the keepalive message
2477         b = peer_1.decrypt_transport(rxs[0])
2478         self.assertEqual(0, len(b))
2479
2480         # prepare and send a packet that will be rewritten into the wg interface
2481         # expect a data packet sent
2482         p = (
2483             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2484             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
2485             / UDP(sport=555, dport=556)
2486             / Raw()
2487         )
2488         rxs = self.send_and_expect(self.pg0, [p], self.pg1)
2489
2490         # verify the data packet
2491         peer_1.validate_encapped(rxs, p)
2492
2493         # administratively disable the wg interface
2494         wg0.admin_down()
2495
2496         # send a packet that will be rewritten into the wg interface
2497         # expect no data packets sent
2498         self.send_and_assert_no_replies(self.pg0, [p])
2499
2500         # administratively enable the wg interface
2501         # expect the peer to send a handshake initiation
2502         wg0.admin_up()
2503         peer_1.noise_reset()
2504         rxs = self.pg1.get_capture(1, timeout=2)
2505         resp = peer_1.consume_init(rxs[0], self.pg1)
2506
2507         # send a packet that will be rewritten into the wg interface
2508         # expect no data packets sent because the peer is not initiated
2509         self.send_and_assert_no_replies(self.pg0, [p])
2510         self.assertEqual(
2511             self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
2512         )
2513
2514         # send a handshake response and expect a keepalive message
2515         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
2516
2517         # verify the keepalive message
2518         b = peer_1.decrypt_transport(rxs[0])
2519         self.assertEqual(0, len(b))
2520
2521         # send a packet that will be rewritten into the wg interface
2522         # expect a data packet sent
2523         rxs = self.send_and_expect(self.pg0, [p], self.pg1)
2524
2525         # verify the data packet
2526         peer_1.validate_encapped(rxs, p)
2527
2528         # remove configs
2529         r1.remove_vpp_config()
2530         peer_1.remove_vpp_config()
2531         wg0.remove_vpp_config()
2532
2533     def _test_wg_large_packet_tmpl(self, is_async, is_ip6):
2534         self.vapi.wg_set_async_mode(is_async)
2535         port = 12323
2536
2537         # create wg interface
2538         if is_ip6:
2539             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
2540             wg0.admin_up()
2541             wg0.config_ip6()
2542         else:
2543             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2544             wg0.admin_up()
2545             wg0.config_ip4()
2546
2547         self.pg_enable_capture(self.pg_interfaces)
2548         self.pg_start()
2549
2550         # create a peer
2551         if is_ip6:
2552             peer_1 = VppWgPeer(
2553                 self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
2554             ).add_vpp_config()
2555         else:
2556             peer_1 = VppWgPeer(
2557                 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
2558             ).add_vpp_config()
2559         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
2560
2561         # create a route to rewrite traffic into the wg interface
2562         if is_ip6:
2563             r1 = VppIpRoute(
2564                 self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
2565             ).add_vpp_config()
2566         else:
2567             r1 = VppIpRoute(
2568                 self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
2569             ).add_vpp_config()
2570
2571         # wait for the peer to send a handshake initiation
2572         rxs = self.pg1.get_capture(1, timeout=2)
2573
2574         # prepare and send a handshake response
2575         # expect a keepalive message
2576         resp = peer_1.consume_init(rxs[0], self.pg1, is_ip6=is_ip6)
2577         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
2578
2579         # verify the keepalive message
2580         b = peer_1.decrypt_transport(rxs[0], is_ip6=is_ip6)
2581         self.assertEqual(0, len(b))
2582
2583         # prepare and send data packets
2584         # expect to receive them decrypted
2585         if is_ip6:
2586             ip_header = IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
2587         else:
2588             ip_header = IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
2589         packet_len_opts = (
2590             2500,  # two buffers
2591             1500,  # one buffer
2592             4500,  # three buffers
2593             1910 if is_ip6 else 1950,  # auth tag is not contiguous
2594         )
2595         txs = []
2596         for l in packet_len_opts:
2597             txs.append(
2598                 peer_1.mk_tunnel_header(self.pg1, is_ip6=is_ip6)
2599                 / Wireguard(message_type=4, reserved_zero=0)
2600                 / WireguardTransport(
2601                     receiver_index=peer_1.sender,
2602                     counter=len(txs),
2603                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
2604                         ip_header / UDP(sport=222, dport=223) / Raw(b"\xfe" * l)
2605                     ),
2606                 )
2607             )
2608         rxs = self.send_and_expect(self.pg1, txs, self.pg0)
2609
2610         # verify decrypted packets
2611         for i, l in enumerate(packet_len_opts):
2612             if is_ip6:
2613                 self.assertEqual(rxs[i][IPv6].dst, self.pg0.remote_ip6)
2614                 self.assertEqual(rxs[i][IPv6].hlim, ip_header.hlim - 1)
2615             else:
2616                 self.assertEqual(rxs[i][IP].dst, self.pg0.remote_ip4)
2617                 self.assertEqual(rxs[i][IP].ttl, ip_header.ttl - 1)
2618             self.assertEqual(len(rxs[i][Raw]), l)
2619             self.assertEqual(bytes(rxs[i][Raw]), b"\xfe" * l)
2620
2621         # prepare and send packets that will be rewritten into the wg interface
2622         # expect data packets sent
2623         if is_ip6:
2624             ip_header = IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
2625         else:
2626             ip_header = IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
2627         packet_len_opts = (
2628             2500,  # two buffers
2629             1500,  # one buffer
2630             4500,  # three buffers
2631             1980 if is_ip6 else 2000,  # no free space to write auth tag
2632         )
2633         txs = []
2634         for l in packet_len_opts:
2635             txs.append(
2636                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2637                 / ip_header
2638                 / UDP(sport=555, dport=556)
2639                 / Raw(b"\xfe" * l)
2640             )
2641         rxs = self.send_and_expect(self.pg0, txs, self.pg1)
2642
2643         # verify the data packets
2644         rxs_decrypted = peer_1.validate_encapped(
2645             rxs, ip_header, is_tunnel_ip6=is_ip6, is_transport_ip6=is_ip6
2646         )
2647
2648         for i, l in enumerate(packet_len_opts):
2649             self.assertEqual(len(rxs_decrypted[i][Raw]), l)
2650             self.assertEqual(bytes(rxs_decrypted[i][Raw]), b"\xfe" * l)
2651
2652         # remove configs
2653         r1.remove_vpp_config()
2654         peer_1.remove_vpp_config()
2655         wg0.remove_vpp_config()
2656
2657     def test_wg_large_packet_v4_sync(self):
2658         """Large packet (v4, sync)"""
2659         self._test_wg_large_packet_tmpl(is_async=False, is_ip6=False)
2660
2661     def test_wg_large_packet_v6_sync(self):
2662         """Large packet (v6, sync)"""
2663         self._test_wg_large_packet_tmpl(is_async=False, is_ip6=True)
2664
2665     def test_wg_large_packet_v4_async(self):
2666         """Large packet (v4, async)"""
2667         self._test_wg_large_packet_tmpl(is_async=True, is_ip6=False)
2668
2669     def test_wg_large_packet_v6_async(self):
2670         """Large packet (v6, async)"""
2671         self._test_wg_large_packet_tmpl(is_async=True, is_ip6=True)
2672
2673     def test_wg_lack_of_buf_headroom(self):
2674         """Lack of buffer's headroom (v6 vxlan over v6 wg)"""
2675         port = 12323
2676
2677         # create wg interface
2678         wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
2679         wg0.admin_up()
2680         wg0.config_ip6()
2681
2682         self.pg_enable_capture(self.pg_interfaces)
2683         self.pg_start()
2684
2685         # create a peer
2686         peer_1 = VppWgPeer(
2687             self, wg0, self.pg1.remote_ip6, port + 1, ["::/0"]
2688         ).add_vpp_config()
2689         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
2690
2691         # create a route to enable communication between wg interface addresses
2692         r1 = VppIpRoute(
2693             self, wg0.remote_ip6, 128, [VppRoutePath("0.0.0.0", wg0.sw_if_index)]
2694         ).add_vpp_config()
2695
2696         # wait for the peer to send a handshake initiation
2697         rxs = self.pg1.get_capture(1, timeout=2)
2698
2699         # prepare and send a handshake response
2700         # expect a keepalive message
2701         resp = peer_1.consume_init(rxs[0], self.pg1, is_ip6=True)
2702         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
2703
2704         # verify the keepalive message
2705         b = peer_1.decrypt_transport(rxs[0], is_ip6=True)
2706         self.assertEqual(0, len(b))
2707
2708         # create vxlan interface over the wg interface
2709         vxlan0 = VppVxlanTunnel(self, src=wg0.local_ip6, dst=wg0.remote_ip6, vni=1111)
2710         vxlan0.add_vpp_config()
2711
2712         # create bridge domain
2713         bd1 = VppBridgeDomain(self, bd_id=1)
2714         bd1.add_vpp_config()
2715
2716         # add the vxlan interface and pg0 to the bridge domain
2717         bd1_ports = (
2718             VppBridgeDomainPort(self, bd1, vxlan0).add_vpp_config(),
2719             VppBridgeDomainPort(self, bd1, self.pg0).add_vpp_config(),
2720         )
2721
2722         # prepare and send packets that will be rewritten into the vxlan interface
2723         # expect they to be rewritten into the wg interface then and data packets sent
2724         tx = (
2725             Ether(dst="00:00:00:00:00:01", src="00:00:00:00:00:02")
2726             / IPv6(src="::1", dst="::2", hlim=20)
2727             / UDP(sport=1111, dport=1112)
2728             / Raw(b"\xfe" * 1900)
2729         )
2730         rxs = self.send_and_expect(self.pg0, [tx] * 5, self.pg1)
2731
2732         # verify the data packet
2733         for rx in rxs:
2734             rx_decrypted = IPv6(peer_1.decrypt_transport(rx, is_ip6=True))
2735
2736             self.assertEqual(rx_decrypted[VXLAN].vni, vxlan0.vni)
2737             inner = rx_decrypted[VXLAN].payload
2738
2739             # check the original packet is present
2740             self.assertEqual(inner[IPv6].dst, tx[IPv6].dst)
2741             self.assertEqual(inner[IPv6].hlim, tx[IPv6].hlim)
2742             self.assertEqual(len(inner[Raw]), len(tx[Raw]))
2743             self.assertEqual(bytes(inner[Raw]), bytes(tx[Raw]))
2744
2745         # remove configs
2746         for bdp in bd1_ports:
2747             bdp.remove_vpp_config()
2748         bd1.remove_vpp_config()
2749         vxlan0.remove_vpp_config()
2750         r1.remove_vpp_config()
2751         peer_1.remove_vpp_config()
2752         wg0.remove_vpp_config()
2753
2754
2755 @tag_fixme_vpp_debug
2756 class WireguardHandoffTests(TestWg):
2757     """Wireguard Tests in multi worker setup"""
2758
2759     vpp_worker_count = 2
2760
2761     def test_wg_peer_init(self):
2762         """Handoff"""
2763
2764         port = 12383
2765
2766         # Create interfaces
2767         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2768         wg0.admin_up()
2769         wg0.config_ip4()
2770
2771         self.pg_enable_capture(self.pg_interfaces)
2772         self.pg_start()
2773
2774         peer_1 = VppWgPeer(
2775             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.2.0/24", "10.11.3.0/24"]
2776         ).add_vpp_config()
2777         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
2778
2779         r1 = VppIpRoute(
2780             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
2781         ).add_vpp_config()
2782
2783         # skip the first automatic handshake
2784         self.pg1.get_capture(1, timeout=HANDSHAKE_JITTER)
2785
2786         # send a valid handsake init for which we expect a response
2787         p = peer_1.mk_handshake(self.pg1)
2788
2789         rx = self.send_and_expect(self.pg1, [p], self.pg1)
2790
2791         peer_1.consume_response(rx[0])
2792
2793         # send a data packet from the peer through the tunnel
2794         # this completes the handshake and pins the peer to worker 0
2795         p = (
2796             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
2797             / UDP(sport=222, dport=223)
2798             / Raw()
2799         )
2800         d = peer_1.encrypt_transport(p)
2801         p = peer_1.mk_tunnel_header(self.pg1) / (
2802             Wireguard(message_type=4, reserved_zero=0)
2803             / WireguardTransport(
2804                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
2805             )
2806         )
2807         rxs = self.send_and_expect(self.pg1, [p], self.pg0, worker=0)
2808
2809         for rx in rxs:
2810             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
2811             self.assertEqual(rx[IP].ttl, 19)
2812
2813         # send a packets that are routed into the tunnel
2814         # and pins the peer tp worker 1
2815         pe = (
2816             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2817             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
2818             / UDP(sport=555, dport=556)
2819             / Raw(b"\x00" * 80)
2820         )
2821         rxs = self.send_and_expect(self.pg0, pe * 255, self.pg1, worker=1)
2822         peer_1.validate_encapped(rxs, pe)
2823
2824         # send packets into the tunnel, from the other worker
2825         p = [
2826             (
2827                 peer_1.mk_tunnel_header(self.pg1)
2828                 / Wireguard(message_type=4, reserved_zero=0)
2829                 / WireguardTransport(
2830                     receiver_index=peer_1.sender,
2831                     counter=ii + 1,
2832                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
2833                         (
2834                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
2835                             / UDP(sport=222, dport=223)
2836                             / Raw()
2837                         )
2838                     ),
2839                 )
2840             )
2841             for ii in range(255)
2842         ]
2843
2844         rxs = self.send_and_expect(self.pg1, p, self.pg0, worker=1)
2845
2846         for rx in rxs:
2847             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
2848             self.assertEqual(rx[IP].ttl, 19)
2849
2850         # send a packets that are routed into the tunnel
2851         # from worker 0
2852         rxs = self.send_and_expect(self.pg0, pe * 255, self.pg1, worker=0)
2853
2854         peer_1.validate_encapped(rxs, pe)
2855
2856         r1.remove_vpp_config()
2857         peer_1.remove_vpp_config()
2858         wg0.remove_vpp_config()
2859
2860     @unittest.skip("test disabled")
2861     def test_wg_multi_interface(self):
2862         """Multi-tunnel on the same port"""
2863
2864
2865 @unittest.skipIf(
2866     "wireguard" in config.excluded_plugins, "Exclude Wireguard plugin tests"
2867 )
2868 @tag_run_solo
2869 class TestWgFIB(VppTestCase):
2870     """Wireguard FIB Test Case"""
2871
2872     @classmethod
2873     def setUpClass(cls):
2874         super(TestWgFIB, cls).setUpClass()
2875
2876     @classmethod
2877     def tearDownClass(cls):
2878         super(TestWgFIB, cls).tearDownClass()
2879
2880     def setUp(self):
2881         super(TestWgFIB, self).setUp()
2882
2883         self.create_pg_interfaces(range(2))
2884
2885         for i in self.pg_interfaces:
2886             i.admin_up()
2887             i.config_ip4()
2888
2889     def tearDown(self):
2890         for i in self.pg_interfaces:
2891             i.unconfig_ip4()
2892             i.admin_down()
2893         super(TestWgFIB, self).tearDown()
2894
2895     def test_wg_fib_tracking(self):
2896         """FIB tracking"""
2897         port = 12323
2898
2899         # create wg interface
2900         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2901         wg0.admin_up()
2902         wg0.config_ip4()
2903
2904         self.pg_enable_capture(self.pg_interfaces)
2905         self.pg_start()
2906
2907         # create a peer
2908         peer_1 = VppWgPeer(
2909             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
2910         ).add_vpp_config()
2911         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
2912
2913         # create a route to rewrite traffic into the wg interface
2914         r1 = VppIpRoute(
2915             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
2916         ).add_vpp_config()
2917
2918         # resolve ARP and expect the adjacency to update
2919         self.pg1.resolve_arp()
2920
2921         # wait for the peer to send a handshake initiation
2922         rxs = self.pg1.get_capture(2, timeout=6)
2923
2924         # prepare and send a handshake response
2925         # expect a keepalive message
2926         resp = peer_1.consume_init(rxs[1], self.pg1)
2927         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
2928
2929         # verify the keepalive message
2930         b = peer_1.decrypt_transport(rxs[0])
2931         self.assertEqual(0, len(b))
2932
2933         # prepare and send a packet that will be rewritten into the wg interface
2934         # expect a data packet sent
2935         p = (
2936             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2937             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
2938             / UDP(sport=555, dport=556)
2939             / Raw()
2940         )
2941         rxs = self.send_and_expect(self.pg0, [p], self.pg1)
2942
2943         # verify the data packet
2944         peer_1.validate_encapped(rxs, p)
2945
2946         # remove configs
2947         r1.remove_vpp_config()
2948         peer_1.remove_vpp_config()
2949         wg0.remove_vpp_config()