view push_cmd.py @ 16:48a44546c12f

Add a basic system for running the hgsubversion tests (although not the svnwrap ones) without requiring Nose. Nose is still the recommended way to run the tests. Also added some tests for pushing.
author Augie Fackler <durin42@gmail.com>
date Tue, 07 Oct 2008 22:13:14 -0500
parents c5039390332f
children b3c7b844b782
line wrap: on
line source

from mercurial import util as merc_util
from mercurial import hg
from svn import core

import util
import hg_delta_editor
import svnwrap
import fetch_command
import utility_commands


@util.register_subcommand('push')
@util.register_subcommand('dcommit') # for git expats
def push_revisions_to_subversion(ui, repo, hg_repo_path, svn_url, **opts):
    """Push revisions starting at a specified head back to Subversion.
    """
    hge = hg_delta_editor.HgChangeReceiver(hg_repo_path,
                                           ui_=ui)
    svn_commit_hashes = dict(zip(hge.revmap.itervalues(),
                                 hge.revmap.iterkeys()))
    # Strategy:
    # 1. Find all outgoing commits from this head
    outgoing = utility_commands.outgoing_revisions(ui, repo, hge,
                                                   svn_commit_hashes)
    if not (outgoing and len(outgoing)):
        ui.status('No revisions to push.')
        return 0
    if len(repo.parents()) != 1:
        ui.status('Cowardly refusing to push branch merge')
        return 1
    while outgoing:
        oldest = outgoing.pop(-1)
        old_ctx = repo[oldest]
        if len(old_ctx.parents()) != 1:
            ui.status('Found a branch merge, this needs discussion and '
                      'implementation.')
            return 1
        base_n = old_ctx.parents()[0].node()
        old_children = repo[base_n].children()
        # 2. Commit oldest revision that needs to be pushed
        base_revision = svn_commit_hashes[old_ctx.parents()[0].node()][0]
        commit_from_rev(ui, repo, old_ctx, hge, svn_url, base_revision)
        # 3. Fetch revisions from svn
        r = fetch_command.fetch_revisions(ui, svn_url, hg_repo_path)
        assert not r or r == 0
        # 4. Find the new head of the target branch
        repo = hg.repository(ui, hge.path)
        base_c = repo[base_n]
        replacement = [c for c in base_c.children() if c not in old_children
                       and c.branch() == old_ctx.branch()]
        assert len(replacement) == 1
        replacement = replacement[0]
        # 5. Rebase all children of the currently-pushing rev to the new branch
        heads = repo.heads(old_ctx.node())
        for needs_transplant in heads:
            hg.clean(repo, needs_transplant)
            utility_commands.rebase_commits(ui, repo, hg_repo_path, **opts)
            repo = hg.repository(ui, hge.path)
            if needs_transplant in outgoing:
                hg.clean(repo, repo['tip'].node())
                hge = hg_delta_editor.HgChangeReceiver(hg_repo_path, ui_=ui)
                svn_commit_hashes = dict(zip(hge.revmap.itervalues(),
                                             hge.revmap.iterkeys()))
                outgoing = utility_commands.outgoing_revisions(ui, repo, hge,
                                                              svn_commit_hashes)
    return 0

class PrefixMatch(object):
    def __init__(self, prefix):
        self.p = prefix
    
    def files(self):
        return []
    
    def __call__(self, fn):
        return fn.startswith(self.p)

def commit_from_rev(ui, repo, rev_ctx, hg_editor, svn_url, base_revision):
    """Build and send a commit from Mercurial to Subversion.
    """
    target_files = []
    file_data = {}
    svn = svnwrap.SubversionRepo(svn_url, username=merc_util.getuser())
    parent = rev_ctx.parents()[0]
    parent_branch = rev_ctx.parents()[0].branch()
    branch_path = 'trunk'

    if parent_branch and parent_branch != 'default':
        branch_path = 'branches/%s' % parent_branch

    added_dirs = []
    props = {}
    for file in rev_ctx.files():
        new_data = base_data = ''
        action = ''
        if file in rev_ctx:
            new_data = rev_ctx.filectx(file).data()

            if 'x' in rev_ctx.filectx(file).flags():
                props.setdefault(file, {})['svn:executable'] = '*'
            if 'l' in rev_ctx.filectx(file).flags():
                props.setdefault(file, {})['svn:special'] = '*'

            if file not in parent:
                target_files.append(file)
                action = 'add'
                dirname = '/'.join(file.split('/')[:-1] + [''])
                # check for new directories
                if not list(parent.walk(PrefixMatch(dirname))):
                    # check and see if the dir exists svn-side.
                    try:
                        assert svn.list_dir('%s/%s' % (branch_path, dirname))
                    except core.SubversionException, e:
                        # dir must not exist
                        added_dirs.append(dirname[:-1])
            else:
                target_files.append(file)
                base_data = parent.filectx(file).data()
                if 'x' in parent.filectx(file).flags():
                    if 'svn:executable' in props.setdefault(file, {}):
                        del props[file]['svn:executable']
                    else:
                        props.setdefault(file, {})['svn:executable'] = None
                if 'l' in parent.filectx(file).flags():
                    if props.setdefault(file, {})['svn:special']:
                        del props[file]['svn:special']
                    else:
                        props.setdefault(file, {})['svn:special'] = None
                action = 'modify'
        else:
            target_files.append(file)
            base_data = parent.filectx(file).data()
            action = 'delete'
        file_data[file] = base_data, new_data, action

    # TODO check for directory deletes here
    new_target_files = ['%s/%s' % (branch_path, f) for f in target_files]
    for tf, ntf in zip(target_files, new_target_files):
        if tf in file_data:
            file_data[ntf] = file_data[tf]
            if tf in props:
                props[ntf] = props[tf]
                del props[tf]
            if merc_util.binary(file_data[ntf][1]):
                props.setdefault(ntf, {}).update(props.get(ntf, {}))
                props.setdefault(ntf, {})['svn:mime-type'] = 'application/octet-stream'
            del file_data[tf]
    added_dirs = ['%s/%s' % (branch_path, f) for f in added_dirs]
    new_target_files += added_dirs
    try:
        svn.commit(new_target_files, rev_ctx.description(), file_data,
                   base_revision, set(added_dirs), props)
    except core.SubversionException, e:
        if hasattr(e, 'apr_err') and e.apr_err == 160028:
            raise merc_util.Abort('Base text was out of date, maybe rebase?')
        else:
            raise