Reorganize repos
This commit is contained in:
5
hsapi_client/__init__.py
Normal file
5
hsapi_client/__init__.py
Normal file
@ -0,0 +1,5 @@
|
||||
from .apikeys import APIKey
|
||||
from .nodes import Node
|
||||
from .users import User
|
||||
from .routes import Route
|
||||
from .preauthkeys import PreAuthKey, v1ListPreAuthKeyRequest
|
42
hsapi_client/apikeys.py
Normal file
42
hsapi_client/apikeys.py
Normal file
@ -0,0 +1,42 @@
|
||||
from pydantic import BaseModel, Field
|
||||
from typing import Optional, List
|
||||
from datetime import datetime
|
||||
|
||||
from .model import HSAPICall
|
||||
from .schemas import v1ApiKey
|
||||
|
||||
|
||||
class v1CreateApiKeyRequest(BaseModel):
|
||||
expiration: str = Field(alias="expiration", default=None)
|
||||
|
||||
|
||||
class v1ExpireApiKeyRequest(BaseModel):
|
||||
prefix: str = Field(alias="prefix", default=None)
|
||||
|
||||
|
||||
class v1ListApiKeysResponse(BaseModel):
|
||||
apiKeys: Optional[List[Optional[v1ApiKey]]] = Field(
|
||||
alias="apiKeys", default=None)
|
||||
|
||||
|
||||
class v1CreateApiKeyResponse(BaseModel):
|
||||
apiKey: str = Field(alias="apiKey", default=None)
|
||||
|
||||
|
||||
class APIKey(HSAPICall):
|
||||
|
||||
objectPath = "apikey"
|
||||
|
||||
def list(self) -> v1ListApiKeysResponse:
|
||||
response = self.call('get')
|
||||
return v1ListApiKeysResponse(**response.json())
|
||||
|
||||
def create(self, data: v1CreateApiKeyRequest) -> v1CreateApiKeyResponse:
|
||||
response = self.call('post', data=data)
|
||||
return v1CreateApiKeyResponse(**response.json())
|
||||
|
||||
def expire(self, data: v1ExpireApiKeyRequest) -> None:
|
||||
self.call('post', call_path='expire', data=data)
|
||||
|
||||
def delete(self, prefix: str) -> None:
|
||||
self.call('delete', call_path=prefix)
|
25
hsapi_client/config.py
Normal file
25
hsapi_client/config.py
Normal file
@ -0,0 +1,25 @@
|
||||
import os
|
||||
from typing import Union
|
||||
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
|
||||
|
||||
class APISettings(BaseSettings):
|
||||
model_config = SettingsConfigDict(env_prefix='HSAPI_')
|
||||
server: str = "http://localhost:8080"
|
||||
api_path: str = "/api/v1"
|
||||
api_token: Union[str, None] = None
|
||||
ssl_verify: Union[bool, str] = True
|
||||
|
||||
def refresh_api_token(self):
|
||||
self.api_token = os.environ.get('HSAPI_API_TOKEN', 'default')
|
||||
|
||||
|
||||
class HTTPException(Exception):
|
||||
def __init__(self, status_code: int, message: str):
|
||||
self.status_code = status_code
|
||||
self.message = message
|
||||
super().__init__(f"{status_code} {message}")
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.status_code} {self.message}"
|
64
hsapi_client/model.py
Normal file
64
hsapi_client/model.py
Normal file
@ -0,0 +1,64 @@
|
||||
from typing import Union, Optional, Dict, Any
|
||||
from .config import APISettings, HTTPException
|
||||
import requests
|
||||
|
||||
|
||||
def formatTags(tagList: Union[list, None] = []) -> list:
|
||||
"""
|
||||
Get a list of tags and prepend `tag:` to all the ones that
|
||||
do not start with `tag:`
|
||||
"""
|
||||
|
||||
formattedTags = []
|
||||
if tagList:
|
||||
for tag in tagList:
|
||||
formatted = f"tag:{tag}" if not tag.startswith('tag:') else tag
|
||||
formattedTags.append(formatted)
|
||||
return formattedTags
|
||||
|
||||
|
||||
class HSAPICall:
|
||||
"""
|
||||
Generic API call.
|
||||
It has a call() method that wants:
|
||||
- a method (GET, POST, DELETE);
|
||||
- a subpath, that is appended to <server>/api/v1/{self.objectPath}
|
||||
- optional `data` payload, the body of the request
|
||||
"""
|
||||
|
||||
objectPath: str = ""
|
||||
|
||||
def __init__(self, settings: Optional[APISettings] = None) -> None:
|
||||
self.api_settings = settings if settings else APISettings()
|
||||
self.base_path = f"{
|
||||
self.api_settings.server}{self.api_settings.api_path}/{self.objectPath}"
|
||||
|
||||
def call(self, method, call_path: Union[str, int] = "", data=None, query: dict = {}):
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
"Authorization": f"Bearer {self.api_settings.api_token}",
|
||||
}
|
||||
|
||||
json_ = data.dict() if data else dict()
|
||||
|
||||
query_params: Dict[str, Any] = {}
|
||||
query_params = {key: value for (
|
||||
key, value) in query.items() if value is not None}
|
||||
|
||||
path = '/'.join([self.base_path, str(call_path)]
|
||||
) if call_path else self.base_path
|
||||
|
||||
response = requests.request(
|
||||
method,
|
||||
path,
|
||||
headers=headers,
|
||||
params=query_params,
|
||||
verify=self.api_settings.ssl_verify,
|
||||
json=json_
|
||||
)
|
||||
if response.status_code != 200:
|
||||
raise HTTPException(response.status_code, f" failed. Error: {
|
||||
response.text}")
|
||||
|
||||
return response
|
98
hsapi_client/nodes.py
Normal file
98
hsapi_client/nodes.py
Normal file
@ -0,0 +1,98 @@
|
||||
from .model import HSAPICall, formatTags
|
||||
from .schemas import v1Route, v1Node
|
||||
from typing import Optional, List
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class v1SetTagsNodeRequest(BaseModel):
|
||||
tags: Optional[List[str]] = Field(alias="tags", default=None)
|
||||
|
||||
def model_post_init(self, ctx):
|
||||
self.tags = formatTags(self.tags)
|
||||
|
||||
|
||||
class v1MoveNodeRequest(BaseModel):
|
||||
user: Optional[str] = Field(alias="user", default=None)
|
||||
|
||||
|
||||
class v1BackfillNodeIPsRequest(BaseModel):
|
||||
confirmed: Optional[bool] = Field(alias="confirmed", default=True)
|
||||
|
||||
|
||||
class v1ListNodesResponse(BaseModel):
|
||||
nodes: Optional[List[Optional[v1Node]]] = Field(
|
||||
alias="nodes", default=None)
|
||||
|
||||
|
||||
class v1NodeResponse(BaseModel):
|
||||
node: Optional[v1Node] = Field(alias="node", default=None)
|
||||
|
||||
|
||||
class v1GetNodeRoutesResponse(BaseModel):
|
||||
routes: Optional[List[Optional[v1Route]]] = Field(
|
||||
alias="routes", default=None)
|
||||
|
||||
|
||||
class v1BackfillNodeIPsResponse(BaseModel):
|
||||
changes: Optional[List[str]] = Field(alias="changes", default=None)
|
||||
|
||||
|
||||
class Node(HSAPICall):
|
||||
|
||||
objectPath = "node"
|
||||
|
||||
def list(self) -> v1ListNodesResponse:
|
||||
response = self.call('get')
|
||||
return v1ListNodesResponse(**response.json())
|
||||
|
||||
def get(self, nodeId: int) -> v1Node:
|
||||
# There is a bug in headscale API
|
||||
# retrieving a specific node does not return the tags
|
||||
# so we get the full list of nodes and extract the node with the
|
||||
# ID we want
|
||||
# response = self.call('get', call_path=nodeId)
|
||||
nodelist = self.list()
|
||||
node = next((n for n in nodelist.nodes if n.id == nodeId), v1Node())
|
||||
return node # type: ignore
|
||||
|
||||
def _get(self, nodeId: int) -> v1Node:
|
||||
# There is a bug in headscale API
|
||||
# retrieving a specific node does not return the tags
|
||||
# This does a real get
|
||||
node = self.call('get', call_path=nodeId)
|
||||
return v1Node(**node.json())
|
||||
|
||||
def byUser(self, username: str) -> v1ListNodesResponse:
|
||||
nodelist = self.list()
|
||||
|
||||
byUser = [n for n in nodelist.nodes if n.user.name == username]
|
||||
|
||||
return v1ListNodesResponse(nodes=byUser)
|
||||
|
||||
def delete(self, nodeId: int) -> None:
|
||||
self.call('delete', call_path=nodeId)
|
||||
|
||||
def expire(self, nodeId: int) -> None:
|
||||
self.call('post', f'{nodeId}/expire')
|
||||
|
||||
def rename(self, nodeId: int, newName: str) -> v1NodeResponse:
|
||||
response = self.call('post', f'{nodeId}/rename/{newName}')
|
||||
return v1NodeResponse(**response.json())
|
||||
|
||||
def move(self, nodeId: int, data: v1MoveNodeRequest) -> v1NodeResponse:
|
||||
response = self.call('post', f'{nodeId}/user', data)
|
||||
return v1NodeResponse(**response.json())
|
||||
|
||||
def routes(self, nodeId: int) -> v1GetNodeRoutesResponse:
|
||||
response = self.call('get', f'{nodeId}/routes')
|
||||
return v1GetNodeRoutesResponse(**response.json())
|
||||
|
||||
def setTags(self, nodeId: int, data: v1SetTagsNodeRequest) -> v1NodeResponse:
|
||||
response = self.call('post', f'{nodeId}/tags', data)
|
||||
return v1NodeResponse(**response.json())
|
||||
|
||||
# Broken on server
|
||||
def backfillips(self, confirmed: bool = True) -> v1BackfillNodeIPsResponse:
|
||||
response = self.call(
|
||||
'post', f'/backfillips?confirmed={confirmed}')
|
||||
return v1BackfillNodeIPsResponse(**response.json())
|
49
hsapi_client/preauthkeys.py
Normal file
49
hsapi_client/preauthkeys.py
Normal file
@ -0,0 +1,49 @@
|
||||
from typing import Optional, List
|
||||
from pydantic import BaseModel, Field
|
||||
from .schemas import v1PreAuthKey
|
||||
from .model import HSAPICall, formatTags
|
||||
|
||||
|
||||
class v1ListPreAuthKeysResponse(BaseModel):
|
||||
preAuthKeys: Optional[List[Optional[v1PreAuthKey]]
|
||||
] = Field(alias="preAuthKeys", default=None)
|
||||
|
||||
|
||||
class v1PreAuthKeyResponse(BaseModel):
|
||||
preAuthKey: Optional[v1PreAuthKey] = Field(
|
||||
alias="preAuthKey", default=None)
|
||||
|
||||
|
||||
class v1ExpirePreAuthKeyRequest(BaseModel):
|
||||
user: str = Field(alias="user", default=None)
|
||||
key: str = Field(alias="key", default=None)
|
||||
|
||||
|
||||
class v1CreatePreAuthKeyRequest(BaseModel):
|
||||
user: str = Field(alias="user", default=None)
|
||||
reusable: Optional[bool] = Field(alias="reusable", default=None)
|
||||
ephemeral: Optional[bool] = Field(alias="ephemeral", default=None)
|
||||
expiration: Optional[str] = Field(alias="expiration", default=None)
|
||||
aclTags: Optional[List[str]] = Field(alias="aclTags", default=None)
|
||||
|
||||
|
||||
class v1ListPreAuthKeyRequest(BaseModel):
|
||||
user: Optional[str] = Field(
|
||||
alias="user", default=None)
|
||||
|
||||
|
||||
class PreAuthKey(HSAPICall):
|
||||
|
||||
objectPath = "preauthkey"
|
||||
|
||||
def list(self, data: v1ListPreAuthKeyRequest) -> v1ListPreAuthKeysResponse:
|
||||
response = self.call('get', query=data.model_dump())
|
||||
return v1ListPreAuthKeysResponse(**response.json())
|
||||
|
||||
def create(self, data: v1CreatePreAuthKeyRequest) -> v1PreAuthKeyResponse:
|
||||
data.aclTags = formatTags(data.aclTags)
|
||||
response = self.call('post', data=data)
|
||||
return v1PreAuthKeyResponse(**response.json())
|
||||
|
||||
def expire(self, data: v1ExpirePreAuthKeyRequest) -> None:
|
||||
self.call('post', data=data)
|
27
hsapi_client/routes.py
Normal file
27
hsapi_client/routes.py
Normal file
@ -0,0 +1,27 @@
|
||||
from typing import Optional, List
|
||||
from pydantic import BaseModel, Field
|
||||
from .model import HSAPICall
|
||||
from .schemas import v1Route
|
||||
|
||||
|
||||
class v1ListRoutesResponse(BaseModel):
|
||||
routes: Optional[List[Optional[v1Route]]] = Field(
|
||||
alias="routes", default=None)
|
||||
|
||||
|
||||
class Route(HSAPICall):
|
||||
|
||||
objectPath = "routes"
|
||||
|
||||
def list(self) -> v1ListRoutesResponse:
|
||||
response = self.call('get')
|
||||
return v1ListRoutesResponse(**response.json())
|
||||
|
||||
def delete(self, routeId: str) -> None:
|
||||
self.call('delete', call_path=routeId)
|
||||
|
||||
def enable(self, routeId: int) -> None:
|
||||
self.call('post', f'{routeId}/enable')
|
||||
|
||||
def disable(self, routeId: int) -> None:
|
||||
self.call('post', f'{routeId}/disable')
|
118
hsapi_client/schemas.py
Normal file
118
hsapi_client/schemas.py
Normal file
@ -0,0 +1,118 @@
|
||||
from typing import Optional, List
|
||||
from pydantic import BaseModel, Field, computed_field
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class v1RegisterMethod(str, Enum):
|
||||
|
||||
REGISTER_METHOD_UNSPECIFIED = "REGISTER_METHOD_UNSPECIFIED"
|
||||
REGISTER_METHOD_AUTH_KEY = "REGISTER_METHOD_AUTH_KEY"
|
||||
REGISTER_METHOD_CLI = "REGISTER_METHOD_CLI"
|
||||
REGISTER_METHOD_OIDC = "REGISTER_METHOD_OIDC"
|
||||
|
||||
|
||||
class v1ApiKey(BaseModel):
|
||||
|
||||
id: int = Field(alias="id", default=None)
|
||||
prefix: str = Field(alias="prefix", default=None)
|
||||
expiration: datetime = Field(alias="expiration", default=None)
|
||||
createdAt: datetime = Field(alias="createdAt", default=None)
|
||||
lastSeen: Optional[datetime] = Field(alias="lastSeen", default=None)
|
||||
|
||||
@computed_field
|
||||
@property
|
||||
def expired(self) -> bool:
|
||||
tzinfo = timezone(timedelta(hours=0)) # UTC
|
||||
now = datetime.now(tzinfo)
|
||||
return self.expiration < now
|
||||
|
||||
|
||||
class v1PreAuthKey(BaseModel):
|
||||
|
||||
id: int = Field(alias="id", default=None)
|
||||
user: str = Field(alias="user", default=None)
|
||||
key: str = Field(alias="key", default=None)
|
||||
reusable: bool = Field(alias="reusable", default=True)
|
||||
ephemeral: bool = Field(alias="ephemeral", default=False)
|
||||
used: bool = Field(alias="used", default=None)
|
||||
expiration: datetime = Field(alias="expiration", default=None)
|
||||
createdAt: datetime = Field(alias="createdAt", default=None)
|
||||
aclTags: Optional[List[str]] = Field(alias="aclTags", default=None)
|
||||
|
||||
@computed_field
|
||||
@property
|
||||
def expired(self) -> bool:
|
||||
tzinfo = timezone(timedelta(hours=0)) # UTC
|
||||
now = datetime.now(tzinfo)
|
||||
exptime = self.expiration < now
|
||||
expused = not self.reusable and self.used
|
||||
expephemereal = self.ephemeral and self.used
|
||||
return exptime or expused or expephemereal
|
||||
|
||||
|
||||
class v1User(BaseModel):
|
||||
|
||||
id: int = Field(alias="id", default=None)
|
||||
name: str = Field(alias="name", default=None)
|
||||
createdAt: datetime = Field(alias="createdAt", default=None)
|
||||
|
||||
|
||||
class v1Node(BaseModel):
|
||||
"""
|
||||
None model
|
||||
|
||||
"""
|
||||
|
||||
id: int = Field(alias="id", default=None)
|
||||
machineKey: str = Field(alias="machineKey", default=None)
|
||||
nodeKey: str = Field(alias="nodeKey", default=None)
|
||||
discoKey: str = Field(alias="discoKey", default=None)
|
||||
ipAddresses: List[str] = Field(alias="ipAddresses", default=None)
|
||||
name: str = Field(alias="name", default=None)
|
||||
user: v1User = Field(alias="user", default=None)
|
||||
lastSeen: datetime = Field(alias="lastSeen", default=None)
|
||||
expiry: datetime = Field(alias="expiry", default=None)
|
||||
preAuthKey: Optional[v1PreAuthKey] = Field(
|
||||
alias="preAuthKey", default=None)
|
||||
|
||||
createdAt: datetime = Field(alias="createdAt", default=None)
|
||||
registerMethod: Optional[v1RegisterMethod] = Field(
|
||||
alias="registerMethod", default=None)
|
||||
|
||||
forcedTags: Optional[List[str]] = Field(alias="forcedTags", default=None)
|
||||
invalidTags: Optional[List[str]] = Field(alias="invalidTags", default=None)
|
||||
validTags: Optional[List[str]] = Field(alias="validTags", default=None)
|
||||
givenName: str = Field(alias="givenName", default=None)
|
||||
online: bool = Field(alias="online", default=None)
|
||||
|
||||
@computed_field
|
||||
@property
|
||||
def expired(self) -> bool:
|
||||
tzinfo = timezone(timedelta(hours=0)) # UTC
|
||||
now = datetime.now(tzinfo)
|
||||
return self.expiry.year != 1 and self.expiry < now
|
||||
|
||||
@computed_field
|
||||
@property
|
||||
def expireDate(self) -> Optional[int | datetime]:
|
||||
if self.expiry.year == 1:
|
||||
return 0
|
||||
return self.expiry
|
||||
|
||||
|
||||
class v1Route(BaseModel):
|
||||
"""
|
||||
None model
|
||||
|
||||
"""
|
||||
|
||||
id: Optional[int] = Field(alias="id", default=None)
|
||||
node: v1Node = Field(alias="node", default=None)
|
||||
prefix: str = Field(alias="prefix", default=None)
|
||||
advertised: bool = Field(alias="advertised", default=None)
|
||||
enabled: bool = Field(alias="enabled", default=None)
|
||||
isPrimary: bool = Field(alias="isPrimary", default=None)
|
||||
createdAt: datetime = Field(alias="createdAt", default=None)
|
||||
updatedAt: datetime = Field(alias="updatedAt", default=None)
|
||||
deletedAt: Optional[datetime] = Field(alias="deletedAt", default=None)
|
41
hsapi_client/users.py
Normal file
41
hsapi_client/users.py
Normal file
@ -0,0 +1,41 @@
|
||||
from typing import Optional, List
|
||||
from pydantic import BaseModel, Field
|
||||
from .schemas import v1User
|
||||
from .model import HSAPICall
|
||||
|
||||
|
||||
class v1CreateUserRequest(BaseModel):
|
||||
name: str = Field(alias="name", default=None)
|
||||
|
||||
|
||||
class v1ListUsersResponse(BaseModel):
|
||||
users: Optional[List[Optional[v1User]]] = Field(
|
||||
alias="users", default=None)
|
||||
|
||||
|
||||
class v1UserResponse(BaseModel):
|
||||
user: Optional[v1User] = Field(alias="user", default=None)
|
||||
|
||||
|
||||
class User(HSAPICall):
|
||||
|
||||
objectPath = "user"
|
||||
|
||||
def list(self) -> v1ListUsersResponse:
|
||||
response = self.call('get')
|
||||
return v1ListUsersResponse(**response.json())
|
||||
|
||||
def get(self, name: str) -> v1UserResponse:
|
||||
response = self.call('get', call_path=name)
|
||||
return v1UserResponse(**response.json())
|
||||
|
||||
def create(self, data: v1CreateUserRequest) -> v1UserResponse:
|
||||
response = self.call('post', data=data)
|
||||
return v1UserResponse(**response.json())
|
||||
|
||||
def delete(self, name: str) -> None:
|
||||
self.call('delete', name)
|
||||
|
||||
def rename(self, oldName: str, newName: str) -> v1UserResponse:
|
||||
response = self.call('post', call_path=f"{oldName}/rename/{newName}")
|
||||
return v1UserResponse(**response.json())
|
Reference in New Issue
Block a user