1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# -*- coding: UTF-8 -*-
# -*- Mode: Python; py-indent-offset: 4 -*-
# Authors: Nik Kim <fafhrd@legco.biz>
__version__ = '$Revision: 1.3 $'[11:-2]
import sys, time, threading
from DateTime import DateTime
from Globals import InitializeClass
from OFS.SimpleItem import SimpleItem
from OFS.PropertyManager import PropertyManager
from zLOG import LOG, INFO, ERROR
from AccessControl import ClassSecurityInfo, Permissions
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
current_version = 1
processing_lock = threading.Lock()
class TimerService(SimpleItem):
""" timer service, all objects that wants timer
event subscribe here """
id='timer_service'
title = 'TimerService'
security = ClassSecurityInfo()
security.declareObjectPublic()
icon = 'misc_/TimerService/timer_icon.gif'
max_size = 0
manage_options = (
({'label': 'Subscribers', 'action':'manage_viewSubscriptions'},))
security.declareProtected(
Permissions.view_management_screens, 'manage_viewSubscriptions')
manage_viewSubscriptions = PageTemplateFile(
'zpt/view_subscriptions',
globals(),
__name__='manage_viewSubscriptions'
)
_version = 0
def __init__(self, id='timer_service'):
""" """
self._subscribers = []
self._version = 1
security.declarePublic('process_shutdown')
def process_shutdown(self, phase, time_in_phase):
""" """
subscriptions = []
try:
subscriptions = [self.unrestrictedTraverse(path)
for path in self._subscribers]
except KeyError:
pass
for subscriber in subscriptions:
process_shutdown = getattr(subscriber, 'process_shutdown', None)
if process_shutdown is not None:
try:
subscriber.process_shutdown(phase=phase,
time_in_phase=time_in_phase)
except:
LOG('TimerService', ERROR, 'Process shutdown error',
error = sys.exc_info())
raise
security.declarePublic('process_timer')
def process_timer(self, interval):
""" """
# Try to acquire a lock, to make sure we only run one processing at a
# time, and abort if another processing is currently running
acquired = processing_lock.acquire(0)
if not acquired:
return
try:
# Don't let TimerService crash when the ERP5Site is not yet existing.
# This case append when we create a new Portal: At that step Timer
# Service start to 'ping' the portal before the zope transaction in
# which the portal is created is commited.
subscriptions = []
try:
subscriptions = [self.unrestrictedTraverse(path)
for path in self._subscribers]
except KeyError:
pass
tick = time.time()
prev_tick = tick - interval
next_tick = tick + interval
for subscriber in subscriptions:
try:
subscriber.process_timer(
interval, DateTime(tick),
DateTime(prev_tick), DateTime(next_tick))
except:
LOG('TimerService', ERROR, 'Process timer error',
error = sys.exc_info())
raise
finally:
# When processing is done, release the lock
processing_lock.release()
def subscribe(self, ob):
""" """
path = '/'.join(ob.getPhysicalPath())
subscribers = self._subscribers
if path not in subscribers:
subscribers.append(path)
self._subscribers = subscribers
security.declareProtected(
Permissions.view_management_screens, 'unsubscribeByPath')
def unsubscribeByPath(self, path):
subscribers = self._subscribers
if path in subscribers:
subscribers.remove(path)
self._subscribers = subscribers
def unsubscribe(self, ob):
""" """
path = '/'.join(ob.getPhysicalPath())
subscribers = self._subscribers
if path in subscribers:
subscribers.remove(path)
self._subscribers = subscribers
security.declareProtected(
Permissions.view_management_screens, 'lisSubscriptions')
def lisSubscriptions(self):
""" """
return self._subscribers
security.declareProtected(
Permissions.view_management_screens, 'manage_removeSubscriptions')
def manage_removeSubscriptions(self, no, REQUEST=None):
""" """
subs = self.lisSubscriptions()
remove_list = [subs[n] for n in [int(n) for n in no]]
for sub in remove_list:
self.unsubscribeByPath(sub)
if REQUEST is not None:
REQUEST.RESPONSE.redirect('manage_viewSubscriptions')
InitializeClass(TimerService)