# HG changeset patch # User Patrick Mezard # Date 1240435498 -7200 # Node ID ba8e91a7c077b88645a7a5680060decee17e457b # Parent 15326640167664b630a676ede8ba6f6345840ea0 Add 'updateexternals' to synchronize externals with remote repo. To synchronize definitions in working copy .hgexternals with remote svn repository: $ hg svn updateexternals To synchronize them with .hgexternals at revision REV: $ hg svn updateexternals REV Last synchronized externals referenced are stored in .hg/svn/externals (a dump of the synchronized .hgexternals). diff --git a/__init__.py b/__init__.py --- a/__init__.py +++ b/__init__.py @@ -26,6 +26,7 @@ import svncommands import tag_repo import util import wrappers +import svnexternals def reposetup(ui, repo): if not util.is_svn_repo(repo): diff --git a/svncommands.py b/svncommands.py --- a/svncommands.py +++ b/svncommands.py @@ -10,6 +10,7 @@ import hg_delta_editor import svnwrap import util import utility_commands +import svnexternals def incoming(ui, svn_url, hg_repo_path, skipto_rev=0, stupid=None, @@ -228,6 +229,7 @@ table = { 'help': help, 'rebuildmeta': rebuildmeta, 'incoming': incoming, + 'updateexternals': svnexternals.updateexternals, } table.update(utility_commands.table) diff --git a/svnexternals.py b/svnexternals.py --- a/svnexternals.py +++ b/svnexternals.py @@ -1,6 +1,8 @@ import cStringIO +import os, re, shutil, stat, subprocess from mercurial import util as hgutil +from mercurial.i18n import _ class externalsfile(dict): """Map svn directories to lists of externals entries. @@ -56,7 +58,7 @@ class externalsfile(dict): if target is None or not line: continue self.setdefault(target, []).append(line[1:]) - + def diff(ext1, ext2): """Compare 2 externalsfile and yield tuples like (dir, value1, value2) where value1 is the external value in ext1 for dir or None, and @@ -70,3 +72,203 @@ def diff(ext1, ext2): for d in ext2: if d not in ext1: yield d, None, '\n'.join(ext2[d]) + +class BadDefinition(Exception): + pass + +re_defold = re.compile(r'^(.*?)\s+(?:-r\s*(\d+)\s+)?([a-zA-Z]+://.*)$') +re_defnew = re.compile(r'^(?:-r\s*(\d+)\s+)?((?:[a-zA-Z]+://|\^/).*)\s+(.*)$') +re_pegrev = re.compile(r'^(.*)@(\d+)$') +re_scheme = re.compile(r'^[a-zA-Z]+://') + +def parsedefinition(line): + """Parse an external definition line, return a tuple (path, rev, source) + or raise BadDefinition. + """ + # The parsing is probably not correct wrt path with whitespaces or + # potential quotes. svn documentation is not really talkative about + # these either. + line = line.strip() + m = re_defnew.search(line) + if m: + rev, source, path = m.group(1, 2, 3) + else: + m = re_defold.search(line) + if not m: + raise BadDefinition() + path, rev, source = m.group(1, 2, 3) + # Look for peg revisions + m = re_pegrev.search(source) + if m: + source, rev = m.group(1, 2) + return (path, rev, source) + +def parsedefinitions(ui, repo, svnroot, exts): + """Return (targetdir, revision, source) tuples. Fail if nested + targetdirs are detected. source is an svn project URL. + """ + defs = [] + for base in sorted(exts): + for line in exts[base]: + try: + path, rev, source = parsedefinition(line) + except BadDefinition: + ui.warn(_('ignoring invalid external definition: %r' % line)) + continue + if re_scheme.search(source): + pass + elif source.startswith('^/'): + source = svnroot + source[1:] + else: + ui.warn(_('ignoring unsupported non-fully qualified external: %r' % source)) + continue + wpath = hgutil.pconvert(os.path.join(base, path)) + wpath = hgutil.canonpath(repo.root, '', wpath) + defs.append((wpath, rev, source)) + # Check target dirs are not nested + defs.sort() + for i, d in enumerate(defs): + for d2 in defs[i+1:]: + if d2[0].startswith(d[0] + '/'): + raise hgutil.Abort(_('external directories cannot nest:\n%s\n%s') + % (d[0], d2[0])) + return defs + +def computeactions(ui, repo, svnroot, ext1, ext2): + + def listdefs(data): + defs = {} + exts = externalsfile() + exts.read(data) + for d in parsedefinitions(ui, repo, svnroot, exts): + defs[d[0]] = d + return defs + + ext1 = listdefs(ext1) + ext2 = listdefs(ext2) + for wp1 in ext1: + if wp1 in ext2: + yield 'u', ext2[wp1] + else: + yield 'd', ext1[wp1] + for wp2 in ext2: + if wp2 not in ext1: + yield 'u', ext2[wp2] + +def getsvninfo(svnurl): + """Return a tuple (url, root) for supplied svn URL or working + directory path. + """ + # Yes, this is ugly, but good enough for now + args = ['svn', 'info', '--xml', svnurl] + shell = os.name == 'nt' + p = subprocess.Popen(args, stdout=subprocess.PIPE, shell=shell) + stdout = p.communicate()[0] + if p.returncode: + raise hgutil.Abort(_('cannot get information about %s') + % svnurl) + m = re.search(r'(.*)', stdout, re.S) + if not m: + raise hgutil.Abort(_('cannot find SVN repository root from %s') + % svnurl) + root = m.group(1).rstrip('/') + + m = re.search(r'(.*)', stdout, re.S) + if not m: + raise hgutil.Abort(_('cannot find SVN repository URL from %s') % svnurl) + url = m.group(1) + + m = re.search(r']+revision="([^"]+)"', stdout, re.S) + if not m: + raise hgutil.Abort(_('cannot find SVN revision from %s') % svnurl) + rev = m.group(1) + return url, root, rev + +class externalsupdater: + def __init__(self, ui, repo): + self.repo = repo + self.ui = ui + + def update(self, wpath, rev, source): + path = self.repo.wjoin(wpath) + revspec = [] + if rev: + revspec = ['-r', rev] + if os.path.isdir(path): + exturl, extroot, extrev = getsvninfo(path) + if source == exturl: + if extrev != rev: + self.ui.status(_('updating external on %s@%s\n') % + (wpath, rev or 'HEAD')) + cwd = os.path.join(self.repo.root, path) + self.svn(['update'] + revspec, cwd) + return + self.delete(wpath) + cwd, dest = os.path.split(path) + cwd = os.path.join(self.repo.root, cwd) + if not os.path.isdir(cwd): + os.makedirs(cwd) + self.ui.status(_('fetching external %s@%s\n') % (wpath, rev or 'HEAD')) + self.svn(['co'] + revspec + [source, dest], cwd) + + def delete(self, wpath): + path = self.repo.wjoin(wpath) + if os.path.isdir(path): + self.ui.status(_('removing external %s\n') % wpath) + + def onerror(function, path, excinfo): + if function is not os.remove: + raise + # read-only files cannot be unlinked under Windows + s = os.stat(path) + if (s.st_mode & stat.S_IWRITE) != 0: + raise + os.chmod(path, stat.S_IMODE(s.st_mode) | stat.S_IWRITE) + os.remove(path) + + shutil.rmtree(path, onerror=onerror) + return 1 + + def svn(self, args, cwd): + args = ['svn'] + args + self.ui.debug(_('updating externals: %r, cwd=%s\n') % (args, cwd)) + shell = os.name == 'nt' + subprocess.check_call(args, cwd=cwd, shell=shell) + +def updateexternals(ui, args, repo, **opts): + """update repository externals + """ + if len(args) > 1: + raise hgutil.Abort(_('updateexternals expects at most one changeset')) + node = None + if args: + node = args[0] + + try: + svnurl = file(repo.join('svn/url'), 'rb').read() + except: + raise hgutil.Abort(_('failed to retrieve original svn URL')) + svnroot = getsvninfo(svnurl)[1] + + # Retrieve current externals status + try: + oldext = file(repo.join('svn/externals'), 'rb').read() + except IOError: + oldext = '' + newext = '' + ctx = repo[node] + if '.hgsvnexternals' in ctx: + newext = ctx['.hgsvnexternals'].data() + + updater = externalsupdater(ui, repo) + actions = computeactions(ui, repo, svnroot, oldext, newext) + for action, ext in actions: + if action == 'u': + updater.update(ext[0], ext[1], ext[2]) + elif action == 'd': + updater.delete(ext[0]) + else: + raise hgutil.Abort(_('unknown update actions: %r') % action) + + file(repo.join('svn/externals'), 'wb').write(newext) + diff --git a/tests/test_externals.py b/tests/test_externals.py --- a/tests/test_externals.py +++ b/tests/test_externals.py @@ -1,9 +1,10 @@ -import unittest +import os, unittest + +from mercurial import commands import svnexternals import test_util - class TestFetchExternals(test_util.TestBase): def test_externalsfile(self): f = svnexternals.externalsfile() @@ -28,6 +29,28 @@ class TestFetchExternals(test_util.TestB for t in f: self.assertEqual(f[t], f2[t]) + def test_parsedefinitions(self): + # Taken from svn book + samples = [ + ('third-party/sounds http://svn.example.com/repos/sounds', + ('third-party/sounds', None, 'http://svn.example.com/repos/sounds')), + ('third-party/skins -r148 http://svn.example.com/skinproj', + ('third-party/skins', '148', 'http://svn.example.com/skinproj')), + ('third-party/skins -r 148 http://svn.example.com/skinproj', + ('third-party/skins', '148', 'http://svn.example.com/skinproj')), + ('http://svn.example.com/repos/sounds third-party/sounds', + ('third-party/sounds', None, 'http://svn.example.com/repos/sounds')), + ('-r148 http://svn.example.com/skinproj third-party/skins', + ('third-party/skins', '148', 'http://svn.example.com/skinproj')), + ('-r 148 http://svn.example.com/skinproj third-party/skins', + ('third-party/skins', '148', 'http://svn.example.com/skinproj')), + ('http://svn.example.com/skin-maker@21 third-party/skins/toolkit', + ('third-party/skins/toolkit', '21', 'http://svn.example.com/skin-maker')), + ] + + for line, expected in samples: + self.assertEqual(expected, svnexternals.parsedefinition(line)) + def test_externals(self, stupid=False): repo = self._load_fixture_and_fetch('externals.svndump', stupid=stupid) @@ -78,6 +101,29 @@ class TestFetchExternals(test_util.TestB def test_externals_stupid(self): self.test_externals(True) + def test_updateexternals(self): + def checkdeps(deps, nodeps, repo, rev=None): + svnexternals.updateexternals(ui, [rev], repo) + for d in deps: + p = os.path.join(repo.root, d) + self.assertTrue(os.path.isdir(p), + 'missing: %s@%r' % (d, rev)) + for d in nodeps: + p = os.path.join(repo.root, d) + self.assertTrue(not os.path.isdir(p), + 'unexpected: %s@%r' % (d, rev)) + + ui = test_util.ui.ui() + repo = self._load_fixture_and_fetch('externals.svndump', stupid=0) + commands.update(ui, repo) + checkdeps(['deps/project1'], [], repo, 0) + checkdeps(['deps/project1', 'deps/project2'], [], repo, 1) + checkdeps(['subdir/deps/project1', 'subdir2/deps/project1', + 'deps/project2'], + ['deps/project1'], repo, 2) + checkdeps(['subdir/deps/project1', 'deps/project2'], + ['subdir2/deps/project1'], repo, 3) + checkdeps(['subdir/deps/project1'], ['deps/project2'], repo, 4) class TestPushExternals(test_util.TestBase): def setUp(self):