-#!/usr/bin/env python
+#!/usr/bin/env python3
import unittest
-from framework import VppTestCase, VppTestRunner
+from framework import VppTestCase
+from asfframework import VppTestRunner
from util import ppp
from scapy.packet import Raw
from scapy.layers.inet import IP, UDP
-from vpp_papi_provider import SYSLOG_SEVERITY
from syslog_rfc5424_parser import SyslogMessage, ParseError
from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
+from vpp_papi import VppEnum
class TestSyslog(VppTestCase):
- """ Syslog Protocol Test Cases """
+ """Syslog Protocol Test Cases"""
+
+ @property
+ def SYSLOG_SEVERITY(self):
+ return VppEnum.vl_api_syslog_severity_t
@classmethod
def setUpClass(cls):
super(TestSyslog, cls).setUpClass()
try:
- cls.create_pg_interfaces(range(1))
+ (cls.pg0,) = cls.create_pg_interfaces(range(1))
cls.pg0.admin_up()
cls.pg0.config_ip4()
cls.pg0.resolve_arp()
super(TestSyslog, cls).tearDownClass()
raise
- def syslog_generate(self, facility, severity, appname, msgid, sd=None,
- msg=None):
+ @classmethod
+ def tearDownClass(cls):
+ super(TestSyslog, cls).tearDownClass()
+
+ def syslog_generate(self, facility, severity, appname, msgid, sd=None, msg=None):
"""
Generate syslog message
:param facility: facility value
:param severity: severity level
:param appname: application name that originate message
- :param msgid: message indetifier
+ :param msgid: message identifier
:param sd: structured data (optional)
:param msg: free-form message (optional)
"""
- facility_str = ['kernel', 'user-level', 'mail-system',
- 'system-daemons', 'security-authorization', 'syslogd',
- 'line-printer', 'network-news', 'uucp', 'clock-daemon',
- '', 'ftp-daemon', 'ntp-subsystem', 'log-audit',
- 'log-alert', '', 'local0', 'local1', 'local2',
- 'local3', 'local4', 'local5', 'local6', 'local7']
-
- severity_str = ['emergency', 'alert', 'critical', 'error', 'warning',
- 'notice', 'informational', 'debug']
-
- cli_str = "test syslog %s %s %s %s" % (facility_str[facility],
- severity_str[severity],
- appname,
- msgid)
+ facility_str = [
+ "kernel",
+ "user-level",
+ "mail-system",
+ "system-daemons",
+ "security-authorization",
+ "syslogd",
+ "line-printer",
+ "network-news",
+ "uucp",
+ "clock-daemon",
+ "",
+ "ftp-daemon",
+ "ntp-subsystem",
+ "log-audit",
+ "log-alert",
+ "",
+ "local0",
+ "local1",
+ "local2",
+ "local3",
+ "local4",
+ "local5",
+ "local6",
+ "local7",
+ ]
+
+ severity_str = [
+ "emergency",
+ "alert",
+ "critical",
+ "error",
+ "warning",
+ "notice",
+ "informational",
+ "debug",
+ ]
+
+ cli_str = "test syslog %s %s %s %s" % (
+ facility_str[facility],
+ severity_str[severity],
+ appname,
+ msgid,
+ )
if sd is not None:
for sd_id, sd_params in sd.items():
cli_str += " sd-id %s" % (sd_id)
cli_str += " %s" % (msg)
self.vapi.cli(cli_str)
- def syslog_verify(self, data, facility, severity, appname, msgid, sd=None,
- msg=None):
+ def syslog_verify(
+ self, data, facility, severity, appname, msgid, sd=None, msg=None
+ ):
"""
Verify syslog message
:param facility: facility value
:param severity: severity level
:param appname: application name that originate message
- :param msgid: message indetifier
+ :param msgid: message identifier
:param sd: structured data (optional)
:param msg: free-form message (optional)
"""
- message = data.decode('utf-8')
+ message = data.decode("utf-8")
if sd is None:
sd = {}
try:
message = SyslogMessage.parse(message)
+ except ParseError as e:
+ self.logger.error(e)
+ raise
+ else:
self.assertEqual(message.facility, facility)
self.assertEqual(message.severity, severity)
self.assertEqual(message.appname, appname)
self.assertEqual(message.sd, sd)
self.assertEqual(message.version, 1)
self.assertEqual(message.hostname, self.pg0.local_ip4)
- except ParseError as e:
- self.logger.error(e)
def test_syslog(self):
- """ Syslog Protocol test """
- self.vapi.syslog_set_sender(self.pg0.remote_ip4n, self.pg0.local_ip4n)
+ """Syslog Protocol test"""
+ self.vapi.syslog_set_sender(
+ src_address=self.pg0.local_ip4, collector_address=self.pg0.remote_ip4
+ )
config = self.vapi.syslog_get_sender()
- self.assertEqual(str(config.collector_address),
- self.pg0.remote_ip4)
+ self.assertEqual(str(config.collector_address), self.pg0.remote_ip4)
self.assertEqual(config.collector_port, 514)
self.assertEqual(str(config.src_address), self.pg0.local_ip4)
self.assertEqual(config.vrf_id, 0)
self.assertEqual(config.max_msg_size, 480)
- appname = 'test'
- msgid = 'testMsg'
- msg = 'this is message'
- sd1 = {'exampleSDID@32473': {'iut': '3',
- 'eventSource': 'App',
- 'eventID': '1011'}}
- sd2 = {'exampleSDID@32473': {'iut': '3',
- 'eventSource': 'App',
- 'eventID': '1011'},
- 'examplePriority@32473': {'class': 'high'}}
+ appname = "test"
+ msgid = "testMsg"
+ msg = "this is message"
+ sd1 = {
+ "exampleSDID@32473": {"iut": "3", "eventSource": "App", "eventID": "1011"}
+ }
+ sd2 = {
+ "exampleSDID@32473": {"iut": "3", "eventSource": "App", "eventID": "1011"},
+ "examplePriority@32473": {"class": "high"},
+ }
self.pg_enable_capture(self.pg_interfaces)
- self.syslog_generate(SyslogFacility.local7,
- SyslogSeverity.info,
- appname,
- msgid,
- None,
- msg)
+ self.syslog_generate(
+ SyslogFacility.local7, SyslogSeverity.info, appname, msgid, None, msg
+ )
capture = self.pg0.get_capture(1)
try:
self.assertEqual(capture[0][IP].src, self.pg0.local_ip4)
self.assertEqual(capture[0][UDP].dport, 514)
self.assert_packet_checksums_valid(capture[0], False)
except:
- self.logger.error(ppp("invalid packet:", capture[0]))
- raise
- self.syslog_verify(capture[0][Raw].load,
- SyslogFacility.local7,
- SyslogSeverity.info,
- appname,
- msgid,
- None,
- msg)
+ self.logger.error(ppp("invalid packet:", capture[0]))
+ raise
+ self.syslog_verify(
+ capture[0][Raw].load,
+ SyslogFacility.local7,
+ SyslogSeverity.info,
+ appname,
+ msgid,
+ None,
+ msg,
+ )
self.pg_enable_capture(self.pg_interfaces)
- self.vapi.syslog_set_filter(SYSLOG_SEVERITY.WARN)
+ self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN)
filter = self.vapi.syslog_get_filter()
- self.assertEqual(filter.severity, SYSLOG_SEVERITY.WARN)
- self.syslog_generate(SyslogFacility.local7,
- SyslogSeverity.info,
- appname,
- msgid,
- None,
- msg)
+ self.assertEqual(filter.severity, self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN)
+ self.syslog_generate(
+ SyslogFacility.local7, SyslogSeverity.info, appname, msgid, None, msg
+ )
self.pg0.assert_nothing_captured()
self.pg_enable_capture(self.pg_interfaces)
- self.syslog_generate(SyslogFacility.local6,
- SyslogSeverity.warning,
- appname,
- msgid,
- sd1,
- msg)
+ self.syslog_generate(
+ SyslogFacility.local6, SyslogSeverity.warning, appname, msgid, sd1, msg
+ )
capture = self.pg0.get_capture(1)
- self.syslog_verify(capture[0][Raw].load,
- SyslogFacility.local6,
- SyslogSeverity.warning,
- appname,
- msgid,
- sd1,
- msg)
-
- self.vapi.syslog_set_sender(self.pg0.remote_ip4n,
- self.pg0.local_ip4n,
- collector_port=12345)
+ self.syslog_verify(
+ capture[0][Raw].load,
+ SyslogFacility.local6,
+ SyslogSeverity.warning,
+ appname,
+ msgid,
+ sd1,
+ msg,
+ )
+
+ self.vapi.syslog_set_sender(
+ self.pg0.local_ip4, self.pg0.remote_ip4, collector_port=12345
+ )
config = self.vapi.syslog_get_sender()
self.assertEqual(config.collector_port, 12345)
self.pg_enable_capture(self.pg_interfaces)
- self.syslog_generate(SyslogFacility.local5,
- SyslogSeverity.err,
- appname,
- msgid,
- sd2,
- None)
+ self.syslog_generate(
+ SyslogFacility.local5, SyslogSeverity.err, appname, msgid, sd2, None
+ )
capture = self.pg0.get_capture(1)
try:
self.assertEqual(capture[0][UDP].dport, 12345)
except:
- self.logger.error(ppp("invalid packet:", capture[0]))
- raise
- self.syslog_verify(capture[0][Raw].load,
- SyslogFacility.local5,
- SyslogSeverity.err,
- appname,
- msgid,
- sd2,
- None)
-
-
-if __name__ == '__main__':
+ self.logger.error(ppp("invalid packet:", capture[0]))
+ raise
+ self.syslog_verify(
+ capture[0][Raw].load,
+ SyslogFacility.local5,
+ SyslogSeverity.err,
+ appname,
+ msgid,
+ sd2,
+ None,
+ )
+
+
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)