+ def test_wg_peer_v6o6(self):
+ """ Test v6o6"""
+
+ port = 12343
+
+ # Create interfaces
+ wg0 = VppWgInterface(self,
+ self.pg1.local_ip6,
+ port).add_vpp_config()
+ wg0.admin_up()
+ wg0.config_ip6()
+
+ peer_1 = VppWgPeer(self,
+ wg0,
+ self.pg1.remote_ip6,
+ port+1,
+ ["1::3:0/112"]).add_vpp_config(True)
+ self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
+
+ r1 = VppIpRoute(self, "1::3:0", 112,
+ [VppRoutePath("1::3:1",
+ wg0.sw_if_index)]).add_vpp_config()
+
+ # route a packet into the wg interface
+ # use the allowed-ip prefix
+ # this is dropped because the peer is not initiated
+
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
+ UDP(sport=555, dport=556) /
+ Raw())
+ self.send_and_assert_no_replies(self.pg0, [p])
+
+ self.assertEqual(self.base_kp6_err + 1,
+ self.statistics.get_err_counter(self.kp6_error))
+
+ # send a handsake from the peer with an invalid MAC
+ p = peer_1.mk_handshake(self.pg1, True)
+ p[WireguardInitiation].mac1 = b'foobar'
+ self.send_and_assert_no_replies(self.pg1, [p])
+
+ self.assertEqual(self.base_mac6_err + 1,
+ self.statistics.get_err_counter(self.mac6_error))
+
+ # send a handsake from the peer but signed by the wrong key.
+ p = peer_1.mk_handshake(self.pg1,
+ True,
+ X25519PrivateKey.generate().public_key())
+ self.send_and_assert_no_replies(self.pg1, [p])
+ self.assertEqual(self.base_peer6_err + 1,
+ self.statistics.get_err_counter(self.peer6_error))
+
+ # send a valid handsake init for which we expect a response
+ p = peer_1.mk_handshake(self.pg1, True)
+
+ rx = self.send_and_expect(self.pg1, [p], self.pg1)
+
+ peer_1.consume_response(rx[0], True)
+
+ # route a packet into the wg interface
+ # this is dropped because the peer is still not initiated
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
+ UDP(sport=555, dport=556) /
+ Raw())
+ self.send_and_assert_no_replies(self.pg0, [p])
+ self.assertEqual(self.base_kp6_err + 2,
+ self.statistics.get_err_counter(self.kp6_error))
+
+ # send a data packet from the peer through the tunnel
+ # this completes the handshake
+ p = (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) /
+ UDP(sport=222, dport=223) /
+ Raw())
+ d = peer_1.encrypt_transport(p)
+ p = (peer_1.mk_tunnel_header(self.pg1, True) /
+ (Wireguard(message_type=4, reserved_zero=0) /
+ WireguardTransport(receiver_index=peer_1.sender,
+ counter=0,
+ encrypted_encapsulated_packet=d)))
+ rxs = self.send_and_expect(self.pg1, [p], self.pg0)
+
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
+ self.assertEqual(rx[IPv6].hlim, 19)
+
+ # send a packets that are routed into the tunnel
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
+ UDP(sport=555, dport=556) /
+ Raw(b'\x00' * 80))
+
+ rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
+
+ for rx in rxs:
+ rx = IPv6(peer_1.decrypt_transport(rx, True))
+
+ # chech the oringial packet is present
+ self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
+ self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim-1)
+
+ # send packets into the tunnel, expect to receive them on
+ # the other side
+ p = [(peer_1.mk_tunnel_header(self.pg1, True) /
+ Wireguard(message_type=4, reserved_zero=0) /
+ WireguardTransport(
+ receiver_index=peer_1.sender,
+ counter=ii+1,
+ encrypted_encapsulated_packet=peer_1.encrypt_transport(
+ (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) /
+ UDP(sport=222, dport=223) /
+ Raw())))) for ii in range(255)]
+
+ rxs = self.send_and_expect(self.pg1, p, self.pg0)
+
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
+ self.assertEqual(rx[IPv6].hlim, 19)
+
+ r1.remove_vpp_config()
+ peer_1.remove_vpp_config()
+ wg0.remove_vpp_config()
+
+ def test_wg_peer_v6o4(self):
+ """ Test v6o4"""
+
+ port = 12353
+
+ # Create interfaces
+ wg0 = VppWgInterface(self,
+ self.pg1.local_ip4,
+ port).add_vpp_config()
+ wg0.admin_up()
+ wg0.config_ip6()
+
+ peer_1 = VppWgPeer(self,
+ wg0,
+ self.pg1.remote_ip4,
+ port+1,
+ ["1::3:0/112"]).add_vpp_config(True)
+ self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
+
+ r1 = VppIpRoute(self, "1::3:0", 112,
+ [VppRoutePath("1::3:1",
+ wg0.sw_if_index)]).add_vpp_config()
+
+ # route a packet into the wg interface
+ # use the allowed-ip prefix
+ # this is dropped because the peer is not initiated
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
+ UDP(sport=555, dport=556) /
+ Raw())
+ self.send_and_assert_no_replies(self.pg0, [p])
+ self.assertEqual(self.base_kp6_err + 1,
+ self.statistics.get_err_counter(self.kp6_error))
+
+ # send a handsake from the peer with an invalid MAC
+ p = peer_1.mk_handshake(self.pg1)
+ p[WireguardInitiation].mac1 = b'foobar'
+ self.send_and_assert_no_replies(self.pg1, [p])
+
+ self.assertEqual(self.base_mac4_err + 1,
+ self.statistics.get_err_counter(self.mac4_error))
+
+ # send a handsake from the peer but signed by the wrong key.
+ p = peer_1.mk_handshake(self.pg1,
+ False,
+ X25519PrivateKey.generate().public_key())
+ self.send_and_assert_no_replies(self.pg1, [p])
+ self.assertEqual(self.base_peer4_err + 1,
+ self.statistics.get_err_counter(self.peer4_error))
+
+ # send a valid handsake init for which we expect a response
+ p = peer_1.mk_handshake(self.pg1)
+
+ rx = self.send_and_expect(self.pg1, [p], self.pg1)
+
+ peer_1.consume_response(rx[0])
+
+ # route a packet into the wg interface
+ # this is dropped because the peer is still not initiated
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
+ UDP(sport=555, dport=556) /
+ Raw())
+ self.send_and_assert_no_replies(self.pg0, [p])
+ self.assertEqual(self.base_kp6_err + 2,
+ self.statistics.get_err_counter(self.kp6_error))
+
+ # send a data packet from the peer through the tunnel
+ # this completes the handshake
+ p = (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) /
+ UDP(sport=222, dport=223) /
+ Raw())
+ d = peer_1.encrypt_transport(p)
+ p = (peer_1.mk_tunnel_header(self.pg1) /
+ (Wireguard(message_type=4, reserved_zero=0) /
+ WireguardTransport(receiver_index=peer_1.sender,
+ counter=0,
+ encrypted_encapsulated_packet=d)))
+ rxs = self.send_and_expect(self.pg1, [p], self.pg0)
+
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
+ self.assertEqual(rx[IPv6].hlim, 19)
+
+ # send a packets that are routed into the tunnel
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
+ UDP(sport=555, dport=556) /
+ Raw(b'\x00' * 80))
+
+ rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
+
+ for rx in rxs:
+ rx = IPv6(peer_1.decrypt_transport(rx))
+
+ # chech the oringial packet is present
+ self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
+ self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim-1)
+
+ # send packets into the tunnel, expect to receive them on
+ # the other side
+ p = [(peer_1.mk_tunnel_header(self.pg1) /
+ Wireguard(message_type=4, reserved_zero=0) /
+ WireguardTransport(
+ receiver_index=peer_1.sender,
+ counter=ii+1,
+ encrypted_encapsulated_packet=peer_1.encrypt_transport(
+ (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) /
+ UDP(sport=222, dport=223) /
+ Raw())))) for ii in range(255)]
+
+ rxs = self.send_and_expect(self.pg1, p, self.pg0)
+
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
+ self.assertEqual(rx[IPv6].hlim, 19)
+
+ r1.remove_vpp_config()
+ peer_1.remove_vpp_config()
+ wg0.remove_vpp_config()
+
+ def test_wg_peer_v4o6(self):
+ """ Test v4o6"""
+
+ port = 12363
+
+ # Create interfaces
+ wg0 = VppWgInterface(self,
+ self.pg1.local_ip6,
+ port).add_vpp_config()
+ wg0.admin_up()
+ wg0.config_ip4()
+
+ peer_1 = VppWgPeer(self,
+ wg0,
+ self.pg1.remote_ip6,
+ port+1,
+ ["10.11.3.0/24"]).add_vpp_config()
+ self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
+
+ r1 = VppIpRoute(self, "10.11.3.0", 24,
+ [VppRoutePath("10.11.3.1",
+ wg0.sw_if_index)]).add_vpp_config()
+
+ # route a packet into the wg interface
+ # use the allowed-ip prefix
+ # this is dropped because the peer is not initiated
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst="10.11.3.2") /
+ UDP(sport=555, dport=556) /
+ Raw())
+ self.send_and_assert_no_replies(self.pg0, [p])
+ self.assertEqual(self.base_kp4_err + 1,
+ self.statistics.get_err_counter(self.kp4_error))
+
+ # send a handsake from the peer with an invalid MAC
+ p = peer_1.mk_handshake(self.pg1, True)
+ p[WireguardInitiation].mac1 = b'foobar'
+ self.send_and_assert_no_replies(self.pg1, [p])
+ self.assertEqual(self.base_mac6_err + 1,
+ self.statistics.get_err_counter(self.mac6_error))
+
+ # send a handsake from the peer but signed by the wrong key.
+ p = peer_1.mk_handshake(self.pg1,
+ True,
+ X25519PrivateKey.generate().public_key())
+ self.send_and_assert_no_replies(self.pg1, [p])
+ self.assertEqual(self.base_peer6_err + 1,
+ self.statistics.get_err_counter(self.peer6_error))
+
+ # send a valid handsake init for which we expect a response
+ p = peer_1.mk_handshake(self.pg1, True)
+
+ rx = self.send_and_expect(self.pg1, [p], self.pg1)
+
+ peer_1.consume_response(rx[0], True)
+
+ # route a packet into the wg interface
+ # this is dropped because the peer is still not initiated
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst="10.11.3.2") /
+ UDP(sport=555, dport=556) /
+ Raw())
+ self.send_and_assert_no_replies(self.pg0, [p])
+ self.assertEqual(self.base_kp4_err + 2,
+ self.statistics.get_err_counter(self.kp4_error))
+
+ # send a data packet from the peer through the tunnel
+ # this completes the handshake
+ p = (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) /
+ UDP(sport=222, dport=223) /
+ Raw())
+ d = peer_1.encrypt_transport(p)
+ p = (peer_1.mk_tunnel_header(self.pg1, True) /
+ (Wireguard(message_type=4, reserved_zero=0) /
+ WireguardTransport(receiver_index=peer_1.sender,
+ counter=0,
+ encrypted_encapsulated_packet=d)))
+ rxs = self.send_and_expect(self.pg1, [p], self.pg0)
+
+ for rx in rxs:
+ self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
+ self.assertEqual(rx[IP].ttl, 19)
+
+ # send a packets that are routed into the tunnel
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst="10.11.3.2") /
+ UDP(sport=555, dport=556) /
+ Raw(b'\x00' * 80))
+
+ rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
+
+ for rx in rxs:
+ rx = IP(peer_1.decrypt_transport(rx, True))
+
+ # chech the oringial packet is present
+ self.assertEqual(rx[IP].dst, p[IP].dst)
+ self.assertEqual(rx[IP].ttl, p[IP].ttl-1)
+
+ # send packets into the tunnel, expect to receive them on
+ # the other side
+ p = [(peer_1.mk_tunnel_header(self.pg1, True) /
+ Wireguard(message_type=4, reserved_zero=0) /
+ WireguardTransport(
+ receiver_index=peer_1.sender,
+ counter=ii+1,
+ encrypted_encapsulated_packet=peer_1.encrypt_transport(
+ (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) /
+ UDP(sport=222, dport=223) /
+ Raw())))) for ii in range(255)]
+
+ rxs = self.send_and_expect(self.pg1, p, self.pg0)
+
+ for rx in rxs:
+ self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
+ self.assertEqual(rx[IP].ttl, 19)
+
+ r1.remove_vpp_config()
+ peer_1.remove_vpp_config()
+ wg0.remove_vpp_config()
+