Reorganize repos
This commit is contained in:
180
app/lib.py
Normal file
180
app/lib.py
Normal file
@ -0,0 +1,180 @@
|
||||
import os
|
||||
import functools
|
||||
|
||||
from flask import request, abort, current_app
|
||||
from flask import session as flask_session
|
||||
from flask_pyoidc import OIDCAuthentication as _OIDCAuth
|
||||
from flask_pyoidc.user_session import UserSession
|
||||
from flask_pyoidc.provider_configuration import ProviderConfiguration, ClientMetadata
|
||||
|
||||
from typing import Callable, List
|
||||
|
||||
import logging
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def remote_ip() -> str:
|
||||
if 'HTTP_X_FORWARDED_FOR' in request.environ:
|
||||
xff_parts = request.environ.get('HTTP_X_FORWARDED_FOR').split(',')
|
||||
return xff_parts[0]
|
||||
else:
|
||||
return str(request.environ.get('REMOTE_ADDR'))
|
||||
|
||||
|
||||
def username() -> str:
|
||||
userinfo = flask_session['userinfo']
|
||||
if 'preferred_username' in userinfo:
|
||||
return userinfo['preferred_username']
|
||||
return userinfo['email']
|
||||
|
||||
|
||||
def webMode() -> bool:
|
||||
is_gunicorn = "gunicorn" in os.environ.get('SERVER_SOFTWARE', '')
|
||||
is_werkzeug = os.environ.get('WERKZEUG_RUN_MAIN', False) == "true"
|
||||
|
||||
return is_gunicorn or is_werkzeug
|
||||
|
||||
|
||||
class OIDCAuthentication(_OIDCAuth):
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def init_app(self, app):
|
||||
client_metadata = ClientMetadata(
|
||||
client_id=app.config['OIDC_CLIENT_ID'],
|
||||
client_secret=app.config['OIDC_CLIENT_SECRET'])
|
||||
|
||||
provider_config = ProviderConfiguration(
|
||||
issuer=app.config['OIDC_URL'],
|
||||
client_metadata=client_metadata,
|
||||
auth_request_params={
|
||||
'scope': ['openid',
|
||||
'profile',
|
||||
'groups',
|
||||
'email']},
|
||||
session_refresh_interval_seconds=1800)
|
||||
super().__init__({'default': provider_config})
|
||||
super().init_app(app)
|
||||
|
||||
def authorize(self, provider_name: str, authz_fn: Callable, **kwargs):
|
||||
if provider_name not in self._provider_configurations:
|
||||
raise ValueError(
|
||||
f"Provider name '{provider_name}' not in configured providers: {
|
||||
self._provider_configurations.keys()}."
|
||||
)
|
||||
|
||||
# We save args with which we have been called
|
||||
external_args = kwargs
|
||||
|
||||
# Decorator
|
||||
def oidc_decorator(view_func):
|
||||
@ functools.wraps(view_func)
|
||||
def wrapper(*args, **kwargs):
|
||||
# Retrieve session and client
|
||||
session = UserSession(flask_session, provider_name)
|
||||
client = self.clients[session.current_provider]
|
||||
|
||||
# Check session validity
|
||||
if session.should_refresh(client.session_refresh_interval_seconds):
|
||||
log.debug('user auth will be refreshed "silently"')
|
||||
return self._authenticate(client, interactive=False)
|
||||
elif session.is_authenticated():
|
||||
log.debug('user is already authenticated')
|
||||
else:
|
||||
log.debug('user not authenticated, start flow')
|
||||
return self._authenticate(client)
|
||||
|
||||
# Call authorization function that must return true or false
|
||||
authorized = authz_fn(session, **external_args)
|
||||
if authorized:
|
||||
return view_func(*args, **kwargs)
|
||||
else:
|
||||
return abort(403)
|
||||
return wrapper
|
||||
|
||||
return oidc_decorator
|
||||
|
||||
def authorize_domains(self,
|
||||
provider_name: str,
|
||||
domains: List[str]):
|
||||
"""
|
||||
Authorize a user if the email domain is in a list of domains
|
||||
"""
|
||||
|
||||
def _authz_fn(session, domains) -> bool:
|
||||
email = session.userinfo.get('email', "")
|
||||
domain = email.split('@')[-1]
|
||||
|
||||
if domain in domains:
|
||||
return True
|
||||
return False
|
||||
|
||||
return self.authorize(provider_name,
|
||||
authz_fn=_authz_fn,
|
||||
domains=domains)
|
||||
|
||||
def authorize_users(self, provider_name: str, users: List[str]):
|
||||
"""
|
||||
Authorize a user if the username of the user part of the email
|
||||
is in a list of usernames
|
||||
"""
|
||||
|
||||
def _authz_fn(session, users) -> bool:
|
||||
username = session.userinfo.get('preferred_username', "")
|
||||
email = session.userinfo.get('email', "")
|
||||
email_user = email.split('@')[0]
|
||||
|
||||
if username in users or email_user in users:
|
||||
return True
|
||||
return False
|
||||
|
||||
return self.authorize(provider_name,
|
||||
authz_fn=_authz_fn,
|
||||
users=users)
|
||||
|
||||
def authorize_groups(self, provider_name: str, groups: List[str]):
|
||||
"""
|
||||
Authorize members of a list of groups
|
||||
"""
|
||||
|
||||
def _authz_fn(session, groups) -> bool:
|
||||
user_groups = session.userinfo.get('groups', [])
|
||||
|
||||
if len(set(groups).intersection(user_groups)):
|
||||
return True
|
||||
return False
|
||||
|
||||
return self.authorize(provider_name,
|
||||
authz_fn=_authz_fn,
|
||||
groups=groups)
|
||||
|
||||
def authorize_admins(self, provider_name: str):
|
||||
"""
|
||||
Authorize admins.
|
||||
Admins are taken from the app config:
|
||||
- members of groups in ADMIN_GROUPS
|
||||
- users in ADMIN_USERS
|
||||
"""
|
||||
|
||||
def _authz_fn(session) -> bool:
|
||||
user_groups = session.userinfo.get('groups', [])
|
||||
username = session.userinfo.get('preferred_username', "")
|
||||
with current_app.app_context():
|
||||
admin_groups = current_app.config.get('ADMIN_GROUPS', [])
|
||||
admin_users = current_app.config.get('ADMIN_USERS', [])
|
||||
|
||||
authorized_groups = set(admin_groups).intersection(user_groups)
|
||||
|
||||
if len(authorized_groups):
|
||||
log.debug(f"'{username}' is a member of {
|
||||
authorized_groups}")
|
||||
return True
|
||||
|
||||
if username in admin_users:
|
||||
log.debug(f"'{username}' is an admin user")
|
||||
return True
|
||||
return False
|
||||
|
||||
return self.authorize(provider_name,
|
||||
authz_fn=_authz_fn)
|
Reference in New Issue
Block a user