Commit a68b6569 authored by Priscila Manhaes's avatar Priscila Manhaes

implemented setMetadata for ffmpeghandler

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk/utils@45466 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 649f3906
...@@ -90,7 +90,7 @@ class Handler(object): ...@@ -90,7 +90,7 @@ class Handler(object):
for data in metadata: for data in metadata:
if len(data) != 0: if len(data) != 0:
key, value = data.split(':') key, value = data.split(':')
metadata_dict[key.strip()] = value.strip() metadata_dict[key.strip().capitalize()] = value.strip()
self.input.trash() self.input.trash()
return metadata_dict return metadata_dict
...@@ -99,5 +99,25 @@ class Handler(object): ...@@ -99,5 +99,25 @@ class Handler(object):
Keyword arguments: Keyword arguments:
metadata -- expected an dictionary with metadata. metadata -- expected an dictionary with metadata.
""" """
raise NotImplementedError output_url = mktemp(suffix=".%s" % self.input.source_format,
dir=self.input.directory_name)
command = ["ffmpeg",
"-i",
self.input.getUrl(),
"-y",
output_url]
index = 3
for metadata in metadata_dict:
command.insert(index, "-metadata")
command.insert(index+1, "%s=%s"%(metadata, metadata_dict[metadata]))
index += 2
try:
stdout, stderr = Popen(command,
stdout=PIPE,
stderr=PIPE,
close_fds=True,
env=self.environment).communicate()
self.input.reload(output_url)
return self.input.getContent()
finally:
self.input.trash()
...@@ -30,29 +30,34 @@ from magic import Magic ...@@ -30,29 +30,34 @@ from magic import Magic
from cloudooo.handler.ffmpeg.handler import Handler from cloudooo.handler.ffmpeg.handler import Handler
from cloudooo.handler.tests.handlerTestCase import HandlerTestCase, make_suite from cloudooo.handler.tests.handlerTestCase import HandlerTestCase, make_suite
file_detector = Magic(mime=True)
class TestHandler(HandlerTestCase): class TestHandler(HandlerTestCase):
def afterSetUp(self): def afterSetUp(self):
self.data = open("./data/test.ogv").read() self.data = open("./data/test.ogv").read()
kw = dict(env=dict(PATH=self.env_path)) self.kw = dict(env=dict(PATH=self.env_path))
self.input = Handler(self.tmp_url, self.data, "ogv", **kw) self.input = Handler(self.tmp_url, self.data, "ogv", **self.kw)
self.file_detector = Magic(mime=True)
def testConvertVideo(self): def testConvertVideo(self):
"""Test coversion of video to another format""" """Test coversion of video to another format"""
output_data = self.input.convert("mpeg") output_data = self.input.convert("mpeg")
file_format = file_detector.from_buffer(output_data) file_format = self.file_detector.from_buffer(output_data)
self.assertEquals(file_format, 'video/mpeg') self.assertEquals(file_format, 'video/mpeg')
def testgetMetadata(self): def testgetMetadata(self):
"""Test if metadata is extracted from""" """Test if metadata is extracted from"""
output_metadata = self.input.getMetadata() output_metadata = self.input.getMetadata()
self.assertEquals(output_metadata, {'ENCODER': 'Lavf52.64.2'}) self.assertEquals(output_metadata, {'Encoder': 'Lavf52.64.2'})
def testsetMetadata(self): def testsetMetadata(self):
""" Test if metadata are inserted correclty """ """ Test if metadata are inserted correclty """
self.assertRaises(NotImplementedError, self.input.setMetadata) metadata_dict = {"title": "Set Metadata Test", "creator": "cloudooo"}
output = self.input.setMetadata(metadata_dict)
handler = Handler(self.tmp_url, output, "ogv", **self.kw)
metadata = handler.getMetadata()
self.assertEquals(metadata["Title"], "Set Metadata Test")
self.assertEquals(metadata["Creator"], "cloudooo")
def testConvertAudio(self): def testConvertAudio(self):
"""Test coversion of audio to another format""" """Test coversion of audio to another format"""
...@@ -60,7 +65,7 @@ class TestHandler(HandlerTestCase): ...@@ -60,7 +65,7 @@ class TestHandler(HandlerTestCase):
kw = dict(env=dict(PATH=self.env_path)) kw = dict(env=dict(PATH=self.env_path))
self.input = Handler(self.tmp_url, self.data, "ogg", **kw) self.input = Handler(self.tmp_url, self.data, "ogg", **kw)
output_data = self.input.convert("wav") output_data = self.input.convert("wav")
file_format = file_detector.from_buffer(output_data) file_format = self.file_detector.from_buffer(output_data)
# XXX this might expect 'audio/vnd.wave' but magic only got 'audio/x-wav' # XXX this might expect 'audio/vnd.wave' but magic only got 'audio/x-wav'
self.assertEquals(file_format, 'audio/x-wav') self.assertEquals(file_format, 'audio/x-wav')
......
...@@ -42,11 +42,11 @@ class TestServer(HandlerTestCase): ...@@ -42,11 +42,11 @@ class TestServer(HandlerTestCase):
"""Creates a connection with cloudooo server""" """Creates a connection with cloudooo server"""
self.proxy = ServerProxy("http://%s:%s/RPC2" % \ self.proxy = ServerProxy("http://%s:%s/RPC2" % \
(self.hostname, self.cloudooo_port), allow_none=True) (self.hostname, self.cloudooo_port), allow_none=True)
self.data = open(join('data', 'test.ogv'), 'r').read()
def testConvertVideo(self): def testConvertVideo(self):
"""Converts ogv video to mpeg format""" """Converts ogv video to mpeg format"""
data = open(join('data', 'test.ogv'), 'r').read() video = self.proxy.convertFile(encodestring(self.data),
video = self.proxy.convertFile(encodestring(data),
"ogv", "ogv",
"mpeg") "mpeg")
mime = Magic(mime=True) mime = Magic(mime=True)
...@@ -55,9 +55,17 @@ class TestServer(HandlerTestCase): ...@@ -55,9 +55,17 @@ class TestServer(HandlerTestCase):
def testGetMetadata(self): def testGetMetadata(self):
"""test if metadata are extracted correctly""" """test if metadata are extracted correctly"""
metadata = self.proxy.getFileMetadataItemList(encodestring(self.data), "ogv")
self.assertEquals(metadata["Encoder"], 'Lavf52.64.2')
def testSetMetadata(self): def testSetMetadata(self):
"""Test if metadata is inserted correctly""" """Test if metadata is inserted correctly"""
new_data = self.proxy.updateFileMetadata(encodestring(self.data),
"ogv",
{"title": "Server Set Metadata Test"})
metadata = self.proxy.getFileMetadataItemList(new_data, "ogv")
self.assertEquals(metadata["Encoder"], 'Lavf52.64.2')
self.assertEquals(metadata["Title"], 'Server Set Metadata Test')
def test_suite(): def test_suite():
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment