Mercurial > hgsubversion
view hgsubversion/svnexternals.py @ 1446:2eba84031a78
RevMap: no longer take a meta
author | Augie Fackler <raf@durin42.com> |
---|---|
date | Mon, 06 Jun 2016 00:51:41 -0400 |
parents | 85981b27e740 |
children | 77da55e0baa4 |
line wrap: on
line source
import cStringIO import os, re, shutil, stat, subprocess from mercurial import util as hgutil from mercurial.i18n import _ from mercurial import subrepo try: from mercurial import scmutil canonpath = scmutil.canonpath except (ImportError, AttributeError): from mercurial import pathutil canonpath = pathutil.canonpath import util class externalsfile(dict): """Map svn directories to lists of externals entries. """ def __init__(self): super(externalsfile, self).__init__() self.encoding = 'utf-8' def __setitem__(self, key, value): if value is None: value = [] elif isinstance(value, basestring): value = value.splitlines() if key == '.': key = '' if not value: if key in self: del self[key] else: super(externalsfile, self).__setitem__(key, value) def write(self): fp = cStringIO.StringIO() for target in sorted(self): lines = self[target] if not lines: continue if not target: target = '.' fp.write('[%s]\n' % target) for l in lines: l = ' ' + l + '\n' fp.write(l) return fp.getvalue() def read(self, data): self.clear() fp = cStringIO.StringIO(data) target = None for line in fp.readlines(): if not line.strip(): continue if line.startswith('['): line = line.strip() if line[-1] != ']': raise hgutil.Abort('invalid externals section name: %s' % line) target = line[1:-1] if target == '.': target = '' elif line.startswith(' '): line = line.rstrip('\n') if target is None or not line: continue self.setdefault(target, []).append(line[1:]) def diff(ext1, ext2): """Compare 2 externalsfile and return a list of tuples like (dir, value1, value2) where value1 is the external value in ext1 for dir or None, and value2 the same in ext2. """ changes = [] for d in ext1: if d not in ext2: changes.append((d, '\n'.join(ext1[d]), None)) elif ext1[d] != ext2[d]: changes.append((d, '\n'.join(ext1[d]), '\n'.join(ext2[d]))) for d in ext2: if d not in ext1: changes.append((d, None, '\n'.join(ext2[d]))) return changes class BadDefinition(Exception): pass re_defold = re.compile(r'^\s*(.*?)\s+(?:-r\s*(\d+|\{REV\})\s+)?([a-zA-Z+]+://.*)\s*$') re_defnew = re.compile(r'^\s*(?:-r\s*(\d+|\{REV\})\s+)?((?:[a-zA-Z+]+://|\^/).*)\s+(\S+)\s*$') re_scheme = re.compile(r'^[a-zA-Z+]+://') def parsedefinition(line): """Parse an external definition line, return a tuple (path, rev, source) or raise BadDefinition. """ # The parsing is probably not correct wrt path with whitespaces or # potential quotes. svn documentation is not really talkative about # these either. pegrev, revgroup = None, 1 m = re_defnew.search(line) if m: rev, source, path = m.group(1, 2, 3) if '@' in source: source, pegrev = source.rsplit('@', 1) else: m = re_defold.search(line) if not m: raise BadDefinition() revgroup = 2 path, rev, source = m.group(1, 2, 3) try: int(rev) # ensure revision is int()able, so we bail otherwise norevline = line[:m.start(revgroup)] + '{REV}' + line[m.end(revgroup):] except (TypeError, ValueError): norevline = line return (path, rev, source, pegrev, norevline) class RelativeSourceError(Exception): pass def resolvedots(url): """ Fix references that include .. entries. Scans a URL for .. type entries and resolves them but will not allow any number of ..s to take us out of domain so http://.. will raise an exception. Tests, (Don't know how to construct a round trip for this so doctest): >>> # Relative URL within servers svn area >>> resolvedots( ... "http://some.svn.server/svn/some_repo/../other_repo") 'http://some.svn.server/svn/other_repo' >>> # Complex One >>> resolvedots( ... "http://some.svn.server/svn/repo/../other/repo/../../other_repo") 'http://some.svn.server/svn/other_repo' >>> # Another Complex One >>> resolvedots( ... "http://some.svn.server/svn/repo/dir/subdir/../../../other_repo/dir") 'http://some.svn.server/svn/other_repo/dir' >>> # Last Complex One - SVN Allows this & seen it used even if it is BAD! >>> resolvedots( ... "http://svn.server/svn/my_repo/dir/subdir/../../other_dir") 'http://svn.server/svn/my_repo/other_dir' >>> # Outside the SVN Area might be OK >>> resolvedots( ... "http://svn.server/svn/some_repo/../../other_svn_repo") 'http://svn.server/other_svn_repo' >>> # Complex One >>> resolvedots( ... "http://some.svn.server/svn/repo/../other/repo/../../other_repo") 'http://some.svn.server/svn/other_repo' >>> # On another server is not a relative URL should give an exception >>> resolvedots( ... "http://some.svn.server/svn/some_repo/../../../other_server") Traceback (most recent call last): ... RelativeSourceError: Relative URL cannot be to another server """ orig = url.split('/') fixed = [] for item in orig: if item != '..': fixed.append(item) elif len(fixed) > 3: # Don't allow things to go out of domain fixed.pop() else: raise RelativeSourceError( 'Relative URL cannot be to another server') return '/'.join(fixed) def resolvesource(ui, svnroot, source): """ Resolve the source as either matching the scheme re or by resolving relative URLs which start with ^ and my include relative .. references. >>> root = 'http://some.svn.server/svn/some_repo' >>> resolvesource(None, root, 'http://other.svn.server') 'http://other.svn.server' >>> resolvesource(None, root, 'ssh://other.svn.server') 'ssh://other.svn.server' >>> resolvesource(None, root, '^/other_repo') 'http://some.svn.server/svn/some_repo/other_repo' >>> resolvesource(None, root, '^/sub_repo') 'http://some.svn.server/svn/some_repo/sub_repo' >>> resolvesource(None, root, '^/../other_repo') 'http://some.svn.server/svn/other_repo' >>> resolvesource(None, root, '^/../../../server/other_repo') Traceback (most recent call last): ... RelativeSourceError: Relative URL cannot be to another server """ if re_scheme.search(source): return source if source.startswith('^/'): if svnroot is None: raise RelativeSourceError() return resolvedots(svnroot + source[1:]) ui.warn(_('ignoring unsupported non-fully qualified external: %r\n' % source)) return None def parsedefinitions(ui, repo, svnroot, exts): """Return (targetdir, revision, source) tuples. Fail if nested targetdirs are detected. source is an svn project URL. """ defs = [] for base in sorted(exts): for line in exts[base]: if not line.strip() or line.lstrip().startswith('#'): # Ignore comments and blank lines continue try: path, rev, source, pegrev, norevline = parsedefinition(line) except BadDefinition: ui.warn(_('ignoring invalid external definition: %r\n' % line)) continue source = resolvesource(ui, svnroot, source) if source is None: continue wpath = hgutil.pconvert(os.path.join(base, path)) wpath = canonpath(repo.root, '', wpath) defs.append((wpath, rev, source, pegrev, norevline, base)) # Check target dirs are not nested defs.sort() for i, d in enumerate(defs): for d2 in defs[i+1:]: if d2[0].startswith(d[0] + '/'): raise hgutil.Abort(_('external directories cannot nest:\n%s\n%s') % (d[0], d2[0])) return defs def computeactions(ui, repo, svnroot, ext1, ext2): def listdefs(data): defs = {} exts = externalsfile() exts.read(data) for d in parsedefinitions(ui, repo, svnroot, exts): defs[d[0]] = d return defs ext1 = listdefs(ext1) ext2 = listdefs(ext2) for wp1 in ext1: if wp1 in ext2: yield 'u', ext2[wp1] else: yield 'd', ext1[wp1] for wp2 in ext2: if wp2 not in ext1: yield 'u', ext2[wp2] def getsvninfo(svnurl): """Return a tuple (url, root) for supplied svn URL or working directory path. """ # Yes, this is ugly, but good enough for now args = ['svn', 'info', '--xml', svnurl] shell = os.name == 'nt' p = subprocess.Popen(args, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) stdout = p.communicate()[0] if p.returncode: raise hgutil.Abort(_('cannot get information about %s') % svnurl) m = re.search(r'<root>(.*)</root>', stdout, re.S) if not m: raise hgutil.Abort(_('cannot find SVN repository root from %s') % svnurl) root = m.group(1).rstrip('/') m = re.search(r'<url>(.*)</url>', stdout, re.S) if not m: raise hgutil.Abort(_('cannot find SVN repository URL from %s') % svnurl) url = m.group(1) m = re.search(r'<entry[^>]+revision="([^"]+)"', stdout, re.S) if not m: raise hgutil.Abort(_('cannot find SVN revision from %s') % svnurl) rev = m.group(1) return url, root, rev class externalsupdater: def __init__(self, ui, repo): self.repo = repo self.ui = ui def update(self, wpath, rev, source, pegrev): path = self.repo.wjoin(wpath) revspec = [] if rev: revspec = ['-r', rev] if os.path.isdir(path): exturl, _extroot, extrev = getsvninfo(path) # Comparing the source paths is not enough, but I don't # know how to compare path+pegrev. The following update # might fail if the path was replaced by another unrelated # one. It can be fixed manually by deleting the externals # and updating again. if source == exturl: if extrev != rev: self.ui.status(_('updating external on %s@%s\n') % (wpath, rev or 'HEAD')) cwd = os.path.join(self.repo.root, path) self.svn(['update'] + revspec, cwd) return self.delete(wpath) cwd, dest = os.path.split(path) cwd = os.path.join(self.repo.root, cwd) if not os.path.isdir(cwd): os.makedirs(cwd) if not pegrev and rev: pegrev = rev if pegrev: source = '%s@%s' % (source, pegrev) self.ui.status(_('fetching external %s@%s\n') % (wpath, rev or 'HEAD')) self.svn(['co'] + revspec + [source, dest], cwd) def delete(self, wpath): path = self.repo.wjoin(wpath) if os.path.isdir(path): self.ui.status(_('removing external %s\n') % wpath) def onerror(function, path, excinfo): if function is not os.remove: raise # read-only files cannot be unlinked under Windows s = os.stat(path) if (s.st_mode & stat.S_IWRITE) != 0: raise os.chmod(path, stat.S_IMODE(s.st_mode) | stat.S_IWRITE) os.remove(path) shutil.rmtree(path, onerror=onerror) return 1 def svn(self, args, cwd): args = ['svn'] + args self.ui.debug(_('updating externals: %r, cwd=%s\n') % (args, cwd)) shell = os.name == 'nt' p = subprocess.Popen(args, cwd=cwd, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) for line in p.stdout: self.ui.note(line) p.wait() if p.returncode != 0: raise hgutil.Abort("subprocess '%s' failed" % ' '.join(args)) def updateexternals(ui, args, repo, **opts): """update repository externals """ if len(args) > 2: raise hgutil.Abort(_('updateexternals expects at most one changeset')) node = None if len(args) == 2: svnurl = util.normalize_url(repo.ui.expandpath(args[0])) args = args[1:] else: svnurl = util.normalize_url(repo.ui.expandpath('default')) if args: node = args[0] svnroot = getsvninfo(svnurl)[1] # Retrieve current externals status try: oldext = file(repo.join('svn/externals'), 'rb').read() except IOError: oldext = '' newext = '' ctx = repo[node] if '.hgsvnexternals' in ctx: newext = ctx['.hgsvnexternals'].data() updater = externalsupdater(ui, repo) actions = computeactions(ui, repo, svnroot, oldext, newext) for action, ext in actions: if action == 'u': updater.update(ext[0], ext[1], ext[2], ext[3]) elif action == 'd': updater.delete(ext[0]) else: raise hgutil.Abort(_('unknown update actions: %r') % action) file(repo.join('svn/externals'), 'wb').write(newext) def getchanges(ui, repo, parentctx, exts): """Take a parent changectx and the new externals definitions as an externalsfile and return a dictionary mapping the special file hgsubversion needs for externals bookkeeping, to their new content as raw bytes or None if the file has to be removed. """ mode = ui.config('hgsubversion', 'externals', 'svnexternals') if mode == 'svnexternals': files = { '.hgsvnexternals': None, } if exts: files['.hgsvnexternals'] = exts.write() elif mode == 'subrepos': # XXX: clobering the subrepos files is good enough for now files = { '.hgsub': None, '.hgsubstate': None, } if exts: defs = parsedefinitions(ui, repo, '', exts) hgsub, hgsubstate = [], [] for path, rev, _source, _pegrev, norevline, base in sorted(defs): hgsub.append('%s = [hgsubversion] %s:%s\n' % (path, base, norevline)) if rev is None: rev = 'HEAD' hgsubstate.append('%s %s\n' % (rev, path)) files['.hgsub'] = ''.join(hgsub) files['.hgsubstate'] = ''.join(hgsubstate) elif mode == 'ignore': files = {} else: raise hgutil.Abort(_('unknown externals modes: %s') % mode) # Should the really be updated? updates = {} for fn, data in files.iteritems(): if data is not None: if fn not in parentctx or parentctx[fn].data() != data: updates[fn] = data else: if fn in parentctx: updates[fn] = None return updates def parse(ui, ctx): """Return the externals definitions stored in ctx as a (possibly empty) externalsfile(). """ external = externalsfile() mode = ui.config('hgsubversion', 'externals', 'svnexternals') if mode == 'svnexternals': if '.hgsvnexternals' in ctx: external.read(ctx['.hgsvnexternals'].data()) elif mode == 'subrepos': for path in ctx.substate: src, rev = ctx.substate[path][:2] base, norevline = src.split(':', 1) base = base.strip() if rev is None: rev = 'HEAD' line = norevline.replace('{REV}', rev) external.setdefault(base, []).append(line) elif mode == 'ignore': pass else: raise hgutil.Abort(_('unknown externals modes: %s') % mode) return external _notset = object() class svnsubrepo(subrepo.svnsubrepo): def __init__(self, ctx, path, state, allowcreate=_notset): state = (state[0].split(':', 1)[1], state[1]) if allowcreate is _notset: # Mercurial 3.7 and earlier super(svnsubrepo, self).__init__(ctx, path, state) else: # Mercurial 3.8 and later super(svnsubrepo, self).__init__(ctx, path, state, allowcreate) # Mercurial 3.3+ set 'ui' rather than '_ui' -- set that and use 'ui' # everywhere to maintain compatibility across versions if not hgutil.safehasattr(self, 'ui'): self.ui = ctx._repo.ui def get(self, state, *args, **kwargs): # Resolve source first line = state[0].split(':', 1)[1] source, pegrev = parsedefinition(line)[2:4] try: # Getting the root SVN repository URL is expensive. # Assume the externals is absolute. source = resolvesource(self.ui, None, source) except RelativeSourceError: svnurl = self._ctx._repo.ui.expandpath('default') svnroot = getsvninfo(util.normalize_url(svnurl))[1] source = resolvesource(self.ui, svnroot, source) # hg 1.9 and higher, append the rev as a peg revision to # the source URL, so we cannot add our own. We assume # that "-r10 url@2" will be similar to "url@10" most of # the time. state = (source, state[1]) return super(svnsubrepo, self).get(state, *args, **kwargs) def dirty(self, ignoreupdate=False): # You cannot compare anything with HEAD. Just accept it # can be anything. if hgutil.safehasattr(self, '_wcrevs'): wcrevs = self._wcrevs() else: wcrev = self._wcrev() wcrevs = (wcrev, wcrev) if (('HEAD' in wcrevs or self._state[1] == 'HEAD' or self._state[1] in wcrevs or ignoreupdate) and not self._wcchanged()[0]): return False return True def commit(self, text, user, date): rev = super(svnsubrepo, self).commit(text, user, date) # Keep unversioned externals unversioned if self._state[1] == 'HEAD': rev = 'HEAD' return rev def basestate(self): # basestate() was introduced by bcb973abcc0b in 2.2 if self._state[1] == 'HEAD': return 'HEAD' return super(svnsubrepo, self).basestate() if __name__ == "__main__": import doctest doctest.testmod()