Mercurial > hgsubversion
view tests/comprehensive/test_rebuildmeta.py @ 1541:adba152a5177
test_util: make developer warnings cause our tests to fail
Previously hgsubversion didn't get any help from develwarn calls,
because we don't use core's testrunner. The upstream configitem
refactor was easier to hunt down using the develwarn being a failure,
which prompted this code. It also exposed several locking correctness
bugs, so it feels like it's worth keeping around long term.
We don't bother with develwarn on anything so old that we don't try to
use smartset. The smartset deprecation was not handled gracefully, so
hg complains for many versions without actually breaking anything, and
it'd pointlessly complicate our code to handle smartset living in
multiple places between when the warning was introduced and the
present.
author | Augie Fackler <raf@durin42.com> |
---|---|
date | Sun, 29 Oct 2017 09:07:12 -0400 |
parents | b98ff95b5861 |
children | 207a0f26f797 |
line wrap: on
line source
import os import unittest import sys # wrapped in a try/except because of weirdness in how # run.py works as compared to nose. try: import test_util except ImportError: sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) import test_util from mercurial import context from mercurial import extensions from mercurial import hg from mercurial import localrepo from mercurial import ui from mercurial import util as hgutil from hgsubversion import compathacks from hgsubversion import svncommands from hgsubversion import svnmeta from hgsubversion import util # These test repositories have harmless skew in rebuildmeta for the # last-pulled-rev because the last rev in svn causes absolutely no # changes in hg. expect_youngest_skew = [('file_mixed_with_branches.svndump', False, False), ('file_mixed_with_branches.svndump', True, False), ('unrelatedbranch.svndump', False, False), ('unrelatedbranch.svndump', True, False), ] def _do_case(self, name, layout): subdir = test_util.subdir.get(name, '') single = layout == 'single' u = test_util.testui() config = {} if layout == 'custom': for branch, path in test_util.custom.get(name, {}).iteritems(): config['hgsubversionbranch.%s' % branch] = path u.setconfig('hgsubversionbranch', branch, path) repo, repo_path = self.load_and_fetch(name, subdir=subdir, layout=layout) assert test_util.repolen(self.repo) > 0 wc2_path = self.wc_path + '_clone' src, dest = test_util.hgclone(u, self.wc_path, wc2_path, update=False) src = test_util.getlocalpeer(src) dest = test_util.getlocalpeer(dest) # insert a wrapper that prevents calling changectx.children() def failfn(orig, ctx): self.fail('calling %s is forbidden; it can cause massive slowdowns ' 'when rebuilding large repositories' % orig) origchildren = getattr(context.changectx, 'children') extensions.wrapfunction(context.changectx, 'children', failfn) try: svncommands.rebuildmeta(u, dest, args=[test_util.fileurl(repo_path + subdir), ]) finally: # remove the wrapper context.changectx.children = origchildren self._run_assertions(name, single, src, dest, u) wc3_path = self.wc_path + '_partial' src, dest = test_util.hgclone(u, self.wc_path, wc3_path, update=False, rev=[0]) srcrepo = test_util.getlocalpeer(src) dest = test_util.getlocalpeer(dest) # insert a wrapper that prevents calling changectx.children() extensions.wrapfunction(context.changectx, 'children', failfn) try: svncommands.rebuildmeta(u, dest, args=[test_util.fileurl(repo_path + subdir), ]) finally: # remove the wrapper context.changectx.children = origchildren if hgutil.safehasattr(localrepo.localrepository, 'pull'): dest.pull(src) else: # Mercurial >= 3.2 from mercurial import exchange exchange.pull(dest, src) # insert a wrapper that prevents calling changectx.children() extensions.wrapfunction(context.changectx, 'children', failfn) try: svncommands.updatemeta(u, dest, args=[test_util.fileurl(repo_path + subdir), ]) finally: # remove the wrapper context.changectx.children = origchildren self._run_assertions(name, single, srcrepo, dest, u) def _run_assertions(self, name, single, src, dest, u): self.assertTrue(os.path.isdir(os.path.join(src.path, 'svn')), 'no .hg/svn directory in the source!') self.assertTrue(os.path.isdir(os.path.join(dest.path, 'svn')), 'no .hg/svn directory in the destination!') dest = hg.repository(u, os.path.dirname(dest.path)) for tf in ('lastpulled', 'rev_map', 'uuid', 'tagmap', 'layout', 'subdir',): stf = os.path.join(src.path, 'svn', tf) # the generation of tagmap is lazy so it doesn't strictly need to exist # if it's not being used if not stf.endswith('tagmap'): self.assertTrue(os.path.isfile(stf), '%r is missing!' % stf) dtf = os.path.join(dest.path, 'svn', tf) old, new = None, None if not dtf.endswith('tagmap'): self.assertTrue(os.path.isfile(dtf), '%r is missing!' % tf) if os.path.isfile(stf) and os.path.isfile(dtf): old, new = util.load(stf, resave=False), util.load(dtf, resave=False) if tf == 'lastpulled' and (name, self.stupid, single) in expect_youngest_skew: self.assertNotEqual( old, new, 'rebuildmeta unexpected match on lastpulled: ' 'old %d new %d, case %r %r %r' % ( old, new, name, self.stupid, single)) continue self.assertEqual( old, new, '%s differs old: %r new %r'% (tf, old, new)) try: self.assertEqual(src.branchmap(), dest.branchmap()) except AttributeError: # hg 2.8 and earlier self.assertEqual(src.branchtags(), dest.branchtags()) srcbi = util.load(os.path.join(src.path, 'svn', 'branch_info')) destbi = util.load(os.path.join(dest.path, 'svn', 'branch_info')) self.assertEqual(sorted(srcbi.keys()), sorted(destbi.keys())) revmap = svnmeta.SVNMeta(dest).revmap # revmap disables __iter__ intentionally to avoid possible slow code # (not using database index in SqliteRevMap) # we need to fetch all keys so enable it by setting _allowiter revmap._allowiter = True revkeys = revmap.keys() for branch in destbi: srcinfo = srcbi[branch] destinfo = destbi[branch] if srcinfo[:2] == (None, 0) or destinfo[:2] == (None, 0): self.assertTrue(srcinfo[2] <= destinfo[2], 'Latest revision for %s decreased from %d to %d!' % (branch or 'default', srcinfo[2], destinfo[2])) self.assertEqual(srcinfo[0], destinfo[0]) else: pr = sorted(filter(lambda x: x[1] == srcinfo[0] and x[0] <= srcinfo[1], revkeys), reverse=True)[0][0] self.assertEqual(pr, destinfo[1]) self.assertEqual(srcinfo[2], destinfo[2]) def buildmethod(case, name, layout): m = lambda self: self._do_case(case, layout) m.__name__ = name m.__doc__ = ('Test rebuildmeta on %s (%s)' % (case, layout)) return m skip = set([ 'project_root_not_repo_root.svndump', 'corrupt.svndump', ]) attrs = {'_do_case': _do_case, '_run_assertions': _run_assertions, 'stupid_mode_tests': True, } for case in [f for f in os.listdir(test_util.FIXTURES) if f.endswith('.svndump')]: # this fixture results in an empty repository, don't use it if case in skip: continue bname = 'test_' + case[:-len('.svndump')] attrs[bname] = buildmethod(case, bname, 'auto') attrs[bname + '_single'] = buildmethod(case, bname + '_single', 'single') if case in test_util.custom: attrs[bname + '_custom'] = buildmethod(case, bname + '_custom', 'single') RebuildMetaTests = type('RebuildMetaTests', (test_util.TestBase,), attrs)