Commit ae2b2cff authored by Jim Fulton's avatar Jim Fulton

Reworked the timeout thread a little:

- to use with
- to use call_from_thread
- to try to recover from errors (that should never happen:)

Added timeout thread liveness to server status
parent e27cdbdc
...@@ -1270,6 +1270,7 @@ class StorageServer: ...@@ -1270,6 +1270,7 @@ class StorageServer:
status = self.stats[storage_id].__dict__.copy() status = self.stats[storage_id].__dict__.copy()
status['connections'] = len(status['connections']) status['connections'] = len(status['connections'])
status['waiting'] = len(self._waiting[storage_id]) status['waiting'] = len(self._waiting[storage_id])
status['timeout-thread-is-alive'] = self.timeouts[storage_id].isAlive()
return status return status
def _level_for_waiting(waiting): def _level_for_waiting(waiting):
...@@ -1288,6 +1289,9 @@ class StubTimeoutThread: ...@@ -1288,6 +1289,9 @@ class StubTimeoutThread:
def end(self, client): def end(self, client):
pass pass
isAlive = lambda self: 'stub'
class TimeoutThread(threading.Thread): class TimeoutThread(threading.Thread):
"""Monitors transaction progress and generates timeouts.""" """Monitors transaction progress and generates timeouts."""
...@@ -1305,32 +1309,25 @@ class TimeoutThread(threading.Thread): ...@@ -1305,32 +1309,25 @@ class TimeoutThread(threading.Thread):
def begin(self, client): def begin(self, client):
# Called from the restart code the "main" thread, whenever the # Called from the restart code the "main" thread, whenever the
# storage lock is being acquired. (Serialized by asyncore.) # storage lock is being acquired. (Serialized by asyncore.)
self._cond.acquire() with self._cond:
try:
assert self._client is None assert self._client is None
self._client = client self._client = client
self._deadline = time.time() + self._timeout self._deadline = time.time() + self._timeout
self._cond.notify() self._cond.notify()
finally:
self._cond.release()
def end(self, client): def end(self, client):
# Called from the "main" thread whenever the storage lock is # Called from the "main" thread whenever the storage lock is
# being released. (Serialized by asyncore.) # being released. (Serialized by asyncore.)
self._cond.acquire() with self._cond:
try:
assert self._client is not None assert self._client is not None
assert self._client is client assert self._client is client
self._client = None self._client = None
self._deadline = None self._deadline = None
finally:
self._cond.release()
def run(self): def run(self):
# Code running in the thread. # Code running in the thread.
while 1: while 1:
self._cond.acquire() with self._cond:
try:
while self._deadline is None: while self._deadline is None:
self._cond.wait() self._cond.wait()
howlong = self._deadline - time.time() howlong = self._deadline - time.time()
...@@ -1338,12 +1335,16 @@ class TimeoutThread(threading.Thread): ...@@ -1338,12 +1335,16 @@ class TimeoutThread(threading.Thread):
# Prevent reporting timeout more than once # Prevent reporting timeout more than once
self._deadline = None self._deadline = None
client = self._client # For the howlong <= 0 branch below client = self._client # For the howlong <= 0 branch below
finally:
self._cond.release()
if howlong <= 0: if howlong <= 0:
client.log("Transaction timeout after %s seconds" % client.log("Transaction timeout after %s seconds" %
self._timeout) self._timeout, logging.ERROR)
client.connection.trigger.pull_trigger(client.connection.close) try:
client.connection.call_from_thread(client.connection.close)
except:
client.log("Timeout failure", logging.CRITICAL,
exc_info=sys.exc_info())
self.end(client)
else: else:
time.sleep(howlong) time.sleep(howlong)
......
...@@ -1274,7 +1274,7 @@ def test_server_status(): ...@@ -1274,7 +1274,7 @@ def test_server_status():
""" """
You can get server status using the server_status method. You can get server status using the server_status method.
>>> addr, _ = start_server() >>> addr, _ = start_server(zeo_conf=dict(transaction_timeout=1))
>>> db = ZEO.DB(addr) >>> db = ZEO.DB(addr)
>>> import pprint >>> import pprint
>>> pprint.pprint(db.storage.server_status(), width=1) >>> pprint.pprint(db.storage.server_status(), width=1)
...@@ -1288,6 +1288,7 @@ def test_server_status(): ...@@ -1288,6 +1288,7 @@ def test_server_status():
'lock_time': None, 'lock_time': None,
'start': 'Tue May 4 10:55:20 2010', 'start': 'Tue May 4 10:55:20 2010',
'stores': 1, 'stores': 1,
'timeout-thread-is-alive': True,
'verifying_clients': 0, 'verifying_clients': 0,
'waiting': 0} 'waiting': 0}
......
...@@ -344,6 +344,7 @@ statistics using the server_status method: ...@@ -344,6 +344,7 @@ statistics using the server_status method:
'lock_time': 1272653598.693882, 'lock_time': 1272653598.693882,
'start': 'Fri Apr 30 14:53:18 2010', 'start': 'Fri Apr 30 14:53:18 2010',
'stores': 13, 'stores': 13,
'timeout-thread-is-alive': 'stub',
'verifying_clients': 0, 'verifying_clients': 0,
'waiting': 9} 'waiting': 9}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment