4 from framework import VppTestCase, VppTestRunner
6 from scapy.packet import Raw
7 from scapy.layers.inet import IP, UDP
8 from vpp_papi_provider import SYSLOG_SEVERITY
9 from syslog_rfc5424_parser import SyslogMessage, ParseError
10 from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
13 class TestSyslog(VppTestCase):
14 """ Syslog Protocol Test Cases """
18 super(TestSyslog, cls).setUpClass()
21 cls.pg0, = cls.create_pg_interfaces(range(1))
27 super(TestSyslog, cls).tearDownClass()
31 def tearDownClass(cls):
32 super(TestSyslog, cls).tearDownClass()
34 def syslog_generate(self, facility, severity, appname, msgid, sd=None,
37 Generate syslog message
39 :param facility: facility value
40 :param severity: severity level
41 :param appname: application name that originate message
42 :param msgid: message identifier
43 :param sd: structured data (optional)
44 :param msg: free-form message (optional)
46 facility_str = ['kernel', 'user-level', 'mail-system',
47 'system-daemons', 'security-authorization', 'syslogd',
48 'line-printer', 'network-news', 'uucp', 'clock-daemon',
49 '', 'ftp-daemon', 'ntp-subsystem', 'log-audit',
50 'log-alert', '', 'local0', 'local1', 'local2',
51 'local3', 'local4', 'local5', 'local6', 'local7']
53 severity_str = ['emergency', 'alert', 'critical', 'error', 'warning',
54 'notice', 'informational', 'debug']
56 cli_str = "test syslog %s %s %s %s" % (facility_str[facility],
57 severity_str[severity],
61 for sd_id, sd_params in sd.items():
62 cli_str += " sd-id %s" % (sd_id)
63 for name, value in sd_params.items():
64 cli_str += " sd-param %s %s" % (name, value)
66 cli_str += " %s" % (msg)
67 self.vapi.cli(cli_str)
69 def syslog_verify(self, data, facility, severity, appname, msgid, sd=None,
74 :param data: syslog message
75 :param facility: facility value
76 :param severity: severity level
77 :param appname: application name that originate message
78 :param msgid: message identifier
79 :param sd: structured data (optional)
80 :param msg: free-form message (optional)
82 message = data.decode('utf-8')
86 message = SyslogMessage.parse(message)
87 except ParseError as e:
91 self.assertEqual(message.facility, facility)
92 self.assertEqual(message.severity, severity)
93 self.assertEqual(message.appname, appname)
94 self.assertEqual(message.msgid, msgid)
95 self.assertEqual(message.msg, msg)
96 self.assertEqual(message.sd, sd)
97 self.assertEqual(message.version, 1)
98 self.assertEqual(message.hostname, self.pg0.local_ip4)
100 def test_syslog(self):
101 """ Syslog Protocol test """
102 self.vapi.syslog_set_sender(src_address=self.pg0.local_ip4n,
103 collector_address=self.pg0.remote_ip4n)
104 config = self.vapi.syslog_get_sender()
105 self.assertEqual(str(config.collector_address),
107 self.assertEqual(config.collector_port, 514)
108 self.assertEqual(str(config.src_address), self.pg0.local_ip4)
109 self.assertEqual(config.vrf_id, 0)
110 self.assertEqual(config.max_msg_size, 480)
114 msg = 'this is message'
115 sd1 = {'exampleSDID@32473': {'iut': '3',
116 'eventSource': 'App',
118 sd2 = {'exampleSDID@32473': {'iut': '3',
119 'eventSource': 'App',
121 'examplePriority@32473': {'class': 'high'}}
123 self.pg_enable_capture(self.pg_interfaces)
124 self.syslog_generate(SyslogFacility.local7,
130 capture = self.pg0.get_capture(1)
132 self.assertEqual(capture[0][IP].src, self.pg0.local_ip4)
133 self.assertEqual(capture[0][IP].dst, self.pg0.remote_ip4)
134 self.assertEqual(capture[0][UDP].dport, 514)
135 self.assert_packet_checksums_valid(capture[0], False)
137 self.logger.error(ppp("invalid packet:", capture[0]))
139 self.syslog_verify(capture[0][Raw].load,
140 SyslogFacility.local7,
147 self.pg_enable_capture(self.pg_interfaces)
148 self.vapi.syslog_set_filter(SYSLOG_SEVERITY.WARN)
149 filter = self.vapi.syslog_get_filter()
150 self.assertEqual(filter.severity, SYSLOG_SEVERITY.WARN)
151 self.syslog_generate(SyslogFacility.local7,
157 self.pg0.assert_nothing_captured()
159 self.pg_enable_capture(self.pg_interfaces)
160 self.syslog_generate(SyslogFacility.local6,
161 SyslogSeverity.warning,
166 capture = self.pg0.get_capture(1)
167 self.syslog_verify(capture[0][Raw].load,
168 SyslogFacility.local6,
169 SyslogSeverity.warning,
175 self.vapi.syslog_set_sender(self.pg0.local_ip4n,
176 self.pg0.remote_ip4n,
177 collector_port=12345)
178 config = self.vapi.syslog_get_sender()
179 self.assertEqual(config.collector_port, 12345)
181 self.pg_enable_capture(self.pg_interfaces)
182 self.syslog_generate(SyslogFacility.local5,
188 capture = self.pg0.get_capture(1)
190 self.assertEqual(capture[0][UDP].dport, 12345)
192 self.logger.error(ppp("invalid packet:", capture[0]))
194 self.syslog_verify(capture[0][Raw].load,
195 SyslogFacility.local5,
203 if __name__ == '__main__':
204 unittest.main(testRunner=VppTestRunner)