comparison tests/test_push_command.py @ 48:d87b57c719f0

Add a test that verifies push works to svn:// servers. This test will fail if you are running an svnserve on localhost, but it checks and will not touch your data in that case. Also fixed the push tests, which had been largely broken.
author Augie Fackler <durin42@gmail.com>
date Wed, 29 Oct 2008 21:15:36 -0500
parents b66ed66c82e4
children 2bc4999a89d3
comparison
equal deleted inserted replaced
47:9738a7c4e3dd 48:d87b57c719f0
1 import os 1 import os
2 import shutil 2 import shutil
3 import socket
3 import tempfile 4 import tempfile
4 import unittest 5 import unittest
5 6
6 from mercurial import context 7 from mercurial import context
8 from mercurial import commands
7 from mercurial import hg 9 from mercurial import hg
8 from mercurial import node 10 from mercurial import node
9 from mercurial import ui 11 from mercurial import ui
10 from mercurial import revlog 12 from mercurial import revlog
11 13
12 import fetch_command 14 import fetch_command
13 import push_cmd 15 import push_cmd
14 import test_util 16 import test_util
17 import time
18
15 # push fails in 1.4-SWIG-land. 19 # push fails in 1.4-SWIG-land.
16 push_works = False 20 push_works = False
17 try: 21 try:
18 import csvn 22 import csvn
19 push_works = True 23 push_works = True
21 from svn import core 25 from svn import core
22 if (core.SVN_VER_MAJOR, core.SVN_VER_MINOR, core.SVN_VER_MICRO) >= (1, 5, 0): 26 if (core.SVN_VER_MAJOR, core.SVN_VER_MINOR, core.SVN_VER_MICRO) >= (1, 5, 0):
23 push_works = True 27 push_works = True
24 28
25 if push_works: 29 if push_works:
30 class PushOverSvnserveTests(unittest.TestCase):
31 def setUp(self):
32 self.oldwd = os.getcwd()
33 self.tmpdir = tempfile.mkdtemp('svnwrap_test')
34 self.repo_path = '%s/testrepo' % self.tmpdir
35 self.wc_path = '%s/testrepo_wc' % self.tmpdir
36 test_util.load_svndump_fixture(self.repo_path, 'simple_branch.svndump')
37 open(os.path.join(self.repo_path, 'conf', 'svnserve.conf'),
38 'w').write('[general]\nanon-access=write\n[sasl]\n')
39 # Paranoia: we try and connect to localhost on 3689 before we start
40 # svnserve. If it is running, we force the test to fail early.
41 user_has_own_svnserve = False
42 try:
43 s = socket.socket()
44 s.settimeout(0.3)
45 s.connect(('localhost', 3690))
46 s.close()
47 user_has_own_svnserve = True
48 except:
49 pass
50 if user_has_own_svnserve:
51 assert False, ('You appear to be running your own svnserve!'
52 ' You can probably ignore this test failure.')
53 args = ['svnserve', '-d', '--foreground', '-r', self.repo_path]
54 self.svnserve_pid = os.spawnvp(os.P_NOWAIT, 'svnserve', args)
55 time.sleep(2)
56 fetch_command.fetch_revisions(ui.ui(),
57 svn_url='svn://localhost/',
58 hg_repo_path=self.wc_path)
59
60 def tearDown(self):
61 shutil.rmtree(self.tmpdir)
62 os.chdir(self.oldwd)
63 os.system('kill -9 %d' % self.svnserve_pid)
64
65 # define this as a property so that it reloads anytime we need it
66 @property
67 def repo(self):
68 return hg.repository(ui.ui(), self.wc_path)
69
70 def test_push_to_default(self, commit=True):
71 repo = self.repo
72 old_tip = repo['tip'].node()
73 expected_parent = repo['default'].node()
74 def file_callback(repo, memctx, path):
75 if path == 'adding_file':
76 return context.memfilectx(path=path,
77 data='foo',
78 islink=False,
79 isexec=False,
80 copied=False)
81 raise IOError()
82 ctx = context.memctx(repo,
83 (repo['default'].node(), node.nullid),
84 'automated test',
85 ['adding_file'],
86 file_callback,
87 'an_author',
88 '2008-10-07 20:59:48 -0500',
89 {'branch': 'default',})
90 new_hash = repo.commitctx(ctx)
91 if not commit:
92 return # some tests use this test as an extended setup.
93 hg.update(repo, repo['tip'].node())
94 push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
95 hg_repo_path=self.wc_path,
96 svn_url='svn://localhost/')
97 tip = self.repo['tip']
98 self.assertNotEqual(tip.node(), old_tip)
99 self.assertEqual(tip.parents()[0].node(), expected_parent)
100 self.assertEqual(tip['adding_file'].data(), 'foo')
101 self.assertEqual(tip.branch(), 'default')
102
103
26 class PushTests(unittest.TestCase): 104 class PushTests(unittest.TestCase):
27 def setUp(self): 105 def setUp(self):
28 self.oldwd = os.getcwd() 106 self.oldwd = os.getcwd()
29 self.tmpdir = tempfile.mkdtemp('svnwrap_test') 107 self.tmpdir = tempfile.mkdtemp('svnwrap_test')
30 self.repo_path = '%s/testrepo' % self.tmpdir 108 self.repo_path = '%s/testrepo' % self.tmpdir
134 file_callback, 212 file_callback,
135 'an_author', 213 'an_author',
136 '2008-10-07 20:59:48 -0500', 214 '2008-10-07 20:59:48 -0500',
137 {'branch': 'the_branch',}) 215 {'branch': 'the_branch',})
138 new_hash = repo.commitctx(ctx) 216 new_hash = repo.commitctx(ctx)
217 #commands.update(ui.ui(), self.repo, node='tip')
218 hg.update(repo, repo['tip'].node())
139 push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo, 219 push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
140 hg_repo_path=self.wc_path, 220 hg_repo_path=self.wc_path,
141 svn_url='file://'+self.repo_path) 221 svn_url='file://'+self.repo_path)
142 tip = self.repo['tip'] 222 tip = self.repo['tip']
223 self.assertNotEqual(tip.node(), new_hash)
143 self.assertEqual(tip['adding_file'].data(), 'foo') 224 self.assertEqual(tip['adding_file'].data(), 'foo')
144 self.assertEqual(tip.branch(), 'the_branch') 225 self.assertEqual(tip.branch(), 'the_branch')
145 226
146 # 227 def test_delete_file(self):
147 # def test_delete_file(self): 228 repo = self.repo
148 # assert False 229 def file_callback(repo, memctx, path):
149 # 230 raise IOError()
150 # def test_push_executable_file(self): 231 old_files = set(repo['default'].manifest().keys())
151 # assert False 232 ctx = context.memctx(repo,
152 # 233 (repo['default'].node(), node.nullid),
153 # def test_push_symlink_file(self): 234 'automated test',
154 # assert False 235 ['alpha'],
236 file_callback,
237 'an author',
238 '2008-10-29 21:26:00 -0500',
239 {'branch': 'default', })
240 new_hash = repo.commitctx(ctx)
241 hg.update(repo, repo['tip'].node())
242 push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
243 hg_repo_path=self.wc_path,
244 svn_url='file://' + self.repo_path)
245 tip = self.repo['tip']
246 self.assertEqual(old_files,
247 set(tip.manifest().keys() + ['alpha']))
248 self.assert_('alpha' not in tip.manifest())
249
250 def test_push_executable_file(self):
251 self.test_push_to_default(commit=True)
252 repo = self.repo
253 def file_callback(repo, memctx, path):
254 if path == 'gamma':
255 return context.memfilectx(path=path,
256 data='foo',
257 islink=False,
258 isexec=True,
259 copied=False)
260 raise IOError()
261 ctx = context.memctx(repo,
262 (repo['tip'].node(), node.nullid),
263 'message',
264 ['gamma', ],
265 file_callback,
266 'author',
267 '2008-10-29 21:26:00 -0500',
268 {'branch': 'default', })
269 new_hash = repo.commitctx(ctx)
270 hg.update(repo, repo['tip'].node())
271 push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
272 hg_repo_path=self.wc_path,
273 svn_url='file://' + self.repo_path)
274 tip = self.repo['tip']
275 self.assertNotEqual(tip.node(), new_hash)
276 self.assertEqual(tip['gamma'].flags(), 'x')
277 self.assertEqual(tip['gamma'].data(), 'foo')
278 self.assertEqual([x for x in tip.manifest().keys() if 'x' not in
279 tip[x].flags()], ['alpha', 'beta', 'adding_file', ])
280
281 def test_push_symlink_file(self):
282 self.test_push_to_default(commit=True)
283 repo = self.repo
284 def file_callback(repo, memctx, path):
285 if path == 'gamma':
286 return context.memfilectx(path=path,
287 data='foo',
288 islink=True,
289 isexec=False,
290 copied=False)
291 raise IOError()
292 ctx = context.memctx(repo,
293 (repo['tip'].node(), node.nullid),
294 'message',
295 ['gamma', ],
296 file_callback,
297 'author',
298 '2008-10-29 21:26:00 -0500',
299 {'branch': 'default', })
300 new_hash = repo.commitctx(ctx)
301 hg.update(repo, repo['tip'].node())
302 push_cmd.push_revisions_to_subversion(ui.ui(), repo=self.repo,
303 hg_repo_path=self.wc_path,
304 svn_url='file://' + self.repo_path)
305 tip = self.repo['tip']
306 self.assertNotEqual(tip.node(), new_hash)
307 self.assertEqual(tip['gamma'].flags(), 'l')
308 self.assertEqual(tip['gamma'].data(), 'foo')
309 self.assertEqual([x for x in tip.manifest().keys() if 'l' not in
310 tip[x].flags()], ['alpha', 'beta', 'adding_file', ])
311
155 else: 312 else:
156 class PushTests(unittest.TestCase): 313 class PushTests(unittest.TestCase):
157 """Dummy so the test runner doesn't get upset. 314 """Dummy so the test runner doesn't get upset.
158 """ 315 """
159 pass 316 pass
160 317
161 def suite(): 318 def suite():
162 return unittest.TestLoader().loadTestsFromTestCase(PushTests) 319 test_classes = [PushTests, PushOverSvnserveTests]
163 320 tests = []
321 # This is the quickest hack I could come up with to load all the tests from
322 # both classes. Would love a patch that simplifies this without adding
323 # dependencies.
324 for tc in test_classes:
325 for attr in dir(tc):
326 if attr.startswith('test_'):
327 tests.append(tc(attr))
328 return unittest.TestSuite(tests)