X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Flibraries%2Fpython%2FSFC%2FVerifyPacket.py;fp=resources%2Flibraries%2Fpython%2FSFC%2FVerifyPacket.py;h=66bd09899d19c012c27c7b51ca708c5df319f9b9;hp=0000000000000000000000000000000000000000;hb=5ce772f3b2d83c31eda5c2abdcd6cc4547ecc6c8;hpb=1e59ee6e39f0604be721ac8ebdb36be14b738bd8 diff --git a/resources/libraries/python/SFC/VerifyPacket.py b/resources/libraries/python/SFC/VerifyPacket.py new file mode 100644 index 0000000000..66bd09899d --- /dev/null +++ b/resources/libraries/python/SFC/VerifyPacket.py @@ -0,0 +1,223 @@ +#!/usr/bin/env python +# Copyright (c) 2017 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This module defines the common functions. +""" + +import ipaddress + +from scapy.layers.inet import IP, UDP +from scapy.all import Raw +from scapy.utils import rdpcap +from resources.libraries.python.constants import Constants as con +from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon +from resources.libraries.python.SFC.TunnelProtocol import VxLAN, VxLANGPE, NSH + +from robot.api import logger + +def valid_ipv4(ipaddr): + """Check if IP address has the correct IPv4 address format. + + :param ipaddr: IP address. + :type ipaddr: str + :returns: True in case of correct IPv4 address format, + otherwise return False. + :rtype: bool + """ + try: + ipaddress.IPv4Address(unicode(ipaddr)) + return True + except (AttributeError, ipaddress.AddressValueError): + return False + + +def valid_ipv6(ipaddr): + """Check if IP address has the correct IPv6 address format. + + :param ipaddr: IP address. + :type ipaddr: str + :returns: True in case of correct IPv6 address format, + otherwise return False. + :rtype: bool + """ + try: + ipaddress.IPv6Address(unicode(ipaddr)) + return True + except (AttributeError, ipaddress.AddressValueError): + return False + + +class VerifyPacket(object): + """Define some functions for the test filed verify.""" + + @staticmethod + def check_vxlan_protocol(payload_data): + """ + verify the vxlan protocol in the payload data. + + :param payload_data: the payload data in the packet. + :type payload_data: str + :returns: none + :raises RuntimeError: If the vxlan protocol field verify fails. + """ + # get the vxlan packet and check it + vxlan_pkt = VxLAN(payload_data[0:8]) + if vxlan_pkt.flags != sfccon.VxLAN_FLAGS: + raise RuntimeError("Unexpected Vxlan flags: {0}". + format(vxlan_pkt.flags)) + + if vxlan_pkt.vni != sfccon.VxLAN_DEFAULT_VNI: + raise RuntimeError("Unexpected VNI flag: {0}".format(vxlan_pkt.vni)) + + @staticmethod + def check_vxlangpe_nsh_protocol(payload_data, test_type): + """ + verify the vxlangpe and nsh protocol in the payload data. + + :param payload_data: the payload data in the packet. + :param test_type: the functional test type. + :type payload_data: str + :type test_type: str + :returns: none + :raises RuntimeError: If the vxlangpe and nsh protocol + field verify fails. + """ + # get the vxlan-gpe packet and check it + vxlangpe_pkt = VxLANGPE(payload_data[0:8]) + if vxlangpe_pkt.flags != sfccon.VxLANGPE_FLAGS: + raise RuntimeError("Unexpected Vxlan-GPE flags: {0}". + format(vxlangpe_pkt.flags)) + + if vxlangpe_pkt.nextproto != sfccon.VxLANGPE_NEXT_PROTOCOL: + raise RuntimeError("next protocol not the NSH") + + if vxlangpe_pkt.vni != sfccon.VxLANGPE_DEFAULT_VNI: + raise RuntimeError("Unexpected VNI flag: {0}". + format(vxlangpe_pkt.vni)) + + # get the NSH packet and check it + nsh_pkt = NSH(payload_data[8:32]) + if nsh_pkt.flags != sfccon.NSH_FLAGS: + raise RuntimeError("Unexpected NSH flags: {0}". + format(nsh_pkt.flags)) + + if nsh_pkt.length != sfccon.NSH_HEADER_LENGTH: + raise RuntimeError("NSH length {0} incorrect". + format(nsh_pkt.length)) + + if nsh_pkt.MDtype != sfccon.NSH_DEFAULT_MDTYPE: + raise RuntimeError("NSH MD-Type {0} incorrect". + format(nsh_pkt.MDtype)) + + if nsh_pkt.nextproto != sfccon.NSH_NEXT_PROTOCOL: + raise RuntimeError("NSH next protocol {0} incorrect". + format(nsh_pkt.nextproto)) + + if test_type == "Proxy Outbound" or test_type == "SFF": + expect_nsi = sfccon.NSH_DEFAULT_NSI - 1 + else: + expect_nsi = sfccon.NSH_DEFAULT_NSI + + nsp_nsi = nsh_pkt.nsp_nsi + nsp = nsp_nsi >> 8 + nsi = nsp_nsi & 0x000000FF + if nsp != sfccon.NSH_DEFAULT_NSP: + raise RuntimeError("NSH Service Path ID {0} incorrect".format(nsp)) + + if nsi != expect_nsi: + raise RuntimeError("NSH Service Index {0} incorrect".format(nsi)) + + nsh_c1 = nsh_pkt.c1 + if nsh_c1 != sfccon.NSH_DEFAULT_C1: + raise RuntimeError("NSH c1 {0} incorrect".format(nsh_c1)) + + nsh_c2 = nsh_pkt.c2 + if nsh_c2 != sfccon.NSH_DEFAULT_C2: + raise RuntimeError("NSH c2 {0} incorrect".format(nsh_c2)) + + nsh_c3 = nsh_pkt.c3 + if nsh_c3 != sfccon.NSH_DEFAULT_C3: + raise RuntimeError("NSH c3 {0} incorrect".format(nsh_c3)) + + nsh_c4 = nsh_pkt.c4 + if nsh_c4 != sfccon.NSH_DEFAULT_C4: + raise RuntimeError("NSH c4 {0} incorrect".format(nsh_c4)) + + + @staticmethod + def check_the_nsh_sfc_packet(frame_size, test_type): + """ + verify the NSH SFC functional test loopback packet field + is correct. + + :param frame_size: the origin frame size. + :param test_type: the test type. + (Classifier, Proxy Inbound, Proxy Outbound, SFF). + :type frame_size: Integer + :type test_type: str + :returns: none + :raises RuntimeError: If the packet field verify fails. + """ + + rx_pcapfile = '{0}/nsh_sfc_tests/sfc_scripts/temp_packet.pcap' \ + .format(con.REMOTE_FW_DIR) + + logger.trace('read pcap file:{0}'.format(rx_pcapfile)) + + packets = rdpcap(rx_pcapfile) + if len(packets) < 1: + raise RuntimeError("No packet is received!") + + ether = packets[0] + + origin_size = int(frame_size) + if test_type == "Classifier": + expect_pkt_len = origin_size + 74 - 4 + elif test_type == "Proxy Inbound": + expect_pkt_len = origin_size - 24 - 4 + elif test_type == "Proxy Outbound": + expect_pkt_len = origin_size + 24 - 4 + else: + expect_pkt_len = origin_size - 4 + + recv_pkt_len = len(ether) + if recv_pkt_len != expect_pkt_len: + raise RuntimeError("Received packet size {0} not " \ + "the expect size {1}".format(recv_pkt_len, \ + expect_pkt_len)) + + if not ether.haslayer(IP): + raise RuntimeError("Not a IPv4 packet") + + pkt_proto = ether[IP].proto + if pkt_proto != sfccon.UDP_PROTOCOL: + raise RuntimeError("Not a UDP packet , {0}".format(pkt_proto)) + + if test_type == "Proxy Inbound": + expect_udp_port = sfccon.VxLAN_UDP_PORT + else: + expect_udp_port = sfccon.VxLANGPE_UDP_PORT + + dst_port = ether[UDP].dport + if dst_port != expect_udp_port: + raise RuntimeError("UDP dest port must be {0}, {1}". + format(expect_udp_port, dst_port)) + + payload_data = ether[Raw].load + + if test_type == "Proxy Inbound": + VerifyPacket.check_vxlan_protocol(payload_data) + else: + VerifyPacket.check_vxlangpe_nsh_protocol(payload_data, test_type)