import os import functools from flask import request, abort, current_app from flask import session as flask_session, jsonify from flask_pyoidc import OIDCAuthentication as _OIDCAuth from flask_pyoidc.user_session import UserSession from flask_pyoidc.provider_configuration import ProviderConfiguration, ClientMetadata, ProviderMetadata from typing import Callable, List import logging log = logging.getLogger(__name__) def remote_ip() -> str: if 'HTTP_X_FORWARDED_FOR' in request.environ: xff_parts = request.environ.get('HTTP_X_FORWARDED_FOR').split(',') return xff_parts[0] else: return str(request.environ.get('REMOTE_ADDR')) def webMode() -> bool: is_gunicorn = "gunicorn" in os.environ.get('SERVER_SOFTWARE', '') is_werkzeug = os.environ.get('WERKZEUG_RUN_MAIN', False) == "true" return is_gunicorn or is_werkzeug class OIDCAuthentication(_OIDCAuth): def __init__(self): pass def init_app(self, app): client_metadata = ClientMetadata( client_id=app.config['OIDC_CLIENT_ID'], client_secret=app.config['OIDC_CLIENT_SECRET']) provider_config = ProviderConfiguration( issuer=app.config['OIDC_URL'], client_metadata=client_metadata, auth_request_params={ 'scope': ['openid', 'profile', 'groups', 'email']}, session_refresh_interval_seconds=1800) super().__init__({'default': provider_config}) super().init_app(app) app.auth = self @property def userinfo(self) -> dict: return flask_session.get('userinfo', {}) @property def username(self) -> str: return self.userinfo.get('preferred_username', 'unknown') @property def email(self) -> str: return self.userinfo.get('email', 'unknown') @property def login_name(self) -> str: return self.userinfo.get('preferred_username', self.username) @property def full_name(self) -> str: return self.userinfo.get('name', self.username) @property def groups(self) -> list: return self.userinfo.get('groups', []) @property def isAdmin(self) -> bool: user_groups = self.userinfo.get('groups', []) with current_app.app_context(): admin_groups = current_app.config.get('ADMIN_GROUPS', []) admin_users = current_app.config.get('ADMIN_USERS', []) authorized_groups = set(admin_groups).intersection(user_groups) if len(authorized_groups): log.debug(f"'{self.username}' is a member of { authorized_groups}. isAdmin == True") return True if self.username in admin_users: log.debug(f"'{self.username}' is an admin user") return True return False @property def unathorized(self): response = jsonify( {'message': f"not authorized", 'comment': 'nice try, info logged', 'logged': f"'{self.username}@{remote_ip()}", 'result': 'GO AWAY!'}) log.warning( f"user '{self.username}' attempted denied operation from {remote_ip()}") return response, 403 def userOrAdmin(self, username: str): """ Check is the current user is an admin OR the username passed as argument """ return self.isAdmin or self.username == username def authorize(self, provider_name: str, authz_fn: Callable, **kwargs): if provider_name not in self._provider_configurations: raise ValueError( f"Provider name '{provider_name}' not in configured providers: { self._provider_configurations.keys()}." ) # We save args with which we have been called external_args = kwargs # Decorator def oidc_decorator(view_func): @functools.wraps(view_func) def wrapper(*args, **kwargs): # Retrieve session and client session = UserSession(flask_session, provider_name) client = self.clients[session.current_provider] # Check session validity if session.should_refresh(client.session_refresh_interval_seconds): log.debug('user auth will be refreshed "silently"') return self._authenticate(client, interactive=False) elif session.is_authenticated(): log.debug('user is already authenticated') else: log.debug('user not authenticated, start flow') return self._authenticate(client) # Call authorization function that must return true or false authorized = authz_fn(session, **external_args) if authorized: return view_func(*args, **kwargs) else: return abort(403) return wrapper return oidc_decorator def authorize_domains(self, provider_name: str, domains: List[str]): """ Authorize a user if the email domain is in a list of domains """ def _authz_fn(session, domains) -> bool: email = session.userinfo.get('email', "") domain = email.split('@')[-1] if domain in domains: return True return False return self.authorize(provider_name, authz_fn=_authz_fn, domains=domains) def authorize_users(self, provider_name: str, users: List[str]): """ Authorize a user if the username of the user part of the email is in a list of usernames """ def _authz_fn(session, users) -> bool: username = session.userinfo.get('preferred_username', "") email = session.userinfo.get('email', "") email_user = email.split('@')[0] if username in users or email_user in users: return True return False return self.authorize(provider_name, authz_fn=_authz_fn, users=users) def authorize_groups(self, provider_name: str, groups: List[str]): """ Authorize members of a list of groups """ def _authz_fn(session, groups) -> bool: user_groups = session.userinfo.get('groups', []) if len(set(groups).intersection(user_groups)): return True return False return self.authorize(provider_name, authz_fn=_authz_fn, groups=groups) def authorize_admins(self, provider_name: str): """ Authorize admins. Admins are taken from the app config: - members of groups in ADMIN_GROUPS - users in ADMIN_USERS """ def _authz_fn(session) -> bool: return self.isAdmin return self.authorize(provider_name, authz_fn=_authz_fn)