Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
C
cpython
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
cpython
Commits
7367d088
Commit
7367d088
authored
May 02, 2011
by
Vinay Sajip
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Added tests to improve coverage.
parent
0255973a
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
339 additions
and
108 deletions
+339
-108
Lib/test/test_logging.py
Lib/test/test_logging.py
+339
-108
No files found.
Lib/test/test_logging.py
View file @
7367d088
...
@@ -33,6 +33,7 @@ import errno
...
@@ -33,6 +33,7 @@ import errno
import
pickle
import
pickle
import
io
import
io
import
gc
import
gc
from
http.server
import
HTTPServer
,
BaseHTTPRequestHandler
import
json
import
json
import
os
import
os
import
queue
import
queue
...
@@ -40,7 +41,8 @@ import re
...
@@ -40,7 +41,8 @@ import re
import
select
import
select
import
smtpd
import
smtpd
import
socket
import
socket
from
socketserver
import
ThreadingTCPServer
,
StreamRequestHandler
from
socketserver
import
(
ThreadingUDPServer
,
DatagramRequestHandler
,
ThreadingTCPServer
,
StreamRequestHandler
)
import
struct
import
struct
import
sys
import
sys
import
tempfile
import
tempfile
...
@@ -49,6 +51,7 @@ from test.support import TestHandler, Matcher
...
@@ -49,6 +51,7 @@ from test.support import TestHandler, Matcher
import
textwrap
import
textwrap
import
time
import
time
import
unittest
import
unittest
from
urllib.parse
import
urlparse
,
parse_qs
import
warnings
import
warnings
import
weakref
import
weakref
try
:
try
:
...
@@ -596,9 +599,8 @@ class StreamHandlerTest(BaseTest):
...
@@ -596,9 +599,8 @@ class StreamHandlerTest(BaseTest):
logging.raiseExceptions = old_raise
logging.raiseExceptions = old_raise
sys.stderr = old_stderr
sys.stderr = old_stderr
# -- The following section could be moved into a server_helper.py module
TEST_SMTP_PORT = 9025
# -- if it proves to be of wider utility than just test_logging
class TestSMTPChannel(smtpd.SMTPChannel):
class TestSMTPChannel(smtpd.SMTPChannel):
def __init__(self, server, conn, addr, sockmap):
def __init__(self, server, conn, addr, sockmap):
...
@@ -642,6 +644,7 @@ class TestSMTPServer(smtpd.SMTPServer):
...
@@ -642,6 +644,7 @@ class TestSMTPServer(smtpd.SMTPServer):
# try to re-use a server port if possible
# try to re-use a server port if possible
self.set_reuse_addr()
self.set_reuse_addr()
self.bind(addr)
self.bind(addr)
self.port = sock.getsockname()[1]
self.listen(5)
self.listen(5)
except:
except:
self.close()
self.close()
...
@@ -666,23 +669,162 @@ class TestSMTPServer(smtpd.SMTPServer):
...
@@ -666,23 +669,162 @@ class TestSMTPServer(smtpd.SMTPServer):
def serve_forever(self, poll_interval):
def serve_forever(self, poll_interval):
asyncore.loop(poll_interval, map=self.sockmap)
asyncore.loop(poll_interval, map=self.sockmap)
def stop(self):
def stop(self
, timeout=None
):
self.close()
self.close()
self._thread.join()
self._thread.join(
timeout
)
self._thread = None
self._thread = None
class ControlMixin(object):
"""
This mixin is used to start a server on a separate thread, and
shut it down programmatically. Request handling is simplified - instead
of needing to derive a suitable RequestHandler subclass, you just
provide a callable which will be passed each received request to be
processed.
:param handler: A handler callable which will be called with a
single parameter - the request - in order to
process the request. This handler is called on the
server thread, effectively meaning that requests are
processed serially. While not quite Web scale ;-),
this should be fine for testing applications.
:param poll_interval: The polling interval in seconds.
"""
def __init__(self, handler, poll_interval):
self._thread = None
self.poll_interval = poll_interval
self._handler = handler
self.ready = threading.Event()
def start(self):
"""
Create a daemon thread to run the server, and start it.
"""
self._thread = t = threading.Thread(target=self.serve_forever,
args=(self.poll_interval,))
t.setDaemon(True)
t.start()
def serve_forever(self, poll_interval):
self.ready.set()
super(ControlMixin, self).serve_forever(poll_interval)
def stop(self, timeout=None):
"""
Tell the server thread to stop, and wait for it to do so.
"""
self.shutdown()
if self._thread is not None:
self._thread.join(timeout)
self._thread = None
self.server_close()
self.ready.clear()
class TestHTTPServer(ControlMixin, HTTPServer):
"""
An HTTP server which is controllable using :class:`ControlMixin`.
:param addr: A tuple with the IP address and port to listen on.
:param handler: A handler callable which will be called with a
single parameter - the request - in order to
process the request.
:param poll_interval: The polling interval in seconds.
"""
def __init__(self, addr, handler, poll_interval=0.5, log=False):
class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler):
def __getattr__(self, name, default=None):
if name.startswith('do_'):
return self.process_request
raise AttributeError(name)
def process_request(self):
self.server._handler(self)
def log_message(self, format, *args):
if log:
super(DelegatingHTTPRequestHandler,
self).log_message(format, *args)
HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler)
ControlMixin.__init__(self, handler, poll_interval)
class TestTCPServer(ControlMixin, ThreadingTCPServer):
"""
A TCP server which is controllable using :class:`ControlMixin`.
:param addr: A tuple with the IP address and port to listen on.
:param handler: A handler callable which will be called with a single
parameter - the request - in order to process the request.
:param poll_interval: The polling interval in seconds.
:bind_and_activate: If True (the default), binds the server and starts it
listening. If False, you need to call
:meth:`server_bind` and :meth:`server_activate` at
some later time before calling :meth:`start`, so that
the server will set up the socket and listen on it.
"""
allow_reuse_address = True
def __init__(self, addr, handler, poll_interval=0.5,
bind_and_activate=True):
class DelegatingTCPRequestHandler(StreamRequestHandler):
def handle(self):
self.server._handler(self)
ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler,
bind_and_activate)
ControlMixin.__init__(self, handler, poll_interval)
def server_bind(self):
super(TestTCPServer, self).server_bind()
self.port = self.socket.getsockname()[1]
class TestUDPServer(ControlMixin, ThreadingUDPServer):
"""
A UDP server which is controllable using :class:`ControlMixin`.
:param addr: A tuple with the IP address and port to listen on.
:param handler: A handler callable which will be called with a
single parameter - the request - in order to
process the request.
:param poll_interval: The polling interval for shutdown requests,
in seconds.
:bind_and_activate: If True (the default), binds the server and
starts it listening. If False, you need to
call :meth:`server_bind` and
:meth:`server_activate` at some later time
before calling :meth:`start`, so that the server will
set up the socket and listen on it.
"""
def __init__(self, addr, handler, poll_interval=0.5, bind_and_activate=True):
class DelegatingUDPRequestHandler(DatagramRequestHandler):
def handle(self):
self.server._handler(self)
ThreadingUDPServer.__init__(self, addr, DelegatingUDPRequestHandler,
bind_and_activate)
ControlMixin.__init__(self, handler, poll_interval)
def server_bind(self):
super(TestUDPServer, self).server_bind()
self.port = self.socket.getsockname()[1]
# - end of server_helper section
class SMTPHandlerTest(BaseTest):
class SMTPHandlerTest(BaseTest):
def test_basic(self):
def test_basic(self):
addr = ('localhost', TEST_SMTP_PORT)
sockmap = {}
server = TestSMTPServer(('localhost', 0), self.process_message, 0.001,
sockmap)
server.start()
addr = ('localhost', server.port)
h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log')
h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log')
self.assertEqual(h.toaddrs, ['you'])
self.assertEqual(h.toaddrs, ['you'])
self.messages = []
self.messages = []
sockmap = {}
server = TestSMTPServer(addr, self.process_message, 0.001, sockmap)
server.start()
r = logging.makeLogRecord({'msg': 'Hello'})
r = logging.makeLogRecord({'msg': 'Hello'})
self.handled = threading.Event()
h.handle(r)
h.handle(r)
self.handled.wait()
server.stop()
server.stop()
self.assertEqual(len(self.messages), 1)
self.assertEqual(len(self.messages), 1)
peer, mailfrom, rcpttos, data = self.messages[0]
peer, mailfrom, rcpttos, data = self.messages[0]
...
@@ -694,7 +836,7 @@ class SMTPHandlerTest(BaseTest):
...
@@ -694,7 +836,7 @@ class SMTPHandlerTest(BaseTest):
def process_message(self, *args):
def process_message(self, *args):
self.messages.append(args)
self.messages.append(args)
self.handled.set()
class MemoryHandlerTest(BaseTest):
class MemoryHandlerTest(BaseTest):
...
@@ -1068,68 +1210,6 @@ class ConfigFileTest(BaseTest):
...
@@ -1068,68 +1210,6 @@ class ConfigFileTest(BaseTest):
# Original logger output is empty.
# Original logger output is empty.
self.assert_log_lines([])
self.assert_log_lines([])
class LogRecordStreamHandler(StreamRequestHandler):
"""Handler for a streaming logging request. It saves the log message in the
TCP server's 'log_output' attribute."""
TCP_LOG_END = "
!!!
END
!!!
"
def handle(self):
"""Handle multiple requests - each expected to be of 4-byte length,
followed by the LogRecord in pickle format. Logs the record
according to whatever policy is configured locally."""
while True:
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack("
>
L
", chunk)[0]
chunk = self.connection.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.connection.recv(slen - len(chunk))
obj = self.unpickle(chunk)
record = logging.makeLogRecord(obj)
self.handle_log_record(record)
def unpickle(self, data):
return pickle.loads(data)
def handle_log_record(self, record):
# If the end-of-messages sentinel is seen, tell the server to
# terminate.
if self.TCP_LOG_END in record.msg:
self.server.abort = 1
return
self.server.log_output += record.msg + "
\
n
"
class LogRecordSocketReceiver(ThreadingTCPServer):
"""A simple-minded TCP socket-based logging receiver suitable for test
purposes."""
allow_reuse_address = 1
log_output = ""
def __init__(self, host='localhost',
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
handler=LogRecordStreamHandler):
ThreadingTCPServer.__init__(self, (host, port), handler)
self.abort = False
self.timeout = 0.1
self.finished = threading.Event()
def serve_until_stopped(self):
while not self.abort:
rd, wr, ex = select.select([self.socket.fileno()], [], [],
self.timeout)
if rd:
self.handle_request()
# Notify the main thread that we're about to exit
self.finished.set()
# close the listen socket
self.server_close()
@unittest.skipUnless(threading, 'Threading required for this test.')
@unittest.skipUnless(threading, 'Threading required for this test.')
class SocketHandlerTest(BaseTest):
class SocketHandlerTest(BaseTest):
...
@@ -1140,51 +1220,54 @@ class SocketHandlerTest(BaseTest):
...
@@ -1140,51 +1220,54 @@ class SocketHandlerTest(BaseTest):
"""Set up a TCP server to receive log messages, and a SocketHandler
"""Set up a TCP server to receive log messages, and a SocketHandler
pointing to that server's address and port."""
pointing to that server's address and port."""
BaseTest.setUp(self)
BaseTest.setUp(self)
self.tcpserver = LogRecordSocketReceiver(port=0)
addr = ('localhost', 0)
self.port = self.tcpserver.socket.getsockname()[1]
self.server = server = TestTCPServer(addr, self.handle_socket,
self.threads = [
0.01)
threading.Thread(target=self.tcpserver.serve_until_stopped)]
server.start()
for thread in self.threads:
server.ready.wait()
thread.start()
self.sock_hdlr = logging.handlers.SocketHandler('localhost',
server.port)
self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port)
self.log_output = ''
self.sock_hdlr.setFormatter(self.root_formatter)
self.root_logger.removeHandler(self.root_logger.handlers[0])
self.root_logger.removeHandler(self.root_logger.handlers[0])
self.root_logger.addHandler(self.sock_hdlr)
self.root_logger.addHandler(self.sock_hdlr)
self.handled = threading.Semaphore(0)
def tearDown(self):
def tearDown(self):
"""Shutdown the TCP server."""
"""Shutdown the TCP server."""
try:
try:
if hasattr(self, 'tcpserver'):
self.server.stop(2.0)
self.tcpserver.abort = True
del self.tcpserver
self.root_logger.removeHandler(self.sock_hdlr)
self.root_logger.removeHandler(self.sock_hdlr)
self.sock_hdlr.close()
self.sock_hdlr.close()
for thread in self.threads:
thread.join(2.0)
finally:
finally:
BaseTest.tearDown(self)
BaseTest.tearDown(self)
def get_output(self):
def handle_socket(self, request):
"""Get the log output as received by the TCP server."""
conn = request.connection
# Signal the TCP receiver and wait for it to terminate.
while True:
self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END)
chunk = conn.recv(4)
self.tcpserver.finished.wait(2.0)
if len(chunk) < 4:
return self.tcpserver.log_output
break
slen = struct.unpack("
>
L
", chunk)[0]
chunk = conn.recv(slen)
while len(chunk) < slen:
chunk = chunk + conn.recv(slen - len(chunk))
obj = pickle.loads(chunk)
record = logging.makeLogRecord(obj)
self.log_output += record.msg + '
\
n
'
self.handled.release()
def test_output(self):
def test_output(self):
# The log message sent to the SocketHandler is properly received.
# The log message sent to the SocketHandler is properly received.
logger = logging.getLogger("
tcp
")
logger = logging.getLogger("
tcp
")
logger.error("
spam
")
logger.error("
spam
")
self.handled.acquire()
logger.debug("
eggs
")
logger.debug("
eggs
")
self.assertEqual(self.get_output(), "
spam
\
neggs
\
n
")
self.handled.acquire()
self.assertEqual(self.log_output, "
spam
\
neggs
\
n
")
def test_noserver(self):
def test_noserver(self):
# Kill the server
# Kill the server
self.tcpserver.abort = True
self.server.stop(2.0)
del self.tcpserver
for thread in self.threads:
thread.join(2.0)
#The logging call should try to connect, which should fail
#The logging call should try to connect, which should fail
try:
try:
raise RuntimeError('Deliberate mistake')
raise RuntimeError('Deliberate mistake')
...
@@ -1196,6 +1279,143 @@ class SocketHandlerTest(BaseTest):
...
@@ -1196,6 +1279,143 @@ class SocketHandlerTest(BaseTest):
time.sleep(self.sock_hdlr.retryTime - now + 0.001)
time.sleep(self.sock_hdlr.retryTime - now + 0.001)
self.root_logger.error('Nor this')
self.root_logger.error('Nor this')
@unittest.skipUnless(threading, 'Threading required for this test.')
class DatagramHandlerTest(BaseTest):
"""Test for DatagramHandler."""
def setUp(self):
"""Set up a UDP server to receive log messages, and a DatagramHandler
pointing to that server's address and port."""
BaseTest.setUp(self)
addr = ('localhost', 0)
self.server = server = TestUDPServer(addr, self.handle_datagram,
0.01)
server.start()
server.ready.wait()
self.sock_hdlr = logging.handlers.DatagramHandler('localhost',
server.port)
self.log_output = ''
self.root_logger.removeHandler(self.root_logger.handlers[0])
self.root_logger.addHandler(self.sock_hdlr)
self.handled = threading.Event()
def tearDown(self):
"""Shutdown the UDP server."""
try:
self.server.stop(2.0)
self.root_logger.removeHandler(self.sock_hdlr)
self.sock_hdlr.close()
finally:
BaseTest.tearDown(self)
def handle_datagram(self, request):
slen = struct.pack('>L', 0) # length of prefix
packet = request.packet[len(slen):]
obj = pickle.loads(packet)
record = logging.makeLogRecord(obj)
self.log_output += record.msg + '
\
n
'
self.handled.set()
def test_output(self):
# The log message sent to the DatagramHandler is properly received.
logger = logging.getLogger("
udp
")
logger.error("
spam
")
self.handled.wait()
self.assertEqual(self.log_output, "
spam
\
n
")
@unittest.skipUnless(threading, 'Threading required for this test.')
class SysLogHandlerTest(BaseTest):
"""Test for SysLogHandler using UDP."""
def setUp(self):
"""Set up a UDP server to receive log messages, and a SysLogHandler
pointing to that server's address and port."""
BaseTest.setUp(self)
addr = ('localhost', 0)
self.server = server = TestUDPServer(addr, self.handle_datagram,
0.01)
server.start()
server.ready.wait()
self.sl_hdlr = logging.handlers.SysLogHandler(('localhost',
server.port))
self.log_output = ''
self.root_logger.removeHandler(self.root_logger.handlers[0])
self.root_logger.addHandler(self.sl_hdlr)
self.handled = threading.Event()
def tearDown(self):
"""Shutdown the UDP server."""
try:
self.server.stop(2.0)
self.root_logger.removeHandler(self.sl_hdlr)
self.sl_hdlr.close()
finally:
BaseTest.tearDown(self)
def handle_datagram(self, request):
self.log_output = request.packet
self.handled.set()
def test_output(self):
# The log message sent to the SysLogHandler is properly received.
logger = logging.getLogger("
slh
")
logger.error("
sp
\
xe4m
")
self.handled.wait()
self.assertEqual(self.log_output, b'<11>
\
xef
\
xbb
\
xbf
sp
\
xc3
\
xa4
m
\
x00
')
@unittest.skipUnless(threading, 'Threading required for this test.')
class HTTPHandlerTest(BaseTest):
"""Test for HTTPHandler."""
def setUp(self):
"""Set up an HTTP server to receive log messages, and a HTTPHandler
pointing to that server's address and port."""
BaseTest.setUp(self)
addr = ('localhost', 0)
self.server = server = TestHTTPServer(addr, self.handle_request,
0.01)
server.start()
server.ready.wait()
host = 'localhost:%d' % server.server_port
self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob')
self.log_data = None
self.root_logger.removeHandler(self.root_logger.handlers[0])
self.root_logger.addHandler(self.h_hdlr)
self.handled = threading.Event()
def tearDown(self):
"""Shutdown the UDP server."""
try:
self.server.stop(2.0)
self.root_logger.removeHandler(self.h_hdlr)
self.h_hdlr.close()
finally:
BaseTest.tearDown(self)
def handle_request(self, request):
self.log_data = urlparse(request.path)
request.send_response(200)
self.handled.set()
def test_output(self):
# The log message sent to the SysLogHandler is properly received.
logger = logging.getLogger("
http
")
msg = "
sp
\
xe4m
"
logger.error(msg)
self.handled.wait()
self.assertEqual(self.log_data.path, '/frob')
d = parse_qs(self.log_data.query)
self.assertEqual(d['name'], ['http'])
self.assertEqual(d['funcName'], ['test_output'])
self.assertEqual(d['msg'], [msg])
class MemoryTest(BaseTest):
class MemoryTest(BaseTest):
"""Test memory persistence of logger objects."""
"""Test memory persistence of logger objects."""
...
@@ -3129,7 +3349,7 @@ class BaseFileTest(BaseTest):
...
@@ -3129,7 +3349,7 @@ class BaseFileTest(BaseTest):
def assertLogFile(self, filename):
def assertLogFile(self, filename):
"
Assert
a
log
file
is
there
and
register
it
for
deletion
"
"
Assert
a
log
file
is
there
and
register
it
for
deletion
"
self.assertTrue(os.path.exists(filename),
self.assertTrue(os.path.exists(filename),
msg="
Log
file
%
r
does
not
exist
")
msg="
Log
file
%
r
does
not
exist
"
% filename
)
self.rmfiles.append(filename)
self.rmfiles.append(filename)
...
@@ -3181,8 +3401,18 @@ class RotatingFileHandlerTest(BaseFileTest):
...
@@ -3181,8 +3401,18 @@ class RotatingFileHandlerTest(BaseFileTest):
rh.close()
rh.close()
class TimedRotatingFileHandlerTest(BaseFileTest):
class TimedRotatingFileHandlerTest(BaseFileTest):
# test methods added below
# other test methods added below
pass
def test_rollover(self):
fh = logging.handlers.TimedRotatingFileHandler(self.fn, 'S')
r = logging.makeLogRecord({'msg': 'testing'})
fh.emit(r)
self.assertLogFile(self.fn)
time.sleep(1.0)
fh.emit(r)
now = datetime.datetime.now()
prevsec = now - datetime.timedelta(seconds=1)
suffix = prevsec.strftime("
.
%
Y
-%
m
-%
d_
%
H
-%
M
-%
S
")
self.assertLogFile(self.fn + suffix)
def secs(**kw):
def secs(**kw):
return datetime.timedelta(**kw) // datetime.timedelta(seconds=1)
return datetime.timedelta(**kw) // datetime.timedelta(seconds=1)
...
@@ -3238,14 +3468,15 @@ for when, exp in (('S', 1),
...
@@ -3238,14 +3468,15 @@ for when, exp in (('S', 1),
def test_main():
def test_main():
run_unittest(BuiltinLevelsTest, BasicFilterTest,
run_unittest(BuiltinLevelsTest, BasicFilterTest,
CustomLevelsAndFiltersTest, HandlerTest, MemoryHandlerTest,
CustomLevelsAndFiltersTest, HandlerTest, MemoryHandlerTest,
ConfigFileTest, SocketHandlerTest,
Memory
Test,
ConfigFileTest, SocketHandlerTest,
DatagramHandler
Test,
EncodingTest, WarningsTest, ConfigDictTest, Manager
Test,
MemoryTest, EncodingTest, WarningsTest, ConfigDict
Test,
FormatterTest, BufferingFormatterTest, StreamHandl
erTest,
ManagerTest, FormatterTest, BufferingFormatt
erTest,
LogRecordFactoryTest, ChildLoggerTest, QueueHandl
erTest,
StreamHandlerTest, LogRecordFactoryTest, ChildLogg
erTest,
ShutdownTest, ModuleLevelMiscTest, BasicConfig
Test,
QueueHandlerTest, ShutdownTest, ModuleLevelMisc
Test,
LoggerAdapterTest, LoggerTest, SMTPHandl
erTest,
BasicConfigTest, LoggerAdapterTest, Logg
erTest,
FileHandlerTest, RotatingFileHandlerTest,
SMTPHandlerTest,
FileHandlerTest, RotatingFileHandlerTest,
LastResortTest, LogRecordTest, ExceptionTest,
LastResortTest, LogRecordTest, ExceptionTest,
SysLogHandlerTest, HTTPHandlerTest,
TimedRotatingFileHandlerTest
TimedRotatingFileHandlerTest
)
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment