pyUpload/dev/pyUpload.py
2025-03-12 22:21:30 +01:00

348 lines
13 KiB
Python

#!/usr/bin/env python3
# © 2025 Adam Skotarczak (adam@skotarczak.net)
# Dieses Softwarepaket darf nicht ohne Genehmigung weiterverbreitet werden!
# Version 1.2.2 (05.03.2025)
import os
import sys
import ssl
import logging
import socket
import argparse
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
from datetime import datetime, timedelta, timezone
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
# -------------------------------------------------------------------
# Globale Konfigurationen & Verzeichnisse
# -------------------------------------------------------------------
UPLOAD_DIR = 'upload'
LOG_DIR = 'logs'
CERT_FILE = 'cert.pem'
KEY_FILE = 'key.pem'
CENTRAL_LOG_FILE = os.path.join(LOG_DIR, 'pyupload.log')
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(LOG_DIR, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(message)s",
handlers=[logging.FileHandler(CENTRAL_LOG_FILE), logging.StreamHandler()]
)
central_logger = logging.getLogger("central_logger")
client_loggers = {}
# -------------------------------------------------------------------
# HTTP-Request-Handler
# -------------------------------------------------------------------
class SecureHTTPRequestHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
"""Sammelt Log-Einträge in Client-spezifische Dateien und die Zentrale-Logdatei."""
client_ip = self.client_address[0]
message = format % args
# Einmalig pro Client-IP einen dedizierten Logger anlegen.
if client_ip not in client_loggers:
logger = logging.getLogger(f'client_{client_ip}')
logger.setLevel(logging.INFO)
client_log_file = os.path.join(LOG_DIR, f"{client_ip}.log")
handler = logging.FileHandler(client_log_file)
handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
logger.addHandler(handler)
client_loggers[client_ip] = logger
# In den Client-spezifischen Logger und in den zentralen Logger schreiben.
client_loggers[client_ip].info(message)
central_logger.info(f"{client_ip} - {message}")
def send_html_response(self, filename):
"""Liefert eine HTML-Datei als HTTP-Response zurück."""
try:
with open(filename, "r", encoding="utf-8") as f:
content = f.read()
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(content.encode("utf-8"))
except FileNotFoundError:
self.send_error(500, "HTML-Template nicht gefunden")
def do_GET(self):
"""Ausliefern der Upload-Seite (template.html)."""
if self.path == '/':
self.send_html_response("template.html")
else:
self.send_error(404, "Seite nicht gefunden")
self.log_message('404 Not Found: %s', self.path)
def do_POST(self):
"""Behandelt Datei-Uploads (multipart/form-data) direkt als Stream."""
try:
content_type = self.headers.get('Content-Type')
content_length = int(self.headers.get('Content-Length', 0))
if not content_type or 'multipart/form-data' not in content_type:
self.send_error(400, "Ungültiger Content-Type")
self.log_message('400 Bad Request: Ungültiger Content-Type')
return
if content_length == 0:
self.send_error(400, "Leere Anfrage erhalten")
self.log_message('400 Bad Request: Leere Anfrage')
return
client_ip = self.client_address[0]
client_upload_dir = os.path.join(UPLOAD_DIR, client_ip)
os.makedirs(client_upload_dir, exist_ok=True)
boundary = content_type.split("boundary=")[-1].encode()
raw_data = self.rfile.read(content_length)
parts = raw_data.split(b"--" + boundary)
found_file = False
for part in parts:
if b"Content-Disposition" in part:
headers, file_data = part.split(b"\r\n\r\n", 1)
filename_start = headers.find(b'filename="') + 10
filename_end = headers.find(b'"', filename_start)
filename = headers[filename_start:filename_end].decode()
if filename: # Falls tatsächlich ein Dateiname vorhanden ist
file_path = os.path.join(client_upload_dir, os.path.basename(filename))
with open(file_path, "wb") as f:
# Entferne das trailing CRLF oder "--"
f.write(file_data.rstrip(b"\r\n--"))
self.log_message(f"Datei {filename} erfolgreich hochgeladen.")
found_file = True
if not found_file:
self.send_error(400, "Keine Datei im Upload enthalten")
self.log_message('400 Bad Request: Keine Datei übermittelt')
return
# Erfolgsseite senden
self.send_html_response("success.html")
except Exception as e:
self.log_message(f"Fehler: {e}")
self.send_error(500, "Interner Serverfehler")
# -------------------------------------------------------------------
# HTTPS-Server-Setup
# -------------------------------------------------------------------
def generate_self_signed_cert(cert_file, key_file):
"""Erzeugt ein selbstsigniertes SSL-Zertifikat, falls keines vorhanden ist."""
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "DE"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Berlin"),
x509.NameAttribute(NameOID.LOCALITY_NAME, "Berlin"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "ionivation.com"),
x509.NameAttribute(NameOID.COMMON_NAME, "localhost.lan"),
])
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.now(timezone.utc))
.not_valid_after(datetime.now(timezone.utc) + timedelta(days=365))
.add_extension(
x509.SubjectAlternativeName([x509.DNSName("localhost")]),
critical=False
)
.sign(key, hashes.SHA256())
)
with open(cert_file, "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
with open(key_file, "wb") as f:
f.write(key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
))
def get_server_ip():
"""Bestimmt die lokale IP-Adresse, um sie z.B. für den QR-Code zu nutzen."""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
except Exception:
ip = "127.0.0.1"
finally:
s.close()
return ip
def create_https_server(port, handler_class=SecureHTTPRequestHandler):
"""Erzeugt ein HTTPS-Serverobjekt (aber startet ihn noch nicht)."""
if not os.path.exists(CERT_FILE) or not os.path.exists(KEY_FILE):
print("SSL-Zertifikat nicht gefunden. Erstelle selbstsigniertes Zertifikat...")
generate_self_signed_cert(CERT_FILE, KEY_FILE)
print("Selbstsigniertes SSL-Zertifikat erstellt.")
server_address = ('', port)
httpd = HTTPServer(server_address, handler_class)
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(certfile=CERT_FILE, keyfile=KEY_FILE)
httpd.socket = context.wrap_socket(httpd.socket, server_side=True)
return httpd
# -------------------------------------------------------------------
# Startfunktionen (GUI / no-GUI)
# -------------------------------------------------------------------
def run_server_nogui(port):
httpd = create_https_server(port)
server_ip = get_server_ip()
try:
print(f"Starte HTTPS-Server auf https://{server_ip}:{port}")
print("Drücke STRG+C, um zu beenden.")
print("warte auf Verbindungen ... \n")
print(f"Öffne im Browser: https://{server_ip}:{port}")
print("Du musst Dich im gleichen Netzwerk befinden (Lan/ Wlan)")
httpd.serve_forever()
except KeyboardInterrupt:
# Nur eine kurze Meldung ausgeben und dann sauber herunterfahren
print("\nSTRG+C erkannt. Fahre Server herunter...")
except Exception as e:
print(f"Fehler aufgetreten: {e}")
logging.error("Serverfehler", exc_info=True)
finally:
httpd.server_close()
logging.shutdown()
print("Server wurde sauber beendet.")
def run_server_with_gui(port):
# Nur für QR-Code und GUI benötigt:
import qrcode
import webbrowser
from PIL import Image, ImageTk
import tkinter as tk
"""Startet den Server in einem Hintergrund-Thread und öffnet eine tkinter-GUI mit QR-Code."""
# 1) Erzeuge den Server (aber noch kein serve_forever).
httpd = create_https_server(port)
server_ip = get_server_ip()
url = f"https://{server_ip}:{port}"
# 2) Hintergrund-Thread starten
def server_thread():
try:
print(f"Starte HTTPS-Server auf {url}")
httpd.serve_forever()
except Exception as ex:
print(f"Server-Thread-Exception: {ex}")
finally:
httpd.server_close()
logging.shutdown()
t = threading.Thread(target=server_thread, daemon=True)
t.start()
# 3) tkinter-GUI aufbauen
root = tk.Tk()
root.title("pyUpload - Secure File Upload")
# Favicon setzen (nur unter Windows direkt mit .ico möglich)
try:
root.iconbitmap("favicon.ico")
except Exception as e:
print(f"Konnte das Icon nicht setzen: {e}")
# Labels erzeugen
labels = [
tk.Label(root, text="HTTPS-Upload-Server läuft!", font=("Arial", 14)),
tk.Label(root, text=f"IP-Adresse: {server_ip}", font=("Arial", 11)),
tk.Label(root, text=f"Port: {port}", font=("Arial", 11)),
tk.Label(root, text="Scanne den QR-Code:", font=("Arial", 11))
]
# Alle Labels packen und größte Breite ermitteln
max_width = 0
total_height = 20 # Grundhöhe als Puffer für Abstände
for label in labels:
label.pack(pady=5)
label.update_idletasks() # Breite und Höhe berechnen
max_width = max(max_width, label.winfo_reqwidth())
total_height += label.winfo_reqheight() + 10 # Höhe sammeln
# QR-Code generieren
url = f"https://{server_ip}:{port}"
qr = qrcode.QRCode(version=1, box_size=8, border=2)
qr.add_data(url)
qr.make(fit=True)
img_qr = qr.make_image(fill_color="black", back_color="white")
# QR-Code als Tkinter-Image einbinden
img_tk = ImageTk.PhotoImage(img_qr)
label_qr = tk.Label(root, image=img_tk)
label_qr.pack()
total_height += img_tk.height() + 20
# Copyright-Vermerk
label_copyright = tk.Label(root, text="Adam Skotarczak (C) 2025", font=("Arial", 9), fg="gray")
label_copyright.pack(pady=5)
total_height += label_copyright.winfo_reqheight() + 10
# Funktion für klickbaren Link
def open_browser(event):
webbrowser.open("https://www.ionivation.com/pyUpload")
# Klickbarer Link unter Copyright
link_label = tk.Label(root, text="Infos: www.ionivation.com/pyUpload", font=("Arial", 10), fg="blue", cursor="hand2")
link_label.pack()
link_label.bind("<Button-1>", open_browser)
total_height += link_label.winfo_reqheight() + 10
# Funktion zum Beenden
def on_quit():
root.destroy()
# Beenden-Button
btn_quit = tk.Button(root, text="Beenden", command=on_quit, font=("Arial", 10))
btn_quit.pack(pady=10)
total_height += btn_quit.winfo_reqheight() + 20
# Endgültige Fenstergröße setzen
root.geometry(f"{max_width + 40}x{total_height + 50}")
# 4) GUI-Loop starten
root.mainloop()
# -------------------------------------------------------------------
# Hauptprogramm mit CLI
# -------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Secure file upload server with optional GUI/QR-Code.")
parser.add_argument("--port", "-p", type=int, default=4443, help="Port, auf dem der Server lauscht (Standard: 4443)")
parser.add_argument("--nogui", "-n", action="store_true", help="Ohne GUI & QR-Code im reinen CLI-Modus starten")
args = parser.parse_args()
print(f"Gestartet mit Port {args.port}, GUI: {not args.nogui}")
if args.nogui:
run_server_nogui(args.port)
else:
run_server_with_gui(args.port)
if __name__ == "__main__":
main()