diff --git a/ChangeLog b/ChangeLog index df232b9..76fd581 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,10 @@ +2009-04-25 Thomas Vander Stichele + + * morituri/test/test_common_renamer.py (added): + * morituri/common/renamer.py (added): + Add a way of doing transactional file renames, as well as their + metafile updates. + 2009-04-21 Thomas Vander Stichele * morituri/common/task.py: diff --git a/morituri/common/renamer.py b/morituri/common/renamer.py new file mode 100644 index 0000000..6bf5e49 --- /dev/null +++ b/morituri/common/renamer.py @@ -0,0 +1,224 @@ +# -*- Mode: Python; test-case-name: morituri.test.test_common_renamer -*- +# vi:si:et:sw=4:sts=4:ts=4 + +# Morituri - for those about to RIP + +# Copyright (C) 2009 Thomas Vander Stichele + +# This file is part of morituri. +# +# morituri is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# morituri is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with morituri. If not, see . + +import os +import sys +import tempfile + +""" +Rename files on file system and inside metafiles in a resumable way. +""" + + +class Operator(object): + def __init__(self, statePath, key): + self._todo = [] + self._done = [] + self._statePath = statePath + self._key = key + self._resuming = False + + def addOperation(self, operation): + """ + Add an operation. + """ + self._todo.append(operation) + + def load(self): + """ + Load state from the given state path using the given key. + Verifies the state. + """ + todo = os.path.join(self._statePath, self._key + '.todo') + handle = open(todo, 'r') + lines = [] + for line in handle.readlines(): + lines.append(line) + name, data = line.split(' ', 1) + cls = globals()[name] + operation = cls.deserialize(data) + self._todo.append(operation) + + + done = os.path.join(self._statePath, self._key + '.done') + i = 0 + if os.path.exists(done): + handle = open(done, 'r') + for i, line in enumerate(handle.readlines()): + assert line == lines[i], "line %s is different than %s" % ( + line, lines[i]) + self._done.append(self._todo[i]) + + # last task done is i; check if the next one might have gotten done. + self._resuming = True + + + def save(self): + """ + Saves the state to the given state path using the given key. + """ + # only save todo first time + todo = os.path.join(self._statePath, self._key + '.todo') + if not os.path.exists(todo): + handle = open(todo, 'w') + for o in self._todo: + name = o.__class__.__name__ + data = o.serialize() + handle.write('%s %s\n' % (name, data)) + handle.close() + + # save done every time + done = os.path.join(self._statePath, self._key + '.done') + handle = open(done, 'w') + for o in self._done: + name = o.__class__.__name__ + data = o.serialize() + handle.write('%s %s\n' % (name, data)) + handle.close() + + def start(self): + """ + Execute the operations + """ + + def next(self): + operation = self._todo[len(self._done)] + if self._resuming: + operation.redo() + self._resuming = False + else: + operation.do() + + self._done.append(operation) + self.save() + + +class FileRenamer(Operator): + def addRename(self, source, destination): + """ + Add a rename operation. + + @param source: source filename + @type source: str + @param destination: destination filename + @type destination: str + """ + + +class Operation(object): + def verify(self): + """ + Check if the operation will succeed in the current conditions. + Consider this a pre-flight check. + + Does not eliminate the need to handle errors as they happen. + """ + + def do(self): + """ + Perform the operation. + """ + pass + + def redo(self): + """ + Perform the operation, without knowing if it already has been + (partly) performed. + """ + self.do() + + def serialize(self): + """ + Serialize the operation. + The return value should bu usable with L{deserialize} + + @rtype: str + """ + + def deserialize(cls, data): + """ + Deserialize the operation with the given operation data. + + @type data: str + """ + raise NotImplementedError + deserialize = classmethod(deserialize) + + +class RenameFile(Operation): + def __init__(self, source, destination): + self._source = source + self._destination = destination + + def verify(self): + assert os.path.exists(self._source) + assert not os.path.exists(self._destination) + + def do(self): + os.rename(self._source, self._destination) + + def serialize(self): + return '"%s" "%s"' % (self._source, self._destination) + + def deserialize(cls, data): + _, source, __, destination, ___ = data.split('"') + return RenameFile(source, destination) + deserialize = classmethod(deserialize) + + def __eq__(self, other): + return self._source == other._source \ + and self._destination == other._destination + + +class RenameInFile(Operation): + def __init__(self, path, source, destination): + self._path = path + self._source = source + self._destination = destination + + def verify(self): + assert os.path.exists(self._path) + # check if the source exists in the given file + + def do(self): + input = open(self._path) + (fd, name) = tempfile.mkstemp(suffix='.morituri') + + for s in input: + os.write(fd, s.replace(self._source, self._destination)) + + input.close() + os.close(fd) + os.rename(name, self._path) + + def serialize(self): + return '"%s" "%s" "%s"' % (self._path, self._source, self._destination) + + def deserialize(cls, data): + _, path, __, source, ___, destination, ____ = data.split('"') + return RenameInFile(path, source, destination) + deserialize = classmethod(deserialize) + + def __eq__(self, other): + return self._source == other._source \ + and self._destination == other._destination \ + and self._path == other._path diff --git a/morituri/test/test_common_renamer.py b/morituri/test/test_common_renamer.py new file mode 100644 index 0000000..e2d824e --- /dev/null +++ b/morituri/test/test_common_renamer.py @@ -0,0 +1,135 @@ +# -*- Mode: Python; test-case-name: morituri.test.test_image_cue -*- +# vi:si:et:sw=4:sts=4:ts=4 + +import os +import stat +import tempfile + +import unittest + +from morituri.common import renamer + +class RenameInFileTestcase(unittest.TestCase): + def setUp(self): + (fd, self._path) = tempfile.mkstemp(suffix='morituri') + os.write(fd, 'This is a test\nThis is another\n') + os.close(fd) + + def testVerify(self): + o = renamer.RenameInFile(self._path, 'is is a', 'at was some') + self.assertEquals(o.verify(), None) + os.unlink(self._path) + self.assertRaises(AssertionError, o.verify) + + def testDo(self): + o = renamer.RenameInFile(self._path, 'is is a', 'at was some') + o.do() + output = open(self._path).read() + self.assertEquals(output, 'That was some test\nThat was somenother\n') + + def testSerialize(self): + o = renamer.RenameInFile(self._path, 'is is a', 'at was some') + data = o.serialize() + o2 = renamer.RenameInFile.deserialize(data) + o2.do() + output = open(self._path).read() + self.assertEquals(output, 'That was some test\nThat was somenother\n') + +class RenameFileTestcase(unittest.TestCase): + def setUp(self): + (fd, self._source) = tempfile.mkstemp(suffix='morituri') + os.write(fd, 'This is a test\nThis is another\n') + os.close(fd) + (fd, self._destination) = tempfile.mkstemp(suffix='morituri') + os.close(fd) + os.unlink(self._destination) + self._operation = renamer.RenameFile(self._source, self._destination) + + def testVerify(self): + self.assertEquals(self._operation.verify(), None) + + handle = open(self._destination, 'w') + handle.close() + self.assertRaises(AssertionError, self._operation.verify) + + os.unlink(self._destination) + self.assertEquals(self._operation.verify(), None) + + os.unlink(self._source) + self.assertRaises(AssertionError, self._operation.verify) + + def testDo(self): + self._operation.do() + output = open(self._destination).read() + self.assertEquals(output, 'This is a test\nThis is another\n') + + def testSerialize(self): + data = self._operation.serialize() + o = renamer.RenameFile.deserialize(data) + o.do() + output = open(self._destination).read() + self.assertEquals(output, 'This is a test\nThis is another\n') + +class OperatorTestCase(unittest.TestCase): + def setUp(self): + self._statePath = tempfile.mkdtemp(suffix='.morituri') + self._operator = renamer.Operator(self._statePath, 'test') + + (fd, self._source) = tempfile.mkstemp(suffix='morituri') + os.write(fd, 'This is a test\nThis is another\n') + os.close(fd) + (fd, self._destination) = tempfile.mkstemp(suffix='morituri') + os.close(fd) + os.unlink(self._destination) + self._operator.addOperation( + renamer.RenameInFile(self._source, 'is is a', 'at was some')) + self._operator.addOperation( + renamer.RenameFile(self._source, self._destination)) + + def testLoadNoneDone(self): + self._operator.save() + + o = renamer.Operator(self._statePath, 'test') + o.load() + + self.assertEquals(o._todo, self._operator._todo) + self.assertEquals(o._done, []) + + def testLoadOneDone(self): + self.assertEquals(len(self._operator._done), 0) + self._operator.save() + self._operator.next() + self.assertEquals(len(self._operator._done), 1) + + o = renamer.Operator(self._statePath, 'test') + o.load() + + self.assertEquals(len(o._done), 1) + self.assertEquals(o._todo, self._operator._todo) + self.assertEquals(o._done, self._operator._done) + + # now continue + o.next() + self.assertEquals(len(o._done), 2) + + def testLoadOneInterrupted(self): + self.assertEquals(len(self._operator._done), 0) + self._operator.save() + + # cheat by doing a task without saving + self._operator._todo[0].do() + + self.assertEquals(len(self._operator._done), 0) + + o = renamer.Operator(self._statePath, 'test') + o.load() + + self.assertEquals(len(o._done), 0) + self.assertEquals(o._todo, self._operator._todo) + self.assertEquals(o._done, self._operator._done) + + # now continue, resuming + o.next() + self.assertEquals(len(o._done), 1) + o.next() + self.assertEquals(len(o._done), 2)