source: server/test/server/job.py @ adf8cf0

no-cups
Last change on this file since adf8cf0 was adf8cf0, checked in by Jessica B. Hamrick <jhamrick@…>, 12 years ago

Remove lag times from job test cases and put them in the actual player.

  • Property mode set to 100644
File size: 6.2 KB
Line 
1from gutenbach.server import GutenbachJob
2from gutenbach.server import errors
3from gutenbach.ipp import JobStates as States
4import unittest
5import tempfile
6import sys
7import time
8
9def make_tempfile():
10    fh = tempfile.NamedTemporaryFile()
11    fh.write("test\n")
12    fh.seek(0)
13    return fh
14
15class TestEmptyGutenbachJob(unittest.TestCase):
16
17    def setUp(self):
18        self.job = GutenbachJob()
19
20    def testPlayer(self):
21        self.assertEqual(self.job.player, None)
22    def testId(self):
23        self.assertEqual(self.job.id, -1)
24    def testCreator(self):
25        self.assertEqual(self.job.creator, "")
26    def testName(self):
27        self.assertEqual(self.job.name, "")
28    def testSize(self):
29        self.assertEqual(self.job.size, 0)
30    def testState(self):
31        self.assertEqual(self.job.state, States.HELD)
32    def testPriority(self):
33        self.assertEqual(self.job.priority, 1)
34
35    def testStateProperties(self):
36        self.assertFalse(self.job.is_valid)
37        self.assertFalse(self.job.is_ready)
38        self.assertFalse(self.job.is_playing)
39        self.assertFalse(self.job.is_paused)
40        self.assertFalse(self.job.is_done)
41        self.assertFalse(self.job.is_completed)
42        self.assertFalse(self.job.is_cancelled)
43        self.assertFalse(self.job.is_aborted)
44
45    def testSpool(self):
46        fh = tempfile.NamedTemporaryFile()
47        self.assertRaises(errors.InvalidJobStateException, self.job.spool, fh)
48        fh.close()
49    def testPlay(self):
50        self.assertRaises(errors.InvalidJobStateException, self.job.play)
51    def testPause(self):
52        self.assertRaises(errors.InvalidJobStateException, self.job.pause)
53    def testCancel(self):
54        self.job.cancel()
55        self.assertTrue(self.job.is_cancelled)
56        self.assertEqual(self.job.state, States.CANCELLED)
57    def testAbort(self):
58        self.job.abort()
59        self.assertTrue(self.job.is_aborted)
60        self.assertEqual(self.job.state, States.ABORTED)
61
62class TestBadGutenbachJob(unittest.TestCase):
63
64    def testBadJobId(self):
65        job = GutenbachJob(job_id=-2)
66        self.assertEqual(job.id, -1)
67        job.id = -2
68        self.assertEqual(job.id, -1)
69
70    def testBadCreator(self):
71        job = GutenbachJob(
72            job_id=1,
73            creator=12345)
74        self.assertEqual(job.creator, "12345")
75        job.creator = None
76        self.assertEqual(job.creator, "")
77        job.creator = []
78        self.assertEqual(job.creator, "[]")
79
80    def testBadName(self):
81        job = GutenbachJob(
82            job_id=1,
83            creator="foo",
84            name=12345)
85        self.assertEqual(job.name, "12345")
86        job.name = None
87        self.assertEqual(job.name, "")
88        job.name = []
89        self.assertEqual(job.name, "[]")
90
91    def testBadPriority(self):
92        job = GutenbachJob(
93            job_id=1,
94            creator="foo",
95            name="test",
96            priority=-1)
97        self.assertEqual(job.priority, 1)
98        job.priority = 0
99        self.assertEqual(job.priority, 1)
100        job.priority = 1
101        self.assertEqual(job.priority, 1)
102        job.priority = sys.maxint
103        self.assertEqual(job.priority, sys.maxint)
104        job.priority = "hello"
105        self.assertEqual(job.priority, 1)
106        job.priority = []
107        self.assertEqual(job.priority, 1)
108
109    def testBadDocument(self):
110        job = GutenbachJob(
111            job_id=1,
112            creator="foo",
113            name="test",
114            priority=1)
115        self.assertRaises(errors.InvalidDocument, job.spool, "hello")
116        self.assertRaises(errors.InvalidDocument, job.spool, [])
117        self.assertRaises(errors.InvalidDocument, job.spool, 12345)
118
119        fh = make_tempfile()
120        job = GutenbachJob(
121            job_id=1,
122            creator="foo",
123            name="test",
124            priority=1,
125            document=fh)
126        self.assertRaises(errors.InvalidJobStateException, job.spool, "hello")
127        self.assertRaises(errors.InvalidJobStateException, job.spool, [])
128        self.assertRaises(errors.InvalidJobStateException, job.spool, 12345)
129
130class TestOperations(unittest.TestCase):
131
132    def setUp(self):
133        fh = make_tempfile()
134        self.job = GutenbachJob(
135            job_id=1,
136            creator="foo",
137            name="test",
138            priority=1,
139            document=fh)
140        self.job.player._dryrun = True
141
142    def testPlay(self):
143        self.job.play()
144        self.assertTrue(self.job.is_playing)
145
146        while self.job.is_playing:
147            time.sleep(0.1)
148
149        self.assertTrue(self.job.is_done)
150        self.assertTrue(self.job.is_completed)
151        self.assertFalse(self.job.is_aborted)
152        self.assertFalse(self.job.is_cancelled)
153
154    def testPause(self):
155        self.job.play()
156        self.assertTrue(self.job.is_playing)
157        self.assertFalse(self.job.is_paused)
158
159        self.job.pause()
160        self.assertTrue(self.job.is_playing)
161        self.assertTrue(self.job.is_paused)
162
163        time.sleep(0.6)
164        self.assertTrue(self.job.is_playing)
165        self.assertTrue(self.job.is_paused)
166
167        self.job.pause()
168        self.assertTrue(self.job.is_playing)
169        self.assertFalse(self.job.is_paused)
170       
171        while self.job.is_playing:
172            time.sleep(0.1)
173           
174        self.assertTrue(self.job.is_done)
175        self.assertTrue(self.job.is_completed)
176        self.assertFalse(self.job.is_aborted)
177        self.assertFalse(self.job.is_cancelled)
178
179    def testCancel(self):
180        self.job.play()
181        self.assertTrue(self.job.is_playing)
182        self.assertFalse(self.job.is_cancelled)
183
184        self.job.cancel()
185        self.assertFalse(self.job.is_playing)
186        self.assertTrue(self.job.is_done)
187        self.assertTrue(self.job.is_cancelled)
188        self.assertFalse(self.job.is_aborted)
189
190    def testAbort(self):
191        self.job.play()
192        self.assertTrue(self.job.is_playing)
193        self.assertFalse(self.job.is_aborted)
194
195        self.job.abort()
196        self.assertFalse(self.job.is_playing)
197        self.assertTrue(self.job.is_done)
198        self.assertFalse(self.job.is_cancelled)
199        self.assertTrue(self.job.is_aborted)
200
201    def testRestart(self):
202        # XXX: Todo
203        pass
204
205if __name__ == "__main__":
206    unittest.main()
Note: See TracBrowser for help on using the repository browser.