133 lines
4.0 KiB
Python
133 lines
4.0 KiB
Python
import os
|
|
import logging
|
|
from typing import List, Optional, Any
|
|
from datetime import datetime
|
|
|
|
from sqlalchemy import create_engine, func
|
|
from sqlalchemy.orm import sessionmaker, Session, joinedload
|
|
|
|
from models import User, Printer, Job, Stats, Base
|
|
from config.settings import DATABASE_PATH, ensure_database_directory
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class DatabaseManager:
|
|
"""Database manager class to handle database operations."""
|
|
|
|
def __init__(self):
|
|
"""Initialize the database manager."""
|
|
ensure_database_directory()
|
|
self.engine = create_engine(f"sqlite:///{DATABASE_PATH}")
|
|
self.Session = sessionmaker(bind=self.engine)
|
|
|
|
def get_session(self) -> Session:
|
|
"""Get a new database session.
|
|
|
|
Returns:
|
|
Session: A new SQLAlchemy session.
|
|
"""
|
|
return self.Session()
|
|
|
|
def test_connection(self) -> bool:
|
|
"""Test the database connection.
|
|
|
|
Returns:
|
|
bool: True if the connection is successful, False otherwise.
|
|
"""
|
|
try:
|
|
session = self.get_session()
|
|
session.execute("SELECT 1")
|
|
session.close()
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Database connection test failed: {str(e)}")
|
|
return False
|
|
|
|
def get_all_jobs(self) -> List[Job]:
|
|
"""Get all jobs with eager loading of relationships.
|
|
|
|
Returns:
|
|
List[Job]: A list of all jobs.
|
|
"""
|
|
session = self.get_session()
|
|
try:
|
|
jobs = session.query(Job).options(
|
|
joinedload(Job.user),
|
|
joinedload(Job.printer)
|
|
).all()
|
|
return jobs
|
|
finally:
|
|
session.close()
|
|
|
|
def get_jobs_by_status(self, status: str) -> List[Job]:
|
|
"""Get jobs by status with eager loading of relationships.
|
|
|
|
Args:
|
|
status: The job status to filter by.
|
|
|
|
Returns:
|
|
List[Job]: A list of jobs with the specified status.
|
|
"""
|
|
session = self.get_session()
|
|
try:
|
|
jobs = session.query(Job).options(
|
|
joinedload(Job.user),
|
|
joinedload(Job.printer)
|
|
).filter(Job.status == status).all()
|
|
return jobs
|
|
finally:
|
|
session.close()
|
|
|
|
def get_job_by_id(self, job_id: int) -> Optional[Job]:
|
|
"""Get a job by ID with eager loading of relationships.
|
|
|
|
Args:
|
|
job_id: The job ID to find.
|
|
|
|
Returns:
|
|
Optional[Job]: The job if found, None otherwise.
|
|
"""
|
|
session = self.get_session()
|
|
try:
|
|
job = session.query(Job).options(
|
|
joinedload(Job.user),
|
|
joinedload(Job.printer)
|
|
).filter(Job.id == job_id).first()
|
|
return job
|
|
finally:
|
|
session.close()
|
|
|
|
def get_available_printers(self) -> List[Printer]:
|
|
"""Get all available printers.
|
|
|
|
Returns:
|
|
List[Printer]: A list of available printers.
|
|
"""
|
|
session = self.get_session()
|
|
try:
|
|
printers = session.query(Printer).filter(
|
|
Printer.active == True,
|
|
Printer.status != "busy"
|
|
).all()
|
|
return printers
|
|
finally:
|
|
session.close()
|
|
|
|
def get_jobs_since(self, since_date: datetime) -> List[Job]:
|
|
"""Get jobs created since a specific date.
|
|
|
|
Args:
|
|
since_date: The date to filter jobs from.
|
|
|
|
Returns:
|
|
List[Job]: A list of jobs created since the specified date.
|
|
"""
|
|
session = self.get_session()
|
|
try:
|
|
jobs = session.query(Job).options(
|
|
joinedload(Job.user),
|
|
joinedload(Job.printer)
|
|
).filter(Job.created_at >= since_date).all()
|
|
return jobs
|
|
finally:
|
|
session.close() |