3 # Copyright (c) 2018 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 """Send ICMPv6 echo request from one TG port to DUT port or to another TG port
17 through DUT node(s) and send reply back. Also verify hop limit processing."""
22 # pylint: disable=no-name-in-module
23 # pylint: disable=import-error
24 logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
26 from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
27 from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr
28 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
29 from scapy.all import Ether
31 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
32 from resources.libraries.python.PacketVerifier import checksum_equal
33 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
37 args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_nh_mac', 'dst_nh_mac',
38 'src_ip', 'dst_ip', 'h_num'], ['is_dst_tg'])
40 src_rxq = RxQueue(args.get_arg('tx_if'))
41 src_txq = TxQueue(args.get_arg('tx_if'))
43 src_mac = args.get_arg('src_mac')
44 dst_mac = args.get_arg('dst_mac')
45 src_nh_mac = args.get_arg('src_nh_mac')
46 dst_nh_mac = args.get_arg('dst_nh_mac')
47 src_ip = args.get_arg('src_ip')
48 dst_ip = args.get_arg('dst_ip')
49 hop_num = int(args.get_arg('h_num'))
51 is_dst_tg = True if args.get_arg('is_dst_tg') in ['True', ''] else False
52 dst_rxq = RxQueue(args.get_arg('rx_if')) if is_dst_tg else None
53 dst_txq = TxQueue(args.get_arg('rx_if')) if is_dst_tg else None
62 # send ICMPv6 neighbor advertisement message
63 pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
64 IPv6(src=src_ip, dst='ff02::1:ff00:2') /
65 ICMPv6ND_NA(tgt=src_ip, R=0) /
66 ICMPv6NDOptDstLLAddr(lladdr=src_mac))
67 src_sent_packets.append(pkt_send)
68 src_txq.send(pkt_send)
71 # send ICMPv6 neighbor advertisement message
72 pkt_send = (Ether(src=dst_mac, dst='ff:ff:ff:ff:ff:ff') /
73 IPv6(src=dst_ip, dst='ff02::1:ff00:2') /
74 ICMPv6ND_NA(tgt=dst_ip, R=0) /
75 ICMPv6NDOptDstLLAddr(lladdr=dst_mac))
76 dst_sent_packets.append(pkt_send)
77 dst_txq.send(pkt_send)
79 # send ICMPv6 echo request from first TG interface
80 pkt_send = (Ether(src=src_mac, dst=src_nh_mac) /
81 IPv6(src=src_ip, dst=dst_ip, hlim=hop_limit) /
82 ICMPv6EchoRequest(id=echo_id, seq=echo_seq))
83 src_sent_packets.append(pkt_send)
84 src_txq.send(pkt_send)
87 # receive ICMPv6 echo request on second TG interface
89 ether = dst_rxq.recv(2, dst_sent_packets)
91 raise RuntimeError('ICMPv6 echo reply Rx timeout')
93 if ether.haslayer(ICMPv6ND_NS):
94 # read another packet in the queue if the current one is
98 # otherwise process the current packet
101 if not ether.haslayer(IPv6):
102 raise RuntimeError('Unexpected packet with no IPv6 received: {0}'.
103 format(ether.__repr__()))
107 # verify hop limit processing
108 if ipv6.hlim != (hop_limit - hop_num):
109 raise RuntimeError('Invalid hop limit {0} should be {1}'.
110 format(ipv6.hlim, hop_limit - hop_num))
112 if not ipv6.haslayer(ICMPv6EchoRequest):
113 raise RuntimeError('Unexpected packet with no IPv6 ICMP received '
114 '{0}'.format(ipv6.__repr__()))
116 icmpv6 = ipv6[ICMPv6EchoRequest]
118 # check identifier and sequence number
119 if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
120 raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} '
121 'seq {1} should be ID {2} seq {3}'.
122 format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
127 tmp = ICMPv6EchoRequest(str(icmpv6))
128 if not checksum_equal(tmp.cksum, cksum):
129 raise RuntimeError('Invalid checksum {0} should be {1}'.
130 format(cksum, tmp.cksum))
132 # send ICMPv6 echo reply from second TG interface
133 pkt_send = (Ether(src=dst_mac, dst=dst_nh_mac) /
134 IPv6(src=dst_ip, dst=src_ip, hlim=(ipv6.hlim - 1)) /
135 ICMPv6EchoReply(id=echo_id, seq=echo_seq))
136 dst_sent_packets.append(pkt_send)
137 dst_txq.send(pkt_send)
139 # receive ICMPv6 echo reply on first TG interface
141 ether = src_rxq.recv(2, src_sent_packets)
143 raise RuntimeError('ICMPv6 echo reply Rx timeout')
145 if ether.haslayer(ICMPv6ND_NS):
146 # read another packet in the queue if the current one is ICMPv6ND_NS
149 # otherwise process the current packet
152 if not ether.haslayer(IPv6):
153 raise RuntimeError('Unexpected packet with no IPv6 layer received {0}'.
154 format(ether.__repr__()))
158 # verify hop limit processing; destination node decrements hlim by one in
159 # outgoing ICMPv6 Echo Reply
160 directions = 2 if is_dst_tg else 1
161 hop_limit_reply = hop_limit - directions * hop_num - 1
162 if ipv6.hlim != hop_limit_reply:
163 raise RuntimeError('Invalid hop limit {0} should be {1}'.
164 format(ipv6.hlim, hop_limit_reply))
166 if not ipv6.haslayer(ICMPv6EchoReply):
167 raise RuntimeError('Unexpected packet with no IPv6 ICMP received {0}'.
168 format(ipv6.__repr__()))
170 icmpv6 = ipv6[ICMPv6EchoReply]
172 # check identifier and sequence number
173 if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
174 raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} '
175 'seq {1} should be ID {2} seq {3}'.
176 format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
181 tmp = ICMPv6EchoReply(str(icmpv6))
182 if not checksum_equal(tmp.cksum, cksum):
183 raise RuntimeError('Invalid checksum {0} should be {1}'.
184 format(cksum, tmp.cksum))
189 if __name__ == "__main__":