diff --git a/api/dao/containerutil.py b/api/dao/containerutil.py index ee97093cc2ff8d6f820d190257792ce1d62bb333..3002df4c3b36f50e0ddfeb6f43def090281b80c2 100644 --- a/api/dao/containerutil.py +++ b/api/dao/containerutil.py @@ -1,7 +1,9 @@ import bson.objectid +from . import APIPermissionException from .. import config -from ..auth import INTEGER_PERMISSIONS +from ..auth import has_access + CONT_TYPES = ['acquisition', 'analysis', 'collection', 'group', 'project', 'session'] SINGULAR_TO_PLURAL = { @@ -14,8 +16,6 @@ SINGULAR_TO_PLURAL = { } PLURAL_TO_SINGULAR = {p: s for s, p in SINGULAR_TO_PLURAL.iteritems()} -def get_perm(name): - return INTEGER_PERMISSIONS[name] def add_id_to_subject(subject, pid): """ @@ -167,11 +167,11 @@ class ContainerReference(object): return '/{}/{}/files/{}'.format(collection, self.id, filename) def check_access(self, uid, perm_name): - perm = get_perm(perm_name) - for p in self.get()['permissions']: - if p['_id'] == uid and get_perm(p['access']) > perm: - return - raise Exception('User {} does not have {} access to {} {}'.format(uid, perm_name, self.type, self.id)) + cont = self.get() + if has_access(uid, cont, perm_name): + return + else: + raise APIPermissionException('User {} does not have {} access to {} {}'.format(uid, perm_name, self.type, self.id)) class FileReference(ContainerReference): diff --git a/test/integration_tests/python/test_jobs.py b/test/integration_tests/python/test_jobs.py index eeb64e44774363bed5908016274a1e1458991969..535ac16938549cd9ab87d975455a8b61afbf6a34 100644 --- a/test/integration_tests/python/test_jobs.py +++ b/test/integration_tests/python/test_jobs.py @@ -21,6 +21,8 @@ def test_jobs_access(as_user): def test_jobs(data_builder, as_user, as_admin, as_root): gear = data_builder.create_gear() invalid_gear = data_builder.create_gear(gear={'custom': {'flywheel': {'invalid': True}}}) + project = data_builder.create_project() + session = data_builder.create_session() acquisition = data_builder.create_acquisition() job_data = { @@ -138,3 +140,34 @@ def test_jobs(data_builder, as_user, as_admin, as_root): # retry failed job w/o root r = as_admin.post('/jobs/' + next_job_id + '/retry') assert r.ok + + # set as_user perms to ro + r = as_user.get('/users/self') + assert r.ok + uid = r.json()['_id'] + + r = as_admin.post('/projects/' + project + '/permissions', json={ + '_id': uid, + 'access': 'ro' + }) + assert r.ok + + # try to add job without rw + r = as_user.post('/jobs/add', json=job_data) + assert r.status_code == 403 + + # set as_user perms to rw + r = as_admin.put('/projects/' + project + '/permissions/' + uid, json={ + 'access': 'rw' + }) + assert r.ok + + # add job with rw + r = as_user.post('/jobs/add', json=job_data) + assert r.ok + job_rw_id = r.json()['_id'] + + # get next job as admin + r = as_admin.get('/jobs/next', params={'tags': 'test-tag'}) + assert r.ok + job_rw_id = r.json()['id']