from . import auth, admin, portal, clients, returns, certificates import socket class MalwareScanError(Exception): pass def clamav_scan_bytes(host: str, port: int, payload: bytes) -> None: """ Uses clamd INSTREAM protocol. Raises MalwareScanError if malware is found or scan fails. """ try: s = socket.create_connection((host, port), timeout=10) s.sendall(b"zINSTREAM\0") chunk_size = 8192 for i in range(0, len(payload), chunk_size): chunk = payload[i:i+chunk_size] s.sendall(len(chunk).to_bytes(4, "big")) s.sendall(chunk) s.sendall((0).to_bytes(4, "big")) resp = s.recv(4096).decode("utf-8", errors="ignore") s.close() except Exception as e: raise MalwareScanError(f"Scan failed: {e}") # Example response: "stream: OK" or "stream: Eicar-Test-Signature FOUND" if "FOUND" in resp.upper(): raise MalwareScanError("Malware detected") if "OK" not in resp.upper(): raise MalwareScanError(f"Unexpected scan response: {resp}") taxhub/ app/ main.py settings.py db.py models.py security.py rbac.py audit.py malware.py integrations/ irs_mef.py state_efile.py bank_products.py tasks/ celery_app.py transmissions.py routers/ auth.py admin.py portal.py clients.py returns.py certificates.py docker-compose.yml requirements.txtfastapi==0.115.6 uvicorn[standard]==0.30.6 SQLAlchemy==2.0.36 psycopg2-binary==2.9.10 pydantic==2.10.3 pydantic-settings==2.6.1 python-jose[cryptography]==3.3.0 passlib[bcrypt]==1.7.4 python-multipart==0.0.20 celery==5.4.0 redis==5.2.0 httpx==0.28.1 services: db: image: postgres:16 environment: POSTGRES_DB: taxhub POSTGRES_USER: taxhub POSTGRES_PASSWORD: taxhub ports: ["5432:5432"] redis: image: redis:7 ports: ["6379:6379"] clamav: image: clamav/clamav:stable ports: ["3310:3310"] from pydantic_settings import BaseSettings class Settings(BaseSettings): APP_NAME: str = "TaxHub" BRAND_NAME: str = "Cross Tax Prep + Bookkeeping" ENV: str = "dev" DATABASE_URL: str = "postgresql+psycopg2://taxhub:taxhub@localhost:5432/taxhub" JWT_SECRET: str = "CHANGE_ME_SUPER_SECRET" JWT_ALG: str = "HS256" JWT_TTL_MINUTES: int = 60 REDIS_URL: str = "redis://localhost:6379/0" # ClamAV (clamd) CLAMAV_HOST: str = "localhost" CLAMAV_PORT: int = 3310 from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, DeclarativeBase from .settings import settings import enum from datetime import datetime from sqlalchemy import ( Column, Integer, String, DateTime, Boolean, Enum, ForeignKey, Text ) from sqlalchemy.orm import relationship from .db import Base class RoleName(str, enum.Enum): admin = "admin" manager = "manager" supervisor = "supervisor" team_lead = "team_lead" mentor = "mentor" tax_associate = "tax_associate" class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True) email = Column(String(255), unique=True, index=True, nullable=False) password_hash = Column(String(255), nullable=False) is_active = Column(Boolean, default=True) role = Column(Enum(RoleName), default=RoleName.tax_associate, nullable=False) created_at = Column(DateTime, default=datetime.utcnow) class Client(Base): __tablename__ = "clients" id = Column(Integer, primary_key=True) full_name = Column(String(255), nullable=False) email = Column(String(255), index=True, nullable=True) phone = Column(String(50), nullable=True) created_at = Column(DateTime, default=datetime.utcnow) returns = relationship("TaxReturn", back_populates="client") class ReturnStatus(str, enum.Enum): draft = "draft" ready_to_transmit = "ready_to_transmit" transmitting = "transmitting" accepted = "accepted" rejected = "rejected" needs_attention = "needs_attention" class TaxReturn(Base): __tablename__ = "tax_returns" id = Column(Integer, primary_key=True) client_id = Column(Integer, ForeignKey("clients.id"), nullable=False) tax_year = Column(Integer, nullable=False) # Identity & compliance tracking fields (store as data; validate in your business rules) ero_name = Column(String(255), nullable=True) ero_efin = Column(String(20), nullable=True) # EFIN preparer_ptin = Column(String(20), nullable=True) # PTIN etin = Column(String(20), nullable=True) # placeholder (if applicable to your workflow) # gating software_purchase_id = Column(String(64), nullable=True) # must exist before transmit bank_product_opt_in = Column(Boolean, default=False) status = Column(Enum(ReturnStatus), default=ReturnStatus.draft, nullable=False) notes = Column(Text, nullable=True) created_at = Column(DateTime, default=datetime.utcnow) # refund tracking fields expected_refund_amount_cents = Column(Integer, nullable=True) refund_status = Column(String(64), nullable=True) # e.g., "pending", "sent", "funded" refund_last_checked_at = Column(DateTime, nullable=True) client = relationship("Client", back_populates="returns") transmissions = relationship("Transmission", back_populates="tax_return") class TransmissionChannel(str, enum.Enum): federal_irs_mef = "federal_irs_mef" state = "state" class Transmission(Base): __tablename__ = "transmissions" id = Column(Integer, primary_key=True) tax_return_id = Column(Integer, ForeignKey("tax_returns.id"), nullable=False) channel = Column(Enum(TransmissionChannel), nullable=False) external_submission_id = Column(String(128), nullable=True) # returned by IRS/state system ack_code = Column(String(64), nullable=True) ack_message = Column(Text, nullable=True) created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow) tax_return = relationship("TaxReturn", back_populates="transmissions") class CertificateType(str, enum.Enum): security_training = "security_training" irs_compliance = "irs_compliance" bank_products = "bank_products" bookkeeping = "bookkeeping" class CertificateRecord(Base): __tablename__ = "certificate_records" id = Column(Integer, primary_key=True) user_id = Column(Integer, ForeignKey("users.id"), nullable=False) cert_type = Column(Enum(CertificateType), nullable=False) completed = Column(Boolean, default=False) completed_at = Column(DateTime, nullable=True) evidence_url = Column(String(500), nullable=True) # link to PDF in your portal storage created_at = Column(DateTime, default=datetime.utcnow) class AuditLog(Base): __tablename__ = "audit_logs" id = Column(Integer, primary_key=True) actor_user_id = Column(Integer, nullable=True) action = Column(String(128), nullable=False) entity = Column(String(128), nullable=False) entity_id = Column(String(64), nullable=True) ip = Column(String(64), nullable=True) user_agent = Column(String(255), nullable=True) created_at = Column(DateTime, default=datetime.utcnow) details = Column(Text, nullable=True) # Optional bookkeeping extension hook class LedgerEntry(Base): __tablename__ = "ledger_entries" id = Column(Integer, primary_key=True) client_id = Column(Integer, ForeignKey("clients.id"), nullable=False) entry_date = Column(DateTime, default=datetime.utcnow) description = Column(String(255), nullable=False) amount_cents = Column(Integer, nullable=False) category = Column(String(128), nullable=True) engine = create_engine(settings.DATABASE_URL, pool_pre_ping=True) SessionLocal = sessionmaker(bind=engine, autocommit=False, autoflush=False) class Base(DeclarativeBase): pass def get_db(): db = SessionLocal() try: yield db finally: db.close() settings = Settings() from datetime import datetime, timedelta from jose import jwt from passlib.context import CryptContext from .settings import settings pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def hash_password(plain: str) -> str: return pwd_context.hash(plain) def verify_password(plain: str, hashed: str) -> bool: return pwd_context.verify(plain, hashed) def create_access_token(sub: str) -> str: now = datetime.utcnow() exp = now + timedelta(minutes=settings.JWT_TTL_MINUTES) payload = {"sub": sub, "iat": int(now.timestamp()), "exp": int(exp.timestamp())} return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALG) def decode_token(token: str) -> dict: return jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALG])