221 lines
7.2 KiB
Python
221 lines
7.2 KiB
Python
import os
|
|
import functools
|
|
|
|
from flask import request, abort, current_app
|
|
from flask import session as flask_session, jsonify
|
|
from flask_pyoidc import OIDCAuthentication as _OIDCAuth
|
|
from flask_pyoidc.user_session import UserSession
|
|
from flask_pyoidc.provider_configuration import ProviderConfiguration, ClientMetadata
|
|
|
|
from typing import Callable, List
|
|
|
|
import logging
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def remote_ip() -> str:
|
|
if 'HTTP_X_FORWARDED_FOR' in request.environ:
|
|
xff_parts = request.environ.get('HTTP_X_FORWARDED_FOR').split(',')
|
|
return xff_parts[0]
|
|
else:
|
|
return str(request.environ.get('REMOTE_ADDR'))
|
|
|
|
|
|
def webMode() -> bool:
|
|
is_gunicorn = "gunicorn" in os.environ.get('SERVER_SOFTWARE', '')
|
|
is_werkzeug = os.environ.get('WERKZEUG_RUN_MAIN', False) == "true"
|
|
|
|
return is_gunicorn or is_werkzeug
|
|
|
|
|
|
class OIDCAuthentication(_OIDCAuth):
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
def init_app(self, app):
|
|
client_metadata = ClientMetadata(
|
|
client_id=app.config['OIDC_CLIENT_ID'],
|
|
client_secret=app.config['OIDC_CLIENT_SECRET'])
|
|
|
|
provider_config = ProviderConfiguration(
|
|
issuer=app.config['OIDC_URL'],
|
|
client_metadata=client_metadata,
|
|
auth_request_params={
|
|
'scope': ['openid',
|
|
'profile',
|
|
'groups',
|
|
'email']},
|
|
session_refresh_interval_seconds=1800)
|
|
super().__init__({'default': provider_config})
|
|
super().init_app(app)
|
|
app.auth = self
|
|
|
|
@property
|
|
def username(self) -> str:
|
|
userinfo = flask_session['userinfo']
|
|
return userinfo['email'].split('@')[0]
|
|
|
|
@property
|
|
def email(self) -> str:
|
|
userinfo = flask_session['userinfo']
|
|
return userinfo['email']
|
|
|
|
@property
|
|
def login_name(self) -> str:
|
|
userinfo = flask_session['userinfo']
|
|
return userinfo.get('preferred_username', self.username)
|
|
|
|
@property
|
|
def full_name(self) -> str:
|
|
userinfo = flask_session['userinfo']
|
|
return userinfo.get('name')
|
|
|
|
@property
|
|
def groups(self) -> list:
|
|
userinfo = flask_session['userinfo']
|
|
return userinfo.get('groups') or []
|
|
|
|
@property
|
|
def isAdmin(self) -> bool:
|
|
userinfo = flask_session['userinfo']
|
|
user_groups = userinfo.get('groups', [])
|
|
with current_app.app_context():
|
|
admin_groups = current_app.config.get('ADMIN_GROUPS', [])
|
|
admin_users = current_app.config.get('ADMIN_USERS', [])
|
|
|
|
authorized_groups = set(admin_groups).intersection(user_groups)
|
|
|
|
if len(authorized_groups):
|
|
log.debug(f"'{self.username}' is a member of {
|
|
authorized_groups}. isAdmin == True")
|
|
return True
|
|
|
|
if self.username in admin_users:
|
|
log.debug(f"'{self.username}' is an admin user")
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def unathorized(self):
|
|
response = jsonify(
|
|
{'message': f"not authorized",
|
|
'comment': 'nice try, info logged',
|
|
'logged': f"'{self.username}@{remote_ip()}",
|
|
'result': 'GO AWAY!'})
|
|
log.warning(
|
|
f"user '{self.username}' attempted denied operation from {remote_ip()}")
|
|
return response, 403
|
|
|
|
def userOrAdmin(self, username: str):
|
|
"""
|
|
Check is the current user is an admin OR the username passed as argument
|
|
"""
|
|
return self.isAdmin or self.username == username
|
|
|
|
def authorize(self, provider_name: str, authz_fn: Callable, **kwargs):
|
|
if provider_name not in self._provider_configurations:
|
|
raise ValueError(
|
|
f"Provider name '{provider_name}' not in configured providers: {
|
|
self._provider_configurations.keys()}."
|
|
)
|
|
|
|
# We save args with which we have been called
|
|
external_args = kwargs
|
|
|
|
# Decorator
|
|
def oidc_decorator(view_func):
|
|
@functools.wraps(view_func)
|
|
def wrapper(*args, **kwargs):
|
|
# Retrieve session and client
|
|
session = UserSession(flask_session, provider_name)
|
|
client = self.clients[session.current_provider]
|
|
|
|
# Check session validity
|
|
if session.should_refresh(client.session_refresh_interval_seconds):
|
|
log.debug('user auth will be refreshed "silently"')
|
|
return self._authenticate(client, interactive=False)
|
|
elif session.is_authenticated():
|
|
log.debug('user is already authenticated')
|
|
else:
|
|
log.debug('user not authenticated, start flow')
|
|
return self._authenticate(client)
|
|
|
|
# Call authorization function that must return true or false
|
|
authorized = authz_fn(session, **external_args)
|
|
if authorized:
|
|
return view_func(*args, **kwargs)
|
|
else:
|
|
return abort(403)
|
|
return wrapper
|
|
|
|
return oidc_decorator
|
|
|
|
def authorize_domains(self,
|
|
provider_name: str,
|
|
domains: List[str]):
|
|
"""
|
|
Authorize a user if the email domain is in a list of domains
|
|
"""
|
|
|
|
def _authz_fn(session, domains) -> bool:
|
|
email = session.userinfo.get('email', "")
|
|
domain = email.split('@')[-1]
|
|
|
|
if domain in domains:
|
|
return True
|
|
return False
|
|
|
|
return self.authorize(provider_name,
|
|
authz_fn=_authz_fn,
|
|
domains=domains)
|
|
|
|
def authorize_users(self, provider_name: str, users: List[str]):
|
|
"""
|
|
Authorize a user if the username of the user part of the email
|
|
is in a list of usernames
|
|
"""
|
|
|
|
def _authz_fn(session, users) -> bool:
|
|
username = session.userinfo.get('preferred_username', "")
|
|
email = session.userinfo.get('email', "")
|
|
email_user = email.split('@')[0]
|
|
|
|
if username in users or email_user in users:
|
|
return True
|
|
return False
|
|
|
|
return self.authorize(provider_name,
|
|
authz_fn=_authz_fn,
|
|
users=users)
|
|
|
|
def authorize_groups(self, provider_name: str, groups: List[str]):
|
|
"""
|
|
Authorize members of a list of groups
|
|
"""
|
|
|
|
def _authz_fn(session, groups) -> bool:
|
|
user_groups = session.userinfo.get('groups', [])
|
|
|
|
if len(set(groups).intersection(user_groups)):
|
|
return True
|
|
return False
|
|
|
|
return self.authorize(provider_name,
|
|
authz_fn=_authz_fn,
|
|
groups=groups)
|
|
|
|
def authorize_admins(self, provider_name: str):
|
|
"""
|
|
Authorize admins.
|
|
Admins are taken from the app config:
|
|
- members of groups in ADMIN_GROUPS
|
|
- users in ADMIN_USERS
|
|
"""
|
|
|
|
def _authz_fn(session) -> bool:
|
|
return self.isAdmin
|
|
|
|
return self.authorize(provider_name,
|
|
authz_fn=_authz_fn)
|