CSIT-124: IPv4 encapsulations - IP4-GRE-IP4
[csit.git] / resources / traffic_scripts / send_gre_check_gre_headers.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends a UDP encapsulated into GRE packet from one
16 interface to the other, where GRE encapsulated packet is expected.
17 """
18
19 import sys
20
21 from robot.api import logger
22 from scapy.all import Ether
23 from scapy.layers.inet import IP_PROTOS
24 from scapy.layers.inet import IP
25 from scapy.layers.inet import UDP
26 from scapy.layers.l2 import GRE
27
28 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
29 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
30
31
32 def main():
33     args = TrafficScriptArg(
34         ['tx_dst_mac', 'tx_src_mac', 'tx_outer_dst_ip', 'tx_outer_src_ip',
35          'tx_inner_dst_ip', 'tx_inner_src_ip', 'rx_dst_mac', 'rx_src_mac',
36          'rx_outer_dst_ip', 'rx_outer_src_ip'])
37
38     tx_if = args.get_arg('tx_if')
39     rx_if = args.get_arg('rx_if')
40     tx_dst_mac = args.get_arg('tx_dst_mac')
41     tx_src_mac = args.get_arg('tx_src_mac')
42     tx_outer_dst_ip = args.get_arg('tx_outer_dst_ip')
43     tx_outer_src_ip = args.get_arg('tx_outer_src_ip')
44     tx_inner_dst_ip = args.get_arg('tx_inner_dst_ip')
45     tx_inner_src_ip = args.get_arg('tx_inner_src_ip')
46     rx_dst_mac = args.get_arg('rx_dst_mac')
47     rx_src_mac = args.get_arg('rx_src_mac')
48     rx_outer_dst_ip = args.get_arg('rx_outer_dst_ip')
49     rx_outer_src_ip = args.get_arg('rx_outer_src_ip')
50     udp_src = 1234
51     udp_dst = 2345
52     udp_payload = 'udp_payload'
53
54     rxq = RxQueue(rx_if)
55     txq = TxQueue(tx_if)
56     sent_packets = []
57
58     tx_pkt_raw = Ether(dst=tx_dst_mac, src=tx_src_mac) / \
59         IP(src=tx_outer_src_ip, dst=tx_outer_dst_ip) / \
60         GRE() / \
61         IP(src=tx_inner_src_ip, dst=tx_inner_dst_ip) / \
62         UDP(dport=udp_dst, sport=udp_src) / \
63         udp_payload
64
65     sent_packets.append(tx_pkt_raw)
66     txq.send(tx_pkt_raw)
67     ether = rxq.recv(2, ignore=sent_packets)
68
69     if ether is None:
70         raise RuntimeError("ICMP echo Rx timeout")
71
72     # Check RX headers
73     if ether.dst != rx_dst_mac:
74         raise RuntimeError("Matching of received destination MAC unsuccessful.")
75     logger.debug("Comparison of received destination MAC: OK.")
76
77     if ether.src != rx_src_mac:
78         raise RuntimeError("Matching of received source MAC unsuccessful.")
79     logger.debug("Comparison of received source MAC: OK.")
80
81     if ether['IP'].src != rx_outer_src_ip:
82         raise RuntimeError("Matching of received outer source IP unsuccessful.")
83     logger.debug("Comparison of received outer source IP: OK.")
84
85     if ether['IP'].dst != rx_outer_dst_ip:
86         raise RuntimeError(
87             "Matching of received outer destination IP unsuccessful.")
88     logger.debug("Comparison of received outer destination IP: OK.")
89
90     if ether['IP'].proto != IP_PROTOS.gre:
91         raise RuntimeError("IP protocol is not GRE.")
92     logger.debug("Comparison of received GRE protocol: OK.")
93
94     if ether['IP']['GRE']['IP'].src != tx_inner_src_ip:
95         raise RuntimeError("Matching of received inner source IP unsuccessful.")
96     logger.debug("Comparison of received inner source IP: OK.")
97
98     if ether['IP']['GRE']['IP'].dst != tx_inner_dst_ip:
99         raise RuntimeError(
100             "Matching of received inner destination IP unsuccessful.")
101     logger.debug("Comparison of received inner destination IP: OK.")
102
103     # check udp
104     udp = ether['IP']['GRE']['IP']['UDP']
105     if udp.dport != udp_dst:
106         raise RuntimeError("UDP dport error {} != {}.".
107                            format(udp.dport, udp_dst))
108     print "UDP dport: OK."
109
110     if udp.sport != udp_src:
111         raise RuntimeError("UDP sport error {} != {}.".
112                            format(udp.sport, udp_src))
113     print "UDP sport: OK."
114
115     if str(udp.payload) != udp_payload:
116         raise RuntimeError("UDP payload check unsuccessful {} != {}.".
117                            format(udp.payload, udp_payload))
118     print "UDP payload: OK."
119
120     sys.exit(0)
121
122
123 if __name__ == "__main__":
124     main()