pyUpload/app/main.py
Adam Skotarczak (Linux-Workstation) 17353b3000 v1.2.1
- Test auf Linux Workstation
  - Siehe CHANGELOG.md
2025-05-07 19:16:10 +02:00

350 lines
14 KiB
Python

#!/usr/bin/env python3
# © 2025 Adam Skotarczak (adam@skotarczak.net)
#
# Version 1.2.1 (07.05.2025 - Linux Test und Anpassungen im Bootstap)
# Manuell in z.B. VS-Code: .\app\.venv\Scripts\activate
#
# Original unter:
# <https://github.com/realAscot/pyUpload/blob/main/app/main.py>
import os
import sys
import subprocess
# Pfade definieren
os.chdir(os.path.dirname(os.path.abspath(__file__)))
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
VENV_DIR = os.path.join(BASE_DIR, ".venv")
VENV_PYTHON = os.path.join(VENV_DIR, "Scripts", "python.exe") if os.name == "nt" else os.path.join(VENV_DIR, "bin", "python")
REQUIREMENTS_FILE = os.path.join(BASE_DIR, "requirements.txt")
# Wenn wir NICHT in der venv sind
if (sys.prefix == sys.base_prefix and sys.executable != VENV_PYTHON) or not os.path.exists(VENV_PYTHON):
# venv erstellen falls nötig
if not os.path.exists(VENV_DIR):
print("[Setup] Virtuelle Umgebung wird erstellt...")
subprocess.run([sys.executable, "-m", "venv", "--copies", VENV_DIR], check=True)
print("[Setup] requirements.txt wird installiert...")
subprocess.run([VENV_PYTHON, "-m", "pip", "install", "--upgrade", "pip"], check=True)
subprocess.run([VENV_PYTHON, "-m", "pip", "install", "-r", REQUIREMENTS_FILE], check=True)
# unabhängig davon: Neustart innerhalb der venv
print("[Setup] Starte erneut mit aktivierter Umgebung...")
if os.name == "nt":
subprocess.Popen([VENV_PYTHON] + sys.argv, creationflags=subprocess.CREATE_NEW_CONSOLE)
sys.exit(0)
else:
main_script = os.path.join(BASE_DIR, "main.py")
os.execv(VENV_PYTHON, [VENV_PYTHON, main_script] + sys.argv[1:])
sys.exit(0)
# Wir sind jetzt sicher in der richtigen Umgebung → Rest des Programms geht hier weiter:
import ssl
import logging
import socket
import argparse
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
from datetime import datetime, timedelta, timezone
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
# -------------------------------------------------------------------
# Virtuelle Umgebung aktivieren (wenn nicht aktiv)
# -------------------------------------------------------------------
def activate_venv():
if sys.prefix != os.path.abspath(VENV_DIR):
venv_path = os.path.join(os.path.dirname(__file__), VENV_DIR)
if os.name == "nt":
activate_script = os.path.join(venv_path, "Scripts", "activate.bat")
else:
activate_script = os.path.join(venv_path, "bin", "activate")
if os.path.exists(activate_script):
print(f"Aktiviere virtuelle Umgebung: {VENV_DIR}")
os.system(f'"{activate_script}"')
else:
print(f"FEHLER: Virtuelle Umgebung nicht gefunden ({VENV_DIR}). Bitte zuerst install.cmd ausführen.")
sys.exit(1)
# -------------------------------------------------------------------
# Globale Konfigurationen & Logging
# -------------------------------------------------------------------
UPLOAD_DIR = 'upload'
LOG_DIR = 'logs'
CERT_FILE = 'cert.pem'
KEY_FILE = 'key.pem'
CENTRAL_LOG_FILE = os.path.join(LOG_DIR, 'pyupload.log')
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(LOG_DIR, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(message)s",
handlers=[logging.FileHandler(CENTRAL_LOG_FILE), logging.StreamHandler()]
)
central_logger = logging.getLogger("central_logger")
client_loggers = {}
# -------------------------------------------------------------------
# HTTP-Handler für Upload und Logging
# -------------------------------------------------------------------
class SecureHTTPRequestHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
client_ip = self.client_address[0]
message = format % args
if client_ip not in client_loggers:
logger = logging.getLogger(f'client_{client_ip}')
logger.setLevel(logging.INFO)
client_log_file = os.path.join(LOG_DIR, f"{client_ip}.log")
handler = logging.FileHandler(client_log_file)
handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
logger.addHandler(handler)
client_loggers[client_ip] = logger
client_loggers[client_ip].info(message)
central_logger.info(f"{client_ip} - {message}")
def send_html_response(self, filename):
try:
with open(filename, "r", encoding="utf-8") as f:
content = f.read()
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(content.encode("utf-8"))
except FileNotFoundError:
self.send_error(500, "HTML-Template nicht gefunden")
def do_GET(self):
if self.path == '/':
self.send_html_response("template.html")
else:
self.send_error(404, "Seite nicht gefunden")
self.log_message('404 Not Found: %s', self.path)
def do_POST(self):
try:
content_type = self.headers.get('Content-Type')
content_length = int(self.headers.get('Content-Length', 0))
if not content_type or 'multipart/form-data' not in content_type:
self.send_error(400, "Ungültiger Content-Type")
self.log_message('400 Bad Request: Ungültiger Content-Type')
return
if content_length == 0:
self.send_error(400, "Leere Anfrage erhalten")
self.log_message('400 Bad Request: Leere Anfrage')
return
client_ip = self.client_address[0]
client_upload_dir = os.path.join(UPLOAD_DIR, client_ip)
os.makedirs(client_upload_dir, exist_ok=True)
boundary = content_type.split("boundary=")[-1].encode()
raw_data = self.rfile.read(content_length)
parts = raw_data.split(b"--" + boundary)
found_file = False
for part in parts:
if b"Content-Disposition" in part:
headers, file_data = part.split(b"\r\n\r\n", 1)
filename_start = headers.find(b'filename="') + 10
filename_end = headers.find(b'"', filename_start)
filename = headers[filename_start:filename_end].decode()
if filename:
file_path = os.path.join(client_upload_dir, os.path.basename(filename))
with open(file_path, "wb") as f:
f.write(file_data.rstrip(b"\r\n--"))
self.log_message(f"Datei {filename} erfolgreich hochgeladen.")
found_file = True
if not found_file:
self.send_error(400, "Keine Datei im Upload enthalten")
self.log_message('400 Bad Request: Keine Datei übermittelt')
return
self.send_html_response("success.html")
except Exception as e:
self.log_message(f"Fehler: {e}")
self.send_error(500, "Interner Serverfehler")
# -------------------------------------------------------------------
# HTTPS-Server mit selbstsigniertem Zertifikat
# -------------------------------------------------------------------
def generate_self_signed_cert(cert_file, key_file):
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "DE"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Berlin"),
x509.NameAttribute(NameOID.LOCALITY_NAME, "Berlin"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "ionivation.com"),
x509.NameAttribute(NameOID.COMMON_NAME, "localhost.lan"),
])
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.now(timezone.utc))
.not_valid_after(datetime.now(timezone.utc) + timedelta(days=365))
.add_extension(
x509.SubjectAlternativeName([x509.DNSName("localhost")]),
critical=False
)
.sign(key, hashes.SHA256())
)
with open(cert_file, "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
with open(key_file, "wb") as f:
f.write(key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
))
def get_server_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
except Exception:
ip = "127.0.0.1"
finally:
s.close()
return ip
def create_https_server(port, handler_class=SecureHTTPRequestHandler):
if not os.path.exists(CERT_FILE) or not os.path.exists(KEY_FILE):
print("SSL-Zertifikat nicht gefunden. Erstelle selbstsigniertes Zertifikat...")
generate_self_signed_cert(CERT_FILE, KEY_FILE)
print("Selbstsigniertes SSL-Zertifikat erstellt.")
server_address = ('', port)
httpd = HTTPServer(server_address, handler_class)
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(certfile=CERT_FILE, keyfile=KEY_FILE)
httpd.socket = context.wrap_socket(httpd.socket, server_side=True)
return httpd
# -------------------------------------------------------------------
# Server starten (mit oder ohne GUI)
# -------------------------------------------------------------------
def run_server_nogui(port):
httpd = create_https_server(port)
server_ip = get_server_ip()
try:
print(f"Starte HTTPS-Server auf https://{server_ip}:{port}")
print("Drücke STRG+C, um zu beenden.")
print("warte auf Verbindungen ...")
print(f"Öffne im Browser: https://{server_ip}:{port}")
print("Du musst Dich im gleichen Netzwerk befinden (LAN/WLAN)")
httpd.serve_forever()
except KeyboardInterrupt:
print("\nSTRG+C erkannt. Fahre Server herunter...")
except Exception as e:
print(f"Fehler aufgetreten: {e}")
logging.error("Serverfehler", exc_info=True)
finally:
httpd.server_close()
logging.shutdown()
print("Server wurde sauber beendet.")
def run_server_with_gui(port):
import qrcode
import webbrowser
from PIL import Image, ImageTk
try:
import tkinter as tk
except ImportError:
print("⚠ Hinweis: tkinter ist nicht installiert. Für die GUI benötigst du:")
print(" sudo apt install python3-tk")
httpd = create_https_server(port)
server_ip = get_server_ip()
url = f"https://{server_ip}:{port}"
def server_thread():
try:
print(f"Starte HTTPS-Server auf {url}")
httpd.serve_forever()
except Exception as ex:
print(f"Server-Thread-Exception: {ex}")
finally:
httpd.server_close()
logging.shutdown()
threading.Thread(target=server_thread, daemon=True).start()
root = tk.Tk()
root.title("pyUpload - Secure File Upload")
try:
root.iconbitmap("favicon.ico")
except Exception as e:
print(f"Konnte das Icon nicht setzen: {e}")
labels = [
tk.Label(root, text="HTTPS-Upload-Server läuft!", font=("Arial", 14)),
tk.Label(root, text=f"IP-Adresse: {server_ip}", font=("Arial", 11)),
tk.Label(root, text=f"Port: {port}", font=("Arial", 11)),
tk.Label(root, text="Scanne den QR-Code:", font=("Arial", 11))
]
max_width, total_height = 0, 20
for label in labels:
label.pack(pady=5)
label.update_idletasks()
max_width = max(max_width, label.winfo_reqwidth())
total_height += label.winfo_reqheight() + 10
qr = qrcode.QRCode(version=1, box_size=8, border=2)
qr.add_data(url)
qr.make(fit=True)
img_qr = qr.make_image(fill_color="black", back_color="white")
img_tk = ImageTk.PhotoImage(img_qr)
tk.Label(root, image=img_tk).pack()
total_height += img_tk.height() + 20
label_copyright = tk.Label(root, text="Adam Skotarczak (C) 2025", font=("Arial", 9), fg="gray")
label_copyright.pack(pady=5)
total_height += label_copyright.winfo_reqheight() + 10
def open_browser(event):
webbrowser.open("https://www.ionivation.com/pyUpload")
link_label = tk.Label(root, text="Infos: www.ionivation.com/pyUpload", font=("Arial", 10), fg="blue", cursor="hand2")
link_label.pack()
link_label.bind("<Button-1>", open_browser)
total_height += link_label.winfo_reqheight() + 10
tk.Button(root, text="Beenden", command=root.destroy, font=("Arial", 10)).pack(pady=10)
root.geometry(f"{max_width + 40}x{total_height + 50}")
root.mainloop()
# -------------------------------------------------------------------
# Kommandozeilenparser & Einstiegspunkt
# -------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Secure file upload server with optional GUI/QR-Code.")
parser.add_argument("--port", "-p", type=int, default=4443, help="Port, auf dem der Server lauscht (Standard: 4443)")
parser.add_argument("--nogui", "-n", action="store_true", help="Ohne GUI & QR-Code im reinen CLI-Modus starten")
args = parser.parse_args()
print(f"Gestartet mit Port {args.port}, GUI: {not args.nogui}")
if args.nogui:
run_server_nogui(args.port)
else:
run_server_with_gui(args.port)
if __name__ == "__main__":
main()