Mercurial > hgsubversion
view tests/test_util.py @ 717:ae5968ffe6fe
svnwrap: fix handling of quotable URLs (fixes #197, refs #132)
The way hgsubversion handles URLs that may or may not be quoted is
somewhat fragile. As part of fixing issue 132 in 925ff8c5989c, the
path component of URLs was always quoted. The URL has been attempted
encoded since the initial check-in.
The fix from 925ff8c5989c was incomplete; reverting it allows us to
clone a URL with a '~' in it.[1] Encoding the URL as UTF-8 seldom
works as expected, as the default string encoding is ASCII, causing
Python to be unable to decode any URL containing an 8-bit
character.
The core problem here is that we don't know whether the URL specified
by the user is quoted or not. Rather than trying to deal with this
ourselves, we pass the problem on to Subversion. Then, we obtain the
URL from the RA instance, where it is always quoted. (It's worth
noting that the editor interface, on the other hand, always deals with
unquoted paths...)
Thus, the following invariants should apply to SubversionRepo
attributes:
- svn_url and root will always be quoted.
- subdir will always be unquoted.
Tests are added that verify that it won't affect the conversion
whether a URL is specified in quoted or unquoted form. Furthermore, a
test fixture for this is added *twice*, so that we can thoroughly test
both quoted and unquoted URLs. I'm not adding a test dedicated to
tildes in URLs; it doesn't seem necessary.
[1] Such as <https://svn.kenai.com/svn/winsw~subversion>.
author | Dan Villiom Podlaski Christiansen <danchr@gmail.com> |
---|---|
date | Mon, 04 Oct 2010 21:00:36 -0500 |
parents | 38ebf7714cdf |
children | ae52a3b30cfb |
line wrap: on
line source
import StringIO import difflib import errno import gettext import imp import os import shutil import stat import subprocess import sys import tempfile import unittest import urllib _rootdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, _rootdir) from mercurial import cmdutil from mercurial import commands from mercurial import context from mercurial import dispatch from mercurial import hg from mercurial import i18n from mercurial import node from mercurial import ui from mercurial import extensions try: from unittest2 import SkipTest except ImportError: try: from nose import SkipTest except ImportError: SkipTest = None from hgsubversion import util # Documentation for Subprocess.Popen() says: # "Note that on Windows, you cannot set close_fds to true and # also redirect the standard handles by setting stdin, stdout or # stderr." canCloseFds='win32' not in sys.platform if not 'win32' in sys.platform: def kill_process(popen_obj): os.kill(popen_obj.pid, 9) else: import ctypes from ctypes.wintypes import BOOL, DWORD, HANDLE, UINT def win_status_check(result, func, args): if result == 0: raise ctypes.WinError() return args def WINAPI(returns, func, *params): assert len(params) % 2 == 0 func.argtypes = tuple(params[0::2]) func.resvalue = returns func.errcheck = win_status_check return func # dwDesiredAccess PROCESS_TERMINATE = 0x0001 OpenProcess = WINAPI(HANDLE, ctypes.windll.kernel32.OpenProcess, DWORD, 'dwDesiredAccess', BOOL, 'bInheritHandle', DWORD, 'dwProcessId', ) CloseHandle = WINAPI(BOOL, ctypes.windll.kernel32.CloseHandle, HANDLE, 'hObject' ) TerminateProcess = WINAPI(BOOL, ctypes.windll.kernel32.TerminateProcess, HANDLE, 'hProcess', UINT, 'uExitCode' ) def kill_process(popen_obj): phnd = OpenProcess(PROCESS_TERMINATE, False, popen_obj.pid) TerminateProcess(phnd, 1) CloseHandle(phnd) # Fixtures that need to be pulled at a subdirectory of the repo path subdir = {'truncatedhistory.svndump': '/project2', 'fetch_missing_files_subdir.svndump': '/foo', 'empty_dir_in_trunk_not_repo_root.svndump': '/project', 'project_root_not_repo_root.svndump': '/dummyproj', 'project_name_with_space.svndump': '/project name', 'non_ascii_path_1.svndump': '/b\xC3\xB8b', 'non_ascii_path_2.svndump': '/b%C3%B8b', } FIXTURES = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'fixtures') def requiresoption(option): '''Skip a test if commands.clone does not take the specified option.''' def decorator(fn): for entry in cmdutil.findcmd('clone', commands.table)[1][1]: if entry[1] == option: return fn # no match found, so skip if SkipTest: def skip(*args, **kwargs): raise SkipTest('test requires clone to accept %s' % option) skip.__name__ = fn.__name__ return skip # no skipping support, so erase decorated method return if not isinstance(option, str): raise TypeError('requiresoption takes a string argument') return decorator def filtermanifest(manifest): return filter(lambda x: x not in ('.hgtags', '.hgsvnexternals', ), manifest) def fileurl(path): path = os.path.abspath(path).replace(os.sep, '/') drive, path = os.path.splitdrive(path) if drive: drive = '/' + drive url = 'file://%s%s' % (drive, path) return url def testui(stupid=False, layout='auto', startrev=0): u = ui.ui() bools = {True: 'true', False: 'false'} u.setconfig('ui', 'quiet', bools[True]) u.setconfig('extensions', 'hgsubversion', '') u.setconfig('hgsubversion', 'stupid', bools[stupid]) u.setconfig('hgsubversion', 'layout', layout) u.setconfig('hgsubversion', 'startrev', startrev) return u def load_svndump_fixture(path, fixture_name): '''Loads an svnadmin dump into a fresh repo at path, which should not already exist. ''' if os.path.exists(path): rmtree(path) subprocess.call(['svnadmin', 'create', path,], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) inp = open(os.path.join(FIXTURES, fixture_name)) proc = subprocess.Popen(['svnadmin', 'load', path,], stdin=inp, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) proc.communicate() def load_fixture_and_fetch(fixture_name, repo_path, wc_path, stupid=False, subdir='', noupdate=True, layout='auto', startrev=0): load_svndump_fixture(repo_path, fixture_name) if subdir: repo_path += '/' + subdir cmd = [ 'clone', '--layout=%s' % layout, '--startrev=%s' % startrev, fileurl(repo_path), wc_path, ] if stupid: cmd.append('--stupid') if noupdate: cmd.append('--noupdate') dispatch.dispatch(cmd) return hg.repository(testui(), wc_path) def rmtree(path): # Read-only files cannot be removed under Windows for root, dirs, files in os.walk(path): for f in files: f = os.path.join(root, f) try: s = os.stat(f) except OSError, e: if e.errno == errno.ENOENT: continue raise if (s.st_mode & stat.S_IWRITE) == 0: os.chmod(f, s.st_mode | stat.S_IWRITE) shutil.rmtree(path) def _verify_our_modules(): ''' Verify that hgsubversion was imported from the correct location. The correct location is any location within the parent directory of the directory containing this file. ''' for modname, module in sys.modules.iteritems(): if not module or not modname.startswith('hgsubversion.'): continue modloc = module.__file__ cp = os.path.commonprefix((os.path.abspath(__file__), modloc)) assert cp.rstrip(os.sep) == _rootdir, ( 'Module location verification failed: hgsubversion was imported ' 'from the wrong path!' ) class TestBase(unittest.TestCase): def setUp(self): _verify_our_modules() self.oldenv = dict([(k, os.environ.get(k, None), ) for k in ('LANG', 'LC_ALL', 'HGRCPATH', )]) self.oldt = i18n.t os.environ['LANG'] = os.environ['LC_ALL'] = 'C' i18n.t = gettext.translation('hg', i18n.localedir, fallback=True) self.oldwd = os.getcwd() self.tmpdir = tempfile.mkdtemp( 'svnwrap_test', dir=os.environ.get('HGSUBVERSION_TEST_TEMP', None)) self.hgrc = os.path.join(self.tmpdir, '.hgrc') os.environ['HGRCPATH'] = self.hgrc rc = open(self.hgrc, 'w') for l in '[extensions]', 'hgsubversion=': print >> rc, l self.repo_path = '%s/testrepo' % self.tmpdir self.wc_path = '%s/testrepo_wc' % self.tmpdir self.svn_wc = None # Previously, we had a MockUI class that wrapped ui, and giving access # to the stream. The ui.pushbuffer() and ui.popbuffer() can be used # instead. Using the regular UI class, with all stderr redirected to # stdout ensures that the test setup is much more similar to usage # setups. self.patch = (ui.ui.write_err, ui.ui.write) setattr(ui.ui, self.patch[0].func_name, self.patch[1]) def tearDown(self): for var, val in self.oldenv.iteritems(): if val is None: del os.environ[var] else: os.environ[var] = val i18n.t = self.oldt rmtree(self.tmpdir) os.chdir(self.oldwd) setattr(ui.ui, self.patch[0].func_name, self.patch[0]) _verify_our_modules() def assertStringEqual(self, l, r): try: self.assertEqual(l, r, 'failed string equality check, see stdout for details') except: add_nl = lambda li: map(lambda x: x+'\n', li) print 'failed expectation:' print ''.join(difflib.unified_diff( add_nl(l.splitlines()), add_nl(r.splitlines()), fromfile='expected', tofile='got')) raise def ui(self, stupid=False, layout='auto'): return testui(stupid, layout) def _load_fixture_and_fetch(self, fixture_name, subdir=None, stupid=False, layout='auto', startrev=0): if layout == 'single': if subdir is None: subdir = 'trunk' elif subdir is None: subdir = '' return load_fixture_and_fetch(fixture_name, self.repo_path, self.wc_path, subdir=subdir, stupid=stupid, layout=layout, startrev=startrev) def _add_svn_rev(self, changes): '''changes is a dict of filename -> contents''' if self.svn_wc is None: self.svn_wc = os.path.join(self.tmpdir, 'testsvn_wc') subprocess.call([ 'svn', 'co', '-q', fileurl(self.repo_path), self.svn_wc ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) for filename, contents in changes.iteritems(): # filenames are / separated filename = filename.replace('/', os.path.sep) filename = os.path.join(self.svn_wc, filename) open(filename, 'w').write(contents) # may be redundant subprocess.call(['svn', 'add', '-q', filename], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) subprocess.call([ 'svn', 'commit', '-q', self.svn_wc, '-m', 'test changes'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) # define this as a property so that it reloads anytime we need it @property def repo(self): return hg.repository(testui(), self.wc_path) def pushrevisions(self, stupid=False, expected_extra_back=0): before = len(self.repo) self.repo.ui.setconfig('hgsubversion', 'stupid', str(stupid)) res = commands.push(self.repo.ui, self.repo) after = len(self.repo) self.assertEqual(expected_extra_back, after - before) return res def svnls(self, path, rev='HEAD'): path = self.repo_path + '/' + path path = util.normalize_url(fileurl(path)) args = ['svn', 'ls', '-r', rev, '-R', path] p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) stdout, stderr = p.communicate() if p.returncode: raise Exception('svn ls failed on %s: %r' % (path, stderr)) entries = [e.strip('/') for e in stdout.splitlines()] entries.sort() return entries def commitchanges(self, changes, parent='tip', message='automated test'): """Commit changes to mercurial directory 'changes' is a sequence of tuples (source, dest, data). It can look like: - (source, source, data) to set source content to data - (source, dest, None) to set dest content to source one, and mark it as copied from source. - (source, dest, data) to set dest content to data, and mark it as copied from source. - (source, None, None) to remove source. """ repo = self.repo parentctx = repo[parent] changed, removed = [], [] for source, dest, newdata in changes: if dest is None: removed.append(source) else: changed.append(dest) def filectxfn(repo, memctx, path): if path in removed: raise IOError(errno.ENOENT, "File \"%s\" no longer exists" % path) entry = [e for e in changes if path == e[1]][0] source, dest, newdata = entry if newdata is None: newdata = parentctx[source].data() copied = None if source != dest: copied = source return context.memfilectx(path=dest, data=newdata, islink=False, isexec=False, copied=copied) ctx = context.memctx(repo, (parentctx.node(), node.nullid), message, changed + removed, filectxfn, 'an_author', '2008-10-07 20:59:48 -0500') nodeid = repo.commitctx(ctx) repo = self.repo hg.clean(repo, nodeid) return nodeid def assertchanges(self, changes, ctx): """Assert that all 'changes' (as in defined in commitchanged()) went into ctx. """ for source, dest, data in changes: if dest is None: self.assertTrue(source not in ctx) continue self.assertTrue(dest in ctx) if data is None: data = ctx.parents()[0][source].data() self.assertEqual(ctx[dest].data(), data) if dest != source: copy = ctx[dest].renamed() self.assertEqual(copy[0], source) def assertMultiLineEqual(self, first, second, msg=None): """Assert that two multi-line strings are equal. (Based on Py3k code.) """ self.assert_(isinstance(first, str), ('First argument is not a string')) self.assert_(isinstance(second, str), ('Second argument is not a string')) if first != second: diff = ''.join(difflib.unified_diff(first.splitlines(True), second.splitlines(True), fromfile='a', tofile='b')) msg = '%s\n%s' % (msg or '', diff) raise self.failureException, msg def draw(self, repo): """Helper function displaying a repository graph, especially useful when debugging comprehensive tests. """ # Could be more elegant, but it works with stock hg _ui = ui.ui() _ui.setconfig('extensions', 'graphlog', '') extensions.loadall(_ui) graphlog = extensions.find('graphlog') templ = """\ changeset: {rev}:{node|short} branch: {branches} tags: {tags} summary: {desc|firstline} files: {files} """ graphlog.graphlog(_ui, repo, rev=None, template=templ)