Skip to content
Snippets Groups Projects
Commit 9eec5f52 authored by Megan Henning's avatar Megan Henning
Browse files

Add additional info to file delete perm check

parent 827a8ade
No related branches found
No related tags found
No related merge requests found
...@@ -7,6 +7,7 @@ import sys ...@@ -7,6 +7,7 @@ import sys
from .. import config from .. import config
from ..types import Origin from ..types import Origin
from ..web.errors import APIPermissionException
from . import _get_access, INTEGER_PERMISSIONS from . import _get_access, INTEGER_PERMISSIONS
log = config.log log = config.log
...@@ -45,6 +46,7 @@ def files_sublist(handler, container): ...@@ -45,6 +46,7 @@ def files_sublist(handler, container):
access = _get_access(handler.uid, container) access = _get_access(handler.uid, container)
def g(exec_op): def g(exec_op):
def f(method, _id, query_params=None, payload=None, exclude_params=None): def f(method, _id, query_params=None, payload=None, exclude_params=None):
errors = None
if method == 'GET' and container.get('public', False): if method == 'GET' and container.get('public', False):
min_access = -1 min_access = -1
elif method == 'GET': elif method == 'GET':
...@@ -52,16 +54,27 @@ def files_sublist(handler, container): ...@@ -52,16 +54,27 @@ def files_sublist(handler, container):
elif method in ['POST', 'PUT']: elif method in ['POST', 'PUT']:
min_access = INTEGER_PERMISSIONS['rw'] min_access = INTEGER_PERMISSIONS['rw']
elif method =='DELETE': elif method =='DELETE':
min_access = INTEGER_PERMISSIONS['admin'] min_access = INTEGER_PERMISSIONS['rw']
if container.get('origin',{}).get('type') in [str(Origin.user), str(Origin.job)]: file_is_original_data = bool(container.get('origin',{}).get('type') not in [str(Origin.user), str(Origin.job)])
min_access = INTEGER_PERMISSIONS['rw']
if file_is_original_data:
min_access = INTEGER_PERMISSIONS['admin']
if file_is_original_data and access == INTEGER_PERMISSIONS['rw']:
# The user was not granted access because the container had original data
errors = {'reason': 'original_data_present'}
else:
errors = {'reason': 'permission_denied'}
else: else:
min_access = sys.maxint min_access = sys.maxint
log.warning('the user access is {} and the min access is {}'.format(access, min_access))
if access >= min_access: if access >= min_access:
return exec_op(method, _id, query_params, payload, exclude_params) return exec_op(method, _id, query_params, payload, exclude_params)
else: else:
handler.abort(403, 'user not authorized to perform a {} operation on the list'.format(method)) raise APIPermissionException('user not authorized to perform a {} operation on the list'.format(method), errors=errors)
return f return f
return g return g
......
...@@ -1267,11 +1267,16 @@ def test_container_delete_tag(data_builder, default_payload, as_root, as_admin, ...@@ -1267,11 +1267,16 @@ def test_container_delete_tag(data_builder, default_payload, as_root, as_admin,
assert r.status_code == 403 assert r.status_code == 403
assert r.json()['reason'] == 'permission_denied' assert r.json()['reason'] == 'permission_denied'
# try to delete acquisition with perms # try to delete acquisition without perms
r = as_user.delete('/acquisitions/' + acquisition) r = as_user.delete('/acquisitions/' + acquisition)
assert r.status_code == 403 assert r.status_code == 403
assert r.json()['reason'] == 'permission_denied' assert r.json()['reason'] == 'permission_denied'
# try to delete file without perms
r = as_user.delete('/acquisitions/' + acquisition + '/files/test2.csv')
assert r.status_code == 403
assert r.json()['reason'] == 'permission_denied'
# Add user as rw # Add user as rw
r = as_user.get('/users/self') r = as_user.get('/users/self')
assert r.ok assert r.ok
...@@ -1298,6 +1303,11 @@ def test_container_delete_tag(data_builder, default_payload, as_root, as_admin, ...@@ -1298,6 +1303,11 @@ def test_container_delete_tag(data_builder, default_payload, as_root, as_admin,
assert r.status_code == 403 assert r.status_code == 403
assert r.json()['reason'] == 'original_data_present' assert r.json()['reason'] == 'original_data_present'
# try to delete "original data" file without admin perms
r = as_user.delete('/acquisitions/' + acquisition + '/files/test2.csv')
assert r.status_code == 403
assert r.json()['reason'] == 'original_data_present'
# Add session level analysis # Add session level analysis
r = as_admin.post('/sessions/' + session + '/analyses', params={'job': 'true'}, json={ r = as_admin.post('/sessions/' + session + '/analyses', params={'job': 'true'}, json={
'analysis': {'label': 'with-job'}, 'analysis': {'label': 'with-job'},
...@@ -1319,8 +1329,7 @@ def test_container_delete_tag(data_builder, default_payload, as_root, as_admin, ...@@ -1319,8 +1329,7 @@ def test_container_delete_tag(data_builder, default_payload, as_root, as_admin,
assert r.status_code == 400 assert r.status_code == 400
# verify that a non-referenced file _can_ be deleted from the same acquisition # verify that a non-referenced file _can_ be deleted from the same acquisition
assert as_admin.post('/acquisitions/' + acquisition + '/files', files=file_form('unrelated.csv')).ok assert as_admin.delete('/acquisitions/' + acquisition + '/files/test2.csv').ok
assert as_admin.delete('/acquisitions/' + acquisition + '/files/unrelated.csv').ok
# delete collection # delete collection
assert collection in as_admin.get('/acquisitions/' + acquisition).json()['collections'] assert collection in as_admin.get('/acquisitions/' + acquisition).json()['collections']
...@@ -1336,10 +1345,6 @@ def test_container_delete_tag(data_builder, default_payload, as_root, as_admin, ...@@ -1336,10 +1345,6 @@ def test_container_delete_tag(data_builder, default_payload, as_root, as_admin,
assert as_admin.get('/sessions/' + session + '/analyses/' + analysis).status_code == 404 assert as_admin.get('/sessions/' + session + '/analyses/' + analysis).status_code == 404
assert as_admin.get('/analyses/' + analysis).status_code == 404 assert as_admin.get('/analyses/' + analysis).status_code == 404
# try to delete acquisition without admin perms
r = as_user.delete('/acquisitions/' + acquisition)
assert r.status_code == 403
# delete acquisition # delete acquisition
assert as_admin.delete('/acquisitions/' + acquisition).ok assert as_admin.delete('/acquisitions/' + acquisition).ok
assert 'deleted' in api_db.acquisitions.find_one({'_id': bson.ObjectId(acquisition)}) assert 'deleted' in api_db.acquisitions.find_one({'_id': bson.ObjectId(acquisition)})
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment