From ac71cf97a0515e8c50c2997fe4853bb49c5c73b8 Mon Sep 17 00:00:00 2001
From: Nathaniel Kofalt <nathaniel@kofalt.com>
Date: Fri, 6 May 2016 15:59:28 -0500
Subject: [PATCH] Add some sanity to jobs logic

---
 api/api.py                    |  36 ++-
 api/handlers/listhandler.py   |   2 +-
 api/jobs.py                   | 443 ----------------------------------
 api/jobs/__init__.py          |   0
 api/jobs/gears.py             |  44 ++++
 api/jobs/handlers.py          | 130 ++++++++++
 api/jobs/jobs.py              | 185 ++++++++++++++
 api/jobs/queue.py             | 166 +++++++++++++
 api/{ => jobs}/rules.py       |  31 ++-
 api/placer.py                 |   2 +-
 api/upload.py                 |   2 +-
 api/util.py                   |   9 -
 test/unit_tests/test_rules.py |   2 +-
 13 files changed, 579 insertions(+), 473 deletions(-)
 delete mode 100644 api/jobs.py
 create mode 100644 api/jobs/__init__.py
 create mode 100644 api/jobs/gears.py
 create mode 100644 api/jobs/handlers.py
 create mode 100644 api/jobs/jobs.py
 create mode 100644 api/jobs/queue.py
 rename api/{ => jobs}/rules.py (86%)

diff --git a/api/api.py b/api/api.py
index 3778f23b..2ec4764b 100644
--- a/api/api.py
+++ b/api/api.py
@@ -1,11 +1,16 @@
-import sys
+import bson.objectid
+import datetime
 import json
-import webapp2
+import pytz
+import sys
 import traceback
+import webapp2
 import webapp2_extras.routes
 
 from . import base
-from . import jobs
+from .jobs.jobs import Job
+from .jobs.handlers import JobsHandler, JobHandler
+from .dao.containerutil import FileReference, ContainerReference
 from . import root
 from . import util
 from . import config
@@ -196,15 +201,13 @@ routes = [
         webapp2.Route(_format(r'/<uid:{user_id_re}>/groups'),   grouphandler.GroupHandler, handler_method='get_all', methods=['GET'], name='groups'),
         webapp2.Route(_format(r'/<uid:{user_id_re}>/avatar'),   userhandler.UserHandler, handler_method='avatar', methods=['GET'], name='avatar'),
     ]),
-    webapp2.Route(r'/api/jobs',             jobs.Jobs),
+    webapp2.Route(r'/api/jobs',             JobsHandler),
     webapp2_extras.routes.PathPrefixRoute(r'/api/jobs', [
-        webapp2.Route(r'/next',             jobs.Jobs, handler_method='next', methods=['GET']),
-        webapp2.Route(r'/count',            jobs.Jobs, handler_method='count', methods=['GET']),
-        webapp2.Route(r'/stats',            jobs.Jobs, handler_method='stats', methods=['GET']),
-        webapp2.Route(r'/reap',             jobs.Jobs, handler_method='reap_stale', methods=['POST']),
-        webapp2.Route(r'/add',              jobs.Jobs, handler_method='add', methods=['POST']),
-        webapp2.Route(r'/add-raw',          jobs.Jobs, handler_method='add_raw', methods=['POST']),
-        webapp2.Route(r'/<:[^/]+>',         jobs.Job,  name='job'),
+        webapp2.Route(r'/next',             JobsHandler, handler_method='next', methods=['GET']),
+        webapp2.Route(r'/stats',            JobsHandler, handler_method='stats', methods=['GET']),
+        webapp2.Route(r'/reap',             JobsHandler, handler_method='reap_stale', methods=['POST']),
+        webapp2.Route(r'/add',              JobsHandler, handler_method='add', methods=['POST']),
+        webapp2.Route(r'/<:[^/]+>',         JobHandler,  name='job'),
     ]),
     webapp2.Route(r'/api/groups',                                   grouphandler.GroupHandler, handler_method='get_all', methods=['GET']),
     webapp2.Route(r'/api/groups',                                   grouphandler.GroupHandler, methods=['POST']),
@@ -258,11 +261,20 @@ routes = [
 ]
 
 
+def custom_json_serializer(obj):
+    if isinstance(obj, bson.objectid.ObjectId):
+        return str(obj)
+    elif isinstance(obj, datetime.datetime):
+        return pytz.timezone('UTC').localize(obj).isoformat()
+    elif isinstance(obj, Job):
+        return obj.map()
+    raise TypeError(repr(obj) + " is not JSON serializable")
+
 def dispatcher(router, request, response):
     try:
         rv = router.default_dispatcher(request, response)
         if rv is not None:
-            response.write(json.dumps(rv, default=util.custom_json_serializer))
+            response.write(json.dumps(rv, default=custom_json_serializer))
             response.headers['Content-Type'] = 'application/json; charset=utf-8'
     except webapp2.exc.HTTPException as e:
         util.send_json_http_exception(response, str(e), e.code)
diff --git a/api/handlers/listhandler.py b/api/handlers/listhandler.py
index eeecda8a..86709334 100644
--- a/api/handlers/listhandler.py
+++ b/api/handlers/listhandler.py
@@ -10,7 +10,7 @@ import zipfile
 from .. import base
 from .. import config
 from .. import files
-from .. import rules
+from ..jobs import rules
 from .. import tempdir as tempfile
 from .. import upload
 from .. import util
diff --git a/api/jobs.py b/api/jobs.py
deleted file mode 100644
index ab00d390..00000000
--- a/api/jobs.py
+++ /dev/null
@@ -1,443 +0,0 @@
-"""
-API request handlers for process-job-handling.
-"""
-
-# We shadow the standard library; this is a workaround.
-from __future__ import absolute_import
-
-import bson
-import pymongo
-import datetime
-
-from collections import namedtuple
-from .dao.containerutil import FileReference, create_filereference_from_dictionary, ContainerReference, create_containerreference_from_dictionary, create_containerreference_from_filereference
-
-from . import base
-from . import config
-from . import util
-
-log = config.log
-
-# How many times a job should be retried
-MAX_ATTEMPTS = 3
-
-JOB_STATES = [
-    'pending',  # Job is queued
-    'running',  # Job has been handed to an engine and is being processed
-    'failed',   # Job has an expired heartbeat (orphaned) or has suffered an error
-    'complete', # Job has successfully completed
-]
-
-JOB_STATES_ALLOWED_MUTATE = [
-    'pending',
-    'running',
-]
-
-JOB_TRANSITIONS = [
-    'pending --> running',
-    'running --> failed',
-    'running --> complete',
-]
-
-def valid_transition(from_state, to_state):
-    return (from_state + ' --> ' + to_state) in JOB_TRANSITIONS or from_state == to_state
-
-def get_gears():
-    """
-    Fetch the install-global gears from the database
-    """
-
-    gear_doc  = config.db.static.find_one({'_id': 'gears'})
-    return gear_doc['gear_list']
-
-def get_gear_by_name(name):
-
-    # Find a gear from the list by name
-    gear_doc = config.db.static.find_one(
-        {'_id': 'gears'},
-        {'gear_list': { '$elemMatch': {
-            'name': name
-        }}
-    })
-
-    if gear_doc is None:
-        raise Exception('Unknown gear ' + name)
-
-    # Mongo returns the full document: { '_id' : 'gears', 'gear_list' : [ { .. } ] }, so strip that out
-    return gear_doc['gear_list'][0]
-
-def queue_job_legacy(db, algorithm_id, input, tags=None, attempt_n=1, previous_job_id=None):
-    """
-    Tie together logic used from the no-manifest, single-file era.
-    Takes a single FileReference instead of a map.
-    """
-
-    if tags is None:
-        tags = []
-
-    gear = get_gear_by_name(algorithm_id)
-
-    if len(gear['manifest']['inputs']) != 1:
-        raise Exception("Legacy gear enqueue attempt of " + algorithm_id + " failed: must have exactly 1 input in manifest")
-
-    input_name = gear['manifest']['inputs'].keys()[0]
-
-    inputs = {
-        input_name: input
-    }
-
-    return queue_job(db, algorithm_id, inputs, tags=tags, attempt_n=attempt_n, previous_job_id=previous_job_id)
-
-def queue_job(db, name, inputs, destination=None, tags=None, attempt_n=1, previous_job_id=None):
-    """
-    Enqueues a job for execution.
-
-    Parameters
-    ----------
-    db: pymongo.database.Database
-        Reference to the database instance
-    name: string
-        Unique name of the algorithm
-    inputs: string -> FileReference map
-        The inputs to be used by this job
-    destination: ContainerReference (optional)
-        Where to place the gear's output. Defaults to one of the input's containers.
-    tags: string array (optional)
-        Tags that this job should be marked with.
-    attempt_n: integer (optional)
-        If an equivalent job has tried & failed before, pass which attempt number we're at. Defaults to 1 (no previous attempts).
-    previous_job_id: string (optional)
-        If an equivalent job has tried & failed before, pass the last job attempt. Defaults to None (no previous attempts).
-    """
-
-    if tags is None:
-        tags = []
-
-    now = datetime.datetime.utcnow()
-    gear = get_gear_by_name(name)
-
-    # A job is always tagged with the name of the gear, and if present, any gear-configured tags
-    tags.append(name)
-    if gear.get('tags', None):
-        tags.extend(gear['tags'])
-
-    job = {
-        'state': 'pending',
-
-        'created':  now,
-        'modified': now,
-
-        'algorithm_id': name,
-        'inputs': {},
-
-        'attempt': attempt_n,
-        'tags': tags
-    }
-
-    # Save input FileReferences
-    for i in gear['manifest']['inputs'].keys():
-        if inputs.get(i, None) is None:
-            raise Exception('Gear ' + name + ' requires input ' + i + ' but was not provided')
-
-        job['inputs'][i] = inputs[i]._asdict()
-
-    if destination is not None:
-        # Use configured destination container
-        job['destination'] = destination._asdict()
-    else:
-        # Grab an arbitrary input's container
-        key = inputs.keys()[0]
-        cr = create_containerreference_from_filereference(inputs[key])
-        job['destination'] = cr._asdict()
-
-    if previous_job_id is not None:
-        job['previous_job_id'] = previous_job_id
-
-    result = db.jobs.insert_one(job)
-    _id = result.inserted_id
-
-    return _id
-
-def retry_job(db, j, force=False):
-    """
-    Given a failed job, either retry the job or fail it permanently, based on the attempt number.
-    Can override the attempt limit by passing force=True.
-    """
-
-    if j['attempt'] < MAX_ATTEMPTS or force:
-        # Translate job out from db to type
-        for x in j['inputs'].keys():
-            j['inputs'][x] = create_filereference_from_dictionary(j['inputs'][x])
-
-        destination = None
-        if j.get('destination', None) is not None:
-            destination = create_containerreference_from_dictionary(j['destination'])
-
-        job_id = queue_job(db, j['algorithm_id'], j['inputs'], attempt_n=j['attempt']+1, previous_job_id=j['_id'], destination=destination)
-        log.info('respawned job %s as %s (attempt %d)' % (j['_id'], job_id, j['attempt']+1))
-    else:
-        log.info('permanently failed job %s (after %d attempts)' % (j['_id'], j['attempt']))
-
-def generate_formula(algorithm_id, inputs, destination, job_id=None):
-    """
-    Given an intent, generates a formula to execute a job.
-
-    Parameters
-    ----------
-    algorithm_id: string
-        Human-friendly unique name of the algorithm
-    inputs: string -> FileReference map
-        The inputs to be used by this job
-    destination: ContainerReference
-        Where to place the gear's output. Defaults to one of the input's containers.
-    job_id: string (optional)
-        The job ID this will be placed on. Enhances the file origin by adding the job ID to the upload URL.
-    """
-
-    f = {
-        'inputs': [ ],
-        'target': {
-            'command': ['bash', '-c', 'rm -rf output; mkdir -p output; ./run; echo "Exit was $?"'],
-            'env': {
-                'PATH': '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin'
-            },
-            'dir': "/flywheel/v0",
-        },
-        'outputs': [
-            {
-                'type': 'scitran',
-                'uri': '',
-                'location': '/flywheel/v0/output',
-            },
-        ],
-    }
-
-    gear = get_gear_by_name(algorithm_id)
-
-    # Add the gear
-    f['inputs'].append(gear['input'])
-
-    # Map destination to upload URI
-    f['outputs'][0]['uri'] = '/engine?level=' + destination['container_type'] + '&id=' + destination['container_id']
-
-    # Add the files
-    for input_name in inputs.keys():
-        i = inputs[input_name]
-
-        f['inputs'].append({
-            'type': 'scitran',
-            'uri': '/' + i['container_type'] + 's/' + i['container_id'] + '/files/' + i['filename'],
-            'location': '/flywheel/v0/input/' + input_name,
-        })
-
-    # Log job origin if provided
-    if job_id:
-        f['outputs'][0]['uri'] += '&job=' + job_id
-
-    return f
-
-
-class Jobs(base.RequestHandler):
-
-    """Provide /jobs API routes."""
-
-    def get(self):
-        """
-        List all jobs. Not used by engine.
-        """
-        if not self.superuser_request:
-            self.abort(403, 'Request requires superuser')
-
-        results = list(config.db.jobs.find())
-
-        return results
-
-    def add(self):
-        """
-        Add a manifest-aware job to the queue.
-        """
-
-        # TODO: Check each input container for R, check dest container for RW
-        if not self.superuser_request:
-            self.abort(403, 'Request requires superuser')
-
-        # TODO: json schema
-
-        submit = self.request.json
-        gear_name = submit['gear']
-
-        # Translate maps to FileReferences
-        inputs = {}
-        for x in submit['inputs'].keys():
-            input_map = submit['inputs'][x]
-            inputs[x] = create_filereference_from_dictionary(input_map)
-
-        # Add job tags, attempt number, and/or previous job ID, if present
-        tags            = submit.get('tags', None)
-        attempt_n       = submit.get('attempt_n', 1)
-        previous_job_id = submit.get('previous_job_id', None)
-
-        # Add destination container, if present
-        destination = None
-        if submit.get('destination', None) is not None:
-            destination = create_containerreference_from_dictionary(submit['destination'])
-
-        # Return enqueued job ID
-        job_id = queue_job(config.db, gear_name, inputs, destination=destination, tags=tags, attempt_n=attempt_n, previous_job_id=previous_job_id)
-        return { '_id': job_id }
-
-    def add_raw(self):
-        """
-        Add a blob of JSON to the jobs table. Absolutely no validation.
-        """
-        if not self.superuser_request:
-            self.abort(403, 'Request requires superuser')
-
-        return { "_id": config.db.jobs.insert_one(self.request.json).inserted_id }
-
-    def count(self):
-        """Return the total number of jobs. Not used by engine."""
-        if not self.superuser_request:
-            self.abort(403, 'Request requires superuser')
-
-        return config.db.jobs.count()
-
-    def stats(self):
-        if not self.superuser_request:
-            self.abort(403, 'Request requires superuser')
-
-        # Count jobs by state
-        result = config.db.jobs.aggregate([{"$group": {"_id": "$state", "count": {"$sum": 1}}}])
-        # Map mongo result to a useful object
-        by_state = {s: 0 for s in JOB_STATES}
-        by_state.update({r['_id']: r['count'] for r in result})
-
-        # Count jobs by tag grouping
-        result = list(config.db.jobs.aggregate([{"$group": {"_id": "$tags", "count": {"$sum": 1}}}]))
-        by_tag = []
-        for r in result:
-            by_tag.append({'tags': r['_id'], 'count': r['count']})
-
-        # Count jobs that will not be retried
-        permafailed = config.db.jobs.count({"attempt": {"$gte": MAX_ATTEMPTS}, "state":"failed"})
-
-        return {
-            'by-state': by_state,
-            'by-tag': by_tag,
-            'permafailed': permafailed
-        }
-
-    def next(self):
-        """
-        Atomically change a 'pending' job to 'running' and returns it. Updates timestamp.
-        Will return empty if there are no jobs to offer.
-        Engine will poll this endpoint whenever there are free processing slots.
-        """
-        if not self.superuser_request:
-            self.abort(403, 'Request requires superuser')
-
-        # First, atomically mark document as running.
-        result = config.db.jobs.find_one_and_update(
-            {
-                'state': 'pending'
-            },
-            { '$set': {
-                'state': 'running',
-                'modified': datetime.datetime.utcnow()}
-            },
-            sort=[('modified', 1)],
-            return_document=pymongo.collection.ReturnDocument.AFTER
-        )
-
-        if result is None:
-            self.abort(400, 'No jobs to process')
-
-        str_id = str(result['_id'])
-
-        # Second, update document to store formula request.
-        result = config.db.jobs.find_one_and_update(
-            {
-                '_id': result['_id']
-            },
-            { '$set': {
-                'request': generate_formula(result['algorithm_id'], result['inputs'], result['destination'], job_id=str_id)}
-            },
-            return_document=pymongo.collection.ReturnDocument.AFTER
-        )
-
-        if result is None:
-            self.abort(500, 'Marked job as running but could not generate and save formula')
-
-        return result
-
-    def reap_stale(self):
-        if not self.superuser_request:
-            self.abort(403, 'Request requires superuser')
-
-        while True:
-            j = config.db.jobs.find_one_and_update(
-                {
-                    'state': 'running',
-                    'modified': {'$lt': datetime.datetime.utcnow() - datetime.timedelta(seconds=100)},
-                },
-                {
-                    '$set': {
-                        'state': 'failed',
-                    },
-                },
-                )
-            if j is None:
-                break
-            else:
-                retry_job(config.db, j)
-
-
-class Job(base.RequestHandler):
-
-    """Provides /Jobs/<jid> routes."""
-
-    def get(self, _id):
-        if not self.superuser_request:
-            self.abort(403, 'Request requires superuser')
-
-        result = config.db.jobs.find_one({'_id': bson.ObjectId(_id)})
-        return result
-
-    def put(self, _id):
-        """
-        Update a job. Updates timestamp.
-        Enforces a valid state machine transition, if any.
-        Rejects any change to a job that is not currently in 'pending' or 'running' state.
-        """
-        if not self.superuser_request:
-            self.abort(403, 'Request requires superuser')
-
-        mutation = self.request.json
-        job = config.db.jobs.find_one({'_id': bson.ObjectId(_id)})
-
-        if job is None:
-            self.abort(404, 'Job not found')
-
-        if job['state'] not in JOB_STATES_ALLOWED_MUTATE:
-            self.abort(404, 'Cannot mutate a job that is ' + job['state'] + '.')
-
-        if 'state' in mutation and not valid_transition(job['state'], mutation['state']):
-            self.abort(404, 'Mutating job from ' + job['state'] + ' to ' + mutation['state'] + ' not allowed.')
-
-        # Any modification must be a timestamp update
-        mutation['modified'] = datetime.datetime.utcnow()
-
-        # Create an object with all the fields that must not have changed concurrently.
-        job_query =  {
-            '_id': job['_id'],
-            'state': job['state'],
-        }
-
-        result = config.db.jobs.update_one(job_query, {'$set': mutation})
-        if result.modified_count != 1:
-            self.abort(500, 'Job modification not saved')
-
-        # If the job did not succeed, check to see if job should be retried.
-        if 'state' in mutation and mutation['state'] == 'failed':
-            job = config.db.jobs.find_one({'_id': bson.ObjectId(_id)})
-            retry_job(config.db, job)
diff --git a/api/jobs/__init__.py b/api/jobs/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/api/jobs/gears.py b/api/jobs/gears.py
new file mode 100644
index 00000000..4874fd12
--- /dev/null
+++ b/api/jobs/gears.py
@@ -0,0 +1,44 @@
+"""
+Gears
+"""
+
+# We shadow the standard library; this is a workaround.
+from __future__ import absolute_import
+
+import bson
+import pymongo
+import datetime
+
+from collections import namedtuple
+from ..dao.containerutil import FileReference, create_filereference_from_dictionary, ContainerReference, create_containerreference_from_dictionary, create_containerreference_from_filereference
+
+from .. import base
+from .. import config
+from .. import util
+
+log = config.log
+
+
+def get_gears():
+    """
+    Fetch the install-global gears from the database
+    """
+
+    gear_doc  = config.db.static.find_one({'_id': 'gears'})
+    return gear_doc['gear_list']
+
+def get_gear_by_name(name):
+
+    # Find a gear from the list by name
+    gear_doc = config.db.static.find_one(
+        {'_id': 'gears'},
+        {'gear_list': { '$elemMatch': {
+            'name': name
+        }}
+    })
+
+    if gear_doc is None:
+        raise Exception('Unknown gear ' + name)
+
+    # Mongo returns the full document: { '_id' : 'gears', 'gear_list' : [ { .. } ] }, so strip that out
+    return gear_doc['gear_list'][0]
diff --git a/api/jobs/handlers.py b/api/jobs/handlers.py
new file mode 100644
index 00000000..f35a9e5c
--- /dev/null
+++ b/api/jobs/handlers.py
@@ -0,0 +1,130 @@
+"""
+API request handlers for the jobs module
+"""
+
+# We shadow the standard library; this is a workaround.
+from __future__ import absolute_import
+
+import bson
+import pymongo
+import datetime
+
+from collections import namedtuple
+from ..dao.containerutil import FileReference, create_filereference_from_dictionary, ContainerReference, create_containerreference_from_dictionary, create_containerreference_from_filereference
+
+from .. import base
+from .. import config
+from .. import util
+from .jobs import Job
+from .queue import Queue, JOB_STATES, MAX_ATTEMPTS
+
+log = config.log
+
+
+class JobsHandler(base.RequestHandler):
+
+    """Provide /jobs API routes."""
+
+    def get(self):
+        """
+        List all jobs.
+        """
+        if not self.superuser_request:
+            self.abort(403, 'Request requires superuser')
+
+        results = list(config.db.jobs.find())
+
+        return results
+
+    def add(self):
+        """
+        Add a job to the queue.
+        """
+
+        # TODO: Check each input container for R, check dest container for RW
+        if not self.superuser_request:
+            self.abort(403, 'Request requires superuser')
+
+        submit = self.request.json
+        gear_name = submit['gear']
+
+        # Translate maps to FileReferences
+        inputs = {}
+        for x in submit['inputs'].keys():
+            input_map = submit['inputs'][x]
+            inputs[x] = create_filereference_from_dictionary(input_map)
+
+        # Add job tags, attempt number, and/or previous job ID, if present
+        tags            = submit.get('tags', None)
+        attempt_n       = submit.get('attempt_n', 1)
+        previous_job_id = submit.get('previous_job_id', None)
+
+        # Add destination container, if present
+        destination = None
+        if submit.get('destination', None) is not None:
+            destination = create_containerreference_from_dictionary(submit['destination'])
+
+        job = Job(gear_name, inputs, destination=destination, tags=tags, attempt=attempt_n, previous_job_id=previous_job_id)
+        return job.insert()
+
+    def stats(self):
+        if not self.superuser_request:
+            self.abort(403, 'Request requires superuser')
+
+        return Queue.get_statistics()
+
+    def next(self):
+        if not self.superuser_request:
+            self.abort(403, 'Request requires superuser')
+
+        job = Queue.start_job()
+
+        if job is None:
+            self.abort(400, 'No jobs to process')
+        else:
+            return job
+
+    def reap_stale(self):
+        if not self.superuser_request:
+            self.abort(403, 'Request requires superuser')
+
+        while True:
+            doc = config.db.jobs.find_one_and_update(
+                {
+                    'state': 'running',
+                    'modified': {'$lt': datetime.datetime.utcnow() - datetime.timedelta(seconds=100)},
+                },
+                {
+                    '$set': {
+                        'state': 'failed',
+                    },
+                },
+                )
+            if doc is None:
+                break
+            else:
+                j = Job.load(doc)
+                Queue.retry(j)
+
+
+class JobHandler(base.RequestHandler):
+
+    """Provides /Jobs/<jid> routes."""
+
+    def get(self, _id):
+        if not self.superuser_request:
+            self.abort(403, 'Request requires superuser')
+
+        return Job.get(_id)
+
+    def put(self, _id):
+        """
+        Update a job. Updates timestamp.
+        Enforces a valid state machine transition, if any.
+        Rejects any change to a job that is not currently in 'pending' or 'running' state.
+        """
+        if not self.superuser_request:
+            self.abort(403, 'Request requires superuser')
+
+        j = Job.get(_id)
+        Queue.mutate(j, self.request.json)
diff --git a/api/jobs/jobs.py b/api/jobs/jobs.py
new file mode 100644
index 00000000..fe211231
--- /dev/null
+++ b/api/jobs/jobs.py
@@ -0,0 +1,185 @@
+"""
+Jobs
+"""
+
+# We shadow the standard library; this is a workaround.
+from __future__ import absolute_import
+
+import bson
+import pymongo
+import datetime
+
+from collections import namedtuple
+from ..dao.containerutil import FileReference, create_filereference_from_dictionary, ContainerReference, create_containerreference_from_dictionary, create_containerreference_from_filereference
+
+from .. import base
+from .. import config
+from .. import util
+from . import gears
+
+log = config.log
+
+
+class Job(object):
+    def __init__(self, algorithm_id, inputs, destination=None, tags=None, attempt=1, previous_job_id=None, created=None, modified=None, state='pending', request=None, _id=None):
+        """
+        Creates a job.
+
+        Parameters
+        ----------
+        algorithm_id: string
+            Unique name of the algorithm
+        inputs: string -> FileReference map
+            The inputs to be used by this job
+        destination: ContainerReference (optional)
+            Where to place the gear's output. Defaults to one of the input's containers.
+        tags: string array (optional)
+            Tags that this job should be marked with.
+        attempt: integer (optional)
+            If an equivalent job has tried & failed before, pass which attempt number we're at. Defaults to 1 (no previous attempts).
+        previous_job_id: string (optional)
+            If an equivalent job has tried & failed before, pass the last job attempt. Defaults to None (no previous attempts).
+        ...
+        ...
+        """
+
+        # TODO: validate inputs against the manifest
+
+        now = datetime.datetime.utcnow()
+
+        if tags is None:
+            tags = []
+        if created is None:
+            created = now
+        if modified is None:
+            modified = now
+
+        if destination is None:
+            # Grab an arbitrary input's container
+            key = inputs.keys()[0]
+            destination = create_containerreference_from_filereference(inputs[key])
+
+        # A job is always tagged with the name of the gear
+        tags.append(algorithm_id)
+
+        # Trim tags array to unique members...
+        tags = list(set(tags))
+
+        self.algorithm_id    = algorithm_id
+        self.inputs          = inputs
+        self.destination     = destination
+        self.tags            = tags
+        self.attempt         = attempt
+        self.previous_job_id = previous_job_id
+        self.created         = created
+        self.modified        = modified
+        self.state           = state
+        self.request         = request
+        self._id             = _id
+
+    @classmethod
+    def load(cls, d):
+        # TODO: validate
+
+        inputs = d['inputs']
+        for x in inputs.keys():
+            inputs[x] = create_filereference_from_dictionary(inputs[x])
+
+        d['destination'] = create_containerreference_from_dictionary(d['destination'])
+
+        d['_id'] = str(d['_id'])
+
+        return cls(d['algorithm_id'], d['inputs'], destination=d['destination'], tags=d['tags'], attempt=d['attempt'], previous_job_id=d.get('previous_job_id', None), created=d['created'], modified=d['modified'], state=d['state'], request=d.get('request', None), _id=d['_id'])
+
+    @classmethod
+    def get(cls, _id):
+        doc = config.db.jobs.find_one({'_id': bson.ObjectId(_id)})
+        return cls.load(doc)
+
+    def map(self):
+        """
+        Flatten struct to map
+        """
+
+        d = self.__dict__
+        d['destination'] = d['destination'].__dict__
+
+        for x in d['inputs'].keys():
+            d['inputs'][x] = d['inputs'][x].__dict__
+
+        if d['_id'] is None:
+            d.pop('_id')
+        if d['previous_job_id'] is None:
+            d.pop('previous_job_id')
+        if d['request'] is None:
+            d.pop('request')
+
+        return d
+
+    def mongo(self):
+        d = self.map()
+        if d.get('_id', None):
+            d['_id'] = bson.ObjectId(d['_id'])
+
+        return d
+
+    def insert(self):
+        if self._id is not None:
+            raise Exception('Cannot insert job that has already been inserted')
+
+        result = config.db.jobs.insert_one(self.mongo())
+        return result.inserted_id
+
+    def generate_request(self, gear=None):
+        """
+        Generate the job's request, save it to the class, and return it
+
+        Parameters
+        ----------
+        gear: map (optional)
+            A gear_list map from the static.gears table. Will be loaded by the job's algorithm_id otherwise.
+        """
+
+        r = {
+            'inputs': [ ],
+            'target': {
+                'command': ['bash', '-c', 'rm -rf output; mkdir -p output; ./run; echo "Exit was $?"'],
+                'env': {
+                    'PATH': '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin'
+                },
+                'dir': "/flywheel/v0",
+            },
+            'outputs': [
+                {
+                    'type': 'scitran',
+                    'uri': '',
+                    'location': '/flywheel/v0/output',
+                },
+            ],
+        }
+
+        if gear is None:
+            gear = gears.get_gear_by_name(self.algorithm_id)
+
+        # Add the gear
+        r['inputs'].append(gear['input'])
+
+        # Map destination to upload URI
+        r['outputs'][0]['uri'] = '/engine?level=' + self.destination.container_type + '&id=' + self.destination.container_id
+
+        # Add the files
+        for input_name in self.inputs.keys():
+            i = self.inputs[input_name]
+
+            r['inputs'].append({
+                'type': 'scitran',
+                'uri': '/' + i.container_type + 's/' + i.container_id + '/files/' + i.filename,
+                'location': '/flywheel/v0/input/' + input_name,
+            })
+
+        # Log job origin if provided
+        if self._id:
+            r['outputs'][0]['uri'] += '&job=' + self._id
+
+        self.request = r
+        return self.request
diff --git a/api/jobs/queue.py b/api/jobs/queue.py
new file mode 100644
index 00000000..f480aadf
--- /dev/null
+++ b/api/jobs/queue.py
@@ -0,0 +1,166 @@
+"""
+A simple FIFO queue for jobs.
+"""
+
+# We shadow the standard library; this is a workaround.
+from __future__ import absolute_import
+
+import bson
+import copy
+import pymongo
+import datetime
+
+from collections import namedtuple
+from ..dao.containerutil import FileReference, create_filereference_from_dictionary, ContainerReference, create_containerreference_from_dictionary, create_containerreference_from_filereference
+
+from .. import base
+from .. import config
+from .. import util
+from .jobs import Job
+
+log = config.log
+
+
+# How many times a job should be retried
+MAX_ATTEMPTS = 3
+
+JOB_STATES = [
+    'pending',  # Job is queued
+    'running',  # Job has been handed to an engine and is being processed
+    'failed',   # Job has an expired heartbeat (orphaned) or has suffered an error
+    'complete', # Job has successfully completed
+]
+
+JOB_STATES_ALLOWED_MUTATE = [
+    'pending',
+    'running',
+]
+
+JOB_TRANSITIONS = [
+    'pending --> running',
+    'running --> failed',
+    'running --> complete',
+]
+
+def valid_transition(from_state, to_state):
+    return (from_state + ' --> ' + to_state) in JOB_TRANSITIONS or from_state == to_state
+
+class Queue(object):
+
+    @staticmethod
+    def mutate(job, mutation):
+        """
+        Validate and save a job mutation
+        """
+
+        if job.state not in JOB_STATES_ALLOWED_MUTATE:
+            raise Exception('Cannot mutate a job that is ' + job.state + '.')
+
+        if 'state' in mutation and not valid_transition(job.state, mutation['state']):
+            raise Exception('Mutating job from ' + job.state + ' to ' + mutation['state'] + ' not allowed.')
+
+        # Any modification must be a timestamp update
+        mutation['modified'] = datetime.datetime.utcnow()
+
+        # Create an object with all the fields that must not have changed concurrently.
+        job_query =  {
+            '_id': bson.ObjectId(job._id),
+            'state': job.state,
+        }
+
+        result = config.db.jobs.update_one(job_query, {'$set': mutation})
+        if result.modified_count != 1:
+            raise Exception('Job modification not saved')
+
+        # If the job did not succeed, check to see if job should be retried.
+        if 'state' in mutation and mutation['state'] == 'failed':
+            Queue.retry(job)
+
+    @staticmethod
+    def retry(job, force=False):
+        """
+        Given a failed job, either retry the job or fail it permanently, based on the attempt number.
+        Can override the attempt limit by passing force=True.
+        """
+
+        if job.attempt >= MAX_ATTEMPTS and not force:
+            log.info('Permanently failed job %s (after %d attempts)' % (job._id, job.attempt))
+            return
+
+        new_job = copy.deepcopy(job)
+        new_job._id = None
+        new_job.state = 'pending'
+        new_job.attempt += 1
+        new_job.previous_job_id = job._id
+
+        new_id = new_job.insert()
+        log.info('respawned job %s as %s (attempt %d)' % (job._id, new_id, new_job.attempt))
+
+    @staticmethod
+    def start_job():
+        """
+        Atomically change a 'pending' job to 'running' and returns it. Updates timestamp.
+        Will return None if there are no jobs to offer.
+        """
+
+        # First, atomically mark document as running.
+        result = config.db.jobs.find_one_and_update(
+            {
+                'state': 'pending'
+            },
+            { '$set': {
+                'state': 'running',
+                'modified': datetime.datetime.utcnow()}
+            },
+            sort=[('modified', 1)],
+            return_document=pymongo.collection.ReturnDocument.AFTER
+        )
+
+        if result is None:
+            return None
+
+        job = Job.load(result)
+        request = job.generate_request()
+
+        # Second, update document to store formula request.
+        result = config.db.jobs.find_one_and_update(
+            {
+                '_id': bson.ObjectId(job._id)
+            },
+            { '$set': {
+                'request': request }
+            },
+            return_document=pymongo.collection.ReturnDocument.AFTER
+        )
+
+        if result is None:
+            raise Exception('Marked job as running but could not generate and save formula')
+
+        return result
+
+    @staticmethod
+    def get_statistics():
+        """
+        Return a variety of interesting information about the job queue.
+        """
+
+        # Count jobs by state
+        result = config.db.jobs.aggregate([{"$group": {"_id": "$state", "count": {"$sum": 1}}}])
+        # Map mongo result to a useful object
+        by_state = {s: 0 for s in JOB_STATES}
+        by_state.update({r['_id']: r['count'] for r in result})
+
+        # Count jobs by tag grouping
+        result = list(config.db.jobs.aggregate([{"$group": {"_id": "$tags", "count": {"$sum": 1}}}]))
+        by_tag = []
+        for r in result:
+            by_tag.append({'tags': r['_id'], 'count': r['count']})
+
+        # Count jobs that will not be retried
+        permafailed = config.db.jobs.count({"attempt": {"$gte": MAX_ATTEMPTS}, "state":"failed"})
+
+        return {
+            'by-state': by_state,
+            'by-tag': by_tag,
+            'permafailed': permafailed
+        }
diff --git a/api/rules.py b/api/jobs/rules.py
similarity index 86%
rename from api/rules.py
rename to api/jobs/rules.py
index e8f1b69e..92949c95 100644
--- a/api/rules.py
+++ b/api/jobs/rules.py
@@ -1,9 +1,11 @@
 import fnmatch
 import json
 
-from . import jobs
-from . import config
-from .dao.containerutil import FileReference
+from .. import config
+from ..dao.containerutil import FileReference
+
+from . import gears
+from .jobs import Job
 
 log = config.log
 
@@ -85,7 +87,6 @@ def eval_match(match_type, match_param, file_, container):
 
     raise Exception('Unimplemented match type ' + match_type)
 
-
 def eval_rule(rule, file_, container):
     """
     Decide if a rule should spawn a job.
@@ -111,6 +112,25 @@ def eval_rule(rule, file_, container):
 
     return True
 
+def queue_job_legacy(db, algorithm_id, input):
+    """
+    Tie together logic used from the no-manifest, single-file era.
+    Takes a single FileReference instead of a map.
+    """
+
+    gear = gears.get_gear_by_name(algorithm_id)
+
+    if len(gear['manifest']['inputs']) != 1:
+        raise Exception("Legacy gear enqueue attempt of " + algorithm_id + " failed: must have exactly 1 input in manifest")
+
+    input_name = gear['manifest']['inputs'].keys()[0]
+
+    inputs = {
+        input_name: input
+    }
+
+    job = Job(algorithm_id, inputs)
+    return job.insert()
 
 def create_jobs(db, container, container_type, file_):
     """
@@ -131,7 +151,8 @@ def create_jobs(db, container, container_type, file_):
         if eval_rule(rule, file_, container):
             alg_name = rule['alg']
             input = FileReference(container_type=container_type, container_id=str(container['_id']), filename=file_['name'])
-            jobs.queue_job_legacy(db, alg_name, input)
+
+            queue_job_legacy(db, alg_name, input)
             job_list.append(alg_name)
 
     return job_list
diff --git a/api/placer.py b/api/placer.py
index 9cc4aed3..a6884084 100644
--- a/api/placer.py
+++ b/api/placer.py
@@ -10,7 +10,7 @@ import zipfile
 from . import base
 from . import config
 from . import files
-from . import rules
+from .jobs import rules
 from . import tempdir as tempfile
 from . import util
 from . import validators
diff --git a/api/upload.py b/api/upload.py
index 6c5949cc..f6db4050 100644
--- a/api/upload.py
+++ b/api/upload.py
@@ -7,7 +7,7 @@ import shutil
 from . import base
 from . import config
 from . import files
-from . import rules
+from .jobs import rules
 from . import tempdir as tempfile
 from . import placer as pl
 from . import util
diff --git a/api/util.py b/api/util.py
index 153337f7..d01ed013 100644
--- a/api/util.py
+++ b/api/util.py
@@ -1,11 +1,9 @@
-import bson.objectid
 import datetime
 import enum as baseEnum
 import errno
 import json
 import mimetypes
 import os
-import pytz
 import tempdir as tempfile
 import uuid
 import requests
@@ -156,13 +154,6 @@ def format_hash(hash_alg, hash_):
     return '-'.join(('v0', hash_alg, hash_))
 
 
-def custom_json_serializer(obj):
-    if isinstance(obj, bson.objectid.ObjectId):
-        return str(obj)
-    elif isinstance(obj, datetime.datetime):
-        return pytz.timezone('UTC').localize(obj).isoformat()
-    raise TypeError(repr(obj) + " is not JSON serializable")
-
 def send_json_http_exception(response, message, code):
     response.set_status(code)
     content = json.dumps({
diff --git a/test/unit_tests/test_rules.py b/test/unit_tests/test_rules.py
index 42820f09..d37a1497 100644
--- a/test/unit_tests/test_rules.py
+++ b/test/unit_tests/test_rules.py
@@ -1,6 +1,6 @@
 
 import pytest
-from api import rules
+from api.jobs import rules
 
 # Statefully holds onto some construction args and can return tuples to unroll for calling rules.eval_match.
 # Might indicate a need for a match tuple in rules.py.
-- 
GitLab