hsman/hsman/app/views.py

205 lines
6.5 KiB
Python

import logging
from .lib import remote_ip
import datetime
from flask import render_template, Blueprint, request
from flask import redirect, session, url_for
from app import auth
from flask import jsonify
from flask_pyoidc.user_session import UserSession
from hsapi import Node, User, Route, PreAuthKey
from hsapi.preauthkeys import (v1ListPreAuthKeyRequest,
v1CreatePreAuthKeyRequest,
v1ExpirePreAuthKeyRequest)
log = logging.getLogger()
main_blueprint = Blueprint('main', __name__)
@ main_blueprint.route('/', methods=['GET', 'POST'])
@ auth.access_control('default')
def index():
user_session = UserSession(session)
hs_user = user_session.userinfo['email'].split('@')[0]
userNodeList = [n for n in Node().list().nodes if n.user.name == hs_user]
return render_template('index.html',
userNodeList=userNodeList,
session=user_session)
@ main_blueprint.route('/token', methods=['GET', 'POST'])
@ auth.access_control('default')
def token():
user_session = UserSession(session)
return jsonify(user_session.userinfo)
@ main_blueprint.route('/call', methods=['GET', 'POST'])
@ auth.access_control('default')
def call():
return "CALL OK"
@ main_blueprint.route('/logout')
@ auth.oidc_logout
def logout():
return redirect('/')
@ main_blueprint.route('/nodes', methods=['GET'])
@ auth.authorize_admins('default')
def nodes():
nodelist = Node().list()
return render_template('nodes.html',
nodes=nodelist.nodes)
@ main_blueprint.route('/node/<int:nodeId>', methods=['GET'])
@ auth.authorize_admins('default')
def node(nodeId):
# There is a bug in HS api with retrieving a single node
# and we added a workaround to hsapi, so node.get() returns a
# v1Node object instead of v1NodeResponse, so we access directly
# `node`, instead of `node.node`
node = Node().get(nodeId)
routes = Node().routes(nodeId)
isExitNode = any((r for r in routes.routes if r.prefix.endswith('/0')))
return render_template("node.html",
routes=routes.routes,
isExitNode=isExitNode,
node=node)
@ main_blueprint.route('/users', methods=['GET'])
@ auth.authorize_admins('default')
def users():
userList = User().list()
# Get online status of devices of the user
online = {}
nodeList = Node().list()
for user in userList.users:
userNodeList = [n for n in nodeList.nodes if n.user.name == user.name]
online[user.name] = any(map(lambda x: x.online, userNodeList))
return render_template('users.html',
users=userList.users,
online=online)
@ main_blueprint.route('/user/<userName>', methods=['GET'])
@ auth.authorize_admins('default')
def user(userName):
user = User().get(userName)
userNodeList = [n for n in Node().list().nodes if n.user.name == userName]
preauthkeyreq = v1ListPreAuthKeyRequest(user=userName)
preauthKeys = PreAuthKey().list(preauthkeyreq)
defaultExpiry = datetime.datetime.now() + datetime.timedelta(days=7)
expStr = defaultExpiry.strftime('%Y-%m-%dT%H:%M')
return render_template("user.html",
user=user.user,
defaultExpiry=expStr,
preauthKeys=preauthKeys.preAuthKeys,
userNodeList=userNodeList)
@ main_blueprint.route('/routes', methods=['GET'])
@ auth.authorize_admins('default')
def routes():
routes = Route().list()
prefixes = set(
(r.prefix for r in routes.routes if not r.prefix.endswith('/0')))
exitNodes = [r.node for r in routes.routes if r.prefix.endswith(('0/0'))]
final = {}
for prefix in prefixes:
rrp = [x for x in routes.routes if x.prefix == prefix]
final[prefix] = sorted(rrp, key=lambda x: x.isPrimary, reverse=True)
return render_template("routes.html",
exitNodes=exitNodes,
routes=final)
@ main_blueprint.route('/routeToggle/<int:routeId>', methods=['GET'])
@ auth.authorize_admins('default')
def routeToggle(routeId: int):
routes = Route().list()
route = [r for r in routes.routes if r.id == routeId]
if route:
route = route[0]
if route.enabled:
Route().disable(routeId)
else:
Route().enable(routeId)
return redirect(url_for("main.routes"))
@ main_blueprint.route('/node/<int:nodeId>/expire', methods=['GET'])
@ auth.authorize_admins('default')
def expireNode(nodeId: int):
Node().expire(nodeId)
return redirect(url_for("main.node", nodeId=nodeId))
@ main_blueprint.route('/node/<int:nodeId>/list-expire', methods=['GET'])
@ auth.authorize_admins('default')
def expireNodeList(nodeId: int):
Node().expire(nodeId)
return redirect(url_for("main.nodes"))
@ main_blueprint.route('/node/<int:nodeId>/delete', methods=['GET'])
@ auth.authorize_admins('default')
def deleteNode(nodeId: int):
Node().delete(nodeId)
return redirect(url_for("main.nodes"))
@ main_blueprint.route('/node/<int:nodeId>/rename/<newName>', methods=['GET'])
@ auth.authorize_admins('default')
def renameNode(nodeId: int, newName: str):
Node().rename(nodeId, newName)
return jsonify(dict(newName=newName))
@ main_blueprint.route('/user/<userName>/delete', methods=['GET'])
@ auth.authorize_admins('default')
def deleteUser(userName: str):
nodes = Node().byUser(userName)
for node in nodes.nodes:
Node().expire(node.id)
Node().delete(node.id)
User().delete(userName)
return redirect(url_for("main.users"))
@ main_blueprint.route('/user/<userName>/pakcreate', methods=['POST'])
@ auth.authorize_admins('default')
def createPKA(userName: str):
data = request.json
log.debug(data)
expiration = f"{data['expiration']}:00Z"
req = v1CreatePreAuthKeyRequest(user=userName,
reusable=data['reusable'],
ephemeral=data['ephemeral'],
expiration=expiration)
pak = PreAuthKey().create((req))
return jsonify(dict(key=pak.preAuthKey.key))
@ main_blueprint.route('/user/<userName>/expire/<key>', methods=['GET'])
@ auth.authorize_admins('default')
def expirePKA(userName: str, key: str):
log.debug(key)
req = v1ExpirePreAuthKeyRequest(user=userName, key=key)
PreAuthKey().expire(req)
return redirect(url_for('main.user', userName=userName))