Fixed strings with format splitting
[csit.git] / resources / traffic_scripts / icmpv6_echo_req_resp.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """Send ICMPv6 echo request from one TG port to another through DUT nodes and
17    send reply back. Also verify hop limit processing."""
18
19 import sys
20 import logging
21 logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
22 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
23 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
24 from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
25 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
26 from scapy.all import Ether
27
28
29 def main():
30     args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_nh_mac', 'dst_nh_mac',
31                              'src_ip', 'dst_ip', 'h_num'])
32
33     src_rxq = RxQueue(args.get_arg('rx_if'))
34     src_txq = TxQueue(args.get_arg('rx_if'))
35     dst_rxq = RxQueue(args.get_arg('tx_if'))
36     dst_txq = TxQueue(args.get_arg('tx_if'))
37
38     src_mac = args.get_arg('src_mac')
39     dst_mac = args.get_arg('dst_mac')
40     src_nh_mac = args.get_arg('src_nh_mac')
41     dst_nh_mac = args.get_arg('dst_nh_mac')
42     src_ip = args.get_arg('src_ip')
43     dst_ip = args.get_arg('dst_ip')
44     hop_num = int(args.get_arg('h_num'))
45     hop_limit = 64
46     echo_id = 0xa
47     echo_seq = 0x1
48
49     src_sent_packets = []
50     dst_sent_packets = []
51
52     # send ICMPv6 neighbor advertisement message
53     pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
54                       IPv6(src=src_ip, dst='ff02::1:ff00:2') /
55                       ICMPv6ND_NA(tgt=src_ip, R=0) /
56                       ICMPv6NDOptDstLLAddr(lladdr=src_mac))
57     src_sent_packets.append(pkt_send)
58     src_txq.send(pkt_send)
59     pkt_send = (Ether(src=dst_mac, dst='ff:ff:ff:ff:ff:ff') /
60                       IPv6(src=dst_ip, dst='ff02::1:ff00:2') /
61                       ICMPv6ND_NA(tgt=dst_ip, R=0) /
62                       ICMPv6NDOptDstLLAddr(lladdr=dst_mac))
63     dst_sent_packets.append(pkt_send)
64     dst_txq.send(pkt_send)
65
66     # send ICMPv6 echo request from first TG interface
67     pkt_send = (Ether(src=src_mac, dst=src_nh_mac) /
68                       IPv6(src=src_ip, dst=dst_ip, hlim=hop_limit) /
69                       ICMPv6EchoRequest(id=echo_id, seq=echo_seq))
70     src_sent_packets.append(pkt_send)
71     src_txq.send(pkt_send)
72
73     # receive ICMPv6 echo request on second TG interface
74     ether = dst_rxq.recv(2, dst_sent_packets)
75     if ether is None:
76         raise RuntimeError('ICMPv6 echo reply Rx timeout')
77
78     if not ether.haslayer(IPv6):
79         raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format(
80             ether.__repr__()))
81
82     ipv6 = ether['IPv6']
83
84     # verify hop limit processing
85     if ipv6.hlim != (hop_limit - hop_num):
86         raise RuntimeError(
87             'Invalid hop limit {0} should be {1}'.format(ipv6.hlim,
88                                                          hop_limit - hop_num))
89
90     if not ipv6.haslayer(ICMPv6EchoRequest):
91         raise RuntimeError(
92             'Unexpected packet with no IPv6 ICMP received {0}'.format(
93                 ipv6.__repr__()))
94
95     icmpv6 = ipv6['ICMPv6 Echo Request']
96
97     # check identifier and sequence number
98     if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
99         raise RuntimeError(
100             'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' \
101             'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
102
103     # verify checksum
104     cksum = icmpv6.cksum
105     del icmpv6.cksum
106     tmp = ICMPv6EchoRequest(str(icmpv6))
107     if tmp.cksum != cksum:
108         raise RuntimeError(
109             'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum))
110
111     # send ICMPv6 echo reply from second TG interface
112     pkt_send = (Ether(src=dst_mac, dst=dst_nh_mac) /
113                       IPv6(src=dst_ip, dst=src_ip) /
114                       ICMPv6EchoReply(id=echo_id, seq=echo_seq))
115     dst_sent_packets.append(pkt_send)
116     dst_txq.send(pkt_send)
117
118     # receive ICMPv6 echo reply on first TG interface
119     ether = src_rxq.recv(2, src_sent_packets)
120     if ether is None:
121         raise RuntimeError('ICMPv6 echo reply Rx timeout')
122
123     if not ether.haslayer(IPv6):
124         raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format(
125             ether.__repr__()))
126
127     ipv6 = ether['IPv6']
128
129     # verify hop limit processing
130     if ipv6.hlim != (hop_limit - hop_num):
131         raise RuntimeError(
132             'Invalid hop limit {0} should be {1}'.format(ipv6.hlim,
133                                                          hop_limit - hop_num))
134
135     if not ipv6.haslayer(ICMPv6EchoReply):
136         raise RuntimeError(
137             'Unexpected packet with no IPv6 ICMP received {0}'.format(
138                 ipv6.__repr__()))
139
140     icmpv6 = ipv6['ICMPv6 Echo Reply']
141
142     # check identifier and sequence number
143     if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
144         raise RuntimeError(
145             'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' \
146             'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
147
148     # verify checksum
149     cksum = icmpv6.cksum
150     del icmpv6.cksum
151     tmp = ICMPv6EchoReply(str(icmpv6))
152     if tmp.cksum != cksum:
153         raise RuntimeError(
154             'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum))
155
156     sys.exit(0)
157
158 if __name__ == "__main__":
159     main()