source: server/lib/gutenbach_test/server/job.py @ 5ffffe0

no-cups
Last change on this file since 5ffffe0 was 5ffffe0, checked in by Jessica B. Hamrick <jhamrick@…>, 12 years ago

Reorganize test suite

  • Property mode set to 100644
File size: 6.8 KB
Line 
1__all__ = [
2    'TestEmptyGutenbachJob',
3    'TestBadGutenbachJob',
4    'TestGutenbachJobOperations',
5    ]
6
7from gutenbach.server import GutenbachJob
8from gutenbach.server import errors
9from gutenbach.ipp import JobStates as States
10import unittest
11import tempfile
12import sys
13import time
14import logging
15
16def make_tempfile():
17    fh = tempfile.NamedTemporaryFile()
18    fh.write("test\n")
19    fh.seek(0)
20    return fh
21
22class TestEmptyGutenbachJob(unittest.TestCase):
23
24    def setUp(self):
25        self.job = GutenbachJob()
26
27    def testPlayer(self):
28        self.assertEqual(self.job.player, None)
29    def testId(self):
30        self.assertEqual(self.job.id, -1)
31    def testCreator(self):
32        self.assertEqual(self.job.creator, "")
33    def testName(self):
34        self.assertEqual(self.job.name, "")
35    def testSize(self):
36        self.assertEqual(self.job.size, 0)
37    def testState(self):
38        self.assertEqual(self.job.state, States.HELD)
39    def testPriority(self):
40        self.assertEqual(self.job.priority, 1)
41
42    def testStateProperties(self):
43        self.assertFalse(self.job.is_valid)
44        self.assertFalse(self.job.is_ready)
45        self.assertFalse(self.job.is_playing)
46        self.assertFalse(self.job.is_paused)
47        self.assertFalse(self.job.is_done)
48        self.assertFalse(self.job.is_completed)
49        self.assertFalse(self.job.is_cancelled)
50        self.assertFalse(self.job.is_aborted)
51
52    def testSpool(self):
53        fh = tempfile.NamedTemporaryFile()
54        self.assertRaises(errors.InvalidJobStateException, self.job.spool, fh)
55        fh.close()
56    def testPlay(self):
57        self.assertRaises(errors.InvalidJobStateException, self.job.play)
58    def testPause(self):
59        self.assertRaises(errors.InvalidJobStateException, self.job.pause)
60    def testCancel(self):
61        self.job.cancel()
62        self.assertTrue(self.job.is_cancelled)
63        self.assertEqual(self.job.state, States.CANCELLED)
64    def testAbort(self):
65        self.job.abort()
66        self.assertTrue(self.job.is_aborted)
67        self.assertEqual(self.job.state, States.ABORTED)
68
69class TestBadGutenbachJob(unittest.TestCase):
70
71    def testBadJobId(self):
72        job = GutenbachJob(job_id=-2)
73        self.assertEqual(job.id, -1)
74        job.id = -2
75        self.assertEqual(job.id, -1)
76
77    def testBadCreator(self):
78        job = GutenbachJob(
79            job_id=1,
80            creator=12345)
81        self.assertEqual(job.creator, "12345")
82        job.creator = None
83        self.assertEqual(job.creator, "")
84        job.creator = []
85        self.assertEqual(job.creator, "[]")
86
87    def testBadName(self):
88        job = GutenbachJob(
89            job_id=1,
90            creator="foo",
91            name=12345)
92        self.assertEqual(job.name, "12345")
93        job.name = None
94        self.assertEqual(job.name, "")
95        job.name = []
96        self.assertEqual(job.name, "[]")
97
98    def testBadPriority(self):
99        job = GutenbachJob(
100            job_id=1,
101            creator="foo",
102            name="test",
103            priority=-1)
104        self.assertEqual(job.priority, 1)
105        job.priority = 0
106        self.assertEqual(job.priority, 1)
107        job.priority = 1
108        self.assertEqual(job.priority, 1)
109        job.priority = sys.maxint
110        self.assertEqual(job.priority, sys.maxint)
111        job.priority = "hello"
112        self.assertEqual(job.priority, 1)
113        job.priority = []
114        self.assertEqual(job.priority, 1)
115
116    def testBadDocument(self):
117        job = GutenbachJob(
118            job_id=1,
119            creator="foo",
120            name="test",
121            priority=1)
122        self.assertRaises(errors.InvalidDocument, job.spool, "hello")
123        self.assertRaises(errors.InvalidDocument, job.spool, [])
124        self.assertRaises(errors.InvalidDocument, job.spool, 12345)
125
126        fh = make_tempfile()
127        job = GutenbachJob(
128            job_id=1,
129            creator="foo",
130            name="test",
131            priority=1,
132            document=fh)
133        self.assertRaises(errors.InvalidJobStateException, job.spool, "hello")
134        self.assertRaises(errors.InvalidJobStateException, job.spool, [])
135        self.assertRaises(errors.InvalidJobStateException, job.spool, 12345)
136
137class TestGutenbachJobOperations(unittest.TestCase):
138
139    def setUp(self):
140        fh = make_tempfile()
141        self.job = GutenbachJob(
142            job_id=1,
143            creator="foo",
144            name="test",
145            priority=1,
146            document=fh)
147        self.job.player._dryrun = True
148
149    def testPlay(self):
150        self.job.play()
151        self.assertTrue(self.job.is_playing)
152
153        while self.job.is_playing:
154            time.sleep(0.1)
155
156        self.assertTrue(self.job.is_done)
157        self.assertTrue(self.job.is_completed)
158        self.assertFalse(self.job.is_aborted)
159        self.assertFalse(self.job.is_cancelled)
160
161    def testPause(self):
162        self.job.play()
163        self.assertTrue(self.job.is_playing)
164        self.assertFalse(self.job.is_paused)
165
166        self.job.pause()
167        self.assertTrue(self.job.is_playing)
168        self.assertTrue(self.job.is_paused)
169
170        time.sleep(0.6)
171        self.assertTrue(self.job.is_playing)
172        self.assertTrue(self.job.is_paused)
173
174        self.job.pause()
175        self.assertTrue(self.job.is_playing)
176        self.assertFalse(self.job.is_paused)
177       
178        while self.job.is_playing:
179            time.sleep(0.1)
180           
181        self.assertTrue(self.job.is_done)
182        self.assertTrue(self.job.is_completed)
183        self.assertFalse(self.job.is_aborted)
184        self.assertFalse(self.job.is_cancelled)
185
186    def testCancel(self):
187        self.job.play()
188        self.assertTrue(self.job.is_playing)
189        self.assertFalse(self.job.is_cancelled)
190
191        self.job.cancel()
192        self.assertFalse(self.job.is_playing)
193        self.assertTrue(self.job.is_done)
194        self.assertTrue(self.job.is_cancelled)
195        self.assertFalse(self.job.is_aborted)
196
197    def testAbort(self):
198        self.job.play()
199        self.assertTrue(self.job.is_playing)
200        self.assertFalse(self.job.is_aborted)
201
202        self.job.abort()
203        self.assertFalse(self.job.is_playing)
204        self.assertTrue(self.job.is_done)
205        self.assertFalse(self.job.is_cancelled)
206        self.assertTrue(self.job.is_aborted)
207
208    def testRestart(self):
209        self.assertRaises(errors.InvalidJobStateException, self.job.restart)
210
211        self.job.play()
212        self.assertTrue(self.job.is_playing)
213        self.assertFalse(self.job.is_done)
214
215        self.assertRaises(errors.InvalidJobStateException, self.job.restart)
216
217        self.job.cancel()
218        self.assertFalse(self.job.is_playing)
219        self.assertTrue(self.job.is_done)
220        self.assertTrue(self.job.is_cancelled)
221        self.assertFalse(self.job.is_aborted)
222
223        self.job.restart()
224        self.assertTrue(self.job.is_ready)
225
226if __name__ == "__main__":
227    logging.basicConfig(loglevel=logging.CRITICAL)
228    unittest.main()
Note: See TracBrowser for help on using the repository browser.