view tests/test_fetch_branches.py @ 890:78db88de9622

Partial metadata rebuilding For highly active subversion repositories, it can be excruciatingly slow to pull updates one at a time from subversion. One way around this is to setup another mercurial repo that pulls new commits from svn periodicly (say every 5 minutes). When you want to update your repository, you can pull commits from this mercurial repository via native mercurial protocols, which will be much faster than pulling directly from svn. Unfortunately, your metadata will be out of date after doing so. Highly active repositories also tend to be very large, which means that it takes a long time to rebuild your metadata from scratch. To address this, this adds support to do a partial rebuild on the metadata by processing only revisions that have been added to the repository after the last revision we processed. With the rev map 1k revisions (~2 days) behind tip updatemeta is dramatically faster than rebuild meta: $ hg --time svn updatemeta Time: real 0.570 secs (user 0.480+0.000 sys 0.060+0.000) $ hg --time svn rebuildmeta Time: real 129.160 secs (user 128.570+0.000 sys 0.320+0.000)
author David Schleimer <dschleimer@fb.com>
date Sat, 12 May 2012 07:28:23 -0700
parents 20e73b5ab6f7
children a80b01ceb1fc
line wrap: on
line source

import test_util

import unittest

from mercurial import hg
from mercurial import node
from mercurial import util as hgutil

class TestFetchBranches(test_util.TestBase):
    def _load_fixture_and_fetch_with_anchor(self, fixture_name, anchor):
        repo_path = self.load_svndump(fixture_name)
        source = '%s#%s' % (test_util.fileurl(repo_path), anchor)
        test_util.hgclone(self.ui(), source, self.wc_path)
        return hg.repository(self.ui(), self.wc_path)

    def branches(self, repo):
        hctxs = [repo[hn] for hn in repo.heads()]
        openbranches = set(ctx.branch() for ctx in hctxs if
                           ctx.extra().get('close', None) != '1')
        closedbranches = set(ctx.branch() for ctx in hctxs if
                             ctx.extra().get('close', None) == '1')
        return sorted(openbranches), sorted(closedbranches)

    def openbranches(self, repo):
        return self.branches(repo)[0]

    def test_rename_branch_parent(self, stupid=False):
        repo = self._load_fixture_and_fetch('rename_branch_parent_dir.svndump',
                                            stupid=stupid)
        heads = [repo[n] for n in repo.heads()]
        heads = dict([(ctx.branch(), ctx) for ctx in heads])
        # Let these tests disabled yet as the fix is not obvious
        self.assertEqual(['dev_branch'], self.openbranches(repo))

    def test_rename_branch_parent_stupid(self):
        self.test_rename_branch_parent(stupid=True)

    def test_unrelatedbranch(self, stupid=False):
        repo = self._load_fixture_and_fetch('unrelatedbranch.svndump',
                                            stupid=stupid)
        heads = [repo[n] for n in repo.heads()]
        heads = dict([(ctx.branch(), ctx) for ctx in heads])
        # Let these tests disabled yet as the fix is not obvious
        self.assertEqual(heads['branch1'].manifest().keys(), ['b'])
        self.assertEqual(heads['branch2'].manifest().keys(), ['a', 'b'])

    def test_unrelatedbranch_stupid(self):
        self.test_unrelatedbranch(True)

    def test_unorderedbranch(self, stupid=False):
        repo = self._load_fixture_and_fetch('unorderedbranch.svndump',
                                            stupid=stupid)
        r = repo['branch']
        self.assertEqual(0, r.parents()[0].rev())
        self.assertEqual(['a', 'c', 'z'], sorted(r.manifest()))

    def test_unorderedbranch_stupid(self):
        self.test_unorderedbranch(True)

    def test_renamed_branch_to_trunk(self, stupid=False):
        repo = self._load_fixture_and_fetch('branch_rename_to_trunk.svndump',
                                            stupid=stupid)
        self.assertEqual(repo['default'].parents()[0].branch(), 'dev_branch')
        self.assert_('iota' in repo['default'])
        self.assertEqual(repo['old_trunk'].parents()[0].branch(), 'default')
        self.assert_('iota' not in repo['old_trunk'])
        expected = ['default', 'old_trunk']
        self.assertEqual(self.openbranches(repo), expected)

    def test_renamed_branch_to_trunk_stupid(self):
        self.test_renamed_branch_to_trunk(stupid=True)

    def test_replace_trunk_with_branch(self, stupid=False):
        repo = self._load_fixture_and_fetch('replace_trunk_with_branch.svndump',
                                            stupid=stupid)
        self.assertEqual(repo['default'].parents()[0].branch(), 'test')
        self.assertEqual(repo['tip'].branch(), 'default')
        self.assertEqual(repo['tip'].extra().get('close'), '1')
        self.assertEqual(self.openbranches(repo), ['default'])

    def test_copybeforeclose(self, stupid=False):
        repo = self._load_fixture_and_fetch('copybeforeclose.svndump',
                                            stupid=stupid)
        self.assertEqual(repo['tip'].branch(), 'test')
        self.assertEqual(repo['test'].extra().get('close'), '1')
        self.assertEqual(repo['test']['b'].data(), 'a\n')

    def test_copybeforeclose_stupid(self):
        self.test_copybeforeclose(True)

    def test_replace_trunk_with_branch_stupid(self):
        self.test_replace_trunk_with_branch(stupid=True)

    def test_branch_create_with_dir_delete_works(self, stupid=False):
        repo = self._load_fixture_and_fetch('branch_create_with_dir_delete.svndump',
                                            stupid=stupid)
        self.assertEqual(repo['tip'].manifest().keys(),
                         ['alpha', 'beta', 'iota', 'gamma', ])

    def test_branch_tip_update_to_default(self, stupid=False):
        repo = self._load_fixture_and_fetch('unorderedbranch.svndump',
                                            stupid=stupid, noupdate=False)
        self.assertEqual(repo[None].branch(), 'default')
        self.assertTrue('tip' not in repo[None].tags())

    def test_branch_tip_update_to_default_stupid(self):
        self.test_branch_tip_update_to_default(True)

    def test_branch_pull_anchor(self):
        self.assertRaises(hgutil.Abort,
                          self._load_fixture_and_fetch_with_anchor,
                          'unorderedbranch.svndump', 'NaN')
        repo = self._load_fixture_and_fetch_with_anchor(
            'unorderedbranch.svndump', '4')
        self.assertTrue('c' not in repo.branchtags())

    def test_branches_weird_moves(self, stupid=False):
        repo = self._load_fixture_and_fetch('renamedproject.svndump',
                                            stupid=stupid,
                                            subdir='project')
        heads = [repo[n] for n in repo.heads()]
        heads = dict((ctx.branch(), ctx) for ctx in heads)
        mdefault = sorted(heads['default'].manifest().keys())
        mbranch = sorted(heads['branch'].manifest().keys())
        self.assertEqual(mdefault, ['a', 'b', 'd/a'])
        self.assertEqual(mbranch, ['a'])

    def test_branches_weird_moves_stupid(self):
        self.test_branches_weird_moves(True)

    def test_branch_delete_parent_dir(self, stupid=False):
        repo = self._load_fixture_and_fetch('branch_delete_parent_dir.svndump',
                                            stupid=stupid)
        openb, closedb = self.branches(repo)
        self.assertEqual(openb, [])
        self.assertEqual(closedb, ['dev_branch'])
        self.assertEqual(list(repo['dev_branch']), ['foo'])

    def test_replace_branch_with_branch(self, stupid=False):
        repo = self._load_fixture_and_fetch('replace_branch_with_branch.svndump',
                                            stupid=stupid)
        self.assertEqual(7, len(repo))
        # tip is former topological branch1 being closed
        ctx = repo['tip']
        self.assertEqual('1', ctx.extra().get('close', '0'))
        self.assertEqual('branch1', ctx.branch())
        # r5 is where the replacement takes place
        ctx = repo[5]
        self.assertEqual(set(['a', 'c', 'dir/e', 'dir2/e', 'f', 'g']), set(ctx))
        self.assertEqual('0', ctx.extra().get('close', '0'))
        self.assertEqual('branch1', ctx.branch())
        self.assertEqual('c\n', ctx['c'].data())
        self.assertEqual('d\n', ctx['a'].data())
        self.assertEqual('e\n', ctx['dir/e'].data())
        self.assertEqual('e\n', ctx['dir2/e'].data())
        self.assertEqual('f\n', ctx['f'].data())
        self.assertEqual('g\n', ctx['g'].data())
        for f in ctx:
            self.assertTrue(not ctx[f].renamed())

    def test_replace_branch_with_branch_stupid(self, stupid=False):
        self.test_replace_branch_with_branch(True)

def suite():
    all_tests = [unittest.TestLoader().loadTestsFromTestCase(TestFetchBranches),
          ]
    return unittest.TestSuite(all_tests)