Changeset eee389a for server


Ignore:
Timestamp:
Jan 11, 2012, 3:13:30 PM (12 years ago)
Author:
Jessica B. Hamrick <jhamrick@…>
Branches:
no-cups
Children:
d21198f
Parents:
be6ff03
git-author:
Jessica B. Hamrick <jhamrick@…> (01/11/12 15:13:30)
git-committer:
Jessica B. Hamrick <jhamrick@…> (01/11/12 15:13:30)
Message:

Better threading model

Location:
server/lib/gutenbach/server
Files:
1 added
6 edited

Legend:

Unmodified
Added
Removed
  • server/lib/gutenbach/server/__init__.py

    re58af05 reee389a  
    1414__all__.append('GutenbachRequestHandler')
    1515
    16 from server import GutenbachServer, IPPServer
    17 __all__.append('GutenbachServer')
     16from server import IPPServer
    1817__all__.append('IPPServer')
    1918
     
    2423
    2524# configure and initialize logging
    26 logging.basicConfig(level=logging.DEBUG)
     25logging.basicConfig(level=logging.INFO)
    2726logger = logging.getLogger(__name__)
    2827
    2928def error(self, request=None, client_address=None):
    3029    logger.fatal(traceback.format_exc())
    31     self.gutenbach_server.request_stop = True
     30    self.gutenbach_printer.running = False
    3231    sys.exit(1)
    3332
    3433def start():
    3534    logger.info("Starting Gutenbach server...")
    36     gutenbach = GutenbachServer()
     35    gutenbach = GutenbachPrinter("test")
    3736    gutenbach.start()
    3837
     
    4140    httpd = BaseHTTPServer.HTTPServer(server_address, IPPServer)
    4241    httpd.handle_error = error.__get__(httpd)
    43     httpd.gutenbach_server = gutenbach
     42    httpd.gutenbach_printer = gutenbach
    4443    while gutenbach.isAlive():
    4544        try:
  • server/lib/gutenbach/server/errors.py

    rb01b6d1 reee389a  
    33    'InvalidPrinterStateException',
    44    'InvalidJobStateException',
     5    'MissingDataException'
    56    ]
    67
     
    1819
    1920class InvalidJobStateException(Exception):
     21    errstr = {
     22        3: "pending",
     23        4: "held",
     24        5: "processing",
     25        6: "stopped",
     26        7: "cancelled",
     27        8: "aborted",
     28        9: "complete"
     29        }
     30   
    2031    def __init__(self, state):
    21         self.state = hex(state)
     32        self.state = int(state)
    2233    def __str__(self):
    23         return "Invalid job state: %s" % self.state
     34        return "Invalid job state: %s (%s)" % \
     35               (self.errstr[self.state], hex(self.state))
     36
     37class MissingDataException(Exception):
     38    pass
  • server/lib/gutenbach/server/job.py

    rbe6ff03 reee389a  
    1 from . import InvalidJobException, InvalidPrinterStateException
     1from . import InvalidJobStateException, MissingDataException
     2from .player import Player
    23from gutenbach.ipp import JobStates as States
    34import os
    45import gutenbach.ipp as ipp
    56import logging
    6 import subprocess
    7 import time
    87
    98# initialize logger
     
    3433        self.name = name
    3534        self.size = size
    36         self.status = States.HELD
    37 
    38         self.document = None
     35        self.state = States.HELD
     36        self.priority = 1
     37
    3938        self.document_name = None
    4039        self.document_format = None
     
    4847        return "<Job %d '%s'>" % (self.id, self.name)
    4948
     49    def __cmp__(self, other):
     50        return cmp(self.priority, other.priority)
     51
    5052    ######################################################################
    5153    ###                          Properties                            ###
     
    98100
    99101        """
    100         if self.document:
    101             size = os.path.getsize(self.document.name)
    102         else:
    103             size = self._size
    104         return size
     102        return self._size
    105103    @size.setter
    106104    def size(self, val):
     
    110108            self._size = 0
    111109
     110    @property
     111    def is_playing(self):
     112        return self.state == States.PROCESSING
     113
     114    @property
     115    def is_ready(self):
     116        return self.state == States.PENDING
     117
     118    @property
     119    def is_finished(self):
     120        return self.state != States.PENDING and self.state != States.PROCESSING
     121       
    112122    ######################################################################
    113123    ###                            Methods                             ###
     
    115125
    116126    def play(self):
     127        """Non-blocking play function.
     128
     129        """
     130       
     131        # make sure the job is waiting to be played and that it's
     132        # valid
     133        if self.state != States.PENDING:
     134            raise InvalidJobStateException(self.state)
     135       
     136        # and set the state to processing if we're good to go
    117137        logger.info("playing job %s" % str(self))
    118         self.status = States.PROCESSING
    119         self.player = subprocess.Popen(
    120             "/usr/bin/mplayer -really-quiet -slave %s" % self.document.name,
    121             shell=True,
    122             stderr=subprocess.PIPE,
    123             stdout=subprocess.PIPE)
    124         while self.player.poll() is None:
    125             time.sleep(0.1)
    126         logger.info("mplayer finished with code %d" % self.player.returncode)
    127         stderr = self.player.stderr.read()
    128         stdout = self.player.stdout.read()
    129         if stderr.strip() != "":
    130             logger.error(stderr)
    131         logger.debug(stdout)
    132         self.player = None
    133         self.printer.complete_job(self.id)
    134 
    135     def finish(self):
    136         logger.info("finished job %s" % str(self))
    137         self.status = States.COMPLETE
     138        self.state = States.PROCESSING
     139        self.player.callback = self._completed
     140        self.player.run()
     141
     142    def pause(self):
     143        if self.player:
     144            self.player.mplayer_pause()
     145
     146    def stop(self):
     147        if self.player:
     148            self.player.callback = self._stopped
     149            self.player.mplayer_stop()
     150
     151    def _completed(self):
     152        if self.state != States.PROCESSING:
     153            raise InvalidJobStateException(self.state)
     154        logger.info("completed job %s" % str(self))
     155        self.state = States.COMPLETE
     156        self.player = None
     157
     158    def _canceled(self):
     159        if self.state != States.PROCESSING:
     160            raise InvalidJobStateException(self.state)
     161        logger.info("canceled job %s" % str(self))
     162        self.state = States.CANCELLED
     163        self.player = None
     164
     165    def _stopped(self):
     166        if self.state != States.PROCESSING:
     167            raise InvalidJobStateException(self.state)
     168        logger.info("stopped job %s" % str(self))
     169        self.state = States.STOPPED
     170        self.player = None
     171
     172    def _aborted(self):
     173        if self.state != States.PROCESSING:
     174            raise InvalidJobStateException(self.state)
     175        logger.info("aborted job %s" % str(self))
     176        self.state = States.ABORTED
     177        self.player = None
    138178
    139179    ######################################################################
     
    149189        return ipp.JobName(self.name)
    150190
    151     # XXX: we need to actually calculate this!
    152191    @property
    153192    def job_originating_user_name(self):
    154193        return ipp.JobOriginatingUserName(self.creator)
    155194
    156     # XXX: we need to actually calculate this!
    157195    @property
    158196    def job_k_octets(self):
     
    161199    @property
    162200    def job_state(self):
    163         return ipp.JobState(self.status)
     201        return ipp.JobState(self.state)
    164202
    165203    @property
     
    175213        pass
    176214
    177     def send_document(self,
    178                       document,
    179                       document_name=None,
    180                       document_format=None,
    181                       document_natural_language=None,
    182                       requesting_user_name=None,
    183                       compression=None,
     215    def send_document(self, document, document_name=None,
     216                      document_format=None, document_natural_language=None,
     217                      requesting_user_name=None, compression=None,
    184218                      last_document=None):
    185219
    186         if self.status != States.HELD:
    187             raise InvalidJobStateException(self.status)
    188        
    189         self.document = document
     220        if self.state != States.HELD:
     221            raise InvalidJobStateException(self.state)
     222
     223        self.player = Player(document)
     224
     225        if self.size == 0:
     226            self.size = os.path.getsize(document.name)
     227       
    190228        self.document_name = str(document_name)
    191229        self.document_format = str(document_format)
     
    193231        self.creator = str(requesting_user_name)
    194232        self.compression = str(compression)
    195         self.status = States.PENDING
     233        self.state = States.PENDING
    196234
    197235        logger.debug("document for job %d is '%s'" % (self.id, self.document_name))
  • server/lib/gutenbach/server/printer.py

    rb01b6d1 reee389a  
    1 from . import InvalidJobException, InvalidPrinterStateException
     1from . import InvalidJobException, InvalidPrinterStateException, InvalidJobStateException
    22from . import Job
    33from gutenbach.ipp import PrinterStates as States
     
    55import logging
    66import time
     7import threading
     8import heapq
     9import traceback
     10import sys
     11
    712
    813# initialize logger
    914logger = logging.getLogger(__name__)
    1015
    11 class GutenbachPrinter(object):
     16class GutenbachPrinter(threading.Thread):
    1217
    1318    # for IPP
     
    4449    ]
    4550       
    46     def __init__(self, name):
    47 
    48         self.name = name
    49         self.time_created = int(time.time())
    50         self.state = States.IDLE
    51 
    52         self.finished_jobs = []
    53         self.active_jobs = []
    54         self.jobs = {}
    55 
    56         # cups ignores jobs with id 0, so we have to start at 1
    57         self._next_jobid = 1
     51    def __init__(self, name, *args, **kwargs):
     52        self.lock = threading.RLock()
     53
     54        with self.lock:
     55            super(GutenbachPrinter, self).__init__(*args, **kwargs)
     56           
     57            self.name = name
     58            self.time_created = int(time.time())
     59
     60            self.finished_jobs = []
     61            self.pending_jobs = []
     62            self.current_job = None
     63            self.jobs = {}
     64
     65            self.running = False
     66            self.paused = False
     67
     68            # CUPS ignores jobs with id 0, so we have to start at 1
     69            self._next_job_id = 1
    5870
    5971    def __repr__(self):
     
    7890
    7991    @property
    80     def next_job(self):
    81         if len(self.active_jobs) == 0:
    82             job = None
    83         else:
    84             job = self.active_jobs[0]
     92    def state(self):
     93        with self.lock:
     94            if self.current_job is not None:
     95                val = States.PROCESSING
     96            elif len(self.pending_jobs) == 0:
     97                val = States.IDLE
     98            else:
     99                val = States.STOPPED
     100        return val
     101
     102    @property
     103    def active_jobs(self):
     104        with self.lock:
     105            jobs = self.pending_jobs[:]
     106            if self.current_job is not None:
     107                jobs.insert(0, self.current_job.id)
     108        return jobs
     109
     110    ######################################################################
     111    ###                            Methods                             ###
     112    ######################################################################
     113
     114    def run(self):
     115        self.running = True
     116        while self.running:
     117            with self.lock:
     118                try:
     119                    if self.current_job is None:
     120                        self.start_job()
     121                    elif self.current_job.is_finished:
     122                        self.complete_job()
     123                except:
     124                    logger.fatal(traceback.format_exc())
     125                    sys.exit(1)
     126            time.sleep(0.1)
     127
     128    def start_job(self):
     129        with self.lock:
     130            if self.current_job is None:
     131                try:
     132                    job_id = heapq.heappop(self.pending_jobs)
     133                    self.current_job = self.get_job(job_id)
     134                    self.current_job.play()
     135                except IndexError:
     136                    pass
     137                except InvalidJobStateException:
     138                    heapq.heappush(self.pending_jobs, self.current_job.id)
     139                finally:
     140                    self.current_job = None
     141                   
     142    def complete_job(self):
     143        with self.lock:
     144            if self.current_job is None:
     145                return
     146
     147            try:
     148                if not self.current_job.is_finished:
     149                    self.current_job.stop()
     150            finally:
     151                self.finished_jobs.append(self.current_job)
     152                self.current_job = None
     153
     154    def stop(self):
     155        pass
     156
     157    def get_job(self, job_id):
     158        with self.lock:
     159            if job_id not in self.jobs:
     160                raise InvalidJobException(job_id)
     161            job = self.jobs[job_id]
    85162        return job
    86 
    87     ######################################################################
    88     ###                            Methods                             ###
    89     ######################################################################
    90 
    91     def complete_job(self, jobid):
    92         job = self.jobs[self.active_jobs.pop(0)]
    93         self.finished_jobs.append(job)
    94         job.finish()
    95         return job.id
    96 
    97     def start_job(self, jobid):
    98         job = self.jobs[self.active_jobs[0]]
    99         if job.status != ipp.JobStates.PENDING:
    100             raise InvalidPrinterStateException(job.status)
    101         job.play()
    102 
    103     def stop(self):
    104         if len(self.active_jobs) == 0:
    105             return
    106         job = self.jobs[self.active_jobs[0]]
    107         if job.player is not None:
    108             logger.info("stopping printer %s" % self.name)
    109             job.player.terminate()
    110 
    111     def get_job(self, jobid):
    112         if jobid not in self.jobs:
    113             raise InvalidJobException(jobid)
    114         return self.jobs[jobid]
    115163
    116164    ######################################################################
     
    238286
    239287    def create_job(self, requesting_user_name="", job_name="", job_k_octets=0):
    240         job_id = self._next_jobid
    241         self._next_jobid += 1
     288        job_id = self._next_job_id
     289        self._next_job_id += 1
    242290       
    243291        job = Job(job_id,
     
    248296       
    249297        self.jobs[job_id] = job
    250         self.active_jobs.append(job_id)
    251         self.state = States.PROCESSING
     298        self.pending_jobs.append(job_id)
    252299       
    253300        return job
  • server/lib/gutenbach/server/requests.py

    rb01b6d1 reee389a  
    4747class GutenbachRequestHandler(object):
    4848
    49     def __init__(self, gutenbach_server):
    50         self.gutenbach_server = gutenbach_server
    51         self.printer = gutenbach_server.printer
     49    def __init__(self, gutenbach_printer):
     50        self.printer = gutenbach_printer
    5251
    5352    def generic_handle(self, request):
  • server/lib/gutenbach/server/server.py

    rffbe41d reee389a  
    88import logging
    99import sys
    10 import traceback
    1110import tempfile
    12 import threading
    13 import time
    1411
    1512# initialize logger
    1613logger = logging.getLogger(__name__)
    17 
    18 class GutenbachServer(threading.Thread):
    19 
    20     def run(self):
    21         self.printer = GutenbachPrinter(name="test")
    22         self.request_stop = False
    23 
    24         while not self.request_stop:
    25             job = self.printer.next_job
    26             if job is not None:
    27                 try:
    28                     self.printer.start_job(job)
    29                 except InvalidPrinterStateException:
    30                     pass
    31                 except:
    32                     logger.fatal(traceback.format_exc())
    33                     sys.exit(1)
    34             time.sleep(0.1)
    3514
    3615class IPPServer(BaseHTTPServer.BaseHTTPRequestHandler):
     
    10786        # throw a fatal error.
    10887        logger.debug("request: %s" % repr(request))
    109         response = GutenbachRequestHandler(self.server.gutenbach_server).handle(request)
     88        response = GutenbachRequestHandler(self.server.gutenbach_printer).handle(request)
    11089        self.send_ok(response)
Note: See TracChangeset for help on using the changeset viewer.