Mercurial > hgsubversion
comparison tests/test_push_command.py @ 48:d87b57c719f0
Add a test that verifies push works to svn:// servers.
This test will fail if you are running an svnserve on localhost, but it checks and will not touch your data in that case.
Also fixed the push tests, which had been largely broken.
author | Augie Fackler <durin42@gmail.com> |
---|---|
date | Wed, 29 Oct 2008 21:15:36 -0500 |
parents | b66ed66c82e4 |
children | 2bc4999a89d3 |
comparison
equal
deleted
inserted
replaced
47:9738a7c4e3dd | 48:d87b57c719f0 |
---|---|
1 import os | 1 import os |
2 import shutil | 2 import shutil |
3 import socket | |
3 import tempfile | 4 import tempfile |
4 import unittest | 5 import unittest |
5 | 6 |
6 from mercurial import context | 7 from mercurial import context |
8 from mercurial import commands | |
7 from mercurial import hg | 9 from mercurial import hg |
8 from mercurial import node | 10 from mercurial import node |
9 from mercurial import ui | 11 from mercurial import ui |
10 from mercurial import revlog | 12 from mercurial import revlog |
11 | 13 |
12 import fetch_command | 14 import fetch_command |
13 import push_cmd | 15 import push_cmd |
14 import test_util | 16 import test_util |
17 import time | |
18 | |
15 # push fails in 1.4-SWIG-land. | 19 # push fails in 1.4-SWIG-land. |
16 push_works = False | 20 push_works = False |
17 try: | 21 try: |
18 import csvn | 22 import csvn |
19 push_works = True | 23 push_works = True |
21 from svn import core | 25 from svn import core |
22 if (core.SVN_VER_MAJOR, core.SVN_VER_MINOR, core.SVN_VER_MICRO) >= (1, 5, 0): | 26 if (core.SVN_VER_MAJOR, core.SVN_VER_MINOR, core.SVN_VER_MICRO) >= (1, 5, 0): |
23 push_works = True | 27 push_works = True |
24 | 28 |
25 if push_works: | 29 if push_works: |
30 class PushOverSvnserveTests(unittest.TestCase): | |
31 def setUp(self): | |
32 self.oldwd = os.getcwd() | |
33 self.tmpdir = tempfile.mkdtemp('svnwrap_test') | |
34 self.repo_path = '%s/testrepo' % self.tmpdir | |
35 self.wc_path = '%s/testrepo_wc' % self.tmpdir | |
36 test_util.load_svndump_fixture(self.repo_path, 'simple_branch.svndump') | |
37 open(os.path.join(self.repo_path, 'conf', 'svnserve.conf'), | |
38 'w').write('[general]\nanon-access=write\n[sasl]\n') | |
39 # Paranoia: we try and connect to localhost on 3689 before we start | |
40 # svnserve. If it is running, we force the test to fail early. | |
41 user_has_own_svnserve = False | |
42 try: | |
43 s = socket.socket() | |
44 s.settimeout(0.3) | |
45 s.connect(('localhost', 3690)) | |
46 s.close() | |
47 user_has_own_svnserve = True | |
48 except: | |
49 pass | |
50 if user_has_own_svnserve: | |
51 assert False, ('You appear to be running your own svnserve!' | |
52 ' You can probably ignore this test failure.') | |
53 args = ['svnserve', '-d', '--foreground', '-r', self.repo_path] | |
54 self.svnserve_pid = os.spawnvp(os.P_NOWAIT, 'svnserve', args) | |
55 time.sleep(2) | |
56 fetch_command.fetch_revisions(ui.ui(), | |
57 svn_url='svn://localhost/', | |
58 hg_repo_path=self.wc_path) | |
59 | |
60 def tearDown(self): | |
61 shutil.rmtree(self.tmpdir) | |
62 os.chdir(self.oldwd) | |
63 os.system('kill -9 %d' % self.svnserve_pid) | |
64 | |
65 # define this as a property so that it reloads anytime we need it | |
66 @property | |
67 def repo(self): | |
68 return hg.repository(ui.ui(), self.wc_path) | |
69 | |
70 def test_push_to_default(self, commit=True): | |
71 repo = self.repo | |
72 old_tip = repo['tip'].node() | |
73 expected_parent = repo['default'].node() | |
74 def file_callback(repo, memctx, path): | |
75 if path == 'adding_file': | |
76 return context.memfilectx(path=path, | |
77 data='foo', | |
78 islink=False, | |
79 isexec=False, | |
80 copied=False) | |
81 raise IOError() | |
82 ctx = context.memctx(repo, | |
83 (repo['default'].node(), node.nullid), | |
84 'automated test', | |
85 ['adding_file'], | |
86 file_callback, | |
87 'an_author', | |
88 '2008-10-07 20:59:48 -0500', | |
89 {'branch': 'default',}) | |
90 new_hash = repo.commitctx(ctx) | |
91 if not commit: | |
92 return # some tests use this test as an extended setup. | |
93 hg.update(repo, repo['tip'].node()) | |
94 push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo, | |
95 hg_repo_path=self.wc_path, | |
96 svn_url='svn://localhost/') | |
97 tip = self.repo['tip'] | |
98 self.assertNotEqual(tip.node(), old_tip) | |
99 self.assertEqual(tip.parents()[0].node(), expected_parent) | |
100 self.assertEqual(tip['adding_file'].data(), 'foo') | |
101 self.assertEqual(tip.branch(), 'default') | |
102 | |
103 | |
26 class PushTests(unittest.TestCase): | 104 class PushTests(unittest.TestCase): |
27 def setUp(self): | 105 def setUp(self): |
28 self.oldwd = os.getcwd() | 106 self.oldwd = os.getcwd() |
29 self.tmpdir = tempfile.mkdtemp('svnwrap_test') | 107 self.tmpdir = tempfile.mkdtemp('svnwrap_test') |
30 self.repo_path = '%s/testrepo' % self.tmpdir | 108 self.repo_path = '%s/testrepo' % self.tmpdir |
134 file_callback, | 212 file_callback, |
135 'an_author', | 213 'an_author', |
136 '2008-10-07 20:59:48 -0500', | 214 '2008-10-07 20:59:48 -0500', |
137 {'branch': 'the_branch',}) | 215 {'branch': 'the_branch',}) |
138 new_hash = repo.commitctx(ctx) | 216 new_hash = repo.commitctx(ctx) |
217 #commands.update(ui.ui(), self.repo, node='tip') | |
218 hg.update(repo, repo['tip'].node()) | |
139 push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo, | 219 push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo, |
140 hg_repo_path=self.wc_path, | 220 hg_repo_path=self.wc_path, |
141 svn_url='file://'+self.repo_path) | 221 svn_url='file://'+self.repo_path) |
142 tip = self.repo['tip'] | 222 tip = self.repo['tip'] |
223 self.assertNotEqual(tip.node(), new_hash) | |
143 self.assertEqual(tip['adding_file'].data(), 'foo') | 224 self.assertEqual(tip['adding_file'].data(), 'foo') |
144 self.assertEqual(tip.branch(), 'the_branch') | 225 self.assertEqual(tip.branch(), 'the_branch') |
145 | 226 |
146 # | 227 def test_delete_file(self): |
147 # def test_delete_file(self): | 228 repo = self.repo |
148 # assert False | 229 def file_callback(repo, memctx, path): |
149 # | 230 raise IOError() |
150 # def test_push_executable_file(self): | 231 old_files = set(repo['default'].manifest().keys()) |
151 # assert False | 232 ctx = context.memctx(repo, |
152 # | 233 (repo['default'].node(), node.nullid), |
153 # def test_push_symlink_file(self): | 234 'automated test', |
154 # assert False | 235 ['alpha'], |
236 file_callback, | |
237 'an author', | |
238 '2008-10-29 21:26:00 -0500', | |
239 {'branch': 'default', }) | |
240 new_hash = repo.commitctx(ctx) | |
241 hg.update(repo, repo['tip'].node()) | |
242 push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo, | |
243 hg_repo_path=self.wc_path, | |
244 svn_url='file://' + self.repo_path) | |
245 tip = self.repo['tip'] | |
246 self.assertEqual(old_files, | |
247 set(tip.manifest().keys() + ['alpha'])) | |
248 self.assert_('alpha' not in tip.manifest()) | |
249 | |
250 def test_push_executable_file(self): | |
251 self.test_push_to_default(commit=True) | |
252 repo = self.repo | |
253 def file_callback(repo, memctx, path): | |
254 if path == 'gamma': | |
255 return context.memfilectx(path=path, | |
256 data='foo', | |
257 islink=False, | |
258 isexec=True, | |
259 copied=False) | |
260 raise IOError() | |
261 ctx = context.memctx(repo, | |
262 (repo['tip'].node(), node.nullid), | |
263 'message', | |
264 ['gamma', ], | |
265 file_callback, | |
266 'author', | |
267 '2008-10-29 21:26:00 -0500', | |
268 {'branch': 'default', }) | |
269 new_hash = repo.commitctx(ctx) | |
270 hg.update(repo, repo['tip'].node()) | |
271 push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo, | |
272 hg_repo_path=self.wc_path, | |
273 svn_url='file://' + self.repo_path) | |
274 tip = self.repo['tip'] | |
275 self.assertNotEqual(tip.node(), new_hash) | |
276 self.assertEqual(tip['gamma'].flags(), 'x') | |
277 self.assertEqual(tip['gamma'].data(), 'foo') | |
278 self.assertEqual([x for x in tip.manifest().keys() if 'x' not in | |
279 tip[x].flags()], ['alpha', 'beta', 'adding_file', ]) | |
280 | |
281 def test_push_symlink_file(self): | |
282 self.test_push_to_default(commit=True) | |
283 repo = self.repo | |
284 def file_callback(repo, memctx, path): | |
285 if path == 'gamma': | |
286 return context.memfilectx(path=path, | |
287 data='foo', | |
288 islink=True, | |
289 isexec=False, | |
290 copied=False) | |
291 raise IOError() | |
292 ctx = context.memctx(repo, | |
293 (repo['tip'].node(), node.nullid), | |
294 'message', | |
295 ['gamma', ], | |
296 file_callback, | |
297 'author', | |
298 '2008-10-29 21:26:00 -0500', | |
299 {'branch': 'default', }) | |
300 new_hash = repo.commitctx(ctx) | |
301 hg.update(repo, repo['tip'].node()) | |
302 push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo, | |
303 hg_repo_path=self.wc_path, | |
304 svn_url='file://' + self.repo_path) | |
305 tip = self.repo['tip'] | |
306 self.assertNotEqual(tip.node(), new_hash) | |
307 self.assertEqual(tip['gamma'].flags(), 'l') | |
308 self.assertEqual(tip['gamma'].data(), 'foo') | |
309 self.assertEqual([x for x in tip.manifest().keys() if 'l' not in | |
310 tip[x].flags()], ['alpha', 'beta', 'adding_file', ]) | |
311 | |
155 else: | 312 else: |
156 class PushTests(unittest.TestCase): | 313 class PushTests(unittest.TestCase): |
157 """Dummy so the test runner doesn't get upset. | 314 """Dummy so the test runner doesn't get upset. |
158 """ | 315 """ |
159 pass | 316 pass |
160 | 317 |
161 def suite(): | 318 def suite(): |
162 return unittest.TestLoader().loadTestsFromTestCase(PushTests) | 319 test_classes = [PushTests, PushOverSvnserveTests] |
163 | 320 tests = [] |
321 # This is the quickest hack I could come up with to load all the tests from | |
322 # both classes. Would love a patch that simplifies this without adding | |
323 # dependencies. | |
324 for tc in test_classes: | |
325 for attr in dir(tc): | |
326 if attr.startswith('test_'): | |
327 tests.append(tc(attr)) | |
328 return unittest.TestSuite(tests) |