changeset 291:ba8e91a7c077

Add 'updateexternals' to synchronize externals with remote repo. To synchronize definitions in working copy .hgexternals with remote svn repository: $ hg svn updateexternals To synchronize them with .hgexternals at revision REV: $ hg svn updateexternals REV Last synchronized externals referenced are stored in .hg/svn/externals (a dump of the synchronized .hgexternals).
author Patrick Mezard <pmezard@gmail.com>
date Wed, 22 Apr 2009 23:24:58 +0200
parents 153266401676
children ce676eff002b b6a9cdee2f68
files __init__.py svncommands.py svnexternals.py tests/test_externals.py
diffstat 4 files changed, 254 insertions(+), 3 deletions(-) [+]
line wrap: on
line diff
--- a/__init__.py
+++ b/__init__.py
@@ -26,6 +26,7 @@ import svncommands
 import tag_repo
 import util
 import wrappers
+import svnexternals
 
 def reposetup(ui, repo):
     if not util.is_svn_repo(repo):
--- a/svncommands.py
+++ b/svncommands.py
@@ -10,6 +10,7 @@ import hg_delta_editor
 import svnwrap
 import util
 import utility_commands
+import svnexternals
 
 
 def incoming(ui, svn_url, hg_repo_path, skipto_rev=0, stupid=None,
@@ -228,6 +229,7 @@ table = {
     'help': help,
     'rebuildmeta': rebuildmeta,
     'incoming': incoming,
+    'updateexternals': svnexternals.updateexternals,
 }
 
 table.update(utility_commands.table)
--- a/svnexternals.py
+++ b/svnexternals.py
@@ -1,6 +1,8 @@
 import cStringIO
 
+import os, re, shutil, stat, subprocess
 from mercurial import util as hgutil
+from mercurial.i18n import _
 
 class externalsfile(dict):
     """Map svn directories to lists of externals entries.
@@ -56,7 +58,7 @@ class externalsfile(dict):
                 if target is None or not line:
                     continue
                 self.setdefault(target, []).append(line[1:])
-            
+
 def diff(ext1, ext2):
     """Compare 2 externalsfile and yield tuples like (dir, value1, value2)
     where value1 is the external value in ext1 for dir or None, and
@@ -70,3 +72,203 @@ def diff(ext1, ext2):
     for d in ext2:
         if d not in ext1:
             yield d, None, '\n'.join(ext2[d])
+
+class BadDefinition(Exception):
+    pass
+
+re_defold = re.compile(r'^(.*?)\s+(?:-r\s*(\d+)\s+)?([a-zA-Z]+://.*)$')
+re_defnew = re.compile(r'^(?:-r\s*(\d+)\s+)?((?:[a-zA-Z]+://|\^/).*)\s+(.*)$')
+re_pegrev = re.compile(r'^(.*)@(\d+)$')
+re_scheme = re.compile(r'^[a-zA-Z]+://')
+
+def parsedefinition(line):
+    """Parse an external definition line, return a tuple (path, rev, source)
+    or raise BadDefinition.
+    """
+    # The parsing is probably not correct wrt path with whitespaces or
+    # potential quotes. svn documentation is not really talkative about
+    # these either.
+    line = line.strip()
+    m = re_defnew.search(line)
+    if m:
+        rev, source, path = m.group(1, 2, 3)
+    else:
+        m = re_defold.search(line)
+        if not m:
+            raise BadDefinition()
+        path, rev, source = m.group(1, 2, 3)
+    # Look for peg revisions
+    m = re_pegrev.search(source)
+    if m:
+        source, rev = m.group(1, 2)
+    return (path, rev, source)
+
+def parsedefinitions(ui, repo, svnroot, exts):
+    """Return (targetdir, revision, source) tuples. Fail if nested
+    targetdirs are detected. source is an svn project URL.
+    """
+    defs = []
+    for base in sorted(exts):
+        for line in exts[base]:
+            try:
+                path, rev, source = parsedefinition(line)
+            except BadDefinition:
+                ui.warn(_('ignoring invalid external definition: %r' % line))
+                continue
+            if re_scheme.search(source):
+                pass
+            elif source.startswith('^/'):
+                source = svnroot + source[1:]
+            else:
+                ui.warn(_('ignoring unsupported non-fully qualified external: %r' % source))
+                continue
+            wpath = hgutil.pconvert(os.path.join(base, path))
+            wpath = hgutil.canonpath(repo.root, '', wpath)
+            defs.append((wpath, rev, source))
+    # Check target dirs are not nested
+    defs.sort()
+    for i, d in enumerate(defs):
+        for d2 in defs[i+1:]:
+            if d2[0].startswith(d[0] + '/'):
+                raise hgutil.Abort(_('external directories cannot nest:\n%s\n%s')
+                                   % (d[0], d2[0]))
+    return defs
+
+def computeactions(ui, repo, svnroot, ext1, ext2):
+
+    def listdefs(data):
+        defs = {}
+        exts = externalsfile()
+        exts.read(data)
+        for d in parsedefinitions(ui, repo, svnroot, exts):
+            defs[d[0]] = d
+        return defs
+
+    ext1 = listdefs(ext1)
+    ext2 = listdefs(ext2)
+    for wp1 in ext1:
+        if wp1 in ext2:
+            yield 'u', ext2[wp1]
+        else:
+            yield 'd', ext1[wp1]
+    for wp2 in ext2:
+        if wp2 not in ext1:
+            yield 'u', ext2[wp2]
+
+def getsvninfo(svnurl):
+    """Return a tuple (url, root) for supplied svn URL or working
+    directory path.
+    """
+    # Yes, this is ugly, but good enough for now
+    args = ['svn', 'info', '--xml', svnurl]
+    shell = os.name == 'nt'
+    p = subprocess.Popen(args, stdout=subprocess.PIPE, shell=shell)
+    stdout = p.communicate()[0]
+    if p.returncode:
+        raise hgutil.Abort(_('cannot get information about %s')
+                         % svnurl)
+    m = re.search(r'<root>(.*)</root>', stdout, re.S)
+    if not m:
+        raise hgutil.Abort(_('cannot find SVN repository root from %s')
+                           % svnurl)
+    root = m.group(1).rstrip('/')
+
+    m = re.search(r'<url>(.*)</url>', stdout, re.S)
+    if not m:
+        raise hgutil.Abort(_('cannot find SVN repository URL from %s') % svnurl)
+    url = m.group(1)
+
+    m = re.search(r'<entry[^>]+revision="([^"]+)"', stdout, re.S)
+    if not m:
+        raise hgutil.Abort(_('cannot find SVN revision from %s') % svnurl)
+    rev = m.group(1)
+    return url, root, rev
+
+class externalsupdater:
+    def __init__(self, ui, repo):
+        self.repo = repo
+        self.ui = ui
+
+    def update(self, wpath, rev, source):
+        path = self.repo.wjoin(wpath)
+        revspec = []
+        if rev:
+            revspec = ['-r', rev]
+        if os.path.isdir(path):
+            exturl, extroot, extrev = getsvninfo(path)
+            if source == exturl:
+                if extrev != rev:
+                    self.ui.status(_('updating external on %s@%s\n') %
+                                   (wpath, rev or 'HEAD'))
+                    cwd = os.path.join(self.repo.root, path)
+                    self.svn(['update'] + revspec, cwd)
+                return
+            self.delete(wpath)
+        cwd, dest = os.path.split(path)
+        cwd = os.path.join(self.repo.root, cwd)
+        if not os.path.isdir(cwd):
+            os.makedirs(cwd)
+        self.ui.status(_('fetching external %s@%s\n') % (wpath, rev or 'HEAD'))
+        self.svn(['co'] + revspec + [source, dest], cwd)
+
+    def delete(self, wpath):
+        path = self.repo.wjoin(wpath)
+        if os.path.isdir(path):
+            self.ui.status(_('removing external %s\n') % wpath)
+
+            def onerror(function, path, excinfo):
+                if function is not os.remove:
+                    raise
+                # read-only files cannot be unlinked under Windows
+                s = os.stat(path)
+                if (s.st_mode & stat.S_IWRITE) != 0:
+                    raise
+                os.chmod(path, stat.S_IMODE(s.st_mode) | stat.S_IWRITE)
+                os.remove(path)
+
+            shutil.rmtree(path, onerror=onerror)
+            return 1
+
+    def svn(self, args, cwd):
+        args = ['svn'] + args
+        self.ui.debug(_('updating externals: %r, cwd=%s\n') % (args, cwd))
+        shell = os.name == 'nt'
+        subprocess.check_call(args, cwd=cwd, shell=shell)
+
+def updateexternals(ui, args, repo, **opts):
+    """update repository externals
+    """
+    if len(args) > 1:
+        raise hgutil.Abort(_('updateexternals expects at most one changeset'))
+    node = None
+    if args:
+        node = args[0]
+
+    try:
+        svnurl = file(repo.join('svn/url'), 'rb').read()
+    except:
+        raise hgutil.Abort(_('failed to retrieve original svn URL'))
+    svnroot = getsvninfo(svnurl)[1]
+
+    # Retrieve current externals status
+    try:
+        oldext = file(repo.join('svn/externals'), 'rb').read()
+    except IOError:
+        oldext = ''
+    newext = ''
+    ctx = repo[node]
+    if '.hgsvnexternals' in ctx:
+        newext = ctx['.hgsvnexternals'].data()
+
+    updater = externalsupdater(ui, repo)
+    actions = computeactions(ui, repo, svnroot, oldext, newext)
+    for action, ext in actions:
+        if action == 'u':
+            updater.update(ext[0], ext[1], ext[2])
+        elif action == 'd':
+            updater.delete(ext[0])
+        else:
+            raise hgutil.Abort(_('unknown update actions: %r') % action)
+
+    file(repo.join('svn/externals'), 'wb').write(newext)
+
--- a/tests/test_externals.py
+++ b/tests/test_externals.py
@@ -1,9 +1,10 @@
-import unittest
+import os, unittest
+
+from mercurial import commands
 
 import svnexternals
 import test_util
 
-
 class TestFetchExternals(test_util.TestBase):
     def test_externalsfile(self):
         f = svnexternals.externalsfile()
@@ -28,6 +29,28 @@ class TestFetchExternals(test_util.TestB
         for t in f:
             self.assertEqual(f[t], f2[t])
 
+    def test_parsedefinitions(self):
+        # Taken from svn book
+        samples = [
+            ('third-party/sounds             http://svn.example.com/repos/sounds',
+             ('third-party/sounds', None, 'http://svn.example.com/repos/sounds')),
+            ('third-party/skins -r148        http://svn.example.com/skinproj',
+             ('third-party/skins', '148', 'http://svn.example.com/skinproj')),
+            ('third-party/skins -r 148        http://svn.example.com/skinproj',
+             ('third-party/skins', '148', 'http://svn.example.com/skinproj')),
+            ('http://svn.example.com/repos/sounds third-party/sounds',
+             ('third-party/sounds', None, 'http://svn.example.com/repos/sounds')),
+            ('-r148 http://svn.example.com/skinproj third-party/skins',
+             ('third-party/skins', '148', 'http://svn.example.com/skinproj')),
+            ('-r 148 http://svn.example.com/skinproj third-party/skins',
+             ('third-party/skins', '148', 'http://svn.example.com/skinproj')),
+            ('http://svn.example.com/skin-maker@21 third-party/skins/toolkit',
+             ('third-party/skins/toolkit', '21', 'http://svn.example.com/skin-maker')),
+            ]
+        
+        for line, expected in samples:
+            self.assertEqual(expected, svnexternals.parsedefinition(line))
+
     def test_externals(self, stupid=False):
         repo = self._load_fixture_and_fetch('externals.svndump', stupid=stupid)
 
@@ -78,6 +101,29 @@ class TestFetchExternals(test_util.TestB
     def test_externals_stupid(self):
         self.test_externals(True)
 
+    def test_updateexternals(self):
+        def checkdeps(deps, nodeps, repo, rev=None):
+            svnexternals.updateexternals(ui, [rev], repo)
+            for d in deps:
+                p = os.path.join(repo.root, d)
+                self.assertTrue(os.path.isdir(p), 
+                                'missing: %s@%r' % (d, rev))
+            for d in nodeps:
+                p = os.path.join(repo.root, d)
+                self.assertTrue(not os.path.isdir(p),
+                                'unexpected: %s@%r' % (d, rev))
+
+        ui = test_util.ui.ui()
+        repo = self._load_fixture_and_fetch('externals.svndump', stupid=0)
+        commands.update(ui, repo)
+        checkdeps(['deps/project1'], [], repo, 0)
+        checkdeps(['deps/project1', 'deps/project2'], [], repo, 1)
+        checkdeps(['subdir/deps/project1', 'subdir2/deps/project1', 
+                   'deps/project2'], 
+                  ['deps/project1'], repo, 2)
+        checkdeps(['subdir/deps/project1', 'deps/project2'], 
+                  ['subdir2/deps/project1'], repo, 3)
+        checkdeps(['subdir/deps/project1'], ['deps/project2'], repo, 4)
 
 class TestPushExternals(test_util.TestBase):
     def setUp(self):