Skip to content
Snippets Groups Projects
Commit 0a6c1a83 authored by Ambrus Simon's avatar Ambrus Simon Committed by GitHub
Browse files

Increase coverage low hanging fruit 5 (#780)

* remove unused test bootstrap json

* create api accessor for unit tests - mock mongo

* add google auth unit tests
parent 59c88436
No related branches found
No related tags found
No related merge requests found
......@@ -21,7 +21,7 @@ class AuthProvider(object):
self.auth_type = auth_type
if set_config:
try:
self.config = config.get_auth(auth_type)
self.config = config.get_auth(auth_type)
except KeyError:
raise NotImplementedError('Auth type {} is not supported by this instance'.format(auth_type))
......@@ -112,7 +112,7 @@ class JWTAuthProvider(AuthProvider):
class GoogleOAuthProvider(AuthProvider):
def __init__(self):
super(GoogleOAuthProvider,self).__init__('google')
super(GoogleOAuthProvider, self).__init__('google')
def validate_code(self, code, **kwargs):
payload = {
......@@ -293,7 +293,7 @@ class APIKeyAuthProvider(AuthProvider):
"""
Does not need to be supported in config.
"""
super(APIKeyAuthProvider,self).__init__('api-key', set_config=False)
super(APIKeyAuthProvider, self).__init__('api-key', set_config=False)
@staticmethod
def _preprocess_key(key):
......
......@@ -37,7 +37,7 @@ class GroupHandler(base.RequestHandler):
return result
def get_all(self, uid=None):
projection = {'name': 1, 'created': 1, 'modified': 1, 'roles': [], 'tags': []}
projection = {'name': 1, 'created': 1, 'modified': 1}
permchecker = groupauth.list_permission_checker(self, uid)
results = permchecker(self.storage.exec_op)('GET', projection=projection)
if not self.superuser_request and not self.is_true('join_avatars'):
......
......@@ -25,8 +25,6 @@ if [ "$SCITRAN_RUN_LINT" == "true" ]; then
./test/bin/lint.sh api
fi
./test/bin/run-unit-tests.sh
clean_up () {
kill $API_PID || true
wait 2> /dev/null
......@@ -50,6 +48,9 @@ SCITRAN_PERSISTENT_PATH=`mktemp -d`
SCITRAN_PERSISTENT_DATA_PATH="$SCITRAN_PERSISTENT_PATH/data"
SCITRAN_CORE_DRONE_SECRET=${SCITRAN_CORE_DRONE_SECRET:-$( openssl rand -base64 32 )}
SCITRAN_CORE_DRONE_SECRET=$SCITRAN_CORE_DRONE_SECRET \
./test/bin/run-unit-tests.sh
uwsgi --http "localhost:8081" --master --http-keepalive \
--so-keepalive --add-header "Connection: Keep-Alive" \
--processes 1 --threads 1 \
......
{
"groups": [
{
"_id": "scitran",
"name": "Scientific Transparency",
"roles": [
{
"_id": "admin@user.com",
"access": "admin"
}
]
}
],
"users": [
{
"_id": "admin@user.com",
"email": "admin@user.com",
"firstname": "Admin",
"lastname": "User",
"root": true
},
{
"_id": "test@user.com",
"email": "test@user.com",
"firstname": "Test",
"lastname": "User",
"root": true
}
]
}
attrdict==2.0.0
coverage==4.0.3
coveralls==1.1
mock==2.0.0
mongomock==3.8.0
pdbpp==0.8.3
pylint==1.5.3
pytest-cov==2.2.0
pytest-watch==3.8.0
pytest==2.8.5
mock==2.0.0
requests_mock==1.3.0
testfixtures==4.10.1
import logging
import os
import attrdict
import mock
import mongomock
import pytest
import webapp2
import api.config
import api.web.start
SCITRAN_CORE_DRONE_SECRET = os.environ['SCITRAN_CORE_DRONE_SECRET']
@pytest.fixture(scope='session')
def as_drone(app):
"""Return ApiAccessor with drone access"""
return ApiAccessor(app, headers={
'X-SciTran-Method': 'bootstrapper',
'X-SciTran-Name': 'Bootstrapper',
'X-SciTran-Auth': SCITRAN_CORE_DRONE_SECRET,
})
@pytest.fixture(scope='session')
def as_public(app):
"""Return ApiAccessor without authentication"""
return ApiAccessor(app)
@pytest.fixture(scope='session')
def api_db(app):
"""Return mongo client mock for the api db"""
return api.config.db
@pytest.fixture(scope='session')
def log_db(app):
"""Return mongo client mock for the log db"""
return api.config.log_db
@pytest.yield_fixture(scope='session')
def app():
"""Return api instance that uses mocked MongoClient"""
mongo_patch = mock.patch('pymongo.MongoClient', new=mongomock.MongoClient)
mongo_patch.start()
# NOTE db and log_db is created at import time in api.config
# reloading the module is needed to use the mocked MongoClient
reload(api.config)
yield api.web.start.app_factory()
mongo_patch.stop()
@pytest.fixture(scope='session')
def config(app):
"""Return app config accessor"""
# NOTE depends on the app fixture as it's reloading the config module
# NOTE the config fixture is session scoped (consider parallel tests)
# NOTE use dict notation for assignment (eg `config['key'] = 'v'` - AttrDict limitation)
return attrdict.AttrDict(api.config.__config)
@pytest.fixture(scope='module')
def log(request):
"""Return logger for the test module for easy logging from tests"""
log = logging.getLogger(request.module.__name__)
log.addHandler(logging.StreamHandler())
return log
class ApiAccessor(object):
def __init__(self, app, **defaults):
self.app = app
self.defaults = defaults
def __getattr__(self, name):
"""Return convenience HTTP method for `name`"""
if name in ('head', 'get', 'post', 'put', 'delete'):
def http_method(path, **kwargs):
# NOTE using WebOb requests in unit tests is fundamentally different
# to using a requests.Session in integration tests. See also:
# http://webapp2.readthedocs.io/en/latest/guide/testing.html#app-get-response
# https://github.com/Pylons/webob/blob/master/webob/request.py
for key, value in self.defaults.items():
kwargs.setdefault(key, value)
kwargs['method'] = name.upper()
response = self.app.get_response('/api' + path, **kwargs)
response.ok = response.status_code == 200
return response
return http_method
raise AttributeError
import datetime
import re
import pytest
import requests_mock
def test_google_auth(config, as_drone, as_public, api_db):
# inject google auth client_secret into config
config['auth']['google']['client_secret'] = 'test'
# try to access api w/ invalid session token
r = as_public.get('', headers={'Authorization': 'test'})
assert r.status_code == 401
# try to login w/o code/auth_type
r = as_public.post('/login', json={})
assert r.status_code == 400
# try to login w/ invalid auth_type
r = as_public.post('/login', json={'auth_type': 'test', 'code': 'test'})
assert r.status_code == 400
with requests_mock.Mocker() as m:
# try to log in w/ google and invalid code
m.post(config.auth.google.token_endpoint, status_code=400)
r = as_public.post('/login', json={'auth_type': 'google', 'code': 'test'})
assert r.status_code == 401
# try to log in w/ google and invalid token
m.post(config.auth.google.token_endpoint, json={'access_token': 'test'})
m.get(config.auth.google.id_endpoint, status_code=400)
r = as_public.post('/login', json={'auth_type': 'google', 'code': 'test'})
assert r.status_code == 401
# try to log in w/ google - pretend provider id endpoint doesn't return email
m.get(config.auth.google.id_endpoint, json={})
r = as_public.post('/login', json={'auth_type': 'google', 'code': 'test'})
assert r.status_code == 401
# try to log in w/ google - user not in db (yet)
m.get(config.auth.google.id_endpoint, json={'email': 'test@gmail.com'})
r = as_public.post('/login', json={'auth_type': 'google', 'code': 'test'})
assert r.status_code == 402
# try to log in w/ google - user added but disabled
as_drone.post('/users', json={
'_id': 'test@gmail.com', 'disabled': True, 'firstname': 'test', 'lastname': 'test'}).ok
r = as_public.post('/login', json={'auth_type': 'google', 'code': 'test'})
assert r.status_code == 402
# try to log in w/ google - invalid refresh token (also mock gravatar 404)
as_drone.put('/users/test@gmail.com', json={'disabled': False})
m.head(re.compile('https://gravatar.com/avatar'), status_code=404)
r = as_public.post('/login', json={'auth_type': 'google', 'code': 'test'})
assert r.status_code == 401
assert 'gravatar' not in api_db.users.find_one({'_id': 'test@gmail.com'})['avatars']
# log in (now w/ existing gravatar)
m.head(re.compile('https://gravatar.com/avatar'))
m.post(config.auth.google.token_endpoint, json={
'access_token': 'test', 'expires_in': 60, 'refresh_token': 'test'})
r = as_public.post('/login', json={'auth_type': 'google', 'code': 'test'})
assert r.ok
assert 'gravatar' in api_db.users.find_one({'_id': 'test@gmail.com'})['avatars']
token_1 = r.json['token']
# access api w/ valid token
r = as_public.get('', headers={'Authorization': token_1})
assert r.ok
# try to access api w/ expired token - provider fails to refresh token
api_db.authtokens.update_one({'_id': token_1}, {'$set':
{'expires': datetime.datetime.now() - datetime.timedelta(seconds=1)}})
m.post(config.auth.google.refresh_endpoint, status_code=400)
r = as_public.get('', headers={'Authorization': token_1})
assert r.status_code == 401
assert not api_db.authtokens.find({'_id': token_1}).count()
# access api w/ expired token - test refresh
m.post(config.auth.google.token_endpoint, json={
'access_token': 'test', 'expires_in': -1, 'refresh_token': 'test'})
r = as_public.post('/login', json={'auth_type': 'google', 'code': 'test'})
token_2 = r.json['token']
m.post(config.auth.google.refresh_endpoint, json={'access_token': 'test', 'expires_in': 60})
r = as_public.get('', headers={'Authorization': token_2})
assert r.ok
# try to access api w/ expired token but w/o persisted refresh_token
api_db.authtokens.update_one({'_id': token_2}, {'$set':
{'expires': datetime.datetime.now() - datetime.timedelta(seconds=1)}})
api_db.refreshtokens.delete_one({'uid': 'test@gmail.com'})
r = as_public.get('', headers={'Authorization': token_2})
assert r.status_code == 401
# try to logout w/o auth headers
r = as_public.post('/logout')
assert r.status_code == 401
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment