Mercurial > hgsubversion
view tests/test_rebuildmeta.py @ 933:a9f315eae67c
push: use native rebase instead of our rebase wrapper
Our rebase wrapper doesn't quite do the right thing with repect to a
just-pushed revision. In particular, it will try to rebase the
just-pushed revision on top of the version of that revision we just
pulled down from svn. This will sometimes result in a local revision
with an identical commit message as the revision from svn, but no file
changes.
This changes the rebase portion of the push command to instead use the
native rebase with a revset that excludes the revision we just pushed
to svn from the set to be rebased. It also moves to a single strip
operation that removes all of the revisions based on a pre-push or
partially pushed revision. This moves to a separate rebase and strip
operation since we now need to strip revisions we are not rebasing.
author | David Schleimer <dschleimer@fb.com> |
---|---|
date | Mon, 24 Sep 2012 10:18:28 -0700 |
parents | 5bacb9c63e3e |
children | 3b43b1c45e2e |
line wrap: on
line source
import test_util import os import pickle import unittest from mercurial import context from mercurial import extensions from mercurial import hg from mercurial import ui from hgsubversion import svncommands from hgsubversion import svnmeta # These test repositories have harmless skew in rebuildmeta for the # last-pulled-rev because the last rev in svn causes absolutely no # changes in hg. expect_youngest_skew = [('file_mixed_with_branches.svndump', False, False), ('file_mixed_with_branches.svndump', True, False), ('unrelatedbranch.svndump', False, False), ('unrelatedbranch.svndump', True, False), ] def _do_case(self, name, stupid, single): subdir = test_util.subdir.get(name, '') layout = 'auto' if single: layout = 'single' repo, repo_path = self.load_and_fetch(name, subdir=subdir, stupid=stupid, layout=layout) assert len(self.repo) > 0 wc2_path = self.wc_path + '_clone' u = ui.ui() src, dest = test_util.hgclone(u, self.wc_path, wc2_path, update=False) src = test_util.getlocalpeer(src) dest = test_util.getlocalpeer(dest) # insert a wrapper that prevents calling changectx.children() def failfn(orig, ctx): self.fail('calling %s is forbidden; it can cause massive slowdowns ' 'when rebuilding large repositories' % orig) origchildren = getattr(context.changectx, 'children') extensions.wrapfunction(context.changectx, 'children', failfn) try: svncommands.rebuildmeta(u, dest, args=[test_util.fileurl(repo_path + subdir), ]) finally: # remove the wrapper context.changectx.children = origchildren self._run_assertions(name, stupid, single, src, dest, u) wc3_path = self.wc_path + '_partial' src, dest = test_util.hgclone(u, self.wc_path, wc3_path, update=False, rev=[0]) srcrepo = test_util.getlocalpeer(src) dest = test_util.getlocalpeer(dest) # insert a wrapper that prevents calling changectx.children() extensions.wrapfunction(context.changectx, 'children', failfn) try: svncommands.rebuildmeta(u, dest, args=[test_util.fileurl(repo_path + subdir), ]) finally: # remove the wrapper context.changectx.children = origchildren dest.pull(src) # insert a wrapper that prevents calling changectx.children() extensions.wrapfunction(context.changectx, 'children', failfn) try: svncommands.updatemeta(u, dest, args=[test_util.fileurl(repo_path + subdir), ]) finally: # remove the wrapper context.changectx.children = origchildren self._run_assertions(name, stupid, single, srcrepo, dest, u) def _run_assertions(self, name, stupid, single, src, dest, u): self.assertTrue(os.path.isdir(os.path.join(src.path, 'svn')), 'no .hg/svn directory in the source!') self.assertTrue(os.path.isdir(os.path.join(src.path, 'svn')), 'no .hg/svn directory in the destination!') dest = hg.repository(u, os.path.dirname(dest.path)) for tf in ('lastpulled', 'rev_map', 'uuid', 'tagmap', 'layout', 'subdir',): stf = os.path.join(src.path, 'svn', tf) self.assertTrue(os.path.isfile(stf), '%r is missing!' % stf) dtf = os.path.join(dest.path, 'svn', tf) self.assertTrue(os.path.isfile(dtf), '%r is missing!' % tf) old, new = open(stf).read(), open(dtf).read() if tf == 'lastpulled' and (name, stupid, single) in expect_youngest_skew: self.assertNotEqual(old, new, 'rebuildmeta unexpected match on youngest rev!') continue self.assertMultiLineEqual(old, new, tf + ' differs') self.assertEqual(src.branchtags(), dest.branchtags()) srcbi = pickle.load(open(os.path.join(src.path, 'svn', 'branch_info'))) destbi = pickle.load(open(os.path.join(dest.path, 'svn', 'branch_info'))) self.assertEqual(sorted(srcbi.keys()), sorted(destbi.keys())) revkeys = svnmeta.SVNMeta(dest).revmap.keys() for branch in destbi: srcinfo = srcbi[branch] destinfo = destbi[branch] if srcinfo[:2] == (None, 0) or destinfo[:2] == (None, 0): self.assertTrue(srcinfo[2] <= destinfo[2], 'Latest revision for %s decreased from %d to %d!' % (branch or 'default', srcinfo[2], destinfo[2])) self.assertEqual(srcinfo[0], destinfo[0]) else: pr = sorted(filter(lambda x: x[1] == srcinfo[0] and x[0] <= srcinfo[1], revkeys), reverse=True)[0][0] self.assertEqual(pr, destinfo[1]) self.assertEqual(srcinfo[2], destinfo[2]) def buildmethod(case, name, stupid, single): m = lambda self: self._do_case(case, stupid, single) m.__name__ = name m.__doc__ = ('Test rebuildmeta on %s with %s replay. (%s)' % (case, (stupid and 'stupid') or 'real', (single and 'single') or 'standard', ) ) return m skip = set([ 'project_root_not_repo_root.svndump', 'corrupt.svndump', ]) attrs = {'_do_case': _do_case, '_run_assertions': _run_assertions, } for case in [f for f in os.listdir(test_util.FIXTURES) if f.endswith('.svndump')]: # this fixture results in an empty repository, don't use it if case in skip: continue bname = 'test_' + case[:-len('.svndump')] attrs[bname] = buildmethod(case, bname, False, False) name = bname + '_stupid' attrs[name] = buildmethod(case, name, True, False) name = bname + '_single' attrs[name] = buildmethod(case, name, False, True) RebuildMetaTests = type('RebuildMetaTests', (test_util.TestBase,), attrs) def suite(): all_tests = [unittest.TestLoader().loadTestsFromTestCase(RebuildMetaTests), ] return unittest.TestSuite(all_tests)