Mercurial > hgsubversion
view tests/test_util.py @ 342:76c833526fbc
Use fallbacks in the wrappers for Subversion support, instead of prefixes.
The change only applies to the ambiguous URL schemes: file, http and
https. The others - svn+ssh and svn - behave the same as previously.
For http and https, the wrapping is implemented in 'svnrepo.py': Only
when the attempt to create a httprepo instance fails, will the URL
be considered for Subversion URL.
For file, the ambiguity is treated much like the Mercurial core
distinguishes bundle and directories. In this case, a directory that
looks like a Subversion repository will *not* be considered for a
Mercurial clone.
Tthe command lines are more similar to before this refactor. The only
option added to push & pull is --stupid; others are only added to
clone. Any of these options specified to clone will be added to the
generated '.hgrc'.
Also, the -r/--rev option now works for clone & push.
author | Dan Villiom Podlaski Christiansen <danchr@gmail.com> |
---|---|
date | Wed, 20 May 2009 18:38:01 +0200 |
parents | 46e69be8e2c8 |
children | 537de0300510 |
line wrap: on
line source
import errno import os import subprocess import shutil import StringIO import stat import tempfile import unittest import urllib from mercurial import context from mercurial import commands from mercurial import hg from mercurial import node from mercurial import ui from hgsubversion import util # Fixtures that need to be pulled at a subdirectory of the repo path subdir = {'truncatedhistory.svndump': '/project2', 'fetch_missing_files_subdir.svndump': '/foo', 'empty_dir_in_trunk_not_repo_root.svndump': '/project', } FIXTURES = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'fixtures') def fileurl(path): path = os.path.abspath(path) drive, path = os.path.splitdrive(path) path = urllib.pathname2url(path) if drive: drive = '/' + drive url = 'file://%s%s' % (drive, path) return url def load_svndump_fixture(path, fixture_name): '''Loads an svnadmin dump into a fresh repo at path, which should not already exist. ''' if os.path.exists(path): rmtree(path) subprocess.call(['svnadmin', 'create', path,], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) inp = open(os.path.join(FIXTURES, fixture_name)) proc = subprocess.Popen(['svnadmin', 'load', path,], stdin=inp, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) proc.communicate() def load_fixture_and_fetch(fixture_name, repo_path, wc_path, stupid=False, subdir='', noupdate=True): load_svndump_fixture(repo_path, fixture_name) if subdir: repo_path += '/' + subdir _ui = ui.ui() _ui.setconfig('hgsubversion', 'stupid', str(stupid)) commands.clone(_ui, fileurl(repo_path), wc_path, noupdate=noupdate) _ui = ui.ui() _ui.setconfig('hgsubversion', 'stupid', str(stupid)) return hg.repository(_ui, wc_path) def rmtree(path): # Read-only files cannot be removed under Windows for root, dirs, files in os.walk(path): for f in files: f = os.path.join(root, f) try: s = os.stat(f) except OSError, e: if e.errno == errno.ENOENT: continue raise if (s.st_mode & stat.S_IWRITE) == 0: os.chmod(f, s.st_mode | stat.S_IWRITE) shutil.rmtree(path) class TestBase(unittest.TestCase): def setUp(self): self.oldwd = os.getcwd() self.tmpdir = tempfile.mkdtemp( 'svnwrap_test', dir=os.environ.get('HGSUBVERSION_TEST_TEMP', None)) self.hgrc = os.path.join(self.tmpdir, '.hgrc') os.environ['HGRCPATH'] = self.hgrc rc = open(self.hgrc, 'w') for l in '[extensions]', 'hgsubversion=': print >> rc, l self.repo_path = '%s/testrepo' % self.tmpdir self.wc_path = '%s/testrepo_wc' % self.tmpdir # Previously, we had a MockUI class that wrapped ui, and giving access # to the stream. The ui.pushbuffer() and ui.popbuffer() can be used # instead. Using the regular UI class, with all stderr redirected to # stdout ensures that the test setup is much more similar to usage # setups. self.patch = (ui.ui.write_err, ui.ui.write) setattr(ui.ui, self.patch[0].func_name, self.patch[1]) def tearDown(self): rmtree(self.tmpdir) os.chdir(self.oldwd) setattr(ui.ui, self.patch[0].func_name, self.patch[0]) def _load_fixture_and_fetch(self, fixture_name, subdir='', stupid=False): return load_fixture_and_fetch(fixture_name, self.repo_path, self.wc_path, subdir=subdir, stupid=stupid) # define this as a property so that it reloads anytime we need it @property def repo(self): return hg.repository(ui.ui(), self.wc_path) def pushrevisions(self, stupid=False): before = len(self.repo) self.repo.ui.setconfig('hgsubversion', 'stupid', str(stupid)) commands.push(self.repo.ui, self.repo) after = len(self.repo) self.assertEqual(0, after - before) def svnls(self, path, rev='HEAD'): path = self.repo_path + '/' + path path = util.normalize_url(fileurl(path)) args = ['svn', 'ls', '-r', rev, '-R', path] p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) stdout, stderr = p.communicate() if p.returncode: raise Exception('svn ls failed on %s: %r' % (path, stderr)) entries = [e.strip('/') for e in stdout.splitlines()] entries.sort() return entries def commitchanges(self, changes, parent='tip'): """Commit changes to mercurial directory 'changes' is a sequence of tuples (source, dest, data). It can look like: - (source, source, data) to set source content to data - (source, dest, None) to set dest content to source one, and mark it as copied from source. - (source, dest, data) to set dest content to data, and mark it as copied from source. - (source, None, None) to remove source. """ repo = self.repo parentctx = repo[parent] changed, removed = [], [] for source, dest, newdata in changes: if dest is None: removed.append(source) else: changed.append(dest) def filectxfn(repo, memctx, path): if path in removed: raise IOError() entry = [e for e in changes if path == e[1]][0] source, dest, newdata = entry if newdata is None: newdata = parentctx[source].data() copied = None if source != dest: copied = source return context.memfilectx(path=dest, data=newdata, islink=False, isexec=False, copied=copied) ctx = context.memctx(repo, (parentctx.node(), node.nullid), 'automated test', changed + removed, filectxfn, 'an_author', '2008-10-07 20:59:48 -0500') nodeid = repo.commitctx(ctx) repo = self.repo hg.clean(repo, nodeid) return nodeid def assertchanges(self, changes, ctx): """Assert that all 'changes' (as in defined in commitchanged()) went into ctx. """ for source, dest, data in changes: if dest is None: self.assertTrue(source not in ctx) continue self.assertTrue(dest in ctx) if data is None: data = ctx.parents()[0][source].data() self.assertEqual(ctx[dest].data(), data) if dest != source: copy = ctx[dest].renamed() self.assertEqual(copy[0], source)