# HG changeset patch # User Patrick Mezard # Date 1226275715 21600 # Node ID 072010a271c6b3fd566fd3ccbaf1fb5125615670 # Parent ed3dd5bf45daf6ea3c9edfeedd6331b4f6a01ded Fix basic issues with tests on Windows - shutil.rmtree() fails if there are any read-only files (svn store) - Fix files:// URLs - os.spawnvp()/Popen4() do not exist under Windows, use subprocess diff --git a/tests/test_fetch_command.py b/tests/test_fetch_command.py --- a/tests/test_fetch_command.py +++ b/tests/test_fetch_command.py @@ -1,5 +1,4 @@ import os -import shutil import tempfile import unittest @@ -19,7 +18,7 @@ class TestBasicRepoLayout(unittest.TestC self.wc_path = '%s/testrepo_wc' % self.tmpdir def tearDown(self): - shutil.rmtree(self.tmpdir) + test_util.rmtree(self.tmpdir) os.chdir(self.oldwd) def _load_fixture_and_fetch(self, fixture_name): @@ -129,16 +128,14 @@ class TestStupidPull(unittest.TestCase): self.wc_path = '%s/testrepo_wc' % self.tmpdir def tearDown(self): - shutil.rmtree(self.tmpdir) + test_util.rmtree(self.tmpdir) os.chdir(self.oldwd) def test_stupid(self): - test_util.load_svndump_fixture(self.repo_path, 'two_heads.svndump') - fetch_command.fetch_revisions(ui.ui(), - svn_url='file://%s' % self.repo_path, - hg_repo_path=self.wc_path, - stupid=True) - repo = hg.repository(ui.ui(), self.wc_path) + repo = test_util.load_fixture_and_fetch('two_heads.svndump', + self.repo_path, + self.wc_path, + True) # TODO there must be a better way than repo[0] for this check self.assertEqual(node.hex(repo[0].node()), 'a47d0ce778660a91c31bf2c21c448e9ee296ac90') @@ -153,12 +150,11 @@ class TestStupidPull(unittest.TestCase): self.assertEqual(len(repo.heads()), 2) def test_oldest_not_trunk_and_tag_vendor_branch(self): - test_util.load_svndump_fixture(self.repo_path, - 'tagged_vendor_and_oldest_not_trunk.svndump') - fetch_command.fetch_revisions(ui.ui(), - svn_url='file://%s' % self.repo_path, - hg_repo_path=self.wc_path, - stupid=True) + repo = test_util.load_fixture_and_fetch( + 'tagged_vendor_and_oldest_not_trunk.svndump', + self.repo_path, + self.wc_path, + True) repo = hg.repository(ui.ui(), self.wc_path) self.assertEqual(node.hex(repo['oldest'].node()), 'd73002bcdeffe389a8df81ee43303d36e79e8ca4') diff --git a/tests/test_fetch_renames.py b/tests/test_fetch_renames.py --- a/tests/test_fetch_renames.py +++ b/tests/test_fetch_renames.py @@ -20,7 +20,7 @@ class TestFetchRenames(unittest.TestCase self.wc_path = '%s/testrepo_wc' % self.tmpdir def tearDown(self): - shutil.rmtree(self.tmpdir) + test_util.rmtree(self.tmpdir) os.chdir(self.oldwd) def _load_fixture_and_fetch(self, fixture_name, stupid): diff --git a/tests/test_push_command.py b/tests/test_push_command.py --- a/tests/test_push_command.py +++ b/tests/test_push_command.py @@ -1,6 +1,6 @@ import os -import shutil import socket +import subprocess import tempfile import unittest @@ -41,14 +41,14 @@ class PushOverSvnserveTests(unittest.Tes assert False, ('You appear to be running your own svnserve!' ' You can probably ignore this test failure.') args = ['svnserve', '-d', '--foreground', '-r', self.repo_path] - self.svnserve_pid = os.spawnvp(os.P_NOWAIT, 'svnserve', args) + self.svnserve_pid = subprocess.Popen(args).pid time.sleep(2) fetch_command.fetch_revisions(ui.ui(), svn_url='svn://localhost/', hg_repo_path=self.wc_path) def tearDown(self): - shutil.rmtree(self.tmpdir) + test_util.rmtree(self.tmpdir) os.chdir(self.oldwd) os.system('kill -9 %d' % self.svnserve_pid) @@ -97,18 +97,22 @@ class PushTests(unittest.TestCase): self.tmpdir = tempfile.mkdtemp('svnwrap_test') self.repo_path = '%s/testrepo' % self.tmpdir self.wc_path = '%s/testrepo_wc' % self.tmpdir - test_util.load_svndump_fixture(self.repo_path, 'simple_branch.svndump') - fetch_command.fetch_revisions(ui.ui(), - svn_url='file://%s' % self.repo_path, - hg_repo_path=self.wc_path) + test_util.load_fixture_and_fetch('simple_branch.svndump', + self.repo_path, + self.wc_path) # define this as a property so that it reloads anytime we need it @property def repo(self): return hg.repository(ui.ui(), self.wc_path) + def pushrevisions(self): + push_cmd.push_revisions_to_subversion( + ui.ui(), repo=self.repo, hg_repo_path=self.wc_path, + svn_url=test_util.fileurl(self.repo_path)) + def tearDown(self): - shutil.rmtree(self.tmpdir) + test_util.rmtree(self.tmpdir) os.chdir(self.oldwd) def test_push_to_default(self, commit=True): @@ -135,9 +139,7 @@ class PushTests(unittest.TestCase): if not commit: return # some tests use this test as an extended setup. hg.update(repo, repo['tip'].node()) - push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo, - hg_repo_path=self.wc_path, - svn_url='file://'+self.repo_path) + self.pushrevisions() tip = self.repo['tip'] self.assertNotEqual(tip.node(), old_tip) self.assertEqual(tip.parents()[0].node(), expected_parent) @@ -168,9 +170,7 @@ class PushTests(unittest.TestCase): {'branch': 'default',}) new_hash = repo.commitctx(ctx) hg.update(repo, repo['tip'].node()) - push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo, - hg_repo_path=self.wc_path, - svn_url='file://'+self.repo_path) + self.pushrevisions() tip = self.repo['tip'] self.assertNotEqual(tip.node(), old_tip) self.assertNotEqual(tip.parents()[0].node(), old_tip) @@ -206,9 +206,7 @@ class PushTests(unittest.TestCase): new_hash = repo.commitctx(ctx) #commands.update(ui.ui(), self.repo, node='tip') hg.update(repo, repo['tip'].node()) - push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo, - hg_repo_path=self.wc_path, - svn_url='file://'+self.repo_path) + self.pushrevisions() tip = self.repo['tip'] self.assertNotEqual(tip.node(), new_hash) self.assertEqual(tip['adding_file'].data(), 'foo') @@ -229,9 +227,7 @@ class PushTests(unittest.TestCase): {'branch': 'default', }) new_hash = repo.commitctx(ctx) hg.update(repo, repo['tip'].node()) - push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo, - hg_repo_path=self.wc_path, - svn_url='file://' + self.repo_path) + self.pushrevisions() tip = self.repo['tip'] self.assertEqual(old_files, set(tip.manifest().keys() + ['alpha'])) @@ -258,9 +254,7 @@ class PushTests(unittest.TestCase): {'branch': 'default', }) new_hash = repo.commitctx(ctx) hg.update(repo, repo['tip'].node()) - push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo, - hg_repo_path=self.wc_path, - svn_url='file://' + self.repo_path) + self.pushrevisions() tip = self.repo['tip'] self.assertNotEqual(tip.node(), new_hash) self.assert_('@' in tip.user()) @@ -290,9 +284,7 @@ class PushTests(unittest.TestCase): {'branch': 'default', }) new_hash = repo.commitctx(ctx) hg.update(repo, repo['tip'].node()) - push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo, - hg_repo_path=self.wc_path, - svn_url='file://' + self.repo_path) + self.pushrevisions() tip = self.repo['tip'] self.assertNotEqual(tip.node(), new_hash) self.assertEqual(tip['gamma'].flags(), 'l') @@ -321,9 +313,7 @@ class PushTests(unittest.TestCase): {'branch': 'default', }) new_hash = repo.commitctx(ctx) hg.update(repo, repo['tip'].node()) - push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo, - hg_repo_path=self.wc_path, - svn_url='file://' + self.repo_path) + self.pushrevisions() tip = self.repo['tip'] self.assertNotEqual(tip.node(), new_hash) self.assertEqual(tip['newdir/gamma'].data(), 'foo') @@ -382,9 +372,7 @@ class PushTests(unittest.TestCase): {'branch': 'default', }) new_hash = repo.commitctx(ctx) hg.update(repo, repo['tip'].node()) - push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo, - hg_repo_path=self.wc_path, - svn_url='file://' + self.repo_path) + self.pushrevisions() tip = self.repo['tip'] self.assertNotEqual(tip.node(), new_hash) self.assertEqual(tip['alpha'].data(), 'foo') @@ -409,9 +397,7 @@ class PushTests(unittest.TestCase): {'branch': 'default', }) new_hash = repo.commitctx(ctx) hg.update(repo, repo['tip'].node()) - push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo, - hg_repo_path=self.wc_path, - svn_url='file://' + self.repo_path) + self.pushrevisions() tip = self.repo['tip'] self.assertNotEqual(tip.node(), new_hash) self.assertEqual(tip['alpha'].data(), 'bar') @@ -435,9 +421,7 @@ class PushTests(unittest.TestCase): {'branch': 'default', }) new_hash = repo.commitctx(ctx) hg.update(repo, repo['tip'].node()) - push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo, - hg_repo_path=self.wc_path, - svn_url='file://' + self.repo_path) + self.pushrevisions() tip = self.repo['tip'] self.assertNotEqual(tip.node(), new_hash) self.assertEqual(tip['alpha'].data(), 'bar') diff --git a/tests/test_push_renames.py b/tests/test_push_renames.py --- a/tests/test_push_renames.py +++ b/tests/test_push_renames.py @@ -1,5 +1,4 @@ import os -import shutil import sys import tempfile import unittest @@ -21,10 +20,10 @@ class TestPushRenames(unittest.TestCase) self.tmpdir = tempfile.mkdtemp('svnwrap_test') self.repo_path = '%s/testrepo' % self.tmpdir self.wc_path = '%s/testrepo_wc' % self.tmpdir - test_util.load_svndump_fixture(self.repo_path, 'pushrenames.svndump') - fetch_command.fetch_revisions(ui.ui(), - svn_url='file://%s' % self.repo_path, - hg_repo_path=self.wc_path) + test_util.load_fixture_and_fetch('pushrenames.svndump', + self.repo_path, + self.wc_path, + True) # define this as a property so that it reloads anytime we need it @property @@ -32,7 +31,7 @@ class TestPushRenames(unittest.TestCase) return hg.repository(ui.ui(), self.wc_path) def tearDown(self): - shutil.rmtree(self.tmpdir) + test_util.rmtree(self.tmpdir) os.chdir(self.oldwd) def _commitchanges(self, repo, changes): @@ -118,9 +117,9 @@ class TestPushRenames(unittest.TestCase) self._commitchanges(repo, changes) hg.update(repo, repo['tip'].node()) - push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo, - hg_repo_path=self.wc_path, - svn_url='file://'+self.repo_path) + push_cmd.push_revisions_to_subversion( + ui.ui(), repo=self.repo, hg_repo_path=self.wc_path, + svn_url=test_util.fileurl(self.repo_path)) tip = self.repo['tip'] # self._debug_print_copies(tip) self.assertchanges(changes, tip) diff --git a/tests/test_tags.py b/tests/test_tags.py --- a/tests/test_tags.py +++ b/tests/test_tags.py @@ -1,5 +1,4 @@ import os -import shutil import tempfile import unittest @@ -19,7 +18,7 @@ class TestTags(unittest.TestCase): self.wc_path = '%s/testrepo_wc' % self.tmpdir def tearDown(self): - shutil.rmtree(self.tmpdir) + test_util.rmtree(self.tmpdir) os.chdir(self.oldwd) def _load_fixture_and_fetch(self, fixture_name, stupid=False): diff --git a/tests/test_util.py b/tests/test_util.py --- a/tests/test_util.py +++ b/tests/test_util.py @@ -1,5 +1,9 @@ +import errno import os -import popen2 +import subprocess +import shutil +import stat +import urllib from mercurial import ui from mercurial import hg @@ -9,22 +13,47 @@ import fetch_command FIXTURES = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'fixtures') +def fileurl(path): + path = os.path.abspath(path) + drive, path = os.path.splitdrive(path) + path = urllib.pathname2url(path) + if drive: + drive = '/' + drive + url = 'file://%s%s' % (drive, path) + return url + def load_svndump_fixture(path, fixture_name): '''Loads an svnadmin dump into a fresh repo at path, which should not already exist. ''' - os.spawnvp(os.P_WAIT, 'svnadmin', ['svnadmin', 'create', path,]) - proc = popen2.Popen4(['svnadmin', 'load', path,]) + subprocess.call(['svnadmin', 'create', path,]) + proc = subprocess.Popen(['svnadmin', 'load', path,], stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.STDOUT) inp = open(os.path.join(FIXTURES, fixture_name)) - proc.tochild.write(inp.read()) - proc.tochild.close() - proc.wait() + proc.stdin.write(inp.read()) + proc.stdin.flush() + proc.communicate() def load_fixture_and_fetch(fixture_name, repo_path, wc_path, stupid=False): load_svndump_fixture(repo_path, fixture_name) fetch_command.fetch_revisions(ui.ui(), - svn_url='file://%s' % repo_path, + svn_url=fileurl(repo_path), hg_repo_path=wc_path, stupid=stupid) repo = hg.repository(ui.ui(), wc_path) return repo + +def rmtree(path): + # Read-only files cannot be removed under Windows + for root, dirs, files in os.walk(path): + for f in files: + f = os.path.join(root, f) + try: + s = os.stat(f) + except OSError, e: + if e.errno == errno.ENOENT: + continue + raise + if (s.st_mode & stat.S_IWRITE) == 0: + os.chmod(f, s.st_mode | stat.S_IWRITE) + shutil.rmtree(path)