view hgsubversion/verify.py @ 1600:6f5b296c01dd

verify: use ui.write() instead of ui.warn() Looks like ui.warn() is being written to output directly and it's not pushed to buffer which ui objects maintain. Let's use ui.write() instead which is used in the other instance of same error messages too. This fixes test_utility_commands.py which was failing for me on tip of default branch.
author Pulkit Goyal <pulkit@yandex-team.ru>
date Wed, 26 Dec 2018 18:40:12 +0300
parents ae572c9be4e6
children 5d8603f080c5
line wrap: on
line source

import difflib
import posixpath

from mercurial import error
from mercurial import worker

import svnwrap
import svnrepo
import util

def verify(ui, repo, args=None, **opts):
    '''verify current revision against Subversion repository
    '''

    if repo is None:
        raise error.RepoError("There is no Mercurial repository"
                              " here (.hg not found)")

    ctx = repo[opts.get('rev', '.')]
    if 'close' in ctx.extra():
        ui.write('cannot verify closed branch')
        return 0
    convert_revision = ctx.extra().get('convert_revision')
    if convert_revision is None or not convert_revision.startswith('svn:'):
        raise error.Abort('revision %s not from SVN' % ctx)

    if args:
        url = repo.ui.expandpath(args[0])
    else:
        url = repo.ui.expandpath('default')

    svn = svnrepo.svnremoterepo(ui, url).svn
    meta = repo.svnmeta(svn.uuid, svn.subdir)
    srev, branch, branchpath = meta.get_source_rev(ctx=ctx)

    branchpath = branchpath[len(svn.subdir.lstrip('/')):]
    branchurl = ('%s/%s' % (url, branchpath)).strip('/')

    ui.write('verifying %s against %s@%i\n' % (ctx, branchurl, srev))

    def diff_file(path, svndata):
        fctx = ctx[path]

        if ui.verbose and not fctx.isbinary():
            svndesc = '%s/%s/%s@%d' % (svn.svn_url, branchpath, path, srev)
            hgdesc = '%s@%s' % (path, ctx)

            for c in difflib.unified_diff(svndata.splitlines(True),
                                          fctx.data().splitlines(True),
                                          svndesc, hgdesc):
                ui.note(c)

    if opts.get('stupid', ui.configbool('hgsubversion', 'stupid')):
        svnfiles = set()
        result = 0

        hgfiles = set(ctx) - util.ignoredfiles

        def verifydata(svndata):
            svnworker = svnrepo.svnremoterepo(ui, url).svn

            i = 0
            res = True
            for fn, type in svndata:
                i += 1
                if type != 'f':
                    continue

                fp = fn
                if branchpath:
                    fp = branchpath + '/' + fn
                data, mode = svnworker.get_file(posixpath.normpath(fp), srev)
                try:
                    fctx = ctx[fn]
                except error.LookupError:
                    yield i, "%s\0%r" % (fn, res)
                    continue

                if not fctx.data() == data:
                    ui.write('difference in: %s\n' % fn)
                    diff_file(fn, data)
                    res = False
                if not fctx.flags() == mode:
                    ui.write('wrong flags for: %s\n' % fn)
                    res = False
                yield i, "%s\0%r" % (fn, res)

        if url.startswith('file://'):
            perarg = 0.00001
        else:
            perarg = 0.000001

        svndata = svn.list_files(branchpath, srev)
        w = worker.worker(repo.ui, perarg, verifydata, (), tuple(svndata))
        i = 0
        for _, t in w:
            ui.progress('verify', i, total=len(hgfiles))
            i += 1
            fn, ok = t.split('\0', 2)
            if not bool(ok):
                result = 1
            svnfiles.add(fn)

        if hgfiles != svnfiles:
            unexpected = hgfiles - svnfiles
            for f in sorted(unexpected):
                ui.write('unexpected file: %s\n' % f)
            missing = svnfiles - hgfiles
            for f in sorted(missing):
                ui.write('missing file: %s\n' % f)
            result = 1

        ui.progress('verify', None, total=len(hgfiles))

    else:
        class VerifyEditor(svnwrap.Editor):
            """editor that verifies a repository against the given context."""
            def __init__(self, ui, ctx):
                self.ui = ui
                self.ctx = ctx
                self.unexpected = set(ctx) - util.ignoredfiles
                self.missing = set()
                self.failed = False

                self.total = len(self.unexpected)
                self.seen = 0

            def open_root(self, base_revnum, pool=None):
                pass

            def add_directory(self, path, parent_baton, copyfrom_path,
                              copyfrom_revision, pool=None):
                self.file = None
                self.props = None

            def open_directory(self, path, parent_baton, base_revision, pool=None):
                self.file = None
                self.props = None

            def add_file(self, path, parent_baton=None, copyfrom_path=None,
                         copyfrom_revision=None, file_pool=None):

                if path in self.unexpected:
                    self.unexpected.remove(path)
                    self.file = path
                    self.props = {}
                else:
                    self.total += 1
                    self.missing.add(path)
                    self.failed = True
                    self.file = None
                    self.props = None

                self.seen += 1
                self.ui.progress('verify', self.seen, total=self.total)

            def open_file(self, path, base_revnum):
                raise NotImplementedError()

            def apply_textdelta(self, file_baton, base_checksum, pool=None):
                stream = svnwrap.SimpleStringIO(closing=False)
                handler = svnwrap.apply_txdelta('', stream)
                if not callable(handler):
                    raise error.Abort('Error in Subversion bindings: '
                                      'cannot call handler!')
                def txdelt_window(window):
                    handler(window)
                    # window being None means we're done
                    if window:
                        return

                    fctx = self.ctx[self.file]
                    hgdata = fctx.data()
                    svndata = stream.getvalue()

                    if 'svn:executable' in self.props:
                        if fctx.flags() != 'x':
                            self.ui.write('wrong flags for: %s\n' % self.file)
                            self.failed = True
                    elif 'svn:special' in self.props:
                        hgdata = 'link ' + hgdata
                        if fctx.flags() != 'l':
                            self.ui.write('wrong flags for: %s\n' % self.file)
                            self.failed = True
                    elif fctx.flags():
                        self.ui.write('wrong flags for: %s\n' % self.file)
                        self.failed = True

                    if hgdata != svndata:
                        self.ui.write('difference in: %s\n' % self.file)
                        diff_file(self.file, svndata)
                        self.failed = True

                if self.file is not None:
                    return txdelt_window

            def change_dir_prop(self, dir_baton, name, value, pool=None):
                pass

            def change_file_prop(self, file_baton, name, value, pool=None):
                if self.props is not None:
                    self.props[name] = value

            def close_file(self, file_baton, checksum, pool=None):
                pass

            def close_directory(self, dir_baton, pool=None):
                pass

            def delete_entry(self, path, revnum, pool=None):
                raise NotImplementedError()

            def check(self):
                self.ui.progress('verify', None, total=self.total)

                for f in self.unexpected:
                    self.ui.write('unexpected file: %s\n' % f)
                    self.failed = True
                for f in self.missing:
                    self.ui.write('missing file: %s\n' % f)
                    self.failed = True
                return not self.failed

        v = VerifyEditor(ui, ctx)
        svnrepo.svnremoterepo(ui, branchurl).svn.get_revision(srev, v)
        if v.check():
            result = 0
        else:
            result = 1

    return result