Commit 4b8ee224 authored by Martin Aspeli's avatar Martin Aspeli

Merge c108040 from 2.12 branch (OFS File/Image lifecycle events)

parent 9a317161
......@@ -46,6 +46,10 @@ from OFS.Cache import Cacheable
from OFS.PropertyManager import PropertyManager
from OFS.SimpleItem import Item_w__name__
from zope.event import notify
from zope.lifecycleevent import ObjectModifiedEvent
from zope.lifecycleevent import ObjectCreatedEvent
manage_addFileForm = DTMLFile('dtml/imageAdd',
globals(),
Kind='File',
......@@ -69,12 +73,16 @@ def manage_addFile(self, id, file='', title='', precondition='',
# First, we create the file without data:
self._setObject(id, File(id,title,'',content_type, precondition))
newFile = self._getOb(id)
# Now we "upload" the data. By doing this in two steps, we
# can use a database trick to make the upload more efficient.
if file:
self._getOb(id).manage_upload(file)
newFile.manage_upload(file)
if content_type:
self._getOb(id).content_type=content_type
newFile.content_type=content_type
notify(ObjectCreatedEvent(newFile))
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(self.absolute_url()+'/manage_main')
......@@ -469,6 +477,9 @@ class File(Persistent, Implicit, PropertyManager,
self.update_data(filedata, content_type, len(filedata))
else:
self.ZCacheable_invalidate()
notify(ObjectModifiedEvent(self))
if REQUEST:
message="Saved changes."
return self.manage_main(self,REQUEST,manage_tabs_message=message)
......@@ -488,6 +499,8 @@ class File(Persistent, Implicit, PropertyManager,
'application/octet-stream')
self.update_data(data, content_type, size)
notify(ObjectModifiedEvent(self))
if REQUEST:
message="Saved changes."
return self.manage_main(self,REQUEST,manage_tabs_message=message)
......@@ -666,12 +679,16 @@ def manage_addImage(self, id, file, title='', precondition='', content_type='',
# First, we create the image without data:
self._setObject(id, Image(id,title,'',content_type, precondition))
newFile = self._getOb(id)
# Now we "upload" the data. By doing this in two steps, we
# can use a database trick to make the upload more efficient.
if file:
self._getOb(id).manage_upload(file)
newFile.manage_upload(file)
if content_type:
self._getOb(id).content_type=content_type
newFile.content_type=content_type
notify(ObjectCreatedEvent(newFile))
if REQUEST is not None:
try: url=self.DestinationURL()
......
......@@ -8,6 +8,8 @@ import os, sys
import time
from cStringIO import StringIO
from Acquisition import aq_base
from OFS.Application import Application
from OFS.SimpleItem import SimpleItem
from OFS.Cache import ZCM_MANAGERS
......@@ -19,6 +21,12 @@ from Testing.makerequest import makerequest
from zExceptions import Redirect
import transaction
import OFS.Image
from zope.component import adapter
from zope.lifecycleevent.interfaces import IObjectModifiedEvent
from zope.lifecycleevent.interfaces import IObjectCreatedEvent
try:
here = os.path.dirname(os.path.abspath(__file__))
except:
......@@ -57,7 +65,7 @@ class DummyCache:
mtime_func=None):
self.get = ob
if self.si:
return si
return self.si
def ZCache_invalidate(self, ob):
self.invalidated = ob
......@@ -78,6 +86,38 @@ class DummyCacheManager(SimpleItem):
def ZCacheManager_getCache(self):
return ADummyCache
class EventCatcher(object):
def __init__(self):
self.created = []
self.modified = []
self.setUp()
def setUp(self):
from zope.component import provideHandler
provideHandler(self.handleCreated)
provideHandler(self.handleModified)
def tearDown(self):
from zope.component import getSiteManager
getSiteManager().unregisterHandler(self.handleCreated)
getSiteManager().unregisterHandler(self.handleModified)
def reset(self):
self.created = []
self.modified = []
@adapter(IObjectCreatedEvent)
def handleCreated(self, event):
if isinstance(event.object, OFS.Image.File):
self.created.append(event)
@adapter(IObjectModifiedEvent)
def handleModified(self, event):
if isinstance(event.object, OFS.Image.File):
self.modified.append(event)
class FileTests(unittest.TestCase):
data = open(filedata, 'rb').read()
content_type = 'application/octet-stream'
......@@ -85,6 +125,8 @@ class FileTests(unittest.TestCase):
def setUp( self ):
self.connection = makeConnection()
self.eventCatcher = EventCatcher()
try:
r = self.connection.root()
a = Application()
......@@ -108,6 +150,15 @@ class FileTests(unittest.TestCase):
transaction.begin()
self.file = getattr( self.app, 'file' )
# Since we do the create here, let's test the events here too
self.assertEquals(1, len(self.eventCatcher.created))
self.failUnless(aq_base(self.eventCatcher.created[0].object) is aq_base(self.file))
self.assertEquals(1, len(self.eventCatcher.modified))
self.failUnless(aq_base(self.eventCatcher.created[0].object) is aq_base(self.file))
self.eventCatcher.reset()
def tearDown( self ):
del self.file
transaction.abort()
......@@ -118,6 +169,8 @@ class FileTests(unittest.TestCase):
del self.connection
ADummyCache.clear()
self.eventCatcher.tearDown()
def testViewImageOrFile(self):
self.assertRaises(Redirect, self.file.view_image_or_file, 'foo')
......@@ -153,18 +206,24 @@ class FileTests(unittest.TestCase):
self.assertEqual(self.file.content_type, 'text/plain')
self.failUnless(ADummyCache.invalidated)
self.failUnless(ADummyCache.set)
self.assertEquals(1, len(self.eventCatcher.modified))
self.failUnless(self.eventCatcher.modified[0].object is self.file)
def testManageEditWithoutFileData(self):
self.file.manage_edit('foobar', 'text/plain')
self.assertEqual(self.file.title, 'foobar')
self.assertEqual(self.file.content_type, 'text/plain')
self.failUnless(ADummyCache.invalidated)
self.assertEquals(1, len(self.eventCatcher.modified))
self.failUnless(self.eventCatcher.modified[0].object is self.file)
def testManageUpload(self):
f = StringIO('jammyjohnson')
self.file.manage_upload(f)
self.assertEqual(self.file.data, 'jammyjohnson')
self.assertEqual(self.file.content_type, 'application/octet-stream')
self.assertEquals(1, len(self.eventCatcher.modified))
self.failUnless(self.eventCatcher.modified[0].object is self.file)
def testIfModSince(self):
now = time.time()
......@@ -302,3 +361,4 @@ def test_suite():
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment