import base64
import codecs
import json
import logging
import os
import re
import urllib.parse as urlparse
import uuid
from typing import List, Tuple, Type
import pkg_resources
import tornado
from bokeh.server.auth_provider import AuthProvider
from tornado.auth import OAuth2Mixin
from tornado.httpclient import HTTPError, HTTPRequest
from tornado.httputil import url_concat
from tornado.web import RequestHandler
from .config import config
from .io import state
from .io.resources import ERROR_TEMPLATE, _env
from .util import base64url_decode, base64url_encode
log = logging.getLogger(__name__)
STATE_COOKIE_NAME = 'panel-oauth-state'
[docs]def decode_response_body(response):
"""
Decodes the JSON-format response body
Arguments
---------
response: tornado.httpclient.HTTPResponse
Returns
-------
Decoded response content
"""
# Fix GitHub response.
try:
body = codecs.decode(response.body, 'ascii')
except Exception:
body = codecs.decode(response.body, 'utf-8')
body = re.sub('"', '\"', body)
body = re.sub("'", '"', body)
body = json.loads(body)
return body
[docs]def decode_id_token(id_token):
"""
Decodes a signed ID JWT token.
"""
signing_input, _ = id_token.encode('utf-8').rsplit(b".", 1)
_, payload_segment = signing_input.split(b".", 1)
return json.loads(base64url_decode(payload_segment).decode('utf-8'))
def _serialize_state(state):
"""Serialize OAuth state to a base64 string after passing through JSON"""
json_state = json.dumps(state)
return base64.urlsafe_b64encode(json_state.encode('utf8')).decode('ascii')
def _deserialize_state(b64_state):
"""Deserialize OAuth state as serialized in _serialize_state"""
if isinstance(b64_state, str):
b64_state = b64_state.encode('ascii')
try:
json_state = base64.urlsafe_b64decode(b64_state).decode('utf8')
except ValueError:
log.error("Failed to b64-decode state: %r", b64_state)
return {}
try:
return json.loads(json_state)
except ValueError:
log.error("Failed to json-decode state: %r", json_state)
return {}
[docs]class OAuthLoginHandler(tornado.web.RequestHandler):
_API_BASE_HEADERS = {
'Accept': 'application/json',
'User-Agent': 'Tornado OAuth'
}
_access_token_header = None
_EXTRA_TOKEN_PARAMS = {}
_SCOPE = None
_state_cookie = None
_error_template = ERROR_TEMPLATE
[docs] async def get_authenticated_user(self, redirect_uri, client_id, state,
client_secret=None, code=None):
"""
Fetches the authenticated user
Arguments
---------
redirect_uri: (str)
The OAuth redirect URI
client_id: (str)
The OAuth client ID
state: (str)
The unguessable random string to protect against
cross-site request forgery attacks
client_secret: (str, optional)
The client secret
code: (str, optional)
The response code from the server
"""
if code:
return await self._fetch_access_token(
code,
redirect_uri,
client_id,
client_secret
)
params = {
'redirect_uri': redirect_uri,
'client_id': client_id,
'client_secret': client_secret,
'response_type': 'code',
'extra_params': {
'state': state,
},
}
if self._SCOPE is not None:
params['scope'] = self._SCOPE
if 'scope' in config.oauth_extra_params:
params['scope'] = config.oauth_extra_params['scope']
log.debug("%s making authorize request", type(self).__name__)
self.authorize_redirect(**params)
async def _fetch_access_token(self, code, redirect_uri, client_id, client_secret):
"""
Fetches the access token.
Arguments
---------
code:
The response code from the server
redirect_uri:
The redirect URI
client_id:
The client ID
client_secret:
The client secret
state:
The unguessable random string to protect against cross-site
request forgery attacks
"""
if not client_secret:
raise ValueError('The client secret is undefined.')
log.debug("%s making access token request.", type(self).__name__)
params = {
'code': code,
'redirect_uri': redirect_uri,
'client_id': client_id,
'client_secret': client_secret,
**self._EXTRA_TOKEN_PARAMS
}
http = self.get_auth_http_client()
# Request the access token.
req = HTTPRequest(
self._OAUTH_ACCESS_TOKEN_URL,
method='POST',
body=urlparse.urlencode(params),
headers=self._API_BASE_HEADERS
)
try:
response = await http.fetch(req)
except HTTPError as e:
return self._on_error(e.response)
body = decode_response_body(response)
if not body:
return
if 'access_token' not in body:
return self._on_error(response, body)
user_headers = dict(self._API_BASE_HEADERS)
if self._access_token_header:
user_url = self._OAUTH_USER_URL
user_headers['Authorization'] = self._access_token_header.format(
body['access_token']
)
else:
user_url = '{}{}'.format(self._OAUTH_USER_URL, body['access_token'])
user_response = await http.fetch(user_url, headers=user_headers)
user = decode_response_body(user_response)
if not user:
return
log.debug("%s received user information.", type(self).__name__)
return self._on_auth(user, body['access_token'])
[docs] def get_state_cookie(self):
"""Get OAuth state from cookies
To be compared with the value in redirect URL
"""
if self._state_cookie is None:
self._state_cookie = (
self.get_secure_cookie(STATE_COOKIE_NAME, max_age_days=config.oauth_expiry) or b''
).decode('utf8', 'replace')
self.clear_cookie(STATE_COOKIE_NAME)
return self._state_cookie
def set_state_cookie(self, state):
self.set_secure_cookie(
STATE_COOKIE_NAME, state, expires_days=config.oauth_expiry, httponly=True
)
def get_state(self):
next_url = original_next_url = self.get_argument('next', None)
if next_url:
# avoid browsers treating \ as /
next_url = next_url.replace('\\', urlparse.quote('\\'))
# disallow hostname-having urls,
# force absolute path redirect
urlinfo = urlparse.urlparse(next_url)
next_url = urlinfo._replace(
scheme='', netloc='', path='/' + urlinfo.path.lstrip('/')
).geturl()
if next_url != original_next_url:
log.warning(
"Ignoring next_url %r, using %r", original_next_url, next_url
)
return _serialize_state(
{'state_id': uuid.uuid4().hex, 'next_url': next_url}
)
async def get(self):
log.debug("%s received login request", type(self).__name__)
if config.oauth_redirect_uri:
redirect_uri = config.oauth_redirect_uri
else:
redirect_uri = "{0}://{1}".format(
self.request.protocol,
self.request.host
)
params = {
'redirect_uri': redirect_uri,
'client_id': config.oauth_key,
}
# Some OAuth2 backends do not correctly return code
next_arg = self.get_argument('next', {})
if next_arg:
next_arg = urlparse.parse_qs(next_arg)
next_arg = {arg.split('?')[-1]: value for arg, value in next_arg.items()}
code = self.get_argument('code', extract_urlparam(next_arg, 'code'))
url_state = self.get_argument('state', extract_urlparam(next_arg, 'state'))
# Handle authentication error
error = self.get_argument('error', extract_urlparam(next_arg, 'error'))
if error is not None:
error_msg = self.get_argument(
'error_description', extract_urlparam(next_arg, 'error_description'))
if not error_msg:
error_msg = error
log.error(
"%s failed to authenticate with following error: %s",
type(self).__name__, error
)
self.set_header("Content-Type", 'text/html')
self.write(self._error_template.render(
npm_cdn=config.npn_cdn,
title='Panel: Authentication Error',
error_type='Authentication Error',
error=error,
error_msg=error_msg
))
return
# Seek the authorization
cookie_state = self.get_state_cookie()
if code:
if cookie_state != url_state:
log.warning("OAuth state mismatch: %s != %s", cookie_state, url_state)
raise HTTPError(400, "OAuth state mismatch")
state = _deserialize_state(url_state)
# For security reason, the state value (cross-site token) will be
# retrieved from the query string.
params.update({
'client_secret': config.oauth_secret,
'code': code,
'state': url_state
})
user = await self.get_authenticated_user(**params)
if user is None:
raise HTTPError(403)
log.debug("%s authorized user, redirecting to app.", type(self).__name__)
self.redirect(state.get('next_url', '/'))
else:
# Redirect for user authentication
params['state'] = state = self.get_state()
self.set_state_cookie(state)
await self.get_authenticated_user(**params)
def _on_auth(self, user_info, access_token):
user_key = config.oauth_jwt_user or self._USER_KEY
user = user_info[user_key]
self.set_secure_cookie('user', user, expires_days=config.oauth_expiry)
id_token = base64url_encode(json.dumps(user_info))
if state.encryption:
access_token = state.encryption.encrypt(access_token.encode('utf-8'))
id_token = state.encryption.encrypt(id_token.encode('utf-8'))
self.set_secure_cookie('access_token', access_token, expires_days=config.oauth_expiry)
self.set_secure_cookie('id_token', id_token, expires_days=config.oauth_expiry)
return user
def _on_error(self, response, body=None):
self.clear_all_cookies()
try:
body = body or decode_response_body(response)
except json.decoder.JSONDecodeError:
body = body
provider = self.__class__.__name__.replace('LoginHandler', '')
if response.error:
log.error(f"{provider} OAuth provider returned a {response.error} "
f"error. The full response was: {body}")
else:
log.warning(f"{provider} OAuth provider failed to fully "
f"authenticate returning the following response:"
f"{body}.")
raise HTTPError(500, f"{provider} authentication failed")
[docs]class GenericLoginHandler(OAuthLoginHandler, OAuth2Mixin):
_access_token_header = 'Bearer {}'
_EXTRA_TOKEN_PARAMS = {
'grant_type': 'authorization_code'
}
@property
def _OAUTH_ACCESS_TOKEN_URL(self):
return config.oauth_extra_params.get('TOKEN_URL', os.environ.get('PANEL_OAUTH_TOKEN_URL'))
@property
def _OAUTH_AUTHORIZE_URL(self):
return config.oauth_extra_params.get('AUTHORIZE_URL', os.environ.get('PANEL_OAUTH_AUTHORIZE_URL'))
@property
def _OAUTH_USER_URL(self):
return config.oauth_extra_params.get('USER_URL', os.environ.get('PANEL_OAUTH_USER_URL'))
@property
def _SCOPE(self):
if 'PANEL_OAUTH_SCOPE' not in os.environ:
return ['openid', 'email']
return [scope for scope in os.environ['PANEL_OAUTH_SCOPE'].split(',')]
@property
def _USER_KEY(self):
return config.oauth_extra_params.get('USER_KEY', os.environ.get('PANEL_USER_KEY', 'email'))
[docs]class GithubLoginHandler(OAuthLoginHandler, OAuth2Mixin):
"""GitHub OAuth2 Authentication
To authenticate with GitHub, first register your application at
https://github.com/settings/applications/new to get the client ID and
secret.
"""
_EXTRA_AUTHORIZE_PARAMS = {}
_OAUTH_ACCESS_TOKEN_URL = 'https://github.com/login/oauth/access_token'
_OAUTH_AUTHORIZE_URL = 'https://github.com/login/oauth/authorize'
_OAUTH_USER_URL = 'https://api.github.com/user'
_access_token_header = 'token {}'
_USER_KEY = 'login'
[docs]class BitbucketLoginHandler(OAuthLoginHandler, OAuth2Mixin):
_API_BASE_HEADERS = {
"Accept": "application/json",
}
_EXTRA_TOKEN_PARAMS = {
'grant_type': 'authorization_code'
}
_OAUTH_ACCESS_TOKEN_URL = "https://bitbucket.org/site/oauth2/access_token"
_OAUTH_AUTHORIZE_URL = "https://bitbucket.org/site/oauth2/authorize"
_OAUTH_USER_URL = "https://api.bitbucket.org/2.0/user?access_token="
_USER_KEY = 'username'
[docs]class Auth0Handler(OAuthLoginHandler, OAuth2Mixin):
_EXTRA_AUTHORIZE_PARAMS = {
'subdomain'
}
_OAUTH_ACCESS_TOKEN_URL_ = 'https://{0}.auth0.com/oauth/token'
_OAUTH_AUTHORIZE_URL_ = 'https://{0}.auth0.com/authorize'
_OAUTH_USER_URL_ = 'https://{0}.auth0.com/userinfo?access_token='
@property
def _OAUTH_ACCESS_TOKEN_URL(self):
url = config.oauth_extra_params.get('subdomain', 'example')
return self._OAUTH_ACCESS_TOKEN_URL_.format(url)
@property
def _OAUTH_AUTHORIZE_URL(self):
url = config.oauth_extra_params.get('subdomain', 'example')
return self._OAUTH_AUTHORIZE_URL_.format(url)
@property
def _OAUTH_USER_URL(self):
url = config.oauth_extra_params.get('subdomain', 'example')
return self._OAUTH_USER_URL_.format(url)
_USER_KEY = 'email'
_EXTRA_TOKEN_PARAMS = {
'grant_type': 'authorization_code'
}
[docs]class GitLabLoginHandler(OAuthLoginHandler, OAuth2Mixin):
_API_BASE_HEADERS = {
'Accept': 'application/json',
}
_EXTRA_TOKEN_PARAMS = {
'grant_type': 'authorization_code'
}
_OAUTH_ACCESS_TOKEN_URL_ = 'https://{0}/oauth/token'
_OAUTH_AUTHORIZE_URL_ = 'https://{0}/oauth/authorize'
_OAUTH_USER_URL_ = 'https://{0}/api/v4/user'
_USER_KEY = 'username'
@property
def _OAUTH_ACCESS_TOKEN_URL(self):
url = config.oauth_extra_params.get('url', 'gitlab.com')
return self._OAUTH_ACCESS_TOKEN_URL_.format(url)
@property
def _OAUTH_AUTHORIZE_URL(self):
url = config.oauth_extra_params.get('url', 'gitlab.com')
return self._OAUTH_AUTHORIZE_URL_.format(url)
@property
def _OAUTH_USER_URL(self):
url = config.oauth_extra_params.get('url', 'gitlab.com')
return self._OAUTH_USER_URL_.format(url)
async def _fetch_access_token(self, code, redirect_uri, client_id, client_secret):
"""
Fetches the access token.
Arguments
----------
code:
The response code from the server
redirect_uri:
The redirect URI
client_id:
The client ID
client_secret:
The client secret
state:
The unguessable random string to protect against cross-site
request forgery attacks
"""
if not client_secret:
raise ValueError('The client secret is undefined.')
log.debug("%s making access token request.", type(self).__name__)
http = self.get_auth_http_client()
params = {
'code': code,
'redirect_uri': redirect_uri,
'client_id': client_id,
'client_secret': client_secret,
**self._EXTRA_TOKEN_PARAMS
}
url = url_concat(self._OAUTH_ACCESS_TOKEN_URL, params)
# Request the access token.
req = HTTPRequest(
url,
method="POST",
headers=self._API_BASE_HEADERS,
body=''
)
try:
response = await http.fetch(req)
except HTTPError as e:
return self._on_error(e.response)
body = decode_response_body(response)
if not body:
return
if 'access_token' not in body:
return self._on_error(response, body)
log.debug("%s granted access_token.", type(self).__name__)
headers = dict(self._API_BASE_HEADERS, **{
"Authorization": "Bearer {}".format(body['access_token']),
})
user_response = await http.fetch(
self._OAUTH_USER_URL,
method="GET",
headers=headers
)
user = decode_response_body(user_response)
if not user:
return
log.debug("%s received user information.", type(self).__name__)
return self._on_auth(user, body['access_token'])
[docs]class OAuthIDTokenLoginHandler(OAuthLoginHandler):
_API_BASE_HEADERS = {
'Content-Type':
'application/x-www-form-urlencoded; charset=UTF-8'
}
_EXTRA_AUTHORIZE_PARAMS = {
'grant_type': 'authorization_code'
}
async def _fetch_access_token(self, code, redirect_uri, client_id, client_secret):
"""
Fetches the access token.
Arguments
----------
code:
The response code from the server
redirect_uri:
The redirect URI
client_id:
The client ID
client_secret:
The client secret
state:
The unguessable random string to protect against cross-site
request forgery attacks
"""
if not client_secret:
raise ValueError('The client secret are undefined.')
log.debug("%s making access token request.", type(self).__name__)
http = self.get_auth_http_client()
params = {
'code': code,
'redirect_uri': redirect_uri,
'client_id': client_id,
'client_secret': client_secret,
**self._EXTRA_AUTHORIZE_PARAMS
}
data = urlparse.urlencode(
params, doseq=True, encoding='utf-8', safe='=')
# Request the access token.
req = HTTPRequest(
self._OAUTH_ACCESS_TOKEN_URL,
method="POST",
headers=self._API_BASE_HEADERS,
body=data
)
try:
response = await http.fetch(req)
except HTTPError as e:
return self._on_error(e.response)
body = decode_response_body(response)
if 'access_token' not in body:
return self._on_error(response, body)
log.debug("%s granted access_token.", type(self).__name__)
access_token = body['access_token']
id_token = body['id_token']
return self._on_auth(id_token, access_token)
def _on_auth(self, id_token, access_token):
decoded = decode_id_token(id_token)
user_key = config.oauth_jwt_user or self._USER_KEY
if user_key in decoded:
user = decoded[user_key]
else:
log.error("%s token payload did not contain expected %r.",
type(self).__name__, user_key)
raise HTTPError(400, "OAuth token payload missing user information")
self.set_secure_cookie('user', user, expires_days=config.oauth_expiry)
if state.encryption:
access_token = state.encryption.encrypt(access_token.encode('utf-8'))
id_token = state.encryption.encrypt(id_token.encode('utf-8'))
self.set_secure_cookie('access_token', access_token, expires_days=config.oauth_expiry)
self.set_secure_cookie('id_token', id_token, expires_days=config.oauth_expiry)
return user
[docs]class AzureAdLoginHandler(OAuthIDTokenLoginHandler, OAuth2Mixin):
_API_BASE_HEADERS = {
'Accept': 'application/json',
'User-Agent': 'Tornado OAuth'
}
_OAUTH_ACCESS_TOKEN_URL_ = 'https://login.microsoftonline.com/{tenant}/oauth2/token'
_OAUTH_AUTHORIZE_URL_ = 'https://login.microsoftonline.com/{tenant}/oauth2/authorize'
_OAUTH_USER_URL_ = ''
_USER_KEY = 'unique_name'
@property
def _OAUTH_ACCESS_TOKEN_URL(self):
tenant = os.environ.get('AAD_TENANT_ID', config.oauth_extra_params.get('tenant', 'common'))
return self._OAUTH_ACCESS_TOKEN_URL_.format(tenant=tenant)
@property
def _OAUTH_AUTHORIZE_URL(self):
tenant = os.environ.get('AAD_TENANT_ID', config.oauth_extra_params.get('tenant', 'common'))
return self._OAUTH_AUTHORIZE_URL_.format(tenant=tenant)
@property
def _OAUTH_USER_URL(self):
return self._OAUTH_USER_URL_.format(**config.oauth_extra_params)
[docs]class AzureAdV2LoginHandler(OAuthIDTokenLoginHandler, OAuth2Mixin):
_API_BASE_HEADERS = {
'Accept': 'application/json',
'User-Agent': 'Tornado OAuth'
}
_OAUTH_ACCESS_TOKEN_URL_ = 'https://login.microsoftonline.com/{tenant}/oauth2/v2.0/token'
_OAUTH_AUTHORIZE_URL_ = 'https://login.microsoftonline.com/{tenant}/oauth2/v2.0/authorize'
_OAUTH_USER_URL_ = ''
_USER_KEY = 'email'
_SCOPE = ['openid', 'email', 'profile']
@property
def _OAUTH_ACCESS_TOKEN_URL(self):
tenant = os.environ.get('AAD_TENANT_ID', config.oauth_extra_params.get('tenant', 'common'))
return self._OAUTH_ACCESS_TOKEN_URL_.format(tenant=tenant)
@property
def _OAUTH_AUTHORIZE_URL(self):
tenant = os.environ.get('AAD_TENANT_ID', config.oauth_extra_params.get('tenant', 'common'))
return self._OAUTH_AUTHORIZE_URL_.format(tenant=tenant)
@property
def _OAUTH_USER_URL(self):
return self._OAUTH_USER_URL_.format(**config.oauth_extra_params)
[docs]class OktaLoginHandler(OAuthIDTokenLoginHandler, OAuth2Mixin):
"""Okta OAuth2 Authentication
To authenticate with Okta you first need to set up and configure
in the Okta developer console.
"""
_EXTRA_TOKEN_PARAMS = {
'grant_type': 'authorization_code',
'response_type': 'code,token,id_token'
}
_OAUTH_ACCESS_TOKEN_URL_ = 'https://{0}/oauth2/{1}/v1/token'
_OAUTH_ACCESS_TOKEN_URL__ = 'https://{0}/oauth2/v1/token'
_OAUTH_AUTHORIZE_URL_ = 'https://{0}/oauth2/{1}/v1/authorize'
_OAUTH_AUTHORIZE_URL__ = 'https://{0}/oauth2/v1/authorize'
_OAUTH_USER_URL_ = 'https://{0}/oauth2/{1}/v1/userinfo?access_token='
_OAUTH_USER_URL__ = 'https://{0}/oauth2/v1/userinfo?access_token='
_USER_KEY = 'email'
_SCOPE = ['openid', 'email', 'profile']
@property
def _OAUTH_ACCESS_TOKEN_URL(self):
url = config.oauth_extra_params.get('url', 'okta.com')
server = config.oauth_extra_params.get('server', 'default')
if server:
return self._OAUTH_ACCESS_TOKEN_URL_.format(url, server)
else:
return self._OAUTH_ACCESS_TOKEN_URL__.format(url)
@property
def _OAUTH_AUTHORIZE_URL(self):
url = config.oauth_extra_params.get('url', 'okta.com')
server = config.oauth_extra_params.get('server', 'default')
if server:
return self._OAUTH_AUTHORIZE_URL_.format(url, server)
else:
return self._OAUTH_AUTHORIZE_URL__.format(url)
@property
def _OAUTH_USER_URL(self):
url = config.oauth_extra_params.get('url', 'okta.com')
server = config.oauth_extra_params.get('server', 'default')
if server:
return self._OAUTH_USER_URL_.format(url, server)
else:
return self._OAUTH_USER_URL__.format(url, server)
[docs]class GoogleLoginHandler(OAuthIDTokenLoginHandler, OAuth2Mixin):
_API_BASE_HEADERS = {
"Content-Type": "application/x-www-form-urlencoded; charset=utf-8"
}
_OAUTH_AUTHORIZE_URL = "https://accounts.google.com/o/oauth2/v2/auth"
_OAUTH_ACCESS_TOKEN_URL = "https://accounts.google.com/o/oauth2/token"
_SCOPE = ['profile', 'email']
_USER_KEY = 'email'
[docs]class LogoutHandler(tornado.web.RequestHandler):
def get(self):
self.clear_cookie("user")
self.clear_cookie("id_token")
self.clear_cookie("access_token")
self.clear_cookie(STATE_COOKIE_NAME)
self.redirect("/")
[docs]class OAuthProvider(AuthProvider):
def __init__(self, error_template=None):
if error_template is None:
self._error_template = ERROR_TEMPLATE
else:
with open(error_template) as f:
self._error_template = _env.from_string(f.read())
super().__init__()
@property
def get_user(self):
def get_user(request_handler):
return request_handler.get_secure_cookie("user", max_age_days=config.oauth_expiry)
return get_user
@property
def endpoints(self) -> List[Tuple[str, Type[RequestHandler]]]:
''' URL patterns for login/logout endpoints.
'''
endpoints: List[Tuple[str, Type[RequestHandler]]] = []
if self.login_handler:
assert self.login_url is not None
endpoints.append(('/login', self.login_handler))
if self.logout_handler:
assert self.logout_url is not None
endpoints.append(('/logout', self.logout_handler))
return endpoints
@property
def login_url(self):
if config.oauth_redirect_uri is None:
return '/login'
else:
return urlparse.urlparse(config.oauth_redirect_uri).path + '/login'
@property
def login_handler(self):
handler = AUTH_PROVIDERS[config.oauth_provider]
if self._error_template:
handler._error_template = self._error_template
return handler
@property
def logout_url(self):
return "/logout"
@property
def logout_handler(self):
return LogoutHandler
AUTH_PROVIDERS = {
'auth0': Auth0Handler,
'azure': AzureAdLoginHandler,
'azurev2': AzureAdV2LoginHandler,
'bitbucket': BitbucketLoginHandler,
'generic': GenericLoginHandler,
'google': GoogleLoginHandler,
'github': GithubLoginHandler,
'gitlab': GitLabLoginHandler,
'okta': OktaLoginHandler
}
# Populate AUTH Providers from external extensions
for entry_point in pkg_resources.iter_entry_points('panel.auth'):
AUTH_PROVIDERS[entry_point.name] = entry_point.resolve()
config.param.objects(False)['_oauth_provider'].objects = list(AUTH_PROVIDERS.keys())