source: server/test/server/job.py @ 609a9b0

no-cups
Last change on this file since 609a9b0 was 609a9b0, checked in by Jessica B. Hamrick <jhamrick@…>, 12 years ago

Add basic configuration file support

  • Property mode set to 100644
File size: 6.3 KB
Line 
1from gutenbach.server import GutenbachJob
2from gutenbach.server import errors
3from gutenbach.ipp import JobStates as States
4import unittest
5import tempfile
6import sys
7import time
8
9def make_tempfile():
10    fh = tempfile.NamedTemporaryFile()
11    fh.write("test\n")
12    fh.seek(0)
13    return fh
14
15class TestEmptyGutenbachJob(unittest.TestCase):
16
17    def setUp(self):
18        self.job = GutenbachJob()
19
20    def testPlayer(self):
21        self.assertEqual(self.job.player, None)
22    def testId(self):
23        self.assertEqual(self.job.id, -1)
24    def testCreator(self):
25        self.assertEqual(self.job.creator, "")
26    def testName(self):
27        self.assertEqual(self.job.name, "")
28    def testSize(self):
29        self.assertEqual(self.job.size, 0)
30    def testState(self):
31        self.assertEqual(self.job.state, States.HELD)
32    def testPriority(self):
33        self.assertEqual(self.job.priority, 1)
34
35    def testStateProperties(self):
36        self.assertFalse(self.job.is_valid)
37        self.assertFalse(self.job.is_ready)
38        self.assertFalse(self.job.is_playing)
39        self.assertFalse(self.job.is_paused)
40        self.assertFalse(self.job.is_done)
41        self.assertFalse(self.job.is_completed)
42        self.assertFalse(self.job.is_cancelled)
43        self.assertFalse(self.job.is_aborted)
44
45    def testSpool(self):
46        fh = tempfile.NamedTemporaryFile()
47        self.assertRaises(errors.InvalidJobStateException, self.job.spool, fh)
48        fh.close()
49    def testPlay(self):
50        self.assertRaises(errors.InvalidJobStateException, self.job.play)
51    def testPause(self):
52        self.assertRaises(errors.InvalidJobStateException, self.job.pause)
53    def testCancel(self):
54        self.job.cancel()
55        self.assertTrue(self.job.is_cancelled)
56        self.assertEqual(self.job.state, States.CANCELLED)
57    def testAbort(self):
58        self.job.abort()
59        self.assertTrue(self.job.is_aborted)
60        self.assertEqual(self.job.state, States.ABORTED)
61
62class TestBadGutenbachJob(unittest.TestCase):
63
64    def testBadJobId(self):
65        job = GutenbachJob(job_id=-2)
66        self.assertEqual(job.id, -1)
67        job.id = -2
68        self.assertEqual(job.id, -1)
69
70    def testBadCreator(self):
71        job = GutenbachJob(
72            job_id=1,
73            creator=12345)
74        self.assertEqual(job.creator, "12345")
75        job.creator = None
76        self.assertEqual(job.creator, "")
77        job.creator = []
78        self.assertEqual(job.creator, "[]")
79
80    def testBadName(self):
81        job = GutenbachJob(
82            job_id=1,
83            creator="foo",
84            name=12345)
85        self.assertEqual(job.name, "12345")
86        job.name = None
87        self.assertEqual(job.name, "")
88        job.name = []
89        self.assertEqual(job.name, "[]")
90
91    def testBadPriority(self):
92        job = GutenbachJob(
93            job_id=1,
94            creator="foo",
95            name="test",
96            priority=-1)
97        self.assertEqual(job.priority, 1)
98        job.priority = 0
99        self.assertEqual(job.priority, 1)
100        job.priority = 1
101        self.assertEqual(job.priority, 1)
102        job.priority = sys.maxint
103        self.assertEqual(job.priority, sys.maxint)
104        job.priority = "hello"
105        self.assertEqual(job.priority, 1)
106        job.priority = []
107        self.assertEqual(job.priority, 1)
108
109    def testBadDocument(self):
110        job = GutenbachJob(
111            job_id=1,
112            creator="foo",
113            name="test",
114            priority=1)
115        self.assertRaises(errors.InvalidDocument, job.spool, "hello")
116        self.assertRaises(errors.InvalidDocument, job.spool, [])
117        self.assertRaises(errors.InvalidDocument, job.spool, 12345)
118
119        fh = make_tempfile()
120        job = GutenbachJob(
121            job_id=1,
122            creator="foo",
123            name="test",
124            priority=1,
125            document=fh)
126        self.assertRaises(errors.InvalidJobStateException, job.spool, "hello")
127        self.assertRaises(errors.InvalidJobStateException, job.spool, [])
128        self.assertRaises(errors.InvalidJobStateException, job.spool, 12345)
129
130class TestOperations(unittest.TestCase):
131
132    def setUp(self):
133        fh = make_tempfile()
134        self.job = GutenbachJob(
135            job_id=1,
136            creator="foo",
137            name="test",
138            priority=1,
139            document=fh)
140        self.job.player._dryrun = True
141
142    def testPlay(self):
143        self.job.play()
144        time.sleep(0.01)
145        self.assertTrue(self.job.is_playing)
146
147        while self.job.is_playing:
148            time.sleep(0.1)
149
150        self.assertTrue(self.job.is_done)
151        self.assertTrue(self.job.is_completed)
152        self.assertFalse(self.job.is_aborted)
153        self.assertFalse(self.job.is_cancelled)
154
155    def testPause(self):
156        self.job.play()
157        time.sleep(0.01)
158        self.assertTrue(self.job.is_playing)
159        self.assertFalse(self.job.is_paused)
160
161        self.job.pause()
162        self.assertTrue(self.job.is_playing)
163        self.assertTrue(self.job.is_paused)
164
165        time.sleep(0.6)
166        self.assertTrue(self.job.is_playing)
167        self.assertTrue(self.job.is_paused)
168
169        self.job.pause()
170        self.assertTrue(self.job.is_playing)
171        self.assertFalse(self.job.is_paused)
172       
173        while self.job.is_playing:
174            time.sleep(0.1)
175           
176        self.assertTrue(self.job.is_done)
177        self.assertTrue(self.job.is_completed)
178        self.assertFalse(self.job.is_aborted)
179        self.assertFalse(self.job.is_cancelled)
180
181    def testCancel(self):
182        self.job.play()
183        time.sleep(0.01)
184        self.assertTrue(self.job.is_playing)
185        self.assertFalse(self.job.is_cancelled)
186
187        self.job.cancel()
188        time.sleep(0.01)
189        self.assertFalse(self.job.is_playing)
190        self.assertTrue(self.job.is_done)
191        self.assertTrue(self.job.is_cancelled)
192        self.assertFalse(self.job.is_aborted)
193
194    def testAbort(self):
195        self.job.play()
196        time.sleep(0.01)
197        self.assertTrue(self.job.is_playing)
198        self.assertFalse(self.job.is_aborted)
199
200        self.job.abort()
201        time.sleep(0.01)
202        self.assertFalse(self.job.is_playing)
203        self.assertTrue(self.job.is_done)
204        self.assertFalse(self.job.is_cancelled)
205        self.assertTrue(self.job.is_aborted)
206
207    def testRestart(self):
208        # XXX: Todo
209        pass
210
211if __name__ == "__main__":
212    unittest.main()
Note: See TracBrowser for help on using the repository browser.