import socket
+import unittest
from scapy.layers.inet import IP, ICMP
-from scapy.layers.l2 import Ether
-from scapy.layers.ipsec import *
+from scapy.layers.l2 import Ether, Raw
+from scapy.layers.ipsec import SecurityAssociation, AH
-from framework import VppTestCase
-from vpp_ip_route import VppIpRoute
-
-from util import ppp
+from framework import VppTestCase, VppTestRunner
class TestIpsecAh(VppTestCase):
l_stopaddr,
r_startaddr,
r_stopaddr,
- protocol=51)
+ protocol=socket.IPPROTO_AH)
cls.vapi.ipsec_spd_add_del_entry(
spd_id,
l_startaddr,
l_stopaddr,
r_startaddr,
r_stopaddr,
- protocol=51,
+ protocol=socket.IPPROTO_AH,
is_outbound=0)
l_startaddr = l_stopaddr = socket.inet_pton(
socket.AF_INET, cls.remote_pg0_lb_addr)
l_stopaddr,
r_startaddr,
r_stopaddr,
- protocol=51)
+ protocol=socket.IPPROTO_AH)
cls.vapi.ipsec_spd_add_del_entry(
spd_id,
l_startaddr,
l_stopaddr,
r_startaddr,
r_stopaddr,
- protocol=51,
+ protocol=socket.IPPROTO_AH,
is_outbound=0)
l_startaddr = l_stopaddr = cls.pg2.local_ip4n
r_startaddr = r_stopaddr = cls.pg2.remote_ip4n
for Pkts in recv_pkts:
Pkts[AH].padding = Pkts[AH].icv[12:]
Pkts[AH].icv = Pkts[AH].icv[:12]
- decrypt_pkt = self.local_tra_sa.decrypt(Pkts[IP])
+ self.local_tra_sa.decrypt(Pkts[IP])
finally:
self.logger.info(self.vapi.ppcli("show error"))
self.logger.info(self.vapi.ppcli("show ipsec"))
self.pg1, send_pkts, self.pg0, count=count)
# ESP TUN VPP encryption verification
for recv_pkt in recv_pkts:
- recv_pkt[IP] = recv_pkt[IP] / IP(recv_pkt[AH].icv[12:])
- recv_pkt[AH].icv = recv_pkt[AH].icv[:12]
decrypt_pkt = self.local_tun_sa.decrypt(recv_pkt[IP])
+ decrypt_pkt = IP(decrypt_pkt[Raw].load)
self.assert_equal(decrypt_pkt.src, self.remote_pg1_lb_addr)
self.assert_equal(decrypt_pkt.dst, self.remote_pg0_lb_addr)
finally: