Newer
Older
import hashlib
import pymongo
import jsonschema
import util
ROLES = [
{
'rid': 'view',
'name': 'View-Only',
},
{
'rid': 'download',
'name': 'Download',
},
{
'rid': 'modify',
'name': 'Modify',
},
{
'rid': 'admin',
'name': 'Admin',
},
]
INTEGER_ROLES = {r['rid']: i for i, r in enumerate(ROLES)}
class Users(base.RequestHandler):
def __init__(self, request=None, response=None):
super(Users, self).__init__(request, response)
self.dbc = self.app.db.users
def count(self):
"""Return the number of Users."""
self.response.write(self.dbc.count())
if self.public_request: # FIXME: who is allowed to create a new user?
self.abort(403, 'must be logged in to create new user')
json_body = self.request.json_body
jsonschema.validate(json_body, User.json_schema)
json_body.setdefault('email', json_body['_id'])
json_body['email_hash'] = hashlib.md5(json_body['email']).hexdigest()
self.dbc.insert(json_body)
except (ValueError, jsonschema.ValidationError) as e:
self.abort(400, str(e))
except pymongo.errors.DuplicateKeyError as e:
self.abort(400, 'User ID %s already exists' % json_body['_id'])
def get(self):
"""Return the list of Users."""
self.abort(403, 'must be logged in to retrieve User list')
users = list(self.dbc.find({}, ['firstname', 'lastname', 'email_hash', 'wheel']))
user['details'] = self.uri_for('user', _id=str(user['_id']), _full=True) + '?' + self.request.query_string
class User(base.RequestHandler):
json_schema = {
'$schema': 'http://json-schema.org/draft-04/schema#',
'title': 'User',
'type': 'object',
'properties': {
'_id': {
'type': 'string',
},
'firstname': {
'title': 'First Name',
'type': 'string',
},
'lastname': {
'title': 'Last Name',
'type': 'string',
},
'email': {
'title': 'Email',
'type': 'string',
'format': 'email',
},
'email_hash': {
'type': 'string',
},
'root': {
'type': 'boolean',
},
'wheel': {
'preferences': {
'title': 'Preferences',
'type': 'object',
'properties': {
def __init__(self, request=None, response=None):
super(User, self).__init__(request, response)
self.dbc = self.app.db.users
def self(self):
"""Return details for the current User."""
user = self.dbc.find_one({'_id': self.uid}, ['firstname', 'lastname', 'root', 'wheel', 'preferences', 'email_hash'])
if not user:
self.abort(400, 'no user is logged in')
user.setdefault('preferences', {})
return user
def roles(self):
"""Return the list of user roles."""
return ROLES
self.abort(403, 'must be logged in to retrieve User info')
projection = []
if self.request.get('remotes') in ('1', 'true'):
projection += ['remotes']
if self.request.get('status') in ('1', 'true'):
projection += ['status']
user = self.dbc.find_one({'_id': _id}, projection or None)
if self.debug and (self.superuser_request or _id == self.uid):
user['groups'] = self.uri_for('groups', _id=_id, _full=True) + '?' + self.request.query_string
user = self.dbc.find_one({'_id': _id})
self.abort(404, 'no such User')
if not self.superuser_request and _id != self.uid:
self.abort(403, 'must be superuser to update another User')
schema = copy.deepcopy(self.json_schema)
del schema['required']
try:
json_body = self.request.json_body
jsonschema.validate(json_body, schema)
except (ValueError, jsonschema.ValidationError) as e:
self.abort(400, str(e))
if 'wheel' in json_body and _id == self.uid:
self.abort(400, 'user cannot alter own superuser privilege')
self.dbc.update({'_id': _id}, {'$set': util.mongo_dict(json_body)})
if not self.superuser_request:
self.abort(403, 'must be superuser to delete a User')
self.dbc.remove({'_id': _id})
class Groups(base.RequestHandler):
def __init__(self, request=None, response=None):
super(Groups, self).__init__(request, response)
self.dbc = self.app.db.groups
def count(self):
"""Return the number of Groups."""
self.response.write(self.app.db.groups.count())
def post(self):
"""Create a new Group"""
if not self.superuser_request:
self.abort(403, 'must be superuser to create new group')
try:
json_body = self.request.json_body
jsonschema.validate(json_body, Group.json_schema)
self.dbc.insert(json_body)
except (ValueError, jsonschema.ValidationError) as e:
self.abort(400, str(e))
except pymongo.errors.DuplicateKeyError as e:
self.abort(400, 'Groups ID %s already exists' % json_body['_id'])
if _id != self.uid and not self.superuser_request:
self.abort(403, 'User ' + self.uid + ' may not see the Groups of User ' + _id)
if not self.superuser_request:
if self.request.get('admin').lower() in ('1', 'true'):
query = {'roles': {'$elemMatch': {'_id': self.uid, 'access': 'admin'}}}
groups = list(self.app.db.groups.find(query, ['name']))
if self.debug:
for group in groups:
group['details'] = self.uri_for('group', _id=str(group['_id']), _full=True) + '?' + self.request.query_string
class Group(base.RequestHandler):
json_schema = {
'$schema': 'http://json-schema.org/draft-04/schema#',
'title': 'Group',
'type': 'object',
'properties': {
'_id': {
'type': 'string',
},
'name': {
'title': 'Name',
'type': 'string',
'maxLength': 32,
},
'type': 'array',
'default': [],
'items': {
'enum': [role['rid'] for role in ROLES],
'required': ['access', '_id'],
'additionalProperties': False,
},
'uniqueItems': True,
},
},
'required': ['_id'],
}
def __init__(self, request=None, response=None):
super(Group, self).__init__(request, response)
self.dbc = self.app.db.groups
group = self.app.db.groups.find_one({'_id': _id})
self.abort(404, 'no such Group: ' + _id)
if not self.superuser_request:
group = self.app.db.groups.find_one({'_id': _id, 'roles': {'$elemMatch': {'_id': self.uid, 'access': 'admin'}}})
self.abort(403, 'User ' + self.uid + ' is not an admin of Group ' + _id)
self.response.write('group %s put, %s\n' % (_id, self.request.params))
if not self.superuser_request:
self.abort(403, 'must be superuser to delete a Group')
# TODO: block deletion, if group is referenced by any projects
self.dbc.remove({'_id': _id})