8 from hashlib import blake2s
9 from scapy.packet import Packet
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP, UDP
13 from scapy.layers.inet6 import IPv6
14 from scapy.contrib.wireguard import (
20 from cryptography.hazmat.primitives.asymmetric.x25519 import (
24 from cryptography.hazmat.primitives.serialization import (
30 from cryptography.hazmat.primitives.hashes import BLAKE2s, Hash
31 from cryptography.hazmat.primitives.hmac import HMAC
32 from cryptography.hazmat.backends import default_backend
33 from noise.connection import NoiseConnection, Keypair
35 from vpp_ipip_tun_interface import VppIpIpTunInterface
36 from vpp_interface import VppInterface
37 from vpp_ip_route import VppIpRoute, VppRoutePath
38 from vpp_object import VppObject
39 from vpp_papi import VppEnum
40 from framework import VppTestCase
41 from re import compile
44 """ TestWg is a subclass of VPPTestCase classes.
51 def private_key_bytes(k):
52 return k.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption())
55 def public_key_bytes(k):
56 return k.public_bytes(Encoding.Raw, PublicFormat.Raw)
59 class VppWgInterface(VppInterface):
61 VPP WireGuard interface
64 def __init__(self, test, src, port):
65 super(VppWgInterface, self).__init__(test)
69 self.private_key = X25519PrivateKey.generate()
70 self.public_key = self.private_key.public_key()
72 def public_key_bytes(self):
73 return public_key_bytes(self.public_key)
75 def private_key_bytes(self):
76 return private_key_bytes(self.private_key)
78 def add_vpp_config(self):
79 r = self.test.vapi.wireguard_interface_create(
81 "user_instance": 0xFFFFFFFF,
84 "private_key": private_key_bytes(self.private_key),
85 "generate_key": False,
88 self.set_sw_if_index(r.sw_if_index)
89 self.test.registry.register(self, self.test.logger)
92 def remove_vpp_config(self):
93 self.test.vapi.wireguard_interface_delete(sw_if_index=self._sw_if_index)
95 def query_vpp_config(self):
96 ts = self.test.vapi.wireguard_interface_dump(sw_if_index=0xFFFFFFFF)
99 t.interface.sw_if_index == self._sw_if_index
100 and str(t.interface.src_ip) == self.src
101 and t.interface.port == self.port
102 and t.interface.private_key == private_key_bytes(self.private_key)
107 def want_events(self, peer_index=0xFFFFFFFF):
108 self.test.vapi.want_wireguard_peer_events(
111 sw_if_index=self._sw_if_index,
112 peer_index=peer_index,
115 def wait_events(self, expect, peers, timeout=5):
116 for i in range(len(peers)):
117 rv = self.test.vapi.wait_for_event(timeout, "wireguard_peer_event")
118 self.test.assertEqual(rv.peer_index, peers[i])
119 self.test.assertEqual(rv.flags, expect)
122 return self.object_id()
125 return "wireguard-%d" % self._sw_if_index
128 def find_route(test, prefix, is_ip6, table_id=0):
129 routes = test.vapi.ip_route_dump(table_id, is_ip6)
132 if table_id == e.route.table_id and str(e.route.prefix) == str(prefix):
137 NOISE_HANDSHAKE_NAME = b"Noise_IKpsk2_25519_ChaChaPoly_BLAKE2s"
138 NOISE_IDENTIFIER_NAME = b"WireGuard v1 zx2c4 Jason@zx2c4.com"
141 class VppWgPeer(VppObject):
142 def __init__(self, test, itf, endpoint, port, allowed_ips, persistent_keepalive=15):
145 self.endpoint = endpoint
147 self.allowed_ips = allowed_ips
148 self.persistent_keepalive = persistent_keepalive
150 # remote peer's public
151 self.private_key = X25519PrivateKey.generate()
152 self.public_key = self.private_key.public_key()
154 self.noise = NoiseConnection.from_name(NOISE_HANDSHAKE_NAME)
156 def add_vpp_config(self, is_ip6=False):
157 rv = self._test.vapi.wireguard_peer_add(
159 "public_key": self.public_key_bytes(),
161 "endpoint": self.endpoint,
162 "n_allowed_ips": len(self.allowed_ips),
163 "allowed_ips": self.allowed_ips,
164 "sw_if_index": self.itf.sw_if_index,
165 "persistent_keepalive": self.persistent_keepalive,
168 self.index = rv.peer_index
169 self.receiver_index = self.index + 1
170 self._test.registry.register(self, self._test.logger)
173 def remove_vpp_config(self):
174 self._test.vapi.wireguard_peer_remove(peer_index=self.index)
177 return "wireguard-peer-%s" % self.index
179 def public_key_bytes(self):
180 return public_key_bytes(self.public_key)
182 def query_vpp_config(self):
183 peers = self._test.vapi.wireguard_peers_dump()
187 p.peer.public_key == self.public_key_bytes()
188 and p.peer.port == self.port
189 and str(p.peer.endpoint) == self.endpoint
190 and p.peer.sw_if_index == self.itf.sw_if_index
191 and len(self.allowed_ips) == p.peer.n_allowed_ips
193 self.allowed_ips.sort()
194 p.peer.allowed_ips.sort()
196 for (a1, a2) in zip(self.allowed_ips, p.peer.allowed_ips):
197 if str(a1) != str(a2):
202 def set_responder(self):
203 self.noise.set_as_responder()
205 def mk_tunnel_header(self, tx_itf, is_ip6=False):
208 Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac)
209 / IP(src=self.endpoint, dst=self.itf.src)
210 / UDP(sport=self.port, dport=self.itf.port)
214 Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac)
215 / IPv6(src=self.endpoint, dst=self.itf.src)
216 / UDP(sport=self.port, dport=self.itf.port)
219 def noise_init(self, public_key=None):
220 self.noise.set_prologue(NOISE_IDENTIFIER_NAME)
221 self.noise.set_psks(psk=bytes(bytearray(32)))
224 public_key = self.itf.public_key
227 self.noise.set_keypair_from_private_bytes(
228 Keypair.STATIC, private_key_bytes(self.private_key)
231 self.noise.set_keypair_from_public_bytes(
232 Keypair.REMOTE_STATIC, public_key_bytes(public_key)
235 self.noise.start_handshake()
237 def mk_handshake(self, tx_itf, is_ip6=False, public_key=None):
238 self.noise.set_as_initiator()
239 self.noise_init(public_key)
241 p = Wireguard() / WireguardInitiation()
243 p[Wireguard].message_type = 1
244 p[Wireguard].reserved_zero = 0
245 p[WireguardInitiation].sender_index = self.receiver_index
247 # some random data for the message
248 # lifted from the noise protocol's wireguard example
249 now = datetime.datetime.now()
252 4611686018427387914 + int(now.timestamp()),
253 int(now.microsecond * 1e3),
255 b = self.noise.write_message(payload=tai)
257 # load noise into init message
258 p[WireguardInitiation].unencrypted_ephemeral = b[0:32]
259 p[WireguardInitiation].encrypted_static = b[32:80]
260 p[WireguardInitiation].encrypted_timestamp = b[80:108]
262 # generate the mac1 hash
263 mac_key = blake2s(b"mac1----" + self.itf.public_key_bytes()).digest()
264 p[WireguardInitiation].mac1 = blake2s(
265 bytes(p)[0:116], digest_size=16, key=mac_key
267 p[WireguardInitiation].mac2 = bytearray(16)
269 p = self.mk_tunnel_header(tx_itf, is_ip6) / p
273 def verify_header(self, p, is_ip6=False):
275 self._test.assertEqual(p[IP].src, self.itf.src)
276 self._test.assertEqual(p[IP].dst, self.endpoint)
278 self._test.assertEqual(p[IPv6].src, self.itf.src)
279 self._test.assertEqual(p[IPv6].dst, self.endpoint)
280 self._test.assertEqual(p[UDP].sport, self.itf.port)
281 self._test.assertEqual(p[UDP].dport, self.port)
282 self._test.assert_packet_checksums_valid(p)
284 def consume_init(self, p, tx_itf, is_ip6=False):
285 self.noise.set_as_responder()
286 self.noise_init(self.itf.public_key)
287 self.verify_header(p, is_ip6)
289 init = Wireguard(p[Raw])
291 self._test.assertEqual(init[Wireguard].message_type, 1)
292 self._test.assertEqual(init[Wireguard].reserved_zero, 0)
294 self.sender = init[WireguardInitiation].sender_index
297 mac_key = blake2s(b"mac1----" + public_key_bytes(self.public_key)).digest()
298 mac1 = blake2s(bytes(init)[0:-32], digest_size=16, key=mac_key).digest()
299 self._test.assertEqual(init[WireguardInitiation].mac1, mac1)
301 # this passes only unencrypted_ephemeral, encrypted_static,
302 # encrypted_timestamp fields of the init
303 payload = self.noise.read_message(bytes(init)[8:-32])
306 b = self.noise.write_message()
307 mac_key = blake2s(b"mac1----" + public_key_bytes(self.itf.public_key)).digest()
308 resp = Wireguard(message_type=2, reserved_zero=0) / WireguardResponse(
309 sender_index=self.receiver_index,
310 receiver_index=self.sender,
311 unencrypted_ephemeral=b[0:32],
312 encrypted_nothing=b[32:],
314 mac1 = blake2s(bytes(resp)[:-32], digest_size=16, key=mac_key).digest()
315 resp[WireguardResponse].mac1 = mac1
317 resp = self.mk_tunnel_header(tx_itf, is_ip6) / resp
318 self._test.assertTrue(self.noise.handshake_finished)
322 def consume_response(self, p, is_ip6=False):
323 self.verify_header(p, is_ip6)
325 resp = Wireguard(p[Raw])
327 self._test.assertEqual(resp[Wireguard].message_type, 2)
328 self._test.assertEqual(resp[Wireguard].reserved_zero, 0)
329 self._test.assertEqual(
330 resp[WireguardResponse].receiver_index, self.receiver_index
333 self.sender = resp[Wireguard].sender_index
335 payload = self.noise.read_message(bytes(resp)[12:60])
336 self._test.assertEqual(payload, b"")
337 self._test.assertTrue(self.noise.handshake_finished)
339 def decrypt_transport(self, p, is_ip6=False):
340 self.verify_header(p, is_ip6)
342 p = Wireguard(p[Raw])
343 self._test.assertEqual(p[Wireguard].message_type, 4)
344 self._test.assertEqual(p[Wireguard].reserved_zero, 0)
345 self._test.assertEqual(
346 p[WireguardTransport].receiver_index, self.receiver_index
349 d = self.noise.decrypt(p[WireguardTransport].encrypted_encapsulated_packet)
352 def encrypt_transport(self, p):
353 return self.noise.encrypt(bytes(p))
355 def validate_encapped(self, rxs, tx, is_ip6=False):
358 rx = IP(self.decrypt_transport(rx))
360 # chech the oringial packet is present
361 self._test.assertEqual(rx[IP].dst, tx[IP].dst)
362 self._test.assertEqual(rx[IP].ttl, tx[IP].ttl - 1)
364 rx = IPv6(self.decrypt_transport(rx))
366 # chech the oringial packet is present
367 self._test.assertEqual(rx[IPv6].dst, tx[IPv6].dst)
368 self._test.assertEqual(rx[IPv6].ttl, tx[IPv6].ttl - 1)
370 def want_events(self):
371 self._test.vapi.want_wireguard_peer_events(
374 peer_index=self.index,
375 sw_if_index=self.itf.sw_if_index,
378 def wait_event(self, expect, timeout=5):
379 rv = self._test.vapi.wait_for_event(timeout, "wireguard_peer_event")
380 self._test.assertEqual(rv.flags, expect)
381 self._test.assertEqual(rv.peer_index, self.index)
384 class TestWg(VppTestCase):
385 """Wireguard Test Case"""
387 error_str = compile(r"Error")
389 wg4_output_node_name = "/err/wg4-output-tun/"
390 wg4_input_node_name = "/err/wg4-input/"
391 wg6_output_node_name = "/err/wg6-output-tun/"
392 wg6_input_node_name = "/err/wg6-input/"
393 kp4_error = wg4_output_node_name + "Keypair error"
394 mac4_error = wg4_input_node_name + "Invalid MAC handshake"
395 peer4_error = wg4_input_node_name + "Peer error"
396 kp6_error = wg6_output_node_name + "Keypair error"
397 mac6_error = wg6_input_node_name + "Invalid MAC handshake"
398 peer6_error = wg6_input_node_name + "Peer error"
402 super(TestWg, cls).setUpClass()
404 cls.create_pg_interfaces(range(3))
405 for i in cls.pg_interfaces:
413 super(TestWg, cls).tearDownClass()
417 def tearDownClass(cls):
418 super(TestWg, cls).tearDownClass()
421 super(VppTestCase, self).setUp()
422 self.base_kp4_err = self.statistics.get_err_counter(self.kp4_error)
423 self.base_mac4_err = self.statistics.get_err_counter(self.mac4_error)
424 self.base_peer4_err = self.statistics.get_err_counter(self.peer4_error)
425 self.base_kp6_err = self.statistics.get_err_counter(self.kp6_error)
426 self.base_mac6_err = self.statistics.get_err_counter(self.mac6_error)
427 self.base_peer6_err = self.statistics.get_err_counter(self.peer6_error)
429 def test_wg_interface(self):
430 """Simple interface creation"""
434 wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
436 self.logger.info(self.vapi.cli("sh int"))
439 wg0.remove_vpp_config()
441 def test_handshake_hash(self):
442 """test hashing an init message"""
443 # a init packet generated by linux given the key below
466 b = bytearray.fromhex(h)
469 pubb = base64.b64decode("aRuHFTTxICIQNefp05oKWlJv3zgKxb8+WW7JJMh0jyM=")
470 pub = X25519PublicKey.from_public_bytes(pubb)
472 self.assertEqual(pubb, public_key_bytes(pub))
474 # strip the macs and build a new packet
476 mac_key = blake2s(b"mac1----" + public_key_bytes(pub)).digest()
477 init += blake2s(init, digest_size=16, key=mac_key).digest()
480 act = Wireguard(init)
482 self.assertEqual(tgt, act)
484 def test_wg_peer_resp(self):
485 """Send handshake response"""
489 wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
493 self.pg_enable_capture(self.pg_interfaces)
497 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
499 self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
502 self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
505 # wait for the peer to send a handshake
506 rx = self.pg1.get_capture(1, timeout=2)
508 # consume the handshake in the noise protocol and
509 # generate the response
510 resp = peer_1.consume_init(rx[0], self.pg1)
512 # send the response, get keepalive
513 rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
516 b = peer_1.decrypt_transport(rx)
517 self.assertEqual(0, len(b))
519 # send a packets that are routed into the tunnel
521 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
522 / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
523 / UDP(sport=555, dport=556)
527 rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
529 peer_1.validate_encapped(rxs, p)
531 # send packets into the tunnel, expect to receive them on
535 peer_1.mk_tunnel_header(self.pg1)
536 / Wireguard(message_type=4, reserved_zero=0)
537 / WireguardTransport(
538 receiver_index=peer_1.sender,
540 encrypted_encapsulated_packet=peer_1.encrypt_transport(
542 IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
543 / UDP(sport=222, dport=223)
552 rxs = self.send_and_expect(self.pg1, p, self.pg0)
555 self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
556 self.assertEqual(rx[IP].ttl, 19)
558 r1.remove_vpp_config()
559 peer_1.remove_vpp_config()
560 wg0.remove_vpp_config()
562 def test_wg_peer_v4o4(self):
568 wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
573 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
575 self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
578 self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
581 # route a packet into the wg interface
582 # use the allowed-ip prefix
583 # this is dropped because the peer is not initiated
585 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
586 / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
587 / UDP(sport=555, dport=556)
590 self.send_and_assert_no_replies(self.pg0, [p])
592 self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
595 # send a handsake from the peer with an invalid MAC
596 p = peer_1.mk_handshake(self.pg1)
597 p[WireguardInitiation].mac1 = b"foobar"
598 self.send_and_assert_no_replies(self.pg1, [p])
600 self.base_mac4_err + 1, self.statistics.get_err_counter(self.mac4_error)
603 # send a handsake from the peer but signed by the wrong key.
604 p = peer_1.mk_handshake(
605 self.pg1, False, X25519PrivateKey.generate().public_key()
607 self.send_and_assert_no_replies(self.pg1, [p])
609 self.base_peer4_err + 1, self.statistics.get_err_counter(self.peer4_error)
612 # send a valid handsake init for which we expect a response
613 p = peer_1.mk_handshake(self.pg1)
615 rx = self.send_and_expect(self.pg1, [p], self.pg1)
617 peer_1.consume_response(rx[0])
619 # route a packet into the wg interface
620 # this is dropped because the peer is still not initiated
622 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
623 / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
624 / UDP(sport=555, dport=556)
627 self.send_and_assert_no_replies(self.pg0, [p])
629 self.base_kp4_err + 2, self.statistics.get_err_counter(self.kp4_error)
632 # send a data packet from the peer through the tunnel
633 # this completes the handshake
635 IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
636 / UDP(sport=222, dport=223)
639 d = peer_1.encrypt_transport(p)
640 p = peer_1.mk_tunnel_header(self.pg1) / (
641 Wireguard(message_type=4, reserved_zero=0)
642 / WireguardTransport(
643 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
646 rxs = self.send_and_expect(self.pg1, [p], self.pg0)
649 self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
650 self.assertEqual(rx[IP].ttl, 19)
652 # send a packets that are routed into the tunnel
654 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
655 / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
656 / UDP(sport=555, dport=556)
660 rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
663 rx = IP(peer_1.decrypt_transport(rx))
665 # chech the oringial packet is present
666 self.assertEqual(rx[IP].dst, p[IP].dst)
667 self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
669 # send packets into the tunnel, expect to receive them on
673 peer_1.mk_tunnel_header(self.pg1)
674 / Wireguard(message_type=4, reserved_zero=0)
675 / WireguardTransport(
676 receiver_index=peer_1.sender,
678 encrypted_encapsulated_packet=peer_1.encrypt_transport(
680 IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
681 / UDP(sport=222, dport=223)
690 rxs = self.send_and_expect(self.pg1, p, self.pg0)
693 self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
694 self.assertEqual(rx[IP].ttl, 19)
696 r1.remove_vpp_config()
697 peer_1.remove_vpp_config()
698 wg0.remove_vpp_config()
700 def test_wg_peer_v6o6(self):
706 wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
711 self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
712 ).add_vpp_config(True)
713 self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
716 self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
719 # route a packet into the wg interface
720 # use the allowed-ip prefix
721 # this is dropped because the peer is not initiated
724 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
725 / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
726 / UDP(sport=555, dport=556)
729 self.send_and_assert_no_replies(self.pg0, [p])
732 self.base_kp6_err + 1, self.statistics.get_err_counter(self.kp6_error)
735 # send a handsake from the peer with an invalid MAC
736 p = peer_1.mk_handshake(self.pg1, True)
737 p[WireguardInitiation].mac1 = b"foobar"
738 self.send_and_assert_no_replies(self.pg1, [p])
741 self.base_mac6_err + 1, self.statistics.get_err_counter(self.mac6_error)
744 # send a handsake from the peer but signed by the wrong key.
745 p = peer_1.mk_handshake(
746 self.pg1, True, X25519PrivateKey.generate().public_key()
748 self.send_and_assert_no_replies(self.pg1, [p])
750 self.base_peer6_err + 1, self.statistics.get_err_counter(self.peer6_error)
753 # send a valid handsake init for which we expect a response
754 p = peer_1.mk_handshake(self.pg1, True)
756 rx = self.send_and_expect(self.pg1, [p], self.pg1)
758 peer_1.consume_response(rx[0], True)
760 # route a packet into the wg interface
761 # this is dropped because the peer is still not initiated
763 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
764 / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
765 / UDP(sport=555, dport=556)
768 self.send_and_assert_no_replies(self.pg0, [p])
770 self.base_kp6_err + 2, self.statistics.get_err_counter(self.kp6_error)
773 # send a data packet from the peer through the tunnel
774 # this completes the handshake
776 IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
777 / UDP(sport=222, dport=223)
780 d = peer_1.encrypt_transport(p)
781 p = peer_1.mk_tunnel_header(self.pg1, True) / (
782 Wireguard(message_type=4, reserved_zero=0)
783 / WireguardTransport(
784 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
787 rxs = self.send_and_expect(self.pg1, [p], self.pg0)
790 self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
791 self.assertEqual(rx[IPv6].hlim, 19)
793 # send a packets that are routed into the tunnel
795 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
796 / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
797 / UDP(sport=555, dport=556)
801 rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
804 rx = IPv6(peer_1.decrypt_transport(rx, True))
806 # chech the oringial packet is present
807 self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
808 self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim - 1)
810 # send packets into the tunnel, expect to receive them on
814 peer_1.mk_tunnel_header(self.pg1, True)
815 / Wireguard(message_type=4, reserved_zero=0)
816 / WireguardTransport(
817 receiver_index=peer_1.sender,
819 encrypted_encapsulated_packet=peer_1.encrypt_transport(
821 IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
822 / UDP(sport=222, dport=223)
831 rxs = self.send_and_expect(self.pg1, p, self.pg0)
834 self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
835 self.assertEqual(rx[IPv6].hlim, 19)
837 r1.remove_vpp_config()
838 peer_1.remove_vpp_config()
839 wg0.remove_vpp_config()
841 def test_wg_peer_v6o4(self):
847 wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
852 self, wg0, self.pg1.remote_ip4, port + 1, ["1::3:0/112"]
853 ).add_vpp_config(True)
854 self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
857 self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
860 # route a packet into the wg interface
861 # use the allowed-ip prefix
862 # this is dropped because the peer is not initiated
864 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
865 / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
866 / UDP(sport=555, dport=556)
869 self.send_and_assert_no_replies(self.pg0, [p])
871 self.base_kp6_err + 1, self.statistics.get_err_counter(self.kp6_error)
874 # send a handsake from the peer with an invalid MAC
875 p = peer_1.mk_handshake(self.pg1)
876 p[WireguardInitiation].mac1 = b"foobar"
877 self.send_and_assert_no_replies(self.pg1, [p])
880 self.base_mac4_err + 1, self.statistics.get_err_counter(self.mac4_error)
883 # send a handsake from the peer but signed by the wrong key.
884 p = peer_1.mk_handshake(
885 self.pg1, False, X25519PrivateKey.generate().public_key()
887 self.send_and_assert_no_replies(self.pg1, [p])
889 self.base_peer4_err + 1, self.statistics.get_err_counter(self.peer4_error)
892 # send a valid handsake init for which we expect a response
893 p = peer_1.mk_handshake(self.pg1)
895 rx = self.send_and_expect(self.pg1, [p], self.pg1)
897 peer_1.consume_response(rx[0])
899 # route a packet into the wg interface
900 # this is dropped because the peer is still not initiated
902 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
903 / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
904 / UDP(sport=555, dport=556)
907 self.send_and_assert_no_replies(self.pg0, [p])
909 self.base_kp6_err + 2, self.statistics.get_err_counter(self.kp6_error)
912 # send a data packet from the peer through the tunnel
913 # this completes the handshake
915 IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
916 / UDP(sport=222, dport=223)
919 d = peer_1.encrypt_transport(p)
920 p = peer_1.mk_tunnel_header(self.pg1) / (
921 Wireguard(message_type=4, reserved_zero=0)
922 / WireguardTransport(
923 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
926 rxs = self.send_and_expect(self.pg1, [p], self.pg0)
929 self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
930 self.assertEqual(rx[IPv6].hlim, 19)
932 # send a packets that are routed into the tunnel
934 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
935 / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
936 / UDP(sport=555, dport=556)
940 rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
943 rx = IPv6(peer_1.decrypt_transport(rx))
945 # chech the oringial packet is present
946 self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
947 self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim - 1)
949 # send packets into the tunnel, expect to receive them on
953 peer_1.mk_tunnel_header(self.pg1)
954 / Wireguard(message_type=4, reserved_zero=0)
955 / WireguardTransport(
956 receiver_index=peer_1.sender,
958 encrypted_encapsulated_packet=peer_1.encrypt_transport(
960 IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
961 / UDP(sport=222, dport=223)
970 rxs = self.send_and_expect(self.pg1, p, self.pg0)
973 self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
974 self.assertEqual(rx[IPv6].hlim, 19)
976 r1.remove_vpp_config()
977 peer_1.remove_vpp_config()
978 wg0.remove_vpp_config()
980 def test_wg_peer_v4o6(self):
986 wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
991 self, wg0, self.pg1.remote_ip6, port + 1, ["10.11.3.0/24"]
993 self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
996 self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
999 # route a packet into the wg interface
1000 # use the allowed-ip prefix
1001 # this is dropped because the peer is not initiated
1003 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1004 / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1005 / UDP(sport=555, dport=556)
1008 self.send_and_assert_no_replies(self.pg0, [p])
1010 self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
1013 # send a handsake from the peer with an invalid MAC
1014 p = peer_1.mk_handshake(self.pg1, True)
1015 p[WireguardInitiation].mac1 = b"foobar"
1016 self.send_and_assert_no_replies(self.pg1, [p])
1018 self.base_mac6_err + 1, self.statistics.get_err_counter(self.mac6_error)
1021 # send a handsake from the peer but signed by the wrong key.
1022 p = peer_1.mk_handshake(
1023 self.pg1, True, X25519PrivateKey.generate().public_key()
1025 self.send_and_assert_no_replies(self.pg1, [p])
1027 self.base_peer6_err + 1, self.statistics.get_err_counter(self.peer6_error)
1030 # send a valid handsake init for which we expect a response
1031 p = peer_1.mk_handshake(self.pg1, True)
1033 rx = self.send_and_expect(self.pg1, [p], self.pg1)
1035 peer_1.consume_response(rx[0], True)
1037 # route a packet into the wg interface
1038 # this is dropped because the peer is still not initiated
1040 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1041 / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1042 / UDP(sport=555, dport=556)
1045 self.send_and_assert_no_replies(self.pg0, [p])
1047 self.base_kp4_err + 2, self.statistics.get_err_counter(self.kp4_error)
1050 # send a data packet from the peer through the tunnel
1051 # this completes the handshake
1053 IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1054 / UDP(sport=222, dport=223)
1057 d = peer_1.encrypt_transport(p)
1058 p = peer_1.mk_tunnel_header(self.pg1, True) / (
1059 Wireguard(message_type=4, reserved_zero=0)
1060 / WireguardTransport(
1061 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1064 rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1067 self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1068 self.assertEqual(rx[IP].ttl, 19)
1070 # send a packets that are routed into the tunnel
1072 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1073 / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1074 / UDP(sport=555, dport=556)
1078 rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1081 rx = IP(peer_1.decrypt_transport(rx, True))
1083 # chech the oringial packet is present
1084 self.assertEqual(rx[IP].dst, p[IP].dst)
1085 self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
1087 # send packets into the tunnel, expect to receive them on
1091 peer_1.mk_tunnel_header(self.pg1, True)
1092 / Wireguard(message_type=4, reserved_zero=0)
1093 / WireguardTransport(
1094 receiver_index=peer_1.sender,
1096 encrypted_encapsulated_packet=peer_1.encrypt_transport(
1098 IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1099 / UDP(sport=222, dport=223)
1105 for ii in range(255)
1108 rxs = self.send_and_expect(self.pg1, p, self.pg0)
1111 self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1112 self.assertEqual(rx[IP].ttl, 19)
1114 r1.remove_vpp_config()
1115 peer_1.remove_vpp_config()
1116 wg0.remove_vpp_config()
1118 def test_wg_multi_peer(self):
1119 """multiple peer setup"""
1123 wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1124 wg1 = VppWgInterface(self, self.pg2.local_ip4, port + 1).add_vpp_config()
1128 # Check peer counter
1129 self.assertEqual(len(self.vapi.wireguard_peers_dump()), 0)
1131 self.pg_enable_capture(self.pg_interfaces)
1134 # Create many peers on sencond interface
1136 self.pg2.generate_remote_hosts(NUM_PEERS)
1137 self.pg2.configure_ipv4_neighbors()
1138 self.pg1.generate_remote_hosts(NUM_PEERS)
1139 self.pg1.configure_ipv4_neighbors()
1145 for i in range(NUM_PEERS):
1150 self.pg1.remote_hosts[i].ip4,
1152 ["10.0.%d.4/32" % i],
1160 [VppRoutePath(self.pg1.remote_hosts[i].ip4, wg0.sw_if_index)],
1168 self.pg2.remote_hosts[i].ip4,
1170 ["10.100.%d.4/32" % i],
1178 [VppRoutePath(self.pg2.remote_hosts[i].ip4, wg1.sw_if_index)],
1182 self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS * 2)
1184 self.logger.info(self.vapi.cli("show wireguard peer"))
1185 self.logger.info(self.vapi.cli("show wireguard interface"))
1186 self.logger.info(self.vapi.cli("show adj 37"))
1187 self.logger.info(self.vapi.cli("sh ip fib 172.16.3.17"))
1188 self.logger.info(self.vapi.cli("sh ip fib 10.11.3.0"))
1192 r.remove_vpp_config()
1194 r.remove_vpp_config()
1198 self.assertTrue(p.query_vpp_config())
1199 p.remove_vpp_config()
1201 self.assertTrue(p.query_vpp_config())
1202 p.remove_vpp_config()
1204 wg0.remove_vpp_config()
1205 wg1.remove_vpp_config()
1207 def test_wg_multi_interface(self):
1208 """Multi-tunnel on the same port"""
1211 # Create many wireguard interfaces
1213 self.pg1.generate_remote_hosts(NUM_IFS)
1214 self.pg1.configure_ipv4_neighbors()
1215 self.pg0.generate_remote_hosts(NUM_IFS)
1216 self.pg0.configure_ipv4_neighbors()
1218 # Create interfaces with a peer on each
1222 for i in range(NUM_IFS):
1223 # Use the same port for each interface
1224 wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1232 self.pg1.remote_hosts[i].ip4,
1234 ["10.0.%d.0/24" % i],
1243 [VppRoutePath("10.0.%d.4" % i, wg0.sw_if_index)],
1247 self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_IFS)
1249 for i in range(NUM_IFS):
1250 # send a valid handsake init for which we expect a response
1251 p = peers[i].mk_handshake(self.pg1)
1252 rx = self.send_and_expect(self.pg1, [p], self.pg1)
1253 peers[i].consume_response(rx[0])
1255 # send a data packet from the peer through the tunnel
1256 # this completes the handshake
1258 IP(src="10.0.%d.4" % i, dst=self.pg0.remote_hosts[i].ip4, ttl=20)
1259 / UDP(sport=222, dport=223)
1262 d = peers[i].encrypt_transport(p)
1263 p = peers[i].mk_tunnel_header(self.pg1) / (
1264 Wireguard(message_type=4, reserved_zero=0)
1265 / WireguardTransport(
1266 receiver_index=peers[i].sender,
1268 encrypted_encapsulated_packet=d,
1271 rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1273 self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[i].ip4)
1274 self.assertEqual(rx[IP].ttl, 19)
1276 # send a packets that are routed into the tunnel
1277 for i in range(NUM_IFS):
1279 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1280 / IP(src=self.pg0.remote_hosts[i].ip4, dst="10.0.%d.4" % i)
1281 / UDP(sport=555, dport=556)
1285 rxs = self.send_and_expect(self.pg0, p * 64, self.pg1)
1288 rx = IP(peers[i].decrypt_transport(rx))
1290 # check the oringial packet is present
1291 self.assertEqual(rx[IP].dst, p[IP].dst)
1292 self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
1294 # send packets into the tunnel
1295 for i in range(NUM_IFS):
1298 peers[i].mk_tunnel_header(self.pg1)
1299 / Wireguard(message_type=4, reserved_zero=0)
1300 / WireguardTransport(
1301 receiver_index=peers[i].sender,
1303 encrypted_encapsulated_packet=peers[i].encrypt_transport(
1306 src="10.0.%d.4" % i,
1307 dst=self.pg0.remote_hosts[i].ip4,
1310 / UDP(sport=222, dport=223)
1319 rxs = self.send_and_expect(self.pg1, p, self.pg0)
1322 self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[i].ip4)
1323 self.assertEqual(rx[IP].ttl, 19)
1326 r.remove_vpp_config()
1328 p.remove_vpp_config()
1330 i.remove_vpp_config()
1332 def test_wg_event(self):
1335 ESTABLISHED_FLAG = (
1336 VppEnum.vl_api_wireguard_peer_flags_t.WIREGUARD_PEER_ESTABLISHED
1338 DEAD_FLAG = VppEnum.vl_api_wireguard_peer_flags_t.WIREGUARD_PEER_STATUS_DEAD
1341 wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1342 wg1 = VppWgInterface(self, self.pg2.local_ip4, port + 1).add_vpp_config()
1346 # Check peer counter
1347 self.assertEqual(len(self.vapi.wireguard_peers_dump()), 0)
1349 self.pg_enable_capture(self.pg_interfaces)
1354 self.pg2.generate_remote_hosts(NUM_PEERS)
1355 self.pg2.configure_ipv4_neighbors()
1356 self.pg1.generate_remote_hosts(NUM_PEERS)
1357 self.pg1.configure_ipv4_neighbors()
1363 for i in range(NUM_PEERS):
1368 self.pg1.remote_hosts[i].ip4,
1370 ["10.0.%d.4/32" % i],
1378 [VppRoutePath(self.pg1.remote_hosts[i].ip4, wg0.sw_if_index)],
1386 self.pg2.remote_hosts[i].ip4,
1388 ["10.100.%d.4/32" % i],
1396 [VppRoutePath(self.pg2.remote_hosts[i].ip4, wg1.sw_if_index)],
1400 self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS * 2)
1402 # Want events from the first perr of wg0
1403 # and from all wg1 peers
1404 peers_0[0].want_events()
1407 for i in range(NUM_PEERS):
1408 # send a valid handsake init for which we expect a response
1409 p = peers_0[i].mk_handshake(self.pg1)
1410 rx = self.send_and_expect(self.pg1, [p], self.pg1)
1411 peers_0[i].consume_response(rx[0])
1413 peers_0[0].wait_event(ESTABLISHED_FLAG)
1415 p = peers_1[i].mk_handshake(self.pg2)
1416 rx = self.send_and_expect(self.pg2, [p], self.pg2)
1417 peers_1[i].consume_response(rx[0])
1419 wg1.wait_events(ESTABLISHED_FLAG, [peers_1[0].index, peers_1[1].index])
1423 r.remove_vpp_config()
1425 r.remove_vpp_config()
1428 for i in range(NUM_PEERS):
1429 self.assertTrue(peers_0[i].query_vpp_config())
1430 peers_0[i].remove_vpp_config()
1432 peers_0[i].wait_event(0)
1433 peers_0[i].wait_event(DEAD_FLAG)
1435 self.assertTrue(p.query_vpp_config())
1436 p.remove_vpp_config()
1438 p.wait_event(DEAD_FLAG)
1440 wg0.remove_vpp_config()
1441 wg1.remove_vpp_config()
1444 class WireguardHandoffTests(TestWg):
1445 """Wireguard Tests in multi worker setup"""
1447 vpp_worker_count = 2
1449 def test_wg_peer_init(self):
1455 wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1460 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.2.0/24", "10.11.3.0/24"]
1462 self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1465 self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1468 # send a valid handsake init for which we expect a response
1469 p = peer_1.mk_handshake(self.pg1)
1471 rx = self.send_and_expect(self.pg1, [p], self.pg1)
1473 peer_1.consume_response(rx[0])
1475 # send a data packet from the peer through the tunnel
1476 # this completes the handshake and pins the peer to worker 0
1478 IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1479 / UDP(sport=222, dport=223)
1482 d = peer_1.encrypt_transport(p)
1483 p = peer_1.mk_tunnel_header(self.pg1) / (
1484 Wireguard(message_type=4, reserved_zero=0)
1485 / WireguardTransport(
1486 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1489 rxs = self.send_and_expect(self.pg1, [p], self.pg0, worker=0)
1492 self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1493 self.assertEqual(rx[IP].ttl, 19)
1495 # send a packets that are routed into the tunnel
1496 # and pins the peer tp worker 1
1498 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1499 / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1500 / UDP(sport=555, dport=556)
1503 rxs = self.send_and_expect(self.pg0, pe * 255, self.pg1, worker=1)
1504 peer_1.validate_encapped(rxs, pe)
1506 # send packets into the tunnel, from the other worker
1509 peer_1.mk_tunnel_header(self.pg1)
1510 / Wireguard(message_type=4, reserved_zero=0)
1511 / WireguardTransport(
1512 receiver_index=peer_1.sender,
1514 encrypted_encapsulated_packet=peer_1.encrypt_transport(
1516 IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1517 / UDP(sport=222, dport=223)
1523 for ii in range(255)
1526 rxs = self.send_and_expect(self.pg1, p, self.pg0, worker=1)
1529 self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1530 self.assertEqual(rx[IP].ttl, 19)
1532 # send a packets that are routed into the tunnel
1534 rxs = self.send_and_expect(self.pg0, pe * 255, self.pg1, worker=0)
1536 peer_1.validate_encapped(rxs, pe)
1538 r1.remove_vpp_config()
1539 peer_1.remove_vpp_config()
1540 wg0.remove_vpp_config()
1542 @unittest.skip("test disabled")
1543 def test_wg_multi_interface(self):
1544 """Multi-tunnel on the same port"""