source: server/lib/gutenbach_test/server/job.py @ 3c0760f

Last change on this file since 3c0760f was 3c0760f, checked in by Steven Allen <steven@…>, 12 years ago

Block instead of looping and sleeping.

  • Property mode set to 100644
File size: 6.8 KB
Line 
1__all__ = [
2    'TestEmptyGutenbachJob',
3    'TestBadGutenbachJob',
4    'TestGutenbachJobOperations',
5    ]
6
7from gutenbach.server import GutenbachJob
8from gutenbach.server import errors
9from gutenbach.ipp import JobStates as States
10import unittest
11import tempfile
12import sys
13import time
14import logging
15
16def make_tempfile():
17    fh = tempfile.NamedTemporaryFile()
18    fh.write("test\n")
19    fh.seek(0)
20    return fh
21
22class TestEmptyGutenbachJob(unittest.TestCase):
23
24    def setUp(self):
25        self.job = GutenbachJob()
26
27    def testPlayer(self):
28        self.assertEqual(self.job.player, None)
29    def testId(self):
30        self.assertEqual(self.job.id, -1)
31    def testCreator(self):
32        self.assertEqual(self.job.creator, "")
33    def testName(self):
34        self.assertEqual(self.job.name, "")
35    def testSize(self):
36        self.assertEqual(self.job.size, 0)
37    def testState(self):
38        self.assertEqual(self.job.state, States.PENDING_HELD)
39    def testPriority(self):
40        self.assertEqual(self.job.priority, 1)
41
42    def testStateProperties(self):
43        self.assertFalse(self.job.is_valid)
44        self.assertFalse(self.job.is_ready)
45        self.assertFalse(self.job.is_playing)
46        self.assertFalse(self.job.is_paused)
47        self.assertFalse(self.job.is_done)
48        self.assertFalse(self.job.is_completed)
49        self.assertFalse(self.job.is_canceled)
50        self.assertFalse(self.job.is_aborted)
51
52    def testSpool(self):
53        fh = tempfile.NamedTemporaryFile()
54        self.assertRaises(errors.InvalidJobStateException, self.job.spool, fh)
55        fh.close()
56    def testPlay(self):
57        self.assertRaises(errors.InvalidJobStateException, self.job.play)
58    def testPause(self):
59        self.assertRaises(errors.InvalidJobStateException, self.job.pause)
60    def testCancel(self):
61        self.job.cancel()
62        self.assertTrue(self.job.is_canceled)
63        self.assertEqual(self.job.state, States.CANCELED)
64    def testAbort(self):
65        self.job.abort()
66        self.assertTrue(self.job.is_aborted)
67        self.assertEqual(self.job.state, States.ABORTED)
68
69class TestBadGutenbachJob(unittest.TestCase):
70
71    def testBadJobId(self):
72        job = GutenbachJob(job_id=-2)
73        self.assertEqual(job.id, -1)
74        job.id = -2
75        self.assertEqual(job.id, -1)
76
77    def testBadCreator(self):
78        job = GutenbachJob(
79            job_id=1,
80            creator=12345)
81        self.assertEqual(job.creator, "12345")
82        job.creator = None
83        self.assertEqual(job.creator, "")
84        job.creator = []
85        self.assertEqual(job.creator, "[]")
86
87    def testBadName(self):
88        job = GutenbachJob(
89            job_id=1,
90            creator="foo",
91            name=12345)
92        self.assertEqual(job.name, "12345")
93        job.name = None
94        self.assertEqual(job.name, "")
95        job.name = []
96        self.assertEqual(job.name, "[]")
97
98    def testBadPriority(self):
99        job = GutenbachJob(
100            job_id=1,
101            creator="foo",
102            name="test",
103            priority=-1)
104        self.assertEqual(job.priority, 1)
105        job.priority = 0
106        self.assertEqual(job.priority, 1)
107        job.priority = 1
108        self.assertEqual(job.priority, 1)
109        job.priority = sys.maxint
110        self.assertEqual(job.priority, sys.maxint)
111        job.priority = "hello"
112        self.assertEqual(job.priority, 1)
113        job.priority = []
114        self.assertEqual(job.priority, 1)
115
116    def testBadDocument(self):
117        job = GutenbachJob(
118            job_id=1,
119            creator="foo",
120            name="test",
121            priority=1)
122        self.assertRaises(errors.InvalidDocument, job.spool, "hello")
123        self.assertRaises(errors.InvalidDocument, job.spool, [])
124        self.assertRaises(errors.InvalidDocument, job.spool, 12345)
125
126        fh = make_tempfile()
127        job = GutenbachJob(
128            job_id=1,
129            creator="foo",
130            name="test",
131            priority=1,
132            document=fh)
133        self.assertRaises(errors.InvalidJobStateException, job.spool, "hello")
134        self.assertRaises(errors.InvalidJobStateException, job.spool, [])
135        self.assertRaises(errors.InvalidJobStateException, job.spool, 12345)
136
137class TestGutenbachJobOperations(unittest.TestCase):
138
139    def setUp(self):
140        fh = make_tempfile()
141        self.job = GutenbachJob(
142            job_id=1,
143            creator="foo",
144            name="test",
145            priority=1,
146            document=fh)
147        self.job.player._dryrun = True
148
149    def testPlay(self):
150        self.job.play()
151        self.assertTrue(self.job.is_playing)
152
153        while self.job.is_playing:
154            time.sleep(0.1)
155
156        self.assertTrue(self.job.is_done)
157        self.assertTrue(self.job.is_completed)
158        self.assertFalse(self.job.is_aborted)
159        self.assertFalse(self.job.is_canceled)
160
161    def testPause(self):
162        self.job.play()
163        self.assertTrue(self.job.is_playing)
164        self.assertFalse(self.job.is_paused)
165
166        self.job.pause()
167        self.assertTrue(self.job.is_playing)
168        self.assertTrue(self.job.is_paused)
169
170        time.sleep(0.6)
171        self.assertTrue(self.job.is_playing)
172        self.assertTrue(self.job.is_paused)
173
174        self.job.pause()
175        self.assertTrue(self.job.is_playing)
176        self.assertFalse(self.job.is_paused)
177       
178        self.job.wait_done()
179           
180        self.assertTrue(self.job.is_done)
181        self.assertTrue(self.job.is_completed)
182        self.assertFalse(self.job.is_aborted)
183        self.assertFalse(self.job.is_canceled)
184
185    def testCancel(self):
186        self.job.play()
187        self.assertTrue(self.job.is_playing)
188        self.assertFalse(self.job.is_canceled)
189
190        self.job.cancel()
191        self.assertFalse(self.job.is_playing)
192        self.assertTrue(self.job.is_done)
193        self.assertTrue(self.job.is_canceled)
194        self.assertFalse(self.job.is_aborted)
195
196    def testAbort(self):
197        self.job.play()
198        self.assertTrue(self.job.is_playing)
199        self.assertFalse(self.job.is_aborted)
200
201        self.job.abort()
202        self.assertFalse(self.job.is_playing)
203        self.assertTrue(self.job.is_done)
204        self.assertFalse(self.job.is_canceled)
205        self.assertTrue(self.job.is_aborted)
206
207    def testRestart(self):
208        self.assertRaises(errors.InvalidJobStateException, self.job.restart)
209
210        self.job.play()
211        self.assertTrue(self.job.is_playing)
212        self.assertFalse(self.job.is_done)
213
214        self.assertRaises(errors.InvalidJobStateException, self.job.restart)
215
216        self.job.cancel()
217        self.assertFalse(self.job.is_playing)
218        self.assertTrue(self.job.is_done)
219        self.assertTrue(self.job.is_canceled)
220        self.assertFalse(self.job.is_aborted)
221
222        self.job.restart()
223        self.assertTrue(self.job.is_ready)
224
225if __name__ == "__main__":
226    logging.basicConfig(loglevel=logging.DEBUG)
227    unittest.main()
Note: See TracBrowser for help on using the repository browser.