Mercurial > hgsubversion
view tests/test_fetch_branches.py @ 937:fb6f6b7fa5a5
editor: implement file batons
The concept of current.file is incorrect, svn_delta.h documents open
file lifetime as:
* 5. When the producer calls @c open_file or @c add_file, either:
*
* (a) The producer must follow with any changes to the file
* (@c change_file_prop and/or @c apply_textdelta, as applicable),
* followed by a @c close_file call, before issuing any other file
* or directory calls, or
*
* (b) The producer must follow with a @c change_file_prop call if
* it is applicable, before issuing any other file or directory
* calls; later, after all directory batons including the root
* have been closed, the producer must issue @c apply_textdelta
* and @c close_file calls.
So, an open file can be kept open until after the root directory is
closed and have deltas applied afterwards. In the meantime, other files
may have been opened and patched, overwriting the current.file variable.
This patch fixes it by introducing file batons bound to file paths, and
using them to deduce the correct target in apply_textdelta(). In theory,
open files could be put in a staging area until they are closed and
moved in the RevisionData. But the current code registers files copied
during a directory copy as open files and these will not receive a
close_file() event. This separation will be enforced later.
author | Patrick Mezard <patrick@mezard.eu> |
---|---|
date | Sun, 23 Sep 2012 19:52:48 +0200 |
parents | 20e73b5ab6f7 |
children | a80b01ceb1fc |
line wrap: on
line source
import test_util import unittest from mercurial import hg from mercurial import node from mercurial import util as hgutil class TestFetchBranches(test_util.TestBase): def _load_fixture_and_fetch_with_anchor(self, fixture_name, anchor): repo_path = self.load_svndump(fixture_name) source = '%s#%s' % (test_util.fileurl(repo_path), anchor) test_util.hgclone(self.ui(), source, self.wc_path) return hg.repository(self.ui(), self.wc_path) def branches(self, repo): hctxs = [repo[hn] for hn in repo.heads()] openbranches = set(ctx.branch() for ctx in hctxs if ctx.extra().get('close', None) != '1') closedbranches = set(ctx.branch() for ctx in hctxs if ctx.extra().get('close', None) == '1') return sorted(openbranches), sorted(closedbranches) def openbranches(self, repo): return self.branches(repo)[0] def test_rename_branch_parent(self, stupid=False): repo = self._load_fixture_and_fetch('rename_branch_parent_dir.svndump', stupid=stupid) heads = [repo[n] for n in repo.heads()] heads = dict([(ctx.branch(), ctx) for ctx in heads]) # Let these tests disabled yet as the fix is not obvious self.assertEqual(['dev_branch'], self.openbranches(repo)) def test_rename_branch_parent_stupid(self): self.test_rename_branch_parent(stupid=True) def test_unrelatedbranch(self, stupid=False): repo = self._load_fixture_and_fetch('unrelatedbranch.svndump', stupid=stupid) heads = [repo[n] for n in repo.heads()] heads = dict([(ctx.branch(), ctx) for ctx in heads]) # Let these tests disabled yet as the fix is not obvious self.assertEqual(heads['branch1'].manifest().keys(), ['b']) self.assertEqual(heads['branch2'].manifest().keys(), ['a', 'b']) def test_unrelatedbranch_stupid(self): self.test_unrelatedbranch(True) def test_unorderedbranch(self, stupid=False): repo = self._load_fixture_and_fetch('unorderedbranch.svndump', stupid=stupid) r = repo['branch'] self.assertEqual(0, r.parents()[0].rev()) self.assertEqual(['a', 'c', 'z'], sorted(r.manifest())) def test_unorderedbranch_stupid(self): self.test_unorderedbranch(True) def test_renamed_branch_to_trunk(self, stupid=False): repo = self._load_fixture_and_fetch('branch_rename_to_trunk.svndump', stupid=stupid) self.assertEqual(repo['default'].parents()[0].branch(), 'dev_branch') self.assert_('iota' in repo['default']) self.assertEqual(repo['old_trunk'].parents()[0].branch(), 'default') self.assert_('iota' not in repo['old_trunk']) expected = ['default', 'old_trunk'] self.assertEqual(self.openbranches(repo), expected) def test_renamed_branch_to_trunk_stupid(self): self.test_renamed_branch_to_trunk(stupid=True) def test_replace_trunk_with_branch(self, stupid=False): repo = self._load_fixture_and_fetch('replace_trunk_with_branch.svndump', stupid=stupid) self.assertEqual(repo['default'].parents()[0].branch(), 'test') self.assertEqual(repo['tip'].branch(), 'default') self.assertEqual(repo['tip'].extra().get('close'), '1') self.assertEqual(self.openbranches(repo), ['default']) def test_copybeforeclose(self, stupid=False): repo = self._load_fixture_and_fetch('copybeforeclose.svndump', stupid=stupid) self.assertEqual(repo['tip'].branch(), 'test') self.assertEqual(repo['test'].extra().get('close'), '1') self.assertEqual(repo['test']['b'].data(), 'a\n') def test_copybeforeclose_stupid(self): self.test_copybeforeclose(True) def test_replace_trunk_with_branch_stupid(self): self.test_replace_trunk_with_branch(stupid=True) def test_branch_create_with_dir_delete_works(self, stupid=False): repo = self._load_fixture_and_fetch('branch_create_with_dir_delete.svndump', stupid=stupid) self.assertEqual(repo['tip'].manifest().keys(), ['alpha', 'beta', 'iota', 'gamma', ]) def test_branch_tip_update_to_default(self, stupid=False): repo = self._load_fixture_and_fetch('unorderedbranch.svndump', stupid=stupid, noupdate=False) self.assertEqual(repo[None].branch(), 'default') self.assertTrue('tip' not in repo[None].tags()) def test_branch_tip_update_to_default_stupid(self): self.test_branch_tip_update_to_default(True) def test_branch_pull_anchor(self): self.assertRaises(hgutil.Abort, self._load_fixture_and_fetch_with_anchor, 'unorderedbranch.svndump', 'NaN') repo = self._load_fixture_and_fetch_with_anchor( 'unorderedbranch.svndump', '4') self.assertTrue('c' not in repo.branchtags()) def test_branches_weird_moves(self, stupid=False): repo = self._load_fixture_and_fetch('renamedproject.svndump', stupid=stupid, subdir='project') heads = [repo[n] for n in repo.heads()] heads = dict((ctx.branch(), ctx) for ctx in heads) mdefault = sorted(heads['default'].manifest().keys()) mbranch = sorted(heads['branch'].manifest().keys()) self.assertEqual(mdefault, ['a', 'b', 'd/a']) self.assertEqual(mbranch, ['a']) def test_branches_weird_moves_stupid(self): self.test_branches_weird_moves(True) def test_branch_delete_parent_dir(self, stupid=False): repo = self._load_fixture_and_fetch('branch_delete_parent_dir.svndump', stupid=stupid) openb, closedb = self.branches(repo) self.assertEqual(openb, []) self.assertEqual(closedb, ['dev_branch']) self.assertEqual(list(repo['dev_branch']), ['foo']) def test_replace_branch_with_branch(self, stupid=False): repo = self._load_fixture_and_fetch('replace_branch_with_branch.svndump', stupid=stupid) self.assertEqual(7, len(repo)) # tip is former topological branch1 being closed ctx = repo['tip'] self.assertEqual('1', ctx.extra().get('close', '0')) self.assertEqual('branch1', ctx.branch()) # r5 is where the replacement takes place ctx = repo[5] self.assertEqual(set(['a', 'c', 'dir/e', 'dir2/e', 'f', 'g']), set(ctx)) self.assertEqual('0', ctx.extra().get('close', '0')) self.assertEqual('branch1', ctx.branch()) self.assertEqual('c\n', ctx['c'].data()) self.assertEqual('d\n', ctx['a'].data()) self.assertEqual('e\n', ctx['dir/e'].data()) self.assertEqual('e\n', ctx['dir2/e'].data()) self.assertEqual('f\n', ctx['f'].data()) self.assertEqual('g\n', ctx['g'].data()) for f in ctx: self.assertTrue(not ctx[f].renamed()) def test_replace_branch_with_branch_stupid(self, stupid=False): self.test_replace_branch_with_branch(True) def suite(): all_tests = [unittest.TestLoader().loadTestsFromTestCase(TestFetchBranches), ] return unittest.TestSuite(all_tests)