from . import auth, admin, portal, clients, returns, certificates
import socket
class MalwareScanError(Exception):
pass
def clamav_scan_bytes(host: str, port: int, payload: bytes) -> None:
"""
Uses clamd INSTREAM protocol.
Raises MalwareScanError if malware is found or scan fails.
"""
try:
s = socket.create_connection((host, port), timeout=10)
s.sendall(b"zINSTREAM\0")
chunk_size = 8192
for i in range(0, len(payload), chunk_size):
chunk = payload[i:i+chunk_size]
s.sendall(len(chunk).to_bytes(4, "big"))
s.sendall(chunk)
s.sendall((0).to_bytes(4, "big"))
resp = s.recv(4096).decode("utf-8", errors="ignore")
s.close()
except Exception as e:
raise MalwareScanError(f"Scan failed: {e}")
# Example response: "stream: OK" or "stream: Eicar-Test-Signature FOUND"
if "FOUND" in resp.upper():
raise MalwareScanError("Malware detected")
if "OK" not in resp.upper():
raise MalwareScanError(f"Unexpected scan response: {resp}")
taxhub/
app/
main.py
settings.py
db.py
models.py
security.py
rbac.py
audit.py
malware.py
integrations/
irs_mef.py
state_efile.py
bank_products.py
tasks/
celery_app.py
transmissions.py
routers/
auth.py
admin.py
portal.py
clients.py
returns.py
certificates.py
docker-compose.yml
requirements.txtfastapi==0.115.6
uvicorn[standard]==0.30.6
SQLAlchemy==2.0.36
psycopg2-binary==2.9.10
pydantic==2.10.3
pydantic-settings==2.6.1
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
python-multipart==0.0.20
celery==5.4.0
redis==5.2.0
httpx==0.28.1
services:
db:
image: postgres:16
environment:
POSTGRES_DB: taxhub
POSTGRES_USER: taxhub
POSTGRES_PASSWORD: taxhub
ports: ["5432:5432"]
redis:
image: redis:7
ports: ["6379:6379"]
clamav:
image: clamav/clamav:stable
ports: ["3310:3310"]
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
APP_NAME: str = "TaxHub"
BRAND_NAME: str = "Cross Tax Prep + Bookkeeping"
ENV: str = "dev"
DATABASE_URL: str = "postgresql+psycopg2://taxhub:taxhub@localhost:5432/taxhub"
JWT_SECRET: str = "CHANGE_ME_SUPER_SECRET"
JWT_ALG: str = "HS256"
JWT_TTL_MINUTES: int = 60
REDIS_URL: str = "redis://localhost:6379/0"
# ClamAV (clamd)
CLAMAV_HOST: str = "localhost"
CLAMAV_PORT: int = 3310
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
from .settings import settings
import enum
from datetime import datetime
from sqlalchemy import (
Column, Integer, String, DateTime, Boolean, Enum, ForeignKey, Text
)
from sqlalchemy.orm import relationship
from .db import Base
class RoleName(str, enum.Enum):
admin = "admin"
manager = "manager"
supervisor = "supervisor"
team_lead = "team_lead"
mentor = "mentor"
tax_associate = "tax_associate"
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String(255), unique=True, index=True, nullable=False)
password_hash = Column(String(255), nullable=False)
is_active = Column(Boolean, default=True)
role = Column(Enum(RoleName), default=RoleName.tax_associate, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
class Client(Base):
__tablename__ = "clients"
id = Column(Integer, primary_key=True)
full_name = Column(String(255), nullable=False)
email = Column(String(255), index=True, nullable=True)
phone = Column(String(50), nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
returns = relationship("TaxReturn", back_populates="client")
class ReturnStatus(str, enum.Enum):
draft = "draft"
ready_to_transmit = "ready_to_transmit"
transmitting = "transmitting"
accepted = "accepted"
rejected = "rejected"
needs_attention = "needs_attention"
class TaxReturn(Base):
__tablename__ = "tax_returns"
id = Column(Integer, primary_key=True)
client_id = Column(Integer, ForeignKey("clients.id"), nullable=False)
tax_year = Column(Integer, nullable=False)
# Identity & compliance tracking fields (store as data; validate in your business rules)
ero_name = Column(String(255), nullable=True)
ero_efin = Column(String(20), nullable=True) # EFIN
preparer_ptin = Column(String(20), nullable=True) # PTIN
etin = Column(String(20), nullable=True) # placeholder (if applicable to your workflow)
# gating
software_purchase_id = Column(String(64), nullable=True) # must exist before transmit
bank_product_opt_in = Column(Boolean, default=False)
status = Column(Enum(ReturnStatus), default=ReturnStatus.draft, nullable=False)
notes = Column(Text, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
# refund tracking fields
expected_refund_amount_cents = Column(Integer, nullable=True)
refund_status = Column(String(64), nullable=True) # e.g., "pending", "sent", "funded"
refund_last_checked_at = Column(DateTime, nullable=True)
client = relationship("Client", back_populates="returns")
transmissions = relationship("Transmission", back_populates="tax_return")
class TransmissionChannel(str, enum.Enum):
federal_irs_mef = "federal_irs_mef"
state = "state"
class Transmission(Base):
__tablename__ = "transmissions"
id = Column(Integer, primary_key=True)
tax_return_id = Column(Integer, ForeignKey("tax_returns.id"), nullable=False)
channel = Column(Enum(TransmissionChannel), nullable=False)
external_submission_id = Column(String(128), nullable=True) # returned by IRS/state system
ack_code = Column(String(64), nullable=True)
ack_message = Column(Text, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow)
tax_return = relationship("TaxReturn", back_populates="transmissions")
class CertificateType(str, enum.Enum):
security_training = "security_training"
irs_compliance = "irs_compliance"
bank_products = "bank_products"
bookkeeping = "bookkeeping"
class CertificateRecord(Base):
__tablename__ = "certificate_records"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
cert_type = Column(Enum(CertificateType), nullable=False)
completed = Column(Boolean, default=False)
completed_at = Column(DateTime, nullable=True)
evidence_url = Column(String(500), nullable=True) # link to PDF in your portal storage
created_at = Column(DateTime, default=datetime.utcnow)
class AuditLog(Base):
__tablename__ = "audit_logs"
id = Column(Integer, primary_key=True)
actor_user_id = Column(Integer, nullable=True)
action = Column(String(128), nullable=False)
entity = Column(String(128), nullable=False)
entity_id = Column(String(64), nullable=True)
ip = Column(String(64), nullable=True)
user_agent = Column(String(255), nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
details = Column(Text, nullable=True)
# Optional bookkeeping extension hook
class LedgerEntry(Base):
__tablename__ = "ledger_entries"
id = Column(Integer, primary_key=True)
client_id = Column(Integer, ForeignKey("clients.id"), nullable=False)
entry_date = Column(DateTime, default=datetime.utcnow)
description = Column(String(255), nullable=False)
amount_cents = Column(Integer, nullable=False)
category = Column(String(128), nullable=True)
engine = create_engine(settings.DATABASE_URL, pool_pre_ping=True)
SessionLocal = sessionmaker(bind=engine, autocommit=False, autoflush=False)
class Base(DeclarativeBase):
pass
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
settings = Settings()
from datetime import datetime, timedelta
from jose import jwt
from passlib.context import CryptContext
from .settings import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(plain: str) -> str:
return pwd_context.hash(plain)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_access_token(sub: str) -> str:
now = datetime.utcnow()
exp = now + timedelta(minutes=settings.JWT_TTL_MINUTES)
payload = {"sub": sub, "iat": int(now.timestamp()), "exp": int(exp.timestamp())}
return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALG)
def decode_token(token: str) -> dict:
return jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALG])