hsman/app/lib.py

188 lines
6.2 KiB
Python

import os
import functools
from flask import request, abort, current_app
from flask import session as flask_session
from flask_pyoidc import OIDCAuthentication as _OIDCAuth
from flask_pyoidc.user_session import UserSession
from flask_pyoidc.provider_configuration import ProviderConfiguration, ClientMetadata
from typing import Callable, List
import logging
log = logging.getLogger(__name__)
def remote_ip() -> str:
if 'HTTP_X_FORWARDED_FOR' in request.environ:
xff_parts = request.environ.get('HTTP_X_FORWARDED_FOR').split(',')
return xff_parts[0]
else:
return str(request.environ.get('REMOTE_ADDR'))
def username() -> str:
userinfo = flask_session['userinfo']
return userinfo['email'].split('@')[0]
def login_name() -> str:
userinfo = flask_session['userinfo']
if 'preferred_username' in userinfo:
return userinfo['preferred_username']
else:
return username()
def webMode() -> bool:
is_gunicorn = "gunicorn" in os.environ.get('SERVER_SOFTWARE', '')
is_werkzeug = os.environ.get('WERKZEUG_RUN_MAIN', False) == "true"
return is_gunicorn or is_werkzeug
class OIDCAuthentication(_OIDCAuth):
def __init__(self):
pass
def init_app(self, app):
client_metadata = ClientMetadata(
client_id=app.config['OIDC_CLIENT_ID'],
client_secret=app.config['OIDC_CLIENT_SECRET'])
provider_config = ProviderConfiguration(
issuer=app.config['OIDC_URL'],
client_metadata=client_metadata,
auth_request_params={
'scope': ['openid',
'profile',
'groups',
'email']},
session_refresh_interval_seconds=1800)
super().__init__({'default': provider_config})
super().init_app(app)
app.auth = self
def authorize(self, provider_name: str, authz_fn: Callable, **kwargs):
if provider_name not in self._provider_configurations:
raise ValueError(
f"Provider name '{provider_name}' not in configured providers: {
self._provider_configurations.keys()}."
)
# We save args with which we have been called
external_args = kwargs
# Decorator
def oidc_decorator(view_func):
@ functools.wraps(view_func)
def wrapper(*args, **kwargs):
# Retrieve session and client
session = UserSession(flask_session, provider_name)
client = self.clients[session.current_provider]
# Check session validity
if session.should_refresh(client.session_refresh_interval_seconds):
log.debug('user auth will be refreshed "silently"')
return self._authenticate(client, interactive=False)
elif session.is_authenticated():
log.debug('user is already authenticated')
else:
log.debug('user not authenticated, start flow')
return self._authenticate(client)
# Call authorization function that must return true or false
authorized = authz_fn(session, **external_args)
if authorized:
return view_func(*args, **kwargs)
else:
return abort(403)
return wrapper
return oidc_decorator
def authorize_domains(self,
provider_name: str,
domains: List[str]):
"""
Authorize a user if the email domain is in a list of domains
"""
def _authz_fn(session, domains) -> bool:
email = session.userinfo.get('email', "")
domain = email.split('@')[-1]
if domain in domains:
return True
return False
return self.authorize(provider_name,
authz_fn=_authz_fn,
domains=domains)
def authorize_users(self, provider_name: str, users: List[str]):
"""
Authorize a user if the username of the user part of the email
is in a list of usernames
"""
def _authz_fn(session, users) -> bool:
username = session.userinfo.get('preferred_username', "")
email = session.userinfo.get('email', "")
email_user = email.split('@')[0]
if username in users or email_user in users:
return True
return False
return self.authorize(provider_name,
authz_fn=_authz_fn,
users=users)
def authorize_groups(self, provider_name: str, groups: List[str]):
"""
Authorize members of a list of groups
"""
def _authz_fn(session, groups) -> bool:
user_groups = session.userinfo.get('groups', [])
if len(set(groups).intersection(user_groups)):
return True
return False
return self.authorize(provider_name,
authz_fn=_authz_fn,
groups=groups)
def authorize_admins(self, provider_name: str):
"""
Authorize admins.
Admins are taken from the app config:
- members of groups in ADMIN_GROUPS
- users in ADMIN_USERS
"""
def _authz_fn(session) -> bool:
user_groups = session.userinfo.get('groups', [])
username = session.userinfo.get('preferred_username', "")
with current_app.app_context():
admin_groups = current_app.config.get('ADMIN_GROUPS', [])
admin_users = current_app.config.get('ADMIN_USERS', [])
authorized_groups = set(admin_groups).intersection(user_groups)
if len(authorized_groups):
log.debug(f"'{username}' is a member of {
authorized_groups}")
return True
if username in admin_users:
log.debug(f"'{username}' is an admin user")
return True
return False
return self.authorize(provider_name,
authz_fn=_authz_fn)