Changeset 3c0760f


Ignore:
Timestamp:
Jan 23, 2012, 8:21:23 PM (12 years ago)
Author:
Steven Allen <steven@…>
Children:
739d696
Parents:
ca1cfa9
git-author:
Steven Allen <steven@…> (01/23/12 19:35:01)
git-committer:
Steven Allen <steven@…> (01/23/12 20:21:23)
Message:

Block instead of looping and sleeping.

Location:
server/lib
Files:
1 added
5 edited

Legend:

Unmodified
Added
Removed
  • server/lib/gutenbach/server/job.py

    rca1cfa9 r3c0760f  
    174174    def is_playing(self):
    175175        """Whether the job is currently playing (regardless of whether
    176         it's paused).
     176        it's Paused).
    177177
    178178        """
     
    190190        return self.is_valid and \
    191191               self.player is not None and \
    192                self.player.is_paused       
     192               self.player.paused and \
     193               self.player.is_playing
    193194
    194195    @property
     
    201202        return (self.is_valid and \
    202203                self.player is not None and \
    203                 self.player.is_done) or \
     204                self.player.done) or \
    204205                (self._why_done == "canceled" or \
    205206                 self._why_done == "aborted")
     207
     208    def wait_done(self):
     209        self.player.wait_done()
    206210
    207211    @property
  • server/lib/gutenbach/server/player.py

    rd42236e r3c0760f  
    1919            self._callback = None
    2020            self._paused = False
     21            self._paused_condition = threading.Condition(self.lock)
    2122            self._done = False
     23            self._done_condition = threading.Condition(self.lock)
    2224            self._dryrun = False
    2325            self._dryrun_time = 0.5
    24             self._lag = 0.01
    2526
    2627    @property
     
    3031            return self.ident is not None and \
    3132                   self.isAlive() and \
    32                    not self.is_done
     33                   not self.done
    3334        else:
    3435            return self.ident is not None and \
    3536                   self.isAlive() and \
    36                    not self.is_done and \
     37                   not self.done and \
    3738                   self.player is not None and \
    3839                   self.player.poll() is None
    39        
     40
     41
     42    # DONE
    4043    @property
    4144    @sync
    42     def is_paused(self):
    43         return self.is_playing and self._paused
     45    def done(self):
     46        return self._done
     47    @done.setter
     48    @sync
     49    def done(self, val):
     50        if (self._done != val):
     51            self._done = val
     52            self._done_condition.notifyAll()
     53    @sync
     54    def wait_done(self):
     55        """Wait for the player to finish playing.
    4456
     57        Requires that the main thread be started.
     58        """
     59        while not self._done:
     60            self._done_condition.wait()
     61
     62    # PAUSED
    4563    @property
    46     def is_done(self):
    47         return self._done
     64    @sync
     65    def paused(self):
     66        return self._paused
     67    @paused.setter
     68    @sync
     69    def paused(self, val):
     70        if (self._paused != val):
     71            self._paused = val
     72            self._paused_condition.notifyAll()
     73
     74    @sync
     75    def wait_unpaused(self):
     76        """Wait for the player to finish playing.
     77
     78        Requires that the main thread be started.
     79        """
     80        while self._paused:
     81            self._paused_condition.wait()
     82
    4883
    4984    @property
     
    5792    def start(self):
    5893        super(Player, self).start()
    59         time.sleep(self._lag)
    6094
    6195    def run(self):
    62         if self.fh is None:
    63             raise ValueError, "file handler is None"
    64        
    65         logger.info("playing file '%s'" % self.fh.name)
     96        try:
     97            if self.fh is None:
     98                raise ValueError, "file handler is None"
    6699
    67         with self.lock:
    68             self._paused = False
    69             self._done = False
     100            logger.info("playing file '%s'" % self.fh.name)
    70101
    71         command = ["mplayer", "-really-quiet", "-slave", self.fh.name]
    72         logger.info("running '%s'", " ".join(command))
     102            with self.lock:
     103                self.paused = False
     104                self.done = False
    73105
    74         if self._dryrun:
    75             step = 0.01
    76             while self._dryrun_time > 0:
    77                 time.sleep(step)
    78                 self._dryrun_time -= step
    79                 while self.is_paused:
    80                     time.sleep(0.01)
     106            command = ["mplayer", "-really-quiet", "-slave", self.fh.name]
     107            logger.info("running '%s'", " ".join(command))
    81108
    82         else:
     109            if self._dryrun:
     110                step = 0.01
     111                while self._dryrun_time > 0:
     112                    time.sleep(step)
     113                    self._dryrun_time -= step
     114                    self.wait_unpaused()
     115            else:
     116                with self.lock:
     117                    self.player = subprocess.Popen(
     118                        command,
     119                        stdin=subprocess.PIPE,
     120                        stderr=subprocess.PIPE,
     121                        stdout=subprocess.PIPE)
     122
     123                # wait for mplayer to finish
     124                self.player.wait()
     125
     126                logger.info("mplayer finished with code %d" % self.player.returncode)
     127
     128                # get output from mplayer and log it
     129                with self.lock:
     130                    stderr = self.player.stderr.read()
     131                    stdout = self.player.stdout.read()
     132
     133                if stderr.strip() != "":
     134                    logger.error(stderr)
     135                if stdout.strip() != "":
     136                    logger.debug(stdout)
     137        finally:
    83138            with self.lock:
    84                 self.player = subprocess.Popen(
    85                     command,
    86                     stdin=subprocess.PIPE,
    87                     stderr=subprocess.PIPE,
    88                     stdout=subprocess.PIPE)
    89 
    90             # wait for mplayer to finish
    91             while True:
    92                 with self.lock:
    93                     playing = self.is_playing
    94                 if not playing:
    95                     break
    96                 time.sleep(0.1)
    97 
    98             logger.info("mplayer finished with code %d" % self.player.returncode)
    99        
    100             # get output from mplayer and log it
    101             with self.lock:
    102                 stderr = self.player.stderr.read()
    103                 stdout = self.player.stdout.read()
    104            
    105             if stderr.strip() != "":
    106                 logger.error(stderr)
    107             if stdout.strip() != "":
    108                 logger.debug(stdout)
    109 
    110         with self.lock:
    111             if self.callback:
    112                 self.callback()
    113             self._done = True
     139                if self.callback:
     140                    self.callback()
     141                self.done = True
    114142
    115143    def mplayer_pause(self):
     
    119147                if not self._dryrun:
    120148                    self.player.stdin.write("pause\n")
    121                 self._paused = not(self._paused)
    122                 logger.info("paused: %s", self.is_paused)
     149                self.paused = not(self.paused)
     150                logger.info("paused: %s", self.paused)
    123151            else:
    124152                logger.warning("trying to pause non-playing job")
    125         time.sleep(self._lag)
    126153
    127154    def mplayer_stop(self):
     
    133160                else:
    134161                    self._dryrun_time = 0.0
    135                 self._paused = False
     162                self.paused = False
    136163                logger.info("stopped")
    137164            else:
  • server/lib/gutenbach/server/printer.py

    r97f20dd r3c0760f  
    1111import time
    1212import traceback
     13from queue import PausingQueue
    1314
    1415# initialize logger
    1516logger = logging.getLogger(__name__)
     17
    1618
    1719class GutenbachPrinter(threading.Thread):
     
    7981
    8082        self.finished_jobs = []
    81         self.pending_jobs = []
     83        self.lock = threading.RLock()
     84        self.pending_jobs = PausingQueue(self.lock)
    8285        self.current_job = None
    8386        self.jobs = {}
    8487
    85         self.lock = threading.RLock()
    8688        self._running = False
    87         self.paused = False
    8889
    8990        # CUPS ignores jobs with id 0, so we have to start at 1
     
    9899        return "<Printer '%s'>" % self.name
    99100
     101    @property
     102    def paused(self):
     103        return self.pending_jobs.paused
     104    @paused.setter
     105    @sync
     106    def paused(self, val):
     107        self.pending_jobs.paused = val
     108
    100109    def run(self):
    101110        self._running = True
    102         while self._running:
    103             with self.lock:
    104                 try:
    105                     if self.current_job is None:
    106                         self.start_job()
    107                     elif self.current_job.is_done:
    108                         self.complete_job()
    109                 except:
    110                     self._running = False
    111                     logger.fatal(traceback.format_exc())
    112                     break
    113             time.sleep(0.1)
     111        try:
     112            while self._running:
     113                with self.lock:
     114                    try:
     115                        self.current_job  = self.get_job(self.pending_jobs.pop())
     116                    except IndexError:
     117                        self.current_job = None
     118                        continue;
     119                    self.current_job.play()
     120                self.current_job.wait_done()
     121                with self.lock:
     122                    self.finished_jobs.append(self.current_job.id)
     123                    self.current_job = None
     124
     125        except:
     126            self._running = False
     127            logger.fatal(traceback.format_exc())
    114128
    115129    def stop(self):
     
    123137               
    124138            self._running = False
     139            self.pending_jobs.interrupt()
     140
    125141        if self.ident is not None and self.isAlive():
    126142            self.join()
     
    179195    @sync
    180196    def active_jobs(self):
    181         jobs = self.pending_jobs[:]
     197        jobs = self.pending_jobs.copy()
    182198        if self.current_job is not None:
    183199            jobs.insert(0, self.current_job.id)
     
    197213        if not self.is_running:
    198214            raise RuntimeError, "%s not started" % str(self)
    199 
    200     @sync
    201     def start_job(self):
    202         self.assert_running()
    203         if not self.paused and self.current_job is None:
    204             try:
    205                 job_id = self.pending_jobs.pop(0)
    206                 self.current_job = self.get_job(job_id)
    207                 self.current_job.play()
    208             except IndexError:
    209                 self.current_job = None
    210                    
    211     @sync
    212     def complete_job(self):
    213         self.assert_running()
    214         if not self.paused and self.current_job is not None:
    215             try:
    216                 if not self.current_job.is_done:
    217                     self.current_job.stop()
    218             finally:
    219                 self.finished_jobs.append(self.current_job.id)
    220                 self.current_job = None
    221215
    222216    @sync
     
    20082002       
    20092003        self.assert_running()
    2010         job = self.get_job(job_id)
    2011         job.priority = 1 # XXX we need to actually do something
    2012                          # correct here
     2004        self.pending_jobs.promote(job_id)
  • server/lib/gutenbach_test/server/job.py

    r97f20dd r3c0760f  
    176176        self.assertFalse(self.job.is_paused)
    177177       
    178         while self.job.is_playing:
    179             time.sleep(0.1)
     178        self.job.wait_done()
    180179           
    181180        self.assertTrue(self.job.is_done)
     
    225224
    226225if __name__ == "__main__":
    227     logging.basicConfig(loglevel=logging.CRITICAL)
     226    logging.basicConfig(loglevel=logging.DEBUG)
    228227    unittest.main()
  • server/lib/gutenbach_test/server/printer.py

    r410ad69 r3c0760f  
    4242        self.assertEqual(self.printer.finished_jobs, [])
    4343    def testPendingJobs(self):
    44         self.assertEqual(self.printer.pending_jobs, [])
     44        self.assertTrue(self.printer.pending_jobs.empty())
    4545    def testCurrentJob(self):
    4646        self.assertEqual(self.printer.current_job, None)
     
    5858        self.assertEqual(self.printer.state, States.STOPPED)
    5959
    60     def testStartJob(self):
    61         self.assertRaises(RuntimeError, self.printer.start_job)
    62     def testCompleteJob(self):
    63         self.assertRaises(RuntimeError, self.printer.complete_job)
    6460    def testGetJob(self):
    6561        self.assertRaises(RuntimeError, self.printer.get_job, 0)
Note: See TracChangeset for help on using the changeset viewer.