📝 Commit Details:

This commit is contained in:
2025-05-31 22:40:29 +02:00
parent 91b1886dde
commit df8fb197c0
14061 changed files with 997277 additions and 103548 deletions

View File

@@ -0,0 +1,2 @@
# Database package initialization file
# Makes the directory a proper Python package

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,133 @@
import os
import logging
from typing import List, Optional, Any
from datetime import datetime
from sqlalchemy import create_engine, func
from sqlalchemy.orm import sessionmaker, Session, joinedload
from models import User, Printer, Job, Stats, Base
from config.settings import DATABASE_PATH, ensure_database_directory
logger = logging.getLogger(__name__)
class DatabaseManager:
"""Database manager class to handle database operations."""
def __init__(self):
"""Initialize the database manager."""
ensure_database_directory()
self.engine = create_engine(f"sqlite:///{DATABASE_PATH}")
self.Session = sessionmaker(bind=self.engine)
def get_session(self) -> Session:
"""Get a new database session.
Returns:
Session: A new SQLAlchemy session.
"""
return self.Session()
def test_connection(self) -> bool:
"""Test the database connection.
Returns:
bool: True if the connection is successful, False otherwise.
"""
try:
session = self.get_session()
session.execute("SELECT 1")
session.close()
return True
except Exception as e:
logger.error(f"Database connection test failed: {str(e)}")
return False
def get_all_jobs(self) -> List[Job]:
"""Get all jobs with eager loading of relationships.
Returns:
List[Job]: A list of all jobs.
"""
session = self.get_session()
try:
jobs = session.query(Job).options(
joinedload(Job.user),
joinedload(Job.printer)
).all()
return jobs
finally:
session.close()
def get_jobs_by_status(self, status: str) -> List[Job]:
"""Get jobs by status with eager loading of relationships.
Args:
status: The job status to filter by.
Returns:
List[Job]: A list of jobs with the specified status.
"""
session = self.get_session()
try:
jobs = session.query(Job).options(
joinedload(Job.user),
joinedload(Job.printer)
).filter(Job.status == status).all()
return jobs
finally:
session.close()
def get_job_by_id(self, job_id: int) -> Optional[Job]:
"""Get a job by ID with eager loading of relationships.
Args:
job_id: The job ID to find.
Returns:
Optional[Job]: The job if found, None otherwise.
"""
session = self.get_session()
try:
job = session.query(Job).options(
joinedload(Job.user),
joinedload(Job.printer)
).filter(Job.id == job_id).first()
return job
finally:
session.close()
def get_available_printers(self) -> List[Printer]:
"""Get all available printers.
Returns:
List[Printer]: A list of available printers.
"""
session = self.get_session()
try:
printers = session.query(Printer).filter(
Printer.active == True,
Printer.status != "busy"
).all()
return printers
finally:
session.close()
def get_jobs_since(self, since_date: datetime) -> List[Job]:
"""Get jobs created since a specific date.
Args:
since_date: The date to filter jobs from.
Returns:
List[Job]: A list of jobs created since the specified date.
"""
session = self.get_session()
try:
jobs = session.query(Job).options(
joinedload(Job.user),
joinedload(Job.printer)
).filter(Job.created_at >= since_date).all()
return jobs
finally:
session.close()

BIN
backend/database/myp.db Normal file

Binary file not shown.