168 lines
5.5 KiB
Python

import time
import sqlite3 as sql
import json
from app._tools.models.user import User
from app._tools.models.session import Session
from app._tools import passhash as Hash
from app.config import _USER_DATABASE as U_DB_PATH
from app.config import _TOKEN_DATABASE as T_DB_PATH
from app.config import _SERVER_CONFIG as S_CFG_PATH
from app.config import _SCHEMA_USERS as SCHEMA_USERS
from app.config import _SCHEMA_TOKENS as SCHEMA_TOKENS
from app.config import _SUPER_ADMINS as SUPER_ADMINS
from app.config import _SALT as SALT
class ConnectManager:
def __init__(self, db_file : str):
self._conn = sql.connect(
db_file,
timeout = 10,
check_same_thread = False
)
self._curs = self._conn.cursor()
self._is_destroyed = False
def _save_and_close(self, mode : int):
if mode == 0:
self._conn.commit()
self._conn.close()
def execute(self, request, mode : int = 0, close_connection : bool = True):
"""
mode:
- 0 if just execute (with commit);
- 1 if fetch one (no commit);
- 2 if fetch many (no commit);
"""
if self._is_destroyed:
return "Connection was destroyed."
if isinstance(request, tuple):
response = self._curs.execute(*request)
else:
response = self._curs.execute(request)
if mode == 1:
response = self._curs.fetchone()
if mode == 2:
response = self._curs.fetchall()
if close_connection:
self._is_destroyed = True
self._save_and_close(mode)
return response
class SessionManager:
"""
Static methods only;
No need to create class sample.
"""
def __init__(self):
pass
def init_table():
c_man = ConnectManager(T_DB_PATH)
c_man.execute(SCHEMA_TOKENS, close_connection = False)
c_man.execute("VACUUM;")
def add_session(token : str, user_id : int, expires : int = None):
"""
Expires feature is not realized yet :(
"""
c_man = ConnectManager(T_DB_PATH)
c_man.execute(
("INSERT INTO tokens VALUES (?, ?, ?)", (token, user_id, 0))
)
def get_session(token : str):
c_man = ConnectManager(T_DB_PATH)
data = c_man.execute(
("SELECT * FROM tokens WHERE token=?", (token,)),
1
)
if data == None:
return None
return Session(*data)
def remove_session(token : str):
c_man = ConnectManager(T_DB_PATH)
c_man.execute(
("DELETE FROM tokens WHERE token=?", (token,))
)
def remove_session_by_user(id : int):
c_man = ConnectManager(T_DB_PATH)
c_man.execute(
("DELETE FROM tokens WHERE user_id=?", (id,))
)
class DataManager:
"""
Static methods only;
No need to create class sample.
"""
def __init__(self):
pass
def init_table():
c_man = ConnectManager(U_DB_PATH)
c_man.execute(SCHEMA_USERS, close_connection = False)
c_man.execute("VACUUM;")
def check_admins():
for k in SUPER_ADMINS:
if DataManager.get_user_by_name(k) == None:
DataManager.add_user(
k,
"$root$",
"Super Admin",
Hash.hashPassword(k, SALT)
)
print(f"Create super admin {k} with password {k}")
def get_user_by_name(username : str) -> User:
c_man = ConnectManager(U_DB_PATH)
data = c_man.execute(
("SELECT * FROM users WHERE username=?", (username,)),
1
)
if data == None:
return None
return User(*data)
def get_user(id : int) -> User:
c_man = ConnectManager(U_DB_PATH)
data = c_man.execute(
("SELECT * FROM users WHERE id=?", (id,)),
1
)
if data == None:
return None
return User(*data)
def get_all_users() -> list:
c_man = ConnectManager(U_DB_PATH)
data = c_man.execute(
"SELECT * FROM users",
2
)
userlist = list()
for user in data:
userlist.append(User(*user))
return userlist
def add_user(username : str, team : str, loc : str, passkey : str):
c_man = ConnectManager(U_DB_PATH)
c_man.execute(
("INSERT INTO users (username, team, loc, passkey, reg_ts) VALUES (?, ?, ?, ?, ?)", (username, team, loc, passkey, int(time.time())))
)
def edit_user(id : int, field : str, value):
c_man = ConnectManager(U_DB_PATH)
c_man.execute(
(f"UPDATE users SET {field} = ? WHERE id = ?", (value, id))
)
def del_user(id : int):
c_man = ConnectManager(U_DB_PATH)
c_man.execute(
("DELETE FROM users WHERE id = ?", (id,))
)
class ConfigManager:
"""
Static methods only;
No need to create class sample.
"""
def __init__():
pass
def get_config():
with open(S_CFG_PATH, 'r', encoding = "utf-8") as file:
return json.load(file)
def set_config(config):
with open(S_CFG_PATH, "w", encoding = "utf-8") as file:
json.dump(config, file, indent = 4, ensure_ascii = False)