- Timestamp:
- Jan 23, 2012, 8:21:23 PM (12 years ago)
- Children:
- 739d696
- Parents:
- ca1cfa9
- git-author:
- Steven Allen <steven@…> (01/23/12 19:35:01)
- git-committer:
- Steven Allen <steven@…> (01/23/12 20:21:23)
- Location:
- server/lib
- Files:
-
- 1 added
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
server/lib/gutenbach/server/job.py
rca1cfa9 r3c0760f 174 174 def is_playing(self): 175 175 """Whether the job is currently playing (regardless of whether 176 it's paused).176 it's Paused). 177 177 178 178 """ … … 190 190 return self.is_valid and \ 191 191 self.player is not None and \ 192 self.player.is_paused 192 self.player.paused and \ 193 self.player.is_playing 193 194 194 195 @property … … 201 202 return (self.is_valid and \ 202 203 self.player is not None and \ 203 self.player. is_done) or \204 self.player.done) or \ 204 205 (self._why_done == "canceled" or \ 205 206 self._why_done == "aborted") 207 208 def wait_done(self): 209 self.player.wait_done() 206 210 207 211 @property -
server/lib/gutenbach/server/player.py
rd42236e r3c0760f 19 19 self._callback = None 20 20 self._paused = False 21 self._paused_condition = threading.Condition(self.lock) 21 22 self._done = False 23 self._done_condition = threading.Condition(self.lock) 22 24 self._dryrun = False 23 25 self._dryrun_time = 0.5 24 self._lag = 0.0125 26 26 27 @property … … 30 31 return self.ident is not None and \ 31 32 self.isAlive() and \ 32 not self. is_done33 not self.done 33 34 else: 34 35 return self.ident is not None and \ 35 36 self.isAlive() and \ 36 not self. is_done and \37 not self.done and \ 37 38 self.player is not None and \ 38 39 self.player.poll() is None 39 40 41 42 # DONE 40 43 @property 41 44 @sync 42 def is_paused(self): 43 return self.is_playing and self._paused 45 def done(self): 46 return self._done 47 @done.setter 48 @sync 49 def done(self, val): 50 if (self._done != val): 51 self._done = val 52 self._done_condition.notifyAll() 53 @sync 54 def wait_done(self): 55 """Wait for the player to finish playing. 44 56 57 Requires that the main thread be started. 58 """ 59 while not self._done: 60 self._done_condition.wait() 61 62 # PAUSED 45 63 @property 46 def is_done(self): 47 return self._done 64 @sync 65 def paused(self): 66 return self._paused 67 @paused.setter 68 @sync 69 def paused(self, val): 70 if (self._paused != val): 71 self._paused = val 72 self._paused_condition.notifyAll() 73 74 @sync 75 def wait_unpaused(self): 76 """Wait for the player to finish playing. 77 78 Requires that the main thread be started. 79 """ 80 while self._paused: 81 self._paused_condition.wait() 82 48 83 49 84 @property … … 57 92 def start(self): 58 93 super(Player, self).start() 59 time.sleep(self._lag)60 94 61 95 def run(self): 62 if self.fh is None: 63 raise ValueError, "file handler is None" 64 65 logger.info("playing file '%s'" % self.fh.name) 96 try: 97 if self.fh is None: 98 raise ValueError, "file handler is None" 66 99 67 with self.lock: 68 self._paused = False 69 self._done = False 100 logger.info("playing file '%s'" % self.fh.name) 70 101 71 command = ["mplayer", "-really-quiet", "-slave", self.fh.name] 72 logger.info("running '%s'", " ".join(command)) 102 with self.lock: 103 self.paused = False 104 self.done = False 73 105 74 if self._dryrun: 75 step = 0.01 76 while self._dryrun_time > 0: 77 time.sleep(step) 78 self._dryrun_time -= step 79 while self.is_paused: 80 time.sleep(0.01) 106 command = ["mplayer", "-really-quiet", "-slave", self.fh.name] 107 logger.info("running '%s'", " ".join(command)) 81 108 82 else: 109 if self._dryrun: 110 step = 0.01 111 while self._dryrun_time > 0: 112 time.sleep(step) 113 self._dryrun_time -= step 114 self.wait_unpaused() 115 else: 116 with self.lock: 117 self.player = subprocess.Popen( 118 command, 119 stdin=subprocess.PIPE, 120 stderr=subprocess.PIPE, 121 stdout=subprocess.PIPE) 122 123 # wait for mplayer to finish 124 self.player.wait() 125 126 logger.info("mplayer finished with code %d" % self.player.returncode) 127 128 # get output from mplayer and log it 129 with self.lock: 130 stderr = self.player.stderr.read() 131 stdout = self.player.stdout.read() 132 133 if stderr.strip() != "": 134 logger.error(stderr) 135 if stdout.strip() != "": 136 logger.debug(stdout) 137 finally: 83 138 with self.lock: 84 self.player = subprocess.Popen( 85 command, 86 stdin=subprocess.PIPE, 87 stderr=subprocess.PIPE, 88 stdout=subprocess.PIPE) 89 90 # wait for mplayer to finish 91 while True: 92 with self.lock: 93 playing = self.is_playing 94 if not playing: 95 break 96 time.sleep(0.1) 97 98 logger.info("mplayer finished with code %d" % self.player.returncode) 99 100 # get output from mplayer and log it 101 with self.lock: 102 stderr = self.player.stderr.read() 103 stdout = self.player.stdout.read() 104 105 if stderr.strip() != "": 106 logger.error(stderr) 107 if stdout.strip() != "": 108 logger.debug(stdout) 109 110 with self.lock: 111 if self.callback: 112 self.callback() 113 self._done = True 139 if self.callback: 140 self.callback() 141 self.done = True 114 142 115 143 def mplayer_pause(self): … … 119 147 if not self._dryrun: 120 148 self.player.stdin.write("pause\n") 121 self. _paused = not(self._paused)122 logger.info("paused: %s", self. is_paused)149 self.paused = not(self.paused) 150 logger.info("paused: %s", self.paused) 123 151 else: 124 152 logger.warning("trying to pause non-playing job") 125 time.sleep(self._lag)126 153 127 154 def mplayer_stop(self): … … 133 160 else: 134 161 self._dryrun_time = 0.0 135 self. _paused = False162 self.paused = False 136 163 logger.info("stopped") 137 164 else: -
server/lib/gutenbach/server/printer.py
r97f20dd r3c0760f 11 11 import time 12 12 import traceback 13 from queue import PausingQueue 13 14 14 15 # initialize logger 15 16 logger = logging.getLogger(__name__) 17 16 18 17 19 class GutenbachPrinter(threading.Thread): … … 79 81 80 82 self.finished_jobs = [] 81 self.pending_jobs = [] 83 self.lock = threading.RLock() 84 self.pending_jobs = PausingQueue(self.lock) 82 85 self.current_job = None 83 86 self.jobs = {} 84 87 85 self.lock = threading.RLock()86 88 self._running = False 87 self.paused = False88 89 89 90 # CUPS ignores jobs with id 0, so we have to start at 1 … … 98 99 return "<Printer '%s'>" % self.name 99 100 101 @property 102 def paused(self): 103 return self.pending_jobs.paused 104 @paused.setter 105 @sync 106 def paused(self, val): 107 self.pending_jobs.paused = val 108 100 109 def run(self): 101 110 self._running = True 102 while self._running: 103 with self.lock: 104 try: 105 if self.current_job is None: 106 self.start_job() 107 elif self.current_job.is_done: 108 self.complete_job() 109 except: 110 self._running = False 111 logger.fatal(traceback.format_exc()) 112 break 113 time.sleep(0.1) 111 try: 112 while self._running: 113 with self.lock: 114 try: 115 self.current_job = self.get_job(self.pending_jobs.pop()) 116 except IndexError: 117 self.current_job = None 118 continue; 119 self.current_job.play() 120 self.current_job.wait_done() 121 with self.lock: 122 self.finished_jobs.append(self.current_job.id) 123 self.current_job = None 124 125 except: 126 self._running = False 127 logger.fatal(traceback.format_exc()) 114 128 115 129 def stop(self): … … 123 137 124 138 self._running = False 139 self.pending_jobs.interrupt() 140 125 141 if self.ident is not None and self.isAlive(): 126 142 self.join() … … 179 195 @sync 180 196 def active_jobs(self): 181 jobs = self.pending_jobs [:]197 jobs = self.pending_jobs.copy() 182 198 if self.current_job is not None: 183 199 jobs.insert(0, self.current_job.id) … … 197 213 if not self.is_running: 198 214 raise RuntimeError, "%s not started" % str(self) 199 200 @sync201 def start_job(self):202 self.assert_running()203 if not self.paused and self.current_job is None:204 try:205 job_id = self.pending_jobs.pop(0)206 self.current_job = self.get_job(job_id)207 self.current_job.play()208 except IndexError:209 self.current_job = None210 211 @sync212 def complete_job(self):213 self.assert_running()214 if not self.paused and self.current_job is not None:215 try:216 if not self.current_job.is_done:217 self.current_job.stop()218 finally:219 self.finished_jobs.append(self.current_job.id)220 self.current_job = None221 215 222 216 @sync … … 2008 2002 2009 2003 self.assert_running() 2010 job = self.get_job(job_id) 2011 job.priority = 1 # XXX we need to actually do something 2012 # correct here 2004 self.pending_jobs.promote(job_id) -
server/lib/gutenbach_test/server/job.py
r97f20dd r3c0760f 176 176 self.assertFalse(self.job.is_paused) 177 177 178 while self.job.is_playing: 179 time.sleep(0.1) 178 self.job.wait_done() 180 179 181 180 self.assertTrue(self.job.is_done) … … 225 224 226 225 if __name__ == "__main__": 227 logging.basicConfig(loglevel=logging. CRITICAL)226 logging.basicConfig(loglevel=logging.DEBUG) 228 227 unittest.main() -
server/lib/gutenbach_test/server/printer.py
r410ad69 r3c0760f 42 42 self.assertEqual(self.printer.finished_jobs, []) 43 43 def testPendingJobs(self): 44 self.assert Equal(self.printer.pending_jobs, [])44 self.assertTrue(self.printer.pending_jobs.empty()) 45 45 def testCurrentJob(self): 46 46 self.assertEqual(self.printer.current_job, None) … … 58 58 self.assertEqual(self.printer.state, States.STOPPED) 59 59 60 def testStartJob(self):61 self.assertRaises(RuntimeError, self.printer.start_job)62 def testCompleteJob(self):63 self.assertRaises(RuntimeError, self.printer.complete_job)64 60 def testGetJob(self): 65 61 self.assertRaises(RuntimeError, self.printer.get_job, 0)
Note: See TracChangeset
for help on using the changeset viewer.