from vpp_papi import mac_pton
from syslog_rfc5424_parser import SyslogMessage, ParseError
from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
-from vpp_papi_provider import SYSLOG_SEVERITY
from io import BytesIO
from vpp_papi import VppEnum
from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
def config_flags(self):
return VppEnum.vl_api_nat_config_flags_t
+ @property
+ def SYSLOG_SEVERITY(self):
+ return VppEnum.vl_api_syslog_severity_t
+
def clear_nat44(self):
"""
Clear NAT44 configuration.
self.ipfix_src_port = 4739
self.ipfix_domain_id = 1
- self.vapi.syslog_set_filter(SYSLOG_SEVERITY.EMERG)
+ self.vapi.syslog_set_filter(
+ self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG)
self.vapi.nat_ha_set_listener(ip_address='0.0.0.0', port=0,
path_mtu=512)
def test_syslog_apmap(self):
""" Test syslog address and port mapping creation and deletion """
- self.vapi.syslog_set_filter(SYSLOG_SEVERITY.INFO)
+ self.vapi.syslog_set_filter(
+ self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
self.vapi.syslog_set_sender(self.pg3.local_ip4n, self.pg3.remote_ip4n)
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT_IS_INSIDE
def test_syslog_sess(self):
""" Test syslog session creation and deletion """
- self.vapi.syslog_set_filter(SYSLOG_SEVERITY.INFO)
+ self.vapi.syslog_set_filter(
+ self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
self.vapi.syslog_set_sender(self.pg2.local_ip4n, self.pg2.remote_ip4n)
self.nat44_add_address(self.nat_addr)
flags = self.config_flags.NAT_IS_INSIDE
sw_if_index=self.pg0.sw_if_index)
self.vapi.nat64_add_del_interface(is_add=1, flags=0,
sw_if_index=self.pg1.sw_if_index)
- self.vapi.syslog_set_filter(SYSLOG_SEVERITY.INFO)
+ self.vapi.syslog_set_filter(
+ self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
self.vapi.syslog_set_sender(self.pg3.local_ip4n, self.pg3.remote_ip4n)
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
self.ipfix_src_port = 4739
self.ipfix_domain_id = 1
- self.vapi.syslog_set_filter(SYSLOG_SEVERITY.EMERG)
+ self.vapi.syslog_set_filter(
+ self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG)
self.vapi.nat_set_timeouts(udp=300, tcp_established=7440,
tcp_transitory=240, icmp=60)
import unittest
from framework import VppTestCase, VppTestRunner
-from vpp_papi_provider import QOS_SOURCE
from vpp_sub_interface import VppDot1QSubint
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6
from scapy.contrib.mpls import MPLS
+from vpp_papi import VppEnum
NUM_PKTS = 67
class TestQOS(VppTestCase):
""" QOS Test Case """
+ # Note: Since the enums aren't created dynamically until after
+ # the papi client attaches to VPP, we put it in a property to
+ # ensure it is the value at runtime, not at module load time.
+ @property
+ def QOS_SOURCE(self):
+ return VppEnum.vl_api_qos_source_t
+
@classmethod
def setUpClass(cls):
super(TestQOS, cls).setUpClass()
# Bind interface pgN to table n
#
self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
- QOS_SOURCE.IP,
+ self.QOS_SOURCE.QOS_API_SOURCE_IP,
1,
1)
self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
- QOS_SOURCE.IP,
+ self.QOS_SOURCE.QOS_API_SOURCE_IP,
2,
1)
self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
- QOS_SOURCE.IP,
+ self.QOS_SOURCE.QOS_API_SOURCE_IP,
3,
1)
self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
- QOS_SOURCE.IP,
+ self.QOS_SOURCE.QOS_API_SOURCE_IP,
4,
1)
# Enable QoS recording on IP input for pg0
#
self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
- QOS_SOURCE.IP,
+ self.QOS_SOURCE.QOS_API_SOURCE_IP,
1)
#
# remove the map on pg2 and pg3, now expect an unchanged IP tos
#
self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
- QOS_SOURCE.IP,
+ self.QOS_SOURCE.QOS_API_SOURCE_IP,
2,
0)
self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
- QOS_SOURCE.IP,
+ self.QOS_SOURCE.QOS_API_SOURCE_IP,
3,
0)
self.logger.info(self.vapi.cli("sh int feat pg2"))
# disable the input recording on pg0
#
self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
- QOS_SOURCE.IP,
+ self.QOS_SOURCE.QOS_API_SOURCE_IP,
0)
#
# disable the egress map on pg1 and pg4
#
self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
- QOS_SOURCE.IP,
+ self.QOS_SOURCE.QOS_API_SOURCE_IP,
1,
0)
self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
- QOS_SOURCE.IP,
+ self.QOS_SOURCE.QOS_API_SOURCE_IP,
4,
0)
# on Pg1
#
self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
- QOS_SOURCE.IP,
+ self.QOS_SOURCE.QOS_API_SOURCE_IP,
1)
self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
- QOS_SOURCE.MPLS,
+ self.QOS_SOURCE.QOS_API_SOURCE_MPLS,
1,
1)
# enable MPLS QoS recording on the input Pg0 and IP egress marking
# on Pg1
#
- self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
- QOS_SOURCE.MPLS,
- 1)
+ self.vapi.qos_record_enable_disable(
+ self.pg0.sw_if_index,
+ self.QOS_SOURCE.QOS_API_SOURCE_MPLS,
+ 1)
self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
- QOS_SOURCE.IP,
+ self.QOS_SOURCE.QOS_API_SOURCE_IP,
1,
1)
# cleanup
#
self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
- QOS_SOURCE.IP,
+ self.QOS_SOURCE.QOS_API_SOURCE_IP,
0)
self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
- QOS_SOURCE.MPLS,
+ self.QOS_SOURCE.QOS_API_SOURCE_MPLS,
1,
0)
- self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
- QOS_SOURCE.MPLS,
- 0)
+ self.vapi.qos_record_enable_disable(
+ self.pg0.sw_if_index,
+ self.QOS_SOURCE.QOS_API_SOURCE_MPLS,
+ 0)
self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
- QOS_SOURCE.IP,
+ self.QOS_SOURCE.QOS_API_SOURCE_IP,
1,
0)
self.vapi.qos_egress_map_delete(1)
#
# enable VLAN QoS recording/marking on the input Pg0 subinterface and
#
- self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
- QOS_SOURCE.VLAN,
- 1)
+ self.vapi.qos_record_enable_disable(
+ sub_if.sw_if_index,
+ self.QOS_SOURCE.QOS_API_SOURCE_VLAN,
+ 1)
self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
- QOS_SOURCE.VLAN,
+ self.QOS_SOURCE.QOS_API_SOURCE_VLAN,
1,
1)
# IP marking/recording on pg1
#
self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
- QOS_SOURCE.IP,
+ self.QOS_SOURCE.QOS_API_SOURCE_IP,
1)
self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
- QOS_SOURCE.IP,
+ self.QOS_SOURCE.QOS_API_SOURCE_IP,
1,
1)
sub_if.unconfig_ip4()
sub_if.unconfig_ip6()
- self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
- QOS_SOURCE.VLAN,
- 0)
+ self.vapi.qos_record_enable_disable(
+ sub_if.sw_if_index,
+ self.QOS_SOURCE.QOS_API_SOURCE_VLAN,
+ 0)
self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
- QOS_SOURCE.VLAN,
+ self.QOS_SOURCE.QOS_API_SOURCE_VLAN,
1,
0)
self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
- QOS_SOURCE.IP,
+ self.QOS_SOURCE.QOS_API_SOURCE_IP,
0)
self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
- QOS_SOURCE.IP,
+ self.QOS_SOURCE.QOS_API_SOURCE_IP,
1,
0)
from util import ppp
from scapy.packet import Raw
from scapy.layers.inet import IP, UDP
-from vpp_papi_provider import SYSLOG_SEVERITY
from syslog_rfc5424_parser import SyslogMessage, ParseError
from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
+from vpp_papi import VppEnum
class TestSyslog(VppTestCase):
""" Syslog Protocol Test Cases """
+ @property
+ def SYSLOG_SEVERITY(self):
+ return VppEnum.vl_api_syslog_severity_t
+
@classmethod
def setUpClass(cls):
super(TestSyslog, cls).setUpClass()
msg)
self.pg_enable_capture(self.pg_interfaces)
- self.vapi.syslog_set_filter(SYSLOG_SEVERITY.WARN)
+ self.vapi.syslog_set_filter(
+ self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN)
filter = self.vapi.syslog_get_filter()
- self.assertEqual(filter.severity, SYSLOG_SEVERITY.WARN)
+ self.assertEqual(filter.severity,
+ self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN)
self.syslog_generate(SyslogFacility.local7,
SyslogSeverity.info,
appname,