Skip to content
Snippets Groups Projects
placer.py 19.6 KiB
Newer Older
import bson
import copy
import datetime
import dateutil
import os
import pymongo
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed
import shutil
import zipfile

from . import config
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed
from . import encoder
from . import files
from .jobs import rules
from . import tempdir as tempfile
from . import util
from . import validators
from .dao import containerutil, hierarchy

class Placer(object):
    """
    Interface for a placer, which knows how to process files and place them where they belong - on disk and database.
    """

    def __init__(self, container_type, container, id_, metadata, timestamp, origin, context):
        self.container_type = container_type
        self.container      = container
Megan Henning's avatar
Megan Henning committed
        self.id_            = id_
        self.metadata       = metadata
        self.timestamp      = timestamp
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed

        # An origin map for the caller
        self.origin         = origin
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed
        # A placer-defined map for breaking the Placer abstraction layer.
        self.context        = context

        # Should the caller expect a normal map return, or a generator that gets mapped to Server-Sent Events?
        self.sse            = False

        # A list of files that have been saved via save_file() usually returned by finalize()
        self.saved          = []

    def check(self):
        """
        Run any pre-processing checks. Expected to throw on error.
        """
        raise NotImplementedError()

    def process_file_field(self, field, info):
        """"
        Process a single file field.
        """
        raise NotImplementedError()

    def finalize(self):
        """
        Run any post-processing work. Expected to return output for the callee.
        """
        raise NotImplementedError()

    def requireTarget(self):
        """
        Helper function that throws unless a container was provided.
        """
Megan Henning's avatar
Megan Henning committed
        if self.id_ is None or self.container is None or self.container_type is None:
            raise Exception('Must specify a target')

    def requireMetadata(self):
        """
        Helper function that throws unless metadata was provided.
        """
        if self.metadata == None:
            raise Exception('Metadata required')

Renzo Frigato's avatar
Renzo Frigato committed
    def save_file(self, field=None, info=None):
        """
        Helper function that moves a file saved via a form field into our CAS.
        May trigger jobs, if applicable, so this should only be called once we're ready for that.

        Requires an augmented file field; see process_upload() for details.
        """

        # Save file
Renzo Frigato's avatar
Renzo Frigato committed
        if field is not None:
            files.move_form_file_field_into_cas(field)

        # Update the DB
Renzo Frigato's avatar
Renzo Frigato committed
        if info is not None:
Megan Henning's avatar
Megan Henning committed
            hierarchy.upsert_fileinfo(self.container_type, self.id_, info)
Renzo Frigato's avatar
Renzo Frigato committed
            # Queue any jobs as a result of this upload
            rules.create_jobs(config.db, self.container, self.container_type, info)


class TargetedPlacer(Placer):
    """
    A placer that can accept N files to a specific container (acquisition, etc).

    LIMITATION: To temporarily avoid messing with the JSON schema, this endpoint can only consume one file :(
    An exception is thrown in upload.process_upload() if you try. This could be fixed by making a better schema.
    """

    def check(self):
        self.requireTarget()
        validators.validate_data(self.metadata, 'file.json', 'input', 'POST', optional=True)

    def process_file_field(self, field, info):
        if self.metadata:
            info.update(self.metadata)
        self.save_file(field, info)
        self.saved.append(info)

    def finalize(self):
        return self.saved


class UIDPlacer(Placer):
    A placer that can accept multiple files.
    It uses the method upsert_bottom_up_hierarchy to create its project/session/acquisition hierarchy
    Sessions and acquisitions are identified by UID.
    metadata_schema = 'uidupload.json'
    create_hierarchy = staticmethod(hierarchy.upsert_bottom_up_hierarchy)
    def __init__(self, container_type, container, id_, metadata, timestamp, origin, context):
        super(UIDPlacer, self).__init__(container_type, container, id_, metadata, timestamp, origin, context)
        self.metadata_for_file = {}


    def check(self):
        self.requireMetadata()

        payload_schema_uri = validators.schema_uri('input', self.metadata_schema)
        metadata_validator = validators.from_schema_path(payload_schema_uri)
        metadata_validator(self.metadata, 'POST')

        targets = self.create_hierarchy(self.metadata)
        self.metadata_for_file = {}

        for target in targets:
            for name in target[1]:
                self.metadata_for_file[name] = {
                    'container': target[0],
                    'metadata': target[1][name]
                }

    def process_file_field(self, field, info):

        # For the file, given self.targets, choose a target

        name        = field.filename
        target      = self.metadata_for_file.get(name)
        # if the file was not included in the metadata skip it
        if not target:
            return
        container   = target['container']
        r_metadata  = target['metadata']
        info.update(r_metadata)
        if container.level != 'subject':
            self.container_type = container.level
            self.id_            = container._id
            self.container      = container.container
            self.save_file(field, info)
        else:
            if field is not None:
                files.move_form_file_field_into_cas(field)
            if info is not None:
                container.upsert_file(info)
                # # Queue any jobs as a result of this upload
                # rules.create_jobs(config.db, self.container, self.container_type, info)

        self.saved.append(info)

    def finalize(self):
        return self.saved


class LabelPlacer(UIDPlacer):
    A placer that create a hierarchy based on labels.
    It uses the method upsert_top_down_hierarchy to create its project/session/acquisition hierarchy
    Sessions and acquisitions are identified by label.
    """
    metadata_schema = 'labelupload.json'
    create_hierarchy = staticmethod(hierarchy.upsert_top_down_hierarchy)
class UIDMatchPlacer(UIDPlacer):
    """
    A placer that uploads to an existing hierarchy it finds based on uid.
    """

    metadata_schema = 'uidmatchupload.json'
    create_hierarchy = staticmethod(hierarchy.find_existing_hierarchy)


class EnginePlacer(Placer):
    """
    A placer that can accept files and/or metadata sent to it from the engine

    It uses update_container_hierarchy to update the container and it's parents' fields from the metadata
    """

    def check(self):
        self.requireTarget()
        if self.metadata is not None:
            validators.validate_data(self.metadata, 'enginemetadata.json', 'input', 'POST', optional=True)
    def process_file_field(self, field, info):
        if self.metadata is not None:
            file_mds = self.metadata.get(self.container_type, {}).get('files', [])

            for file_md in file_mds:
                if file_md['name'] == info['name']:
                    info.update(file_md)
                    break

        self.save_file(field, info)
        self.saved.append(info)

    def finalize(self):
        if self.metadata is not None:
Megan Henning's avatar
Megan Henning committed
            bid = bson.ObjectId(self.id_)

            # Remove file metadata as it was already updated in process_file_field
            for k in self.metadata.keys():
                self.metadata[k].pop('files', {})
            hierarchy.update_container_hierarchy(self.metadata, bid, self.container_type)
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed
class TokenPlacer(Placer):
    """
    A placer that can accept N files and save them to a persistent directory across multiple requests.
    Intended for use with a token that tracks where the files will be stored.
    """

    def __init__(self, container_type, container, id_, metadata, timestamp, origin, context):
        super(TokenPlacer, self).__init__(container_type, container, id_, metadata, timestamp, origin, context)

        self.paths  =   []
        self.folder =   None

Nathaniel Kofalt's avatar
Nathaniel Kofalt committed
    def check(self):
        token = self.context['token']

        if token is None:
            raise Exception('TokenPlacer requires a token')

        # This logic is used by:
        #   TokenPlacer.check
        #   PackfilePlacer.check
        #   upload.clean_packfile_tokens
        #
        # It must be kept in sync between each instance.
        base_path = config.get_item('persistent', 'data_path')
        self.folder = os.path.join(base_path, 'tokens', 'packfile', token)
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed

        util.mkdir_p(self.folder)

    def process_file_field(self, field, info):
        self.saved.append(info)
        self.paths.append(field.path)

    def finalize(self):
        for path in self.paths:
            dest = os.path.join(self.folder, os.path.basename(path))
            shutil.move(path, dest)

        return self.saved


class PackfilePlacer(Placer):
    """
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed
    A placer that can accept N files, save them into a zip archive, and place the result on an acquisition.
    def __init__(self, container_type, container, id_, metadata, timestamp, origin, context):
        super(PackfilePlacer, self).__init__(container_type, container, id_, metadata, timestamp, origin, context)
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed
        # This endpoint is an SSE endpoint
        self.sse            = True

        # Populated in check(), used in finalize()
        self.p_id           = None
        self.s_label        = None
        self.a_label        = None
        self.g_id           = None

        self.permissions    = {}
        self.folder         = None
        self.dir_           = None
        self.name           = None
        self.path           = None
        self.zip_           = None
        self.ziptime        = None
        self.tempdir        = None
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed
        token = self.context['token']

        if token is None:
            raise Exception('PackfilePlacer requires a token')

        # This logic is used by:
        #   TokenPlacer.check
        #   PackfilePlacer.check
        #   upload.clean_packfile_tokens
        #
        # It must be kept in sync between each instance.
        base_path = config.get_item('persistent', 'data_path')
        self.folder = os.path.join(base_path, 'tokens', 'packfile', token)
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed

        if not os.path.isdir(self.folder):
            raise Exception('Packfile directory does not exist or has been deleted')

        self.requireMetadata()
        validators.validate_data(self.metadata, 'packfile.json', 'input', 'POST')

        # Save required fields
        self.p_id  = self.metadata['project']['_id']
        self.s_label = self.metadata['session']['label']
        self.a_label = self.metadata['acquisition']['label']

        # Get project info that we need later
        project = config.db['projects'].find_one({ '_id': bson.ObjectId(self.p_id)})
        self.permissions = project.get('permissions', {})
        self.g_id = project['group']
        # If a timestamp was provided, use that for zip files. Otherwise use a set date.
        # Normally we'd use epoch, but zips cannot support years older than 1980, so let's use that instead.
        # Then, given the ISO string, convert it to an epoch integer.
        minimum = datetime.datetime(1980, 1, 1).isoformat()
        stamp   = self.metadata['acquisition'].get('timestamp', minimum)
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed

        # If there was metadata sent back that predates the zip minimum, don't use it.
        #
        # Dateutil has overloaded the comparison operators, except it's totally useless:
        # > TypeError: can't compare offset-naive and offset-aware datetimes
        #
        # So instead, epoch-integer both and compare that way.
        if int(dateutil.parser.parse(stamp).strftime('%s')) < int(dateutil.parser.parse(minimum).strftime('%s')):
            stamp = minimum

        # Remember the timestamp integer for later use with os.utime.
        self.ziptime = int(dateutil.parser.parse(stamp).strftime('%s'))

        # The zipfile is a santizied acquisition label
        self.dir_ = util.sanitize_string_to_filename(self.a_label)
        self.name = self.dir_ + '.zip'

        # Make a tempdir to store zip until moved
        # OPPORTUNITY: this is also called in files.py. Could be a util func.
        self.tempdir = tempfile.TemporaryDirectory(prefix='.tmp', dir=config.get_item('persistent', 'data_path'))

        # Create a zip in the tempdir that later gets moved into the CAS.
        self.path = os.path.join(self.tempdir.name, 'temp.zip')
        self.zip_  = zipfile.ZipFile(self.path, 'w', zipfile.ZIP_DEFLATED, allowZip64=True)

        # OPPORTUNITY: add zip comment
        # self.zip.comment = json.dumps(metadata, default=metadata_encoder)

        # Bit of a silly hack: write our tempdir directory into the zip (not including its contents).
        # Creates an empty directory entry in the zip which will hold all the files inside.
        # This way, when you expand a zip, you'll get folder/things instead of a thousand dicoms splattered everywhere.
        self.zip_.write(self.tempdir.name, self.dir_)

    def process_file_field(self, field, info):
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed
        # Should not be called with any files
        raise Exception('Files must already be uploaded')

    def finalize(self):
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed

        paths = os.listdir(self.folder)
        total = len(paths)
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed

        # Write all files to zip
        complete = 0
        for path in paths:
            p = os.path.join(self.folder, path)

            # Set the file's mtime & atime.
            os.utime(p, (self.ziptime, self.ziptime))

            # Place file into the zip folder we created before
            self.zip_.write(p, os.path.join(self.dir_, os.path.basename(path)))
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed

            # Report progress
            complete += 1
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed
            yield encoder.json_sse_pack({
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed
                'event': 'progress',
                'data': { 'done': complete, 'total': total, 'percent': (complete / float(total)) * 100 },
        self.zip_.close()
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed
        # Remove the folder created by TokenPlacer
        shutil.rmtree(self.folder)

        # Create an anyonmous object in the style of our augmented file fields.
        # Not a great practice. See process_upload() for details.
        cgi_field = util.obj_from_map({
            'filename': self.name,
            'path':	 self.path,
            'size':	 os.path.getsize(self.path),
            'hash':	 files.hash_file_formatted(self.path),
            'mimetype': util.guess_mimetype('lol.zip'),
            'modified': self.timestamp
        })

        # Similarly, create the info map that is consumed by helper funcs. Clear duplication :(
        # This could be coalesced into a single map thrown on file fields, for example.
        # Used in the API return.
        cgi_info = {
            'name':	 cgi_field.filename,
            'modified': cgi_field.modified,
            'size':	 cgi_field.size,
            'hash':	 cgi_field.hash,

            'type': self.metadata['packfile']['type'],

            # OPPORTUNITY: packfile endpoint could be extended someday to take additional metadata.
            'instrument': None,
            'measurements': [],
            'tags': [],
            'metadata': {},

            # Manually add the file orign to the packfile metadata.
            # This is set by upload.process_upload on each file, but we're not storing those.
            'origin': self.origin
        }

        # Get or create a session based on the hierarchy and provided labels.
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed
        query = {
            'project': bson.ObjectId(self.p_id),
            'label': self.s_label,
            'group': self.g_id
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed
        # Updates if existing
        updates = {}
        updates['permissions'] = self.permissions
        updates['modified']    = self.timestamp
        updates = util.mongo_dict(updates)

        # Extra properties on insert
        insert_map = copy.deepcopy(query)
        insert_map['created'] = self.timestamp
        insert_map.update(self.metadata['session'])
        insert_map['subject'] = containerutil.add_id_to_subject(insert_map.get('subject'), self.p_id)
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed

        session = config.db['session' + 's'].find_one_and_update(
            query, {
                '$set': updates,
                '$setOnInsert': insert_map
            },
            upsert=True,
            return_document=pymongo.collection.ReturnDocument.AFTER
        )

        # Get or create an acquisition based on the hierarchy and provided labels.
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed
        query = {
            'session': session['_id'],
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed
            'label': self.a_label,
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed
        # Updates if existing
        updates = {}
        updates['permissions'] = self.permissions
        updates['modified']    = self.timestamp
        updates = util.mongo_dict(updates)

        # Extra properties on insert
        insert_map = copy.deepcopy(query)
        insert_map['created'] = self.timestamp
        insert_map.update(self.metadata['acquisition'])

        acquisition = config.db['acquisition' + 's'].find_one_and_update(
            query, {
                '$set': updates,
                '$setOnInsert': insert_map
            },
            upsert=True,
            return_document=pymongo.collection.ReturnDocument.AFTER
        )

        # Set instance target for helper func
        self.container_type = 'acquisition'
Megan Henning's avatar
Megan Henning committed
        self.id_            = str(acquisition['_id'])
        self.container	    = acquisition

        self.save_file(cgi_field, cgi_info)

Nathaniel Kofalt's avatar
Nathaniel Kofalt committed
        # Delete token
        token  = self.context['token']
        config.db['tokens'].delete_one({ '_id': token })
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed

        result = {
            'acquisition_id': str(acquisition['_id']),
            'session_id':	 str(session['_id']),
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed
            'info': cgi_info,
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed

        # Report result
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed
        yield encoder.json_sse_pack({
Nathaniel Kofalt's avatar
Nathaniel Kofalt committed
            'event': 'result',
            'data': result,
        })
Renzo Frigato's avatar
Renzo Frigato committed

class AnalysisPlacer(Placer):

    def check(self):
        self.requireMetadata()
Megan Henning's avatar
Megan Henning committed
        validators.validate_data(self.metadata, 'analysis.json', 'input', 'POST', optional=True)
Renzo Frigato's avatar
Renzo Frigato committed

    def process_file_field(self, field, info):
        self.save_file(field)
        self.saved.append(info)

    def finalize(self):
        # we are going to merge the "hard" infos from the processed upload
        # with the infos from the payload
        metadata_infos = {}
        for info in self.metadata.pop('inputs', []):
            info['input'] = True
            metadata_infos[info['name']] = info
        for info in self.metadata.pop('outputs', []):
            info['output'] = True
            metadata_infos[info['name']] = info
        self.metadata['files'] = []
        for info in self.saved:
            metadata_info = metadata_infos.get(info['name'], {})
            metadata_info.update(info)
            self.metadata['files'].append(metadata_info)
        return self.metadata
class AnalysisJobPlacer(Placer):
    def check(self):
        if self.id_ is None:
            raise Exception('Must specify a target analysis')

    def process_file_field(self, field, info):
        if self.metadata is not None:
            file_mds = self.metadata.get('acquisition', {}).get('files', [])

            for file_md in file_mds:
                if file_md['name'] == info['name']:
                    info.update(file_md)
                    break

        info['output'] = True
        self.save_file(field)
        self.saved.append(info)

    def finalize(self):
        # Search the sessions table for analysis, replace file field
Megan Henning's avatar
Megan Henning committed
            q = {'analyses._id': str(self.id_)}
            u = {'$push': {'analyses.$.files': {'$each': self.saved}}}
            if self.context.get('job_id'):
                # If the original job failed, update the analysis with the job that succeeded
                u['$set'] = {'job': self.context['job_id']}
            config.db.sessions.update_one(q, u)