348 lines
13 KiB
Python
348 lines
13 KiB
Python
|
#!/usr/bin/env python3
|
||
|
|
||
|
# © 2025 Adam Skotarczak (adam@skotarczak.net)
|
||
|
# Dieses Softwarepaket darf nicht ohne Genehmigung weiterverbreitet werden!
|
||
|
# Version 1.2.2 (05.03.2025)
|
||
|
|
||
|
import os
|
||
|
import sys
|
||
|
import ssl
|
||
|
import logging
|
||
|
import socket
|
||
|
import argparse
|
||
|
import threading
|
||
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||
|
from datetime import datetime, timedelta, timezone
|
||
|
|
||
|
from cryptography import x509
|
||
|
from cryptography.x509.oid import NameOID
|
||
|
from cryptography.hazmat.primitives import hashes, serialization
|
||
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
||
|
|
||
|
# -------------------------------------------------------------------
|
||
|
# Globale Konfigurationen & Verzeichnisse
|
||
|
# -------------------------------------------------------------------
|
||
|
UPLOAD_DIR = 'upload'
|
||
|
LOG_DIR = 'logs'
|
||
|
CERT_FILE = 'cert.pem'
|
||
|
KEY_FILE = 'key.pem'
|
||
|
CENTRAL_LOG_FILE = os.path.join(LOG_DIR, 'pyupload.log')
|
||
|
|
||
|
os.makedirs(UPLOAD_DIR, exist_ok=True)
|
||
|
os.makedirs(LOG_DIR, exist_ok=True)
|
||
|
|
||
|
logging.basicConfig(
|
||
|
level=logging.INFO,
|
||
|
format="%(asctime)s - %(message)s",
|
||
|
handlers=[logging.FileHandler(CENTRAL_LOG_FILE), logging.StreamHandler()]
|
||
|
)
|
||
|
|
||
|
central_logger = logging.getLogger("central_logger")
|
||
|
client_loggers = {}
|
||
|
|
||
|
# -------------------------------------------------------------------
|
||
|
# HTTP-Request-Handler
|
||
|
# -------------------------------------------------------------------
|
||
|
class SecureHTTPRequestHandler(BaseHTTPRequestHandler):
|
||
|
def log_message(self, format, *args):
|
||
|
"""Sammelt Log-Einträge in Client-spezifische Dateien und die Zentrale-Logdatei."""
|
||
|
client_ip = self.client_address[0]
|
||
|
message = format % args
|
||
|
|
||
|
# Einmalig pro Client-IP einen dedizierten Logger anlegen.
|
||
|
if client_ip not in client_loggers:
|
||
|
logger = logging.getLogger(f'client_{client_ip}')
|
||
|
logger.setLevel(logging.INFO)
|
||
|
client_log_file = os.path.join(LOG_DIR, f"{client_ip}.log")
|
||
|
handler = logging.FileHandler(client_log_file)
|
||
|
handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
|
||
|
logger.addHandler(handler)
|
||
|
client_loggers[client_ip] = logger
|
||
|
|
||
|
# In den Client-spezifischen Logger und in den zentralen Logger schreiben.
|
||
|
client_loggers[client_ip].info(message)
|
||
|
central_logger.info(f"{client_ip} - {message}")
|
||
|
|
||
|
def send_html_response(self, filename):
|
||
|
"""Liefert eine HTML-Datei als HTTP-Response zurück."""
|
||
|
try:
|
||
|
with open(filename, "r", encoding="utf-8") as f:
|
||
|
content = f.read()
|
||
|
self.send_response(200)
|
||
|
self.send_header('Content-type', 'text/html')
|
||
|
self.end_headers()
|
||
|
self.wfile.write(content.encode("utf-8"))
|
||
|
except FileNotFoundError:
|
||
|
self.send_error(500, "HTML-Template nicht gefunden")
|
||
|
|
||
|
def do_GET(self):
|
||
|
"""Ausliefern der Upload-Seite (template.html)."""
|
||
|
if self.path == '/':
|
||
|
self.send_html_response("template.html")
|
||
|
else:
|
||
|
self.send_error(404, "Seite nicht gefunden")
|
||
|
self.log_message('404 Not Found: %s', self.path)
|
||
|
|
||
|
def do_POST(self):
|
||
|
"""Behandelt Datei-Uploads (multipart/form-data) direkt als Stream."""
|
||
|
try:
|
||
|
content_type = self.headers.get('Content-Type')
|
||
|
content_length = int(self.headers.get('Content-Length', 0))
|
||
|
|
||
|
if not content_type or 'multipart/form-data' not in content_type:
|
||
|
self.send_error(400, "Ungültiger Content-Type")
|
||
|
self.log_message('400 Bad Request: Ungültiger Content-Type')
|
||
|
return
|
||
|
|
||
|
if content_length == 0:
|
||
|
self.send_error(400, "Leere Anfrage erhalten")
|
||
|
self.log_message('400 Bad Request: Leere Anfrage')
|
||
|
return
|
||
|
|
||
|
client_ip = self.client_address[0]
|
||
|
client_upload_dir = os.path.join(UPLOAD_DIR, client_ip)
|
||
|
os.makedirs(client_upload_dir, exist_ok=True)
|
||
|
|
||
|
boundary = content_type.split("boundary=")[-1].encode()
|
||
|
raw_data = self.rfile.read(content_length)
|
||
|
parts = raw_data.split(b"--" + boundary)
|
||
|
|
||
|
found_file = False
|
||
|
|
||
|
for part in parts:
|
||
|
if b"Content-Disposition" in part:
|
||
|
headers, file_data = part.split(b"\r\n\r\n", 1)
|
||
|
filename_start = headers.find(b'filename="') + 10
|
||
|
filename_end = headers.find(b'"', filename_start)
|
||
|
filename = headers[filename_start:filename_end].decode()
|
||
|
|
||
|
if filename: # Falls tatsächlich ein Dateiname vorhanden ist
|
||
|
file_path = os.path.join(client_upload_dir, os.path.basename(filename))
|
||
|
with open(file_path, "wb") as f:
|
||
|
# Entferne das trailing CRLF oder "--"
|
||
|
f.write(file_data.rstrip(b"\r\n--"))
|
||
|
self.log_message(f"Datei {filename} erfolgreich hochgeladen.")
|
||
|
found_file = True
|
||
|
|
||
|
if not found_file:
|
||
|
self.send_error(400, "Keine Datei im Upload enthalten")
|
||
|
self.log_message('400 Bad Request: Keine Datei übermittelt')
|
||
|
return
|
||
|
|
||
|
# Erfolgsseite senden
|
||
|
self.send_html_response("success.html")
|
||
|
|
||
|
except Exception as e:
|
||
|
self.log_message(f"Fehler: {e}")
|
||
|
self.send_error(500, "Interner Serverfehler")
|
||
|
|
||
|
# -------------------------------------------------------------------
|
||
|
# HTTPS-Server-Setup
|
||
|
# -------------------------------------------------------------------
|
||
|
def generate_self_signed_cert(cert_file, key_file):
|
||
|
"""Erzeugt ein selbstsigniertes SSL-Zertifikat, falls keines vorhanden ist."""
|
||
|
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
||
|
subject = issuer = x509.Name([
|
||
|
x509.NameAttribute(NameOID.COUNTRY_NAME, "DE"),
|
||
|
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Berlin"),
|
||
|
x509.NameAttribute(NameOID.LOCALITY_NAME, "Berlin"),
|
||
|
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "ionivation.com"),
|
||
|
x509.NameAttribute(NameOID.COMMON_NAME, "localhost.lan"),
|
||
|
])
|
||
|
cert = (
|
||
|
x509.CertificateBuilder()
|
||
|
.subject_name(subject)
|
||
|
.issuer_name(issuer)
|
||
|
.public_key(key.public_key())
|
||
|
.serial_number(x509.random_serial_number())
|
||
|
.not_valid_before(datetime.now(timezone.utc))
|
||
|
.not_valid_after(datetime.now(timezone.utc) + timedelta(days=365))
|
||
|
.add_extension(
|
||
|
x509.SubjectAlternativeName([x509.DNSName("localhost")]),
|
||
|
critical=False
|
||
|
)
|
||
|
.sign(key, hashes.SHA256())
|
||
|
)
|
||
|
|
||
|
with open(cert_file, "wb") as f:
|
||
|
f.write(cert.public_bytes(serialization.Encoding.PEM))
|
||
|
with open(key_file, "wb") as f:
|
||
|
f.write(key.private_bytes(
|
||
|
encoding=serialization.Encoding.PEM,
|
||
|
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||
|
encryption_algorithm=serialization.NoEncryption()
|
||
|
))
|
||
|
|
||
|
def get_server_ip():
|
||
|
"""Bestimmt die lokale IP-Adresse, um sie z.B. für den QR-Code zu nutzen."""
|
||
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
|
try:
|
||
|
s.connect(("8.8.8.8", 80))
|
||
|
ip = s.getsockname()[0]
|
||
|
except Exception:
|
||
|
ip = "127.0.0.1"
|
||
|
finally:
|
||
|
s.close()
|
||
|
return ip
|
||
|
|
||
|
def create_https_server(port, handler_class=SecureHTTPRequestHandler):
|
||
|
"""Erzeugt ein HTTPS-Serverobjekt (aber startet ihn noch nicht)."""
|
||
|
if not os.path.exists(CERT_FILE) or not os.path.exists(KEY_FILE):
|
||
|
print("SSL-Zertifikat nicht gefunden. Erstelle selbstsigniertes Zertifikat...")
|
||
|
generate_self_signed_cert(CERT_FILE, KEY_FILE)
|
||
|
print("Selbstsigniertes SSL-Zertifikat erstellt.")
|
||
|
|
||
|
server_address = ('', port)
|
||
|
httpd = HTTPServer(server_address, handler_class)
|
||
|
|
||
|
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||
|
context.load_cert_chain(certfile=CERT_FILE, keyfile=KEY_FILE)
|
||
|
httpd.socket = context.wrap_socket(httpd.socket, server_side=True)
|
||
|
return httpd
|
||
|
|
||
|
# -------------------------------------------------------------------
|
||
|
# Startfunktionen (GUI / no-GUI)
|
||
|
# -------------------------------------------------------------------
|
||
|
def run_server_nogui(port):
|
||
|
httpd = create_https_server(port)
|
||
|
server_ip = get_server_ip()
|
||
|
|
||
|
try:
|
||
|
print(f"Starte HTTPS-Server auf https://{server_ip}:{port}")
|
||
|
print("Drücke STRG+C, um zu beenden.")
|
||
|
print("warte auf Verbindungen ... \n")
|
||
|
print(f"Öffne im Browser: https://{server_ip}:{port}")
|
||
|
print("Du musst Dich im gleichen Netzwerk befinden (Lan/ Wlan)")
|
||
|
httpd.serve_forever()
|
||
|
|
||
|
except KeyboardInterrupt:
|
||
|
# Nur eine kurze Meldung ausgeben und dann sauber herunterfahren
|
||
|
print("\nSTRG+C erkannt. Fahre Server herunter...")
|
||
|
|
||
|
except Exception as e:
|
||
|
print(f"Fehler aufgetreten: {e}")
|
||
|
logging.error("Serverfehler", exc_info=True)
|
||
|
|
||
|
finally:
|
||
|
httpd.server_close()
|
||
|
logging.shutdown()
|
||
|
print("Server wurde sauber beendet.")
|
||
|
|
||
|
|
||
|
def run_server_with_gui(port):
|
||
|
# Nur für QR-Code und GUI benötigt:
|
||
|
import qrcode
|
||
|
import webbrowser
|
||
|
from PIL import Image, ImageTk
|
||
|
import tkinter as tk
|
||
|
|
||
|
"""Startet den Server in einem Hintergrund-Thread und öffnet eine tkinter-GUI mit QR-Code."""
|
||
|
# 1) Erzeuge den Server (aber noch kein serve_forever).
|
||
|
httpd = create_https_server(port)
|
||
|
server_ip = get_server_ip()
|
||
|
url = f"https://{server_ip}:{port}"
|
||
|
|
||
|
# 2) Hintergrund-Thread starten
|
||
|
def server_thread():
|
||
|
try:
|
||
|
print(f"Starte HTTPS-Server auf {url}")
|
||
|
httpd.serve_forever()
|
||
|
except Exception as ex:
|
||
|
print(f"Server-Thread-Exception: {ex}")
|
||
|
finally:
|
||
|
httpd.server_close()
|
||
|
logging.shutdown()
|
||
|
|
||
|
t = threading.Thread(target=server_thread, daemon=True)
|
||
|
t.start()
|
||
|
|
||
|
# 3) tkinter-GUI aufbauen
|
||
|
root = tk.Tk()
|
||
|
root.title("pyUpload - Secure File Upload")
|
||
|
|
||
|
# Favicon setzen (nur unter Windows direkt mit .ico möglich)
|
||
|
try:
|
||
|
root.iconbitmap("favicon.ico")
|
||
|
except Exception as e:
|
||
|
print(f"Konnte das Icon nicht setzen: {e}")
|
||
|
|
||
|
# Labels erzeugen
|
||
|
labels = [
|
||
|
tk.Label(root, text="HTTPS-Upload-Server läuft!", font=("Arial", 14)),
|
||
|
tk.Label(root, text=f"IP-Adresse: {server_ip}", font=("Arial", 11)),
|
||
|
tk.Label(root, text=f"Port: {port}", font=("Arial", 11)),
|
||
|
tk.Label(root, text="Scanne den QR-Code:", font=("Arial", 11))
|
||
|
]
|
||
|
|
||
|
# Alle Labels packen und größte Breite ermitteln
|
||
|
max_width = 0
|
||
|
total_height = 20 # Grundhöhe als Puffer für Abstände
|
||
|
for label in labels:
|
||
|
label.pack(pady=5)
|
||
|
label.update_idletasks() # Breite und Höhe berechnen
|
||
|
max_width = max(max_width, label.winfo_reqwidth())
|
||
|
total_height += label.winfo_reqheight() + 10 # Höhe sammeln
|
||
|
|
||
|
# QR-Code generieren
|
||
|
url = f"https://{server_ip}:{port}"
|
||
|
qr = qrcode.QRCode(version=1, box_size=8, border=2)
|
||
|
qr.add_data(url)
|
||
|
qr.make(fit=True)
|
||
|
img_qr = qr.make_image(fill_color="black", back_color="white")
|
||
|
|
||
|
# QR-Code als Tkinter-Image einbinden
|
||
|
img_tk = ImageTk.PhotoImage(img_qr)
|
||
|
label_qr = tk.Label(root, image=img_tk)
|
||
|
label_qr.pack()
|
||
|
total_height += img_tk.height() + 20
|
||
|
|
||
|
# Copyright-Vermerk
|
||
|
label_copyright = tk.Label(root, text="Adam Skotarczak (C) 2025", font=("Arial", 9), fg="gray")
|
||
|
label_copyright.pack(pady=5)
|
||
|
total_height += label_copyright.winfo_reqheight() + 10
|
||
|
|
||
|
# Funktion für klickbaren Link
|
||
|
def open_browser(event):
|
||
|
webbrowser.open("https://www.ionivation.com/pyUpload")
|
||
|
|
||
|
# Klickbarer Link unter Copyright
|
||
|
link_label = tk.Label(root, text="Infos: www.ionivation.com/pyUpload", font=("Arial", 10), fg="blue", cursor="hand2")
|
||
|
link_label.pack()
|
||
|
link_label.bind("<Button-1>", open_browser)
|
||
|
total_height += link_label.winfo_reqheight() + 10
|
||
|
|
||
|
# Funktion zum Beenden
|
||
|
def on_quit():
|
||
|
root.destroy()
|
||
|
|
||
|
# Beenden-Button
|
||
|
btn_quit = tk.Button(root, text="Beenden", command=on_quit, font=("Arial", 10))
|
||
|
btn_quit.pack(pady=10)
|
||
|
total_height += btn_quit.winfo_reqheight() + 20
|
||
|
|
||
|
# Endgültige Fenstergröße setzen
|
||
|
root.geometry(f"{max_width + 40}x{total_height + 50}")
|
||
|
|
||
|
# 4) GUI-Loop starten
|
||
|
root.mainloop()
|
||
|
|
||
|
# -------------------------------------------------------------------
|
||
|
# Hauptprogramm mit CLI
|
||
|
# -------------------------------------------------------------------
|
||
|
|
||
|
def main():
|
||
|
parser = argparse.ArgumentParser(description="Secure file upload server with optional GUI/QR-Code.")
|
||
|
parser.add_argument("--port", "-p", type=int, default=4443, help="Port, auf dem der Server lauscht (Standard: 4443)")
|
||
|
parser.add_argument("--nogui", "-n", action="store_true", help="Ohne GUI & QR-Code im reinen CLI-Modus starten")
|
||
|
|
||
|
args = parser.parse_args()
|
||
|
print(f"Gestartet mit Port {args.port}, GUI: {not args.nogui}")
|
||
|
|
||
|
if args.nogui:
|
||
|
run_server_nogui(args.port)
|
||
|
else:
|
||
|
run_server_with_gui(args.port)
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|