Mercurial > hgsubversion
view tests/test_push_command.py @ 1229:46523cdfd3b0 stable 1.6.3
pushmod: prepend "link " to base text for links
http://svn.apache.org/viewvc?view=revision&revision=1223036 exposes
what is arguably a bug in hgsubversion push code. Specifically, when
we are receiving text from the server in an editor, we prepend a "link
" to the text of symlinks when opening a file and strip it when
closing a file. We don't, however, prepend "link " to the base we use
when sending text changes to the server.
This was working before because prior to that revision, the first
thing subversion did was to check whether the entirety of the before
text or the entirety of the after text was less than 64 bytes. In
that case, it just sent the entirety of the after text as a single
insert operation. I'd expect most, but not all symlinks to fit under
the 64 byte limit, including the leading "link " text on the
subversion end.
After the change, the first thing subversion does is check for a
leading match that is more than 4 bytes long, or that is the full
length of the after text. In this case, it sends a copy operation for
the leading match, and then goes into the if < 64 bytes remaining send
the whole thing behavior. It also looks for trailing matches of more
than 4 bytes even in the <64 byte case, but that's not what breaks the
tests.
Incidentally, changing the destination of long symlinks was broken
even before this subversion change. This diff includes test additions
that cover that breakage.
author | David Schleimer <dschleimer@gmail.com> |
---|---|
date | Thu, 07 Aug 2014 19:30:26 -0700 |
parents | afd047a7df45 |
children | 5c2917375961 |
line wrap: on
line source
import test_util import atexit import errno import os import sys import random import shutil import socket import subprocess import unittest from mercurial import context from mercurial import commands from mercurial import hg from mercurial import node from mercurial import revlog from mercurial import util as hgutil from hgsubversion import util from hgsubversion import compathacks import time class PushTests(test_util.TestBase): obsolete_mode_tests = True def setUp(self): test_util.TestBase.setUp(self) self.repo_path = self.load_and_fetch('simple_branch.svndump')[1] def test_cant_push_empty_ctx(self): repo = self.repo def file_callback(repo, memctx, path): if path == 'adding_file': return compathacks.makememfilectx(repo, path=path, data='foo', islink=False, isexec=False, copied=False) raise IOError() ctx = context.memctx(repo, (repo['default'].node(), node.nullid), 'automated test', [], file_callback, 'an_author', '2008-10-07 20:59:48 -0500', {'branch': 'default', }) new_hash = repo.commitctx(ctx) hg.update(repo, repo['tip'].node()) old_tip = repo['tip'].node() self.pushrevisions() tip = self.repo['tip'] self.assertEqual(tip.node(), old_tip) def test_push_add_of_added_upstream_gives_sane_error(self): repo = self.repo def file_callback(repo, memctx, path): if path == 'adding_file': return compathacks.makememfilectx(repo, path=path, data='foo', islink=False, isexec=False, copied=False) raise IOError() p1 = repo['default'].node() ctx = context.memctx(repo, (p1, node.nullid), 'automated test', ['adding_file'], file_callback, 'an_author', '2008-10-07 20:59:48 -0500', {'branch': 'default', }) new_hash = repo.commitctx(ctx) hg.update(repo, repo['tip'].node()) old_tip = repo['tip'].node() self.pushrevisions() tip = self.repo['tip'] self.assertNotEqual(tip.node(), old_tip) # This node adds the same file as the first one we added, and # will be refused by the server for adding a file that already # exists. We should respond with an error suggesting the user # rebase. ctx = context.memctx(repo, (p1, node.nullid), 'automated test', ['adding_file'], file_callback, 'an_author', '2008-10-07 20:59:48 -0500', {'branch': 'default', }) new_hash = repo.commitctx(ctx) hg.update(repo, repo['tip'].node()) old_tip = repo['tip'].node() try: self.pushrevisions() except hgutil.Abort, e: assert "pull again and rebase" in str(e) tip = self.repo['tip'] self.assertEqual(tip.node(), old_tip) def test_cant_push_with_changes(self): repo = self.repo def file_callback(repo, memctx, path): return compathacks.makememfilectx(repo, path=path, data='foo', islink=False, isexec=False, copied=False) ctx = context.memctx(repo, (repo['default'].node(), node.nullid), 'automated test', ['adding_file'], file_callback, 'an_author', '2008-10-07 20:59:48 -0500', {'branch': 'default', }) new_hash = repo.commitctx(ctx) hg.update(repo, repo['tip'].node()) # Touch an existing file repo.wwrite('beta', 'something else', '') try: self.pushrevisions() except hgutil.Abort: pass tip = self.repo['tip'] self.assertEqual(new_hash, tip.node()) def internal_push_over_svnserve(self, subdir='', commit=True): repo_path = self.load_svndump('simple_branch.svndump') open(os.path.join(repo_path, 'conf', 'svnserve.conf'), 'w').write('[general]\nanon-access=write\n[sasl]\n') self.port = random.randint(socket.IPPORT_USERRESERVED, 65535) self.host = 'localhost' args = ['svnserve', '--daemon', '--foreground', '--listen-port=%d' % self.port, '--listen-host=%s' % self.host, '--root=%s' % repo_path] svnserve = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) self.svnserve_pid = svnserve.pid try: time.sleep(2) import shutil shutil.rmtree(self.wc_path) commands.clone(self.ui(), 'svn://%s:%d/%s' % (self.host, self.port, subdir), self.wc_path, noupdate=True) repo = self.repo old_tip = repo['tip'].node() expected_parent = repo['default'].node() def file_callback(repo, memctx, path): if path == 'adding_file': return compathacks.makememfilectx(repo, path=path, data='foo', islink=False, isexec=False, copied=False) raise IOError(errno.EINVAL, 'Invalid operation: ' + path) ctx = context.memctx(repo, parents=(repo['default'].node(), node.nullid), text='automated test', files=['adding_file'], filectxfn=file_callback, user='an_author', date='2008-10-07 20:59:48 -0500', extra={'branch': 'default', }) new_hash = repo.commitctx(ctx) if not commit: return # some tests use this test as an extended setup. hg.update(repo, repo['tip'].node()) oldauthor = repo['tip'].user() commands.push(repo.ui, repo) tip = self.repo['tip'] self.assertNotEqual(oldauthor, tip.user()) self.assertNotEqual(tip.node(), old_tip) self.assertEqual(tip.parents()[0].node(), expected_parent) self.assertEqual(tip['adding_file'].data(), 'foo') self.assertEqual(tip.branch(), 'default') # unintended behaviour: self.assertNotEqual('an_author', tip.user()) self.assertEqual('(no author)', tip.user().rsplit('@', 1)[0]) finally: if sys.version_info >= (2,6): svnserve.kill() else: test_util.kill_process(svnserve) def test_push_over_svnserve(self): self.internal_push_over_svnserve() def test_push_over_svnserve_with_subdir(self): self.internal_push_over_svnserve(subdir='///branches////the_branch/////') def test_push_to_default(self, commit=True): repo = self.repo old_tip = repo['tip'].node() expected_parent = repo['default'].node() def file_callback(repo, memctx, path): if path == 'adding_file': return compathacks.makememfilectx(repo, path=path, data='foo', islink=False, isexec=False, copied=False) raise IOError(errno.EINVAL, 'Invalid operation: ' + path) ctx = context.memctx(repo, (repo['default'].node(), node.nullid), 'automated test', ['adding_file'], file_callback, 'an_author', '2008-10-07 20:59:48 -0500', {'branch': 'default', }) new_hash = repo.commitctx(ctx) if not commit: return # some tests use this test as an extended setup. hg.update(repo, repo['tip'].node()) self.pushrevisions() tip = self.repo['tip'] self.assertNotEqual(tip.node(), old_tip) self.assertEqual(node.hex(tip.parents()[0].node()), node.hex(expected_parent)) self.assertEqual(tip['adding_file'].data(), 'foo') self.assertEqual(tip.branch(), 'default') def test_push_two_revs_different_local_branch(self): def filectxfn(repo, memctx, path): return compathacks.makememfilectx(repo, path=path, data=path, islink=False, isexec=False, copied=False) oldtiphash = self.repo['default'].node() ctx = context.memctx(self.repo, (self.repo[0].node(), revlog.nullid,), 'automated test', ['gamma', ], filectxfn, 'testy', '2008-12-21 16:32:00 -0500', {'branch': 'localbranch', }) newhash = self.repo.commitctx(ctx) ctx = context.memctx(self.repo, (newhash, revlog.nullid), 'automated test2', ['delta', ], filectxfn, 'testy', '2008-12-21 16:32:00 -0500', {'branch': 'localbranch', }) newhash = self.repo.commitctx(ctx) repo = self.repo hg.update(repo, newhash) commands.push(repo.ui, repo) self.assertEqual(self.repo['tip'].parents()[0].parents()[0].node(), oldtiphash) self.assertEqual(self.repo['tip'].files(), ['delta', ]) self.assertEqual(self.repo['tip'].manifest().keys(), ['alpha', 'beta', 'gamma', 'delta']) def test_push_two_revs(self): # set up some work for us self.test_push_to_default(commit=False) repo = self.repo old_tip = repo['tip'].node() expected_parent = repo['tip'].parents()[0].node() def file_callback(repo, memctx, path): if path == 'adding_file2': return compathacks.makememfilectx(repo, path=path, data='foo2', islink=False, isexec=False, copied=False) raise IOError(errno.EINVAL, 'Invalid operation: ' + path) ctx = context.memctx(repo, (repo['default'].node(), node.nullid), 'automated test', ['adding_file2'], file_callback, 'an_author', '2008-10-07 20:59:48 -0500', {'branch': 'default', }) new_hash = repo.commitctx(ctx) hg.update(repo, repo['tip'].node()) self.pushrevisions() tip = self.repo['tip'] self.assertNotEqual(tip.node(), old_tip) self.assertNotEqual(tip.parents()[0].node(), old_tip) self.assertEqual(tip.parents()[0].parents()[0].node(), expected_parent) self.assertEqual(tip['adding_file2'].data(), 'foo2') self.assertEqual(tip['adding_file'].data(), 'foo') self.assertEqual(tip.parents()[0]['adding_file'].data(), 'foo') try: self.assertEqual(tip.parents()[0]['adding_file2'].data(), 'foo') assert False, "this is impossible, adding_file2 should not be in this manifest." except revlog.LookupError, e: pass self.assertEqual(tip.branch(), 'default') def test_push_to_branch(self, push=True): repo = self.repo def file_callback(repo, memctx, path): if path == 'adding_file': return compathacks.makememfilectx(repo, path=path, data='foo', islink=False, isexec=False, copied=False) raise IOError(errno.EINVAL, 'Invalid operation: ' + path) ctx = context.memctx(repo, (repo['the_branch'].node(), node.nullid), 'automated test', ['adding_file'], file_callback, 'an_author', '2008-10-07 20:59:48 -0500', {'branch': 'the_branch', }) new_hash = repo.commitctx(ctx) hg.update(repo, repo['tip'].node()) if push: self.pushrevisions() tip = self.repo['tip'] self.assertNotEqual(tip.node(), new_hash) self.assertEqual(tip['adding_file'].data(), 'foo') self.assertEqual(tip.branch(), 'the_branch') def test_push_to_non_tip(self): self.test_push_to_branch(push=False) wc2path = self.wc_path + '_clone' u = self.repo.ui test_util.hgclone(self.repo.ui, self.wc_path, wc2path, update=False) res = self.pushrevisions() self.assertEqual(0, res) oldf = open(os.path.join(self.wc_path, '.hg', 'hgrc')) hgrc = oldf.read() oldf.close() shutil.rmtree(self.wc_path) test_util.hgclone(u, wc2path, self.wc_path, update=False) oldf = open(os.path.join(self.wc_path, '.hg', 'hgrc'), 'w') oldf.write(hgrc) oldf.close() # do a commit here self.commitchanges([('foobaz', 'foobaz', 'This file is added on default.',), ], parent='default', message='commit to default') from hgsubversion import svncommands svncommands.rebuildmeta(u, self.repo, args=[test_util.fileurl(self.repo_path)]) hg.update(self.repo, self.repo['tip'].node()) oldnode = self.repo['tip'].hex() self.pushrevisions(expected_extra_back=1) self.assertNotEqual(oldnode, self.repo['tip'].hex(), 'Revision was not pushed.') def test_delete_file(self): repo = self.repo def file_callback(repo, memctx, path): raise IOError(errno.ENOENT, '%s is deleted' % path) old_files = set(repo['default'].manifest().keys()) ctx = context.memctx(repo, (repo['default'].node(), node.nullid), 'automated test', ['alpha'], file_callback, 'an author', '2008-10-29 21:26:00 -0500', {'branch': 'default', }) new_hash = repo.commitctx(ctx) hg.update(repo, repo['tip'].node()) self.pushrevisions() tip = self.repo['tip'] self.assertEqual(old_files, set(tip.manifest().keys() + ['alpha'])) self.assert_('alpha' not in tip.manifest()) def test_push_executable_file(self): self.test_push_to_default(commit=True) repo = self.repo def file_callback(repo, memctx, path): if path == 'gamma': return compathacks.makememfilectx(repo, path=path, data='foo', islink=False, isexec=True, copied=False) raise IOError(errno.EINVAL, 'Invalid operation: ' + path) ctx = context.memctx(repo, (repo['tip'].node(), node.nullid), 'message', ['gamma', ], file_callback, 'author', '2008-10-29 21:26:00 -0500', {'branch': 'default', }) new_hash = repo.commitctx(ctx) hg.clean(repo, repo['tip'].node()) self.pushrevisions() tip = self.repo['tip'] self.assertNotEqual(tip.node(), new_hash) self.assert_('@' in self.repo['tip'].user()) self.assertEqual(tip['gamma'].flags(), 'x') self.assertEqual(tip['gamma'].data(), 'foo') self.assertEqual([x for x in tip.manifest().keys() if 'x' not in tip[x].flags()], ['alpha', 'beta', 'adding_file', ]) def test_push_symlink_file(self): self.test_push_to_default(commit=True) repo = self.repo def file_callback(repo, memctx, path): if path == 'gamma': return compathacks.makememfilectx(repo, path=path, data='foo', islink=True, isexec=False, copied=False) raise IOError(errno.EINVAL, 'Invalid operation: ' + path) ctx = context.memctx(repo, (repo['tip'].node(), node.nullid), 'message', ['gamma', ], file_callback, 'author', '2008-10-29 21:26:00 -0500', {'branch': 'default', }) new_hash = repo.commitctx(ctx) hg.update(repo, repo['tip'].node()) self.pushrevisions() # grab a new repo instance (self.repo is an @property functions) repo = self.repo tip = repo['tip'] self.assertNotEqual(tip.node(), new_hash) self.assertEqual(tip['gamma'].flags(), 'l') self.assertEqual(tip['gamma'].data(), 'foo') self.assertEqual([x for x in tip.manifest().keys() if 'l' not in tip[x].flags()], ['alpha', 'beta', 'adding_file', ]) def file_callback2(repo, memctx, path): if path == 'gamma': return compathacks.makememfilectx(repo, path=path, data='a' * 129, islink=True, isexec=False, copied=False) raise IOError(errno.EINVAL, 'Invalid operation: ' + path) ctx = context.memctx(repo, (repo['tip'].node(), node.nullid), 'message', ['gamma', ], file_callback2, 'author', '2014-08-08 20:11:41 -0700', {'branch': 'default', }) repo.commitctx(ctx) hg.update(repo, repo['tip'].node()) self.pushrevisions() # grab a new repo instance (self.repo is an @property functions) repo = self.repo tip = repo['tip'] self.assertEqual(tip['gamma'].flags(), 'l') self.assertEqual(tip['gamma'].data(), 'a'*129) def file_callback3(repo, memctx, path): if path == 'gamma': return compathacks.makememfilectx(repo, path=path, data='a' * 64 + 'b' * 65, islink=True, isexec=False, copied=False) raise IOError(errno.EINVAL, 'Invalid operation: ' + path) ctx = context.memctx(repo, (repo['tip'].node(), node.nullid), 'message', ['gamma', ], file_callback3, 'author', '2014-08-08 20:16:25 -0700', {'branch': 'default', }) repo.commitctx(ctx) hg.update(repo, repo['tip'].node()) self.pushrevisions() repo = self.repo tip = repo['tip'] self.assertEqual(tip['gamma'].flags(), 'l') self.assertEqual(tip['gamma'].data(), 'a' * 64 + 'b' * 65) def test_push_existing_file_newly_symlink(self): self.test_push_existing_file_newly_execute(execute=False, link=True, expected_flags='l') def test_push_existing_file_newly_execute(self, execute=True, link=False, expected_flags='x'): self.test_push_to_default() repo = self.repo def file_callback(repo, memctx, path): return compathacks.makememfilectx(repo, path=path, data='foo', islink=link, isexec=execute, copied=False) ctx = context.memctx(repo, (repo['default'].node(), node.nullid), 'message', ['alpha', ], file_callback, 'author', '2008-1-1 00:00:00 -0500', {'branch': 'default', }) new_hash = repo.commitctx(ctx) hg.update(repo, repo['tip'].node()) self.pushrevisions() tip = self.repo['tip'] self.assertNotEqual(tip.node(), new_hash) self.assertEqual(tip['alpha'].data(), 'foo') self.assertEqual(tip.parents()[0]['alpha'].flags(), '') self.assertEqual(tip['alpha'].flags(), expected_flags) # while we're here, double check pushing an already-executable file # works repo = self.repo def file_callback2(repo, memctx, path): return compathacks.makememfilectx(repo, path=path, data='bar', islink=link, isexec=execute, copied=False) ctx = context.memctx(repo, (repo['default'].node(), node.nullid), 'message', ['alpha', ], file_callback2, 'author', '2008-1-1 00:00:00 -0500', {'branch': 'default', }) new_hash = repo.commitctx(ctx) hg.update(repo, repo['tip'].node()) self.pushrevisions() tip = self.repo['tip'] self.assertNotEqual(tip.node(), new_hash) self.assertEqual(tip['alpha'].data(), 'bar') self.assertEqual(tip.parents()[0]['alpha'].flags(), expected_flags) self.assertEqual(tip['alpha'].flags(), expected_flags) # now test removing the property entirely repo = self.repo def file_callback3(repo, memctx, path): return compathacks.makememfilectx(repo, path=path, data='bar', islink=False, isexec=False, copied=False) ctx = context.memctx(repo, (repo['default'].node(), node.nullid), 'message', ['alpha', ], file_callback3, 'author', '2008-01-01 00:00:00 -0500', {'branch': 'default', }) new_hash = repo.commitctx(ctx) hg.update(repo, repo['tip'].node()) self.pushrevisions() tip = self.repo['tip'] self.assertNotEqual(tip.node(), new_hash) self.assertEqual(tip['alpha'].data(), 'bar') self.assertEqual(tip.parents()[0]['alpha'].flags(), expected_flags) self.assertEqual(tip['alpha'].flags(), '') def test_push_outdated_base_text(self): self.test_push_two_revs() changes = [('adding_file', 'adding_file', 'different_content',), ] par = self.repo['tip'].rev() self.commitchanges(changes, parent=par) self.pushrevisions() changes = [('adding_file', 'adding_file', 'even_more different_content',), ] self.commitchanges(changes, parent=par) try: self.pushrevisions() assert False, 'This should have aborted!' except hgutil.Abort, e: self.assertEqual(e.args[0], 'Outgoing changesets parent is not at subversion ' 'HEAD\n' '(pull again and rebase on a newer revision)') # verify that any pending transactions on the server got cleaned up self.assertEqual([], os.listdir( os.path.join(self.tmpdir, 'testrepo-1', 'db', 'transactions'))) def test_push_encoding(self): self.test_push_two_revs() # Writing then rebasing UTF-8 filenames in a cp1252 windows console # used to fail because hg internal encoding was being changed during # the interactions with subversion, *and during the rebase*, which # confused the dirstate and made it believe the file was deleted. fn = 'pi\xc3\xa8ce/test' changes = [(fn, fn, 'a')] par = self.repo['tip'].rev() self.commitchanges(changes, parent=par) self.pushrevisions() def test_push_emptying_changeset(self): r = self.repo['tip'] changes = [ ('alpha', None, None), ('beta', None, None), ] parent = self.repo['tip'].rev() self.commitchanges(changes, parent=parent) self.pushrevisions() self.assertEqual({}, self.repo['tip'].manifest()) # Try to re-add a file after emptying the branch changes = [ ('alpha', 'alpha', 'alpha'), ] self.commitchanges(changes, parent=self.repo['tip'].rev()) self.pushrevisions() self.assertEqual(['alpha'], list(self.repo['tip'].manifest())) def test_push_without_pushing_children(self): ''' Verify that a push of a nontip node, keeps the tip child on top of the pushed commit. ''' oldlen = test_util.repolen(self.repo) oldtiphash = self.repo['default'].node() changes = [('gamma', 'gamma', 'sometext')] newhash1 = self.commitchanges(changes) changes = [('delta', 'delta', 'sometext')] newhash2 = self.commitchanges(changes) # push only the first commit repo = self.repo hg.update(repo, newhash1) commands.push(repo.ui, repo) self.assertEqual(test_util.repolen(self.repo), oldlen + 2) # verify that the first commit is pushed, and the second is not commit2 = self.repo['tip'] self.assertEqual(commit2.files(), ['delta', ]) self.assertEqual(util.getsvnrev(commit2), None) commit1 = commit2.parents()[0] self.assertEqual(commit1.files(), ['gamma', ]) prefix = 'svn:' + self.repo.svnmeta().uuid self.assertEqual(util.getsvnrev(commit1), prefix + '/branches/the_branch@5') def test_push_two_that_modify_same_file(self): ''' Push performs a rebase if two commits touch the same file. This test verifies that code path works. ''' oldlen = test_util.repolen(self.repo) oldtiphash = self.repo['default'].node() changes = [('gamma', 'gamma', 'sometext')] newhash = self.commitchanges(changes) changes = [('gamma', 'gamma', 'sometext\n moretext'), ('delta', 'delta', 'sometext\n moretext'), ] newhash = self.commitchanges(changes) repo = self.repo hg.update(repo, newhash) commands.push(repo.ui, repo) self.assertEqual(test_util.repolen(self.repo), oldlen + 2) # verify that both commits are pushed commit1 = self.repo['tip'] self.assertEqual(commit1.files(), ['delta', 'gamma']) prefix = 'svn:' + self.repo.svnmeta().uuid self.assertEqual(util.getsvnrev(commit1), prefix + '/branches/the_branch@6') commit2 = commit1.parents()[0] self.assertEqual(commit2.files(), ['gamma']) self.assertEqual(util.getsvnrev(commit2), prefix + '/branches/the_branch@5') def test_push_in_subdir(self, commit=True): repo = self.repo old_tip = repo['tip'].node() def file_callback(repo, memctx, path): if path == 'adding_file' or path == 'newdir/new_file': testData = 'fooFirstFile' if path == 'newdir/new_file': testData = 'fooNewFile' return compathacks.makememfilectx(repo, path=path, data=testData, islink=False, isexec=False, copied=False) raise IOError(errno.EINVAL, 'Invalid operation: ' + path) ctx = context.memctx(repo, (repo['default'].node(), node.nullid), 'automated test', ['adding_file'], file_callback, 'an_author', '2012-12-13 20:59:48 -0500', {'branch': 'default', }) new_hash = repo.commitctx(ctx) p = os.path.join(repo.root, "newdir") os.mkdir(p) ctx = context.memctx(repo, (repo['default'].node(), node.nullid), 'automated test', ['newdir/new_file'], file_callback, 'an_author', '2012-12-13 20:59:48 -0500', {'branch': 'default', }) os.chdir(p) new_hash = repo.commitctx(ctx) hg.update(repo, repo['tip'].node()) self.pushrevisions() tip = self.repo['tip'] self.assertNotEqual(tip.node(), old_tip) self.assertEqual(p, os.getcwd()) self.assertEqual(tip['adding_file'].data(), 'fooFirstFile') self.assertEqual(tip['newdir/new_file'].data(), 'fooNewFile') self.assertEqual(tip.branch(), 'default')