3 # Copyright (c) 2020 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 """Traffic script for IPsec verification."""
21 from ipaddress import ip_address
22 from scapy.layers.l2 import Ether
23 from scapy.layers.inet import IP, TCP
24 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
25 from scapy.packet import Raw
27 from .TrafficScriptArg import TrafficScriptArg
28 from .PacketVerifier import RxQueue, TxQueue
31 def check_ipv4(pkt_recv, dscp):
32 """Check received IPv4 IPsec packet.
34 :param pkt_recv: Received packet to verify.
35 :param dscp: DSCP value to check.
36 :type pkt_recv: scapy.Ether
38 :raises RuntimeError: If received packet is invalid.
40 if not pkt_recv.haslayer(IP):
41 raise RuntimeError(f"Not an IPv4 packet received: {pkt_recv!r}")
43 rx_dscp = pkt_recv[IP].tos >> 2
45 raise RuntimeError(f"Invalid DSCP {rx_dscp} should be {dscp}")
47 if not pkt_recv.haslayer(TCP):
48 raise RuntimeError(f"Not a TCP packet received: {pkt_recv!r}")
51 def check_ipv6(pkt_recv, dscp):
52 """Check received IPv6 IPsec packet.
54 :param pkt_recv: Received packet to verify.
55 :param dscp: DSCP value to check.
56 :type pkt_recv: scapy.Ether
58 :raises RuntimeError: If received packet is invalid.
60 if not pkt_recv.haslayer(IPv6):
61 raise RuntimeError(f"Not an IPv6 packet received: {pkt_recv!r}")
63 rx_dscp = pkt_recv[IPv6].tc >> 2
65 raise RuntimeError(f"Invalid DSCP {rx_dscp} should be {dscp}")
67 if not pkt_recv.haslayer(TCP):
68 raise RuntimeError(f"Not a TCP packet received: {pkt_recv!r}")
71 # TODO: Pylint says too-many-locals and too-many-statements. Refactor!
73 """Send and receive TCP packet."""
74 args = TrafficScriptArg(
75 [u"src_mac", u"dst_mac", u"src_ip", u"dst_ip", u"dscp"]
78 rxq = RxQueue(args.get_arg(u"rx_if"))
79 txq = TxQueue(args.get_arg(u"tx_if"))
81 src_mac = args.get_arg(u"src_mac")
82 dst_mac = args.get_arg(u"dst_mac")
83 src_ip = args.get_arg(u"src_ip")
84 dst_ip = args.get_arg(u"dst_ip")
85 dscp = int(args.get_arg(u"dscp"))
87 ip_layer = IPv6 if ip_address(src_ip).version == 6 else IP
90 pkt_send = (Ether(src=src_mac, dst=dst_mac) /
91 ip_layer(src=src_ip, dst=dst_ip) /
95 sent_packets.append(pkt_send)
99 pkt_recv = rxq.recv(2, sent_packets)
101 raise RuntimeError(u"ICMPv6 echo reply Rx timeout")
103 if pkt_recv.haslayer(ICMPv6ND_NS):
104 # read another packet in the queue if the current one is ICMPv6ND_NS
107 # otherwise process the current packet
111 raise RuntimeError(u"Rx timeout")
114 check_ipv4(pkt_recv, dscp)
116 check_ipv6(pkt_recv, dscp)
121 if __name__ == u"__main__":