GoSuda

Verkürzung der Endpoint-Antwortzeit in Go – Nutzung von Work Queues

By Yunjin Lee
views ...

Überblick

Wenn man Go erlernt, geschieht dies häufig durch die Implementierung eines Backend-Servers. Nehmen wir als Beispiel das Erstellen einer Demonstration, bei der ein Server Datenströme von einer RestAPI empfängt und diese hochlädt. Der Go-Sprach-Server net/http verarbeitet standardmäßig mehrere Anfragen simultan, sodass es an sich kein Problem mit simultanen Uploads gibt. Wenn jedoch alle Operationen nach dem Empfang des Streams synchron behandelt werden, kommt es zu einer Verzögerung der Endpunkt-Antwort. Wir werden nun Techniken untersuchen, um eine solche Situation zu vermeiden.

Ursache

Auch der Empfang des Streams nimmt typischerweise erhebliche Zeit in Anspruch, wobei die Verarbeitung einer einzelnen Anfrage bei großen Dateien mehrere Minuten dauern kann. In solchen Fällen ist es von Bedeutung, die nachgelagerten Operationen nach dem Empfang so zügig wie möglich zu bearbeiten. Dieses Beispiel-Szenario beschreibt das Speichern des Streams in einer temporären Datei nach dem Empfang und das anschließende Pushen dieser Datei in einen Container. Wenn der Teil des Pushens der temporären Datei in den Container mittels eines Worker Pools gehandhabt wird, lässt sich die Antwortverzögerung reduzieren.

 1package file_upload
 2
 3import (
 4	"fmt"
 5	"io"
 6	"log"
 7	"net/http"
 8	"os"
 9	"path/filepath"
10	"time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Temporäres Verzeichnis des Hosts
14
15// UploadTask speichert Daten für das asynchrone Incus-Push-Verfahren.
16type UploadTask struct {
17	HostTempFilePath         string
18	ContainerName            string
19    HostFilename             string
20	ContainerDestinationPath string
21}
22
23// UploadHandler verarbeitet Datei-Uploads. Speichert in einer temporären Datei und reiht sie dann für den Incus-Push ein.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25	if req.Method != http.MethodPost {
26		http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27		return
28	}
29	originalFilePath := req.Header.Get("X-File-Path")
30    originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31	containerName := req.Header.Get("X-Container-Name")
32	if originalFilePath == "" || containerName == "" {
33		http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34		return
35	}
36
37	cleanContainerDestPath := filepath.Clean(originalFilePath)
38	if !filepath.IsAbs(cleanContainerDestPath) {
39		http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40		return
41	}
42
43	// Erstellt einen eindeutigen temporären Dateipfad auf dem Host
44	tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45	hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47	if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48		log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49		http.Error(wr, "Server error.", http.StatusInternalServerError)
50		return
51	}
52
53	// Erstellt und kopiert den Request-Body in die temporäre Datei (synchron)
54	outFile, err := os.Create(hostTempFilePath)
55	if err != nil {
56		log.Printf("ERROR: Failed to create temporary file: %v", err)
57		http.Error(wr, "Server error.", http.StatusInternalServerError)
58		return
59	}
60	defer outFile.Close()
61
62	bytesWritten, err := io.Copy(outFile, req.Body)
63	if err != nil {
64		outFile.Close()
65		os.Remove(hostTempFilePath)
66		log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67		http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68		return
69	}
70	log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72	// Reiht die Aufgabe für den asynchronen Incus-Push ein
73	task := UploadTask{
74		HostTempFilePath:         hostTempFilePath,
75		ContainerName:            containerName,
76        HostFilename :            originalFilename,
77		ContainerDestinationPath: cleanContainerDestPath,
78	}
79	EnqueueTask(task) //DIESER TEIL
80	log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82	// Sendet sofortige 202 Accepted Antwort
83	wr.WriteHeader(http.StatusAccepted)
84	fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}

Wenn Sie den mit THIS PART gekennzeichneten Abschnitt betrachten, werden Sie bemerkt haben, dass hier eine Aufgabe in die Warteschlange eingereiht wird.

Nun wollen wir uns ansehen, wie die Aufgabenwarteschlange funktioniert.

 1package file_upload
 2
 3import (
 4	"log"
 5	"sync"
 6)
 7
 8var taskQueue chan UploadTask
 9var once sync.Once
10
11// InitWorkQueue initialisiert die In-Memory-Aufgabenwarteschlange.
12func InitWorkQueue() {
13	once.Do(func() {
14		taskQueue = make(chan UploadTask, 100)
15		log.Println("Upload Info: Work queue initialized.")
16	})
17}
18
19// EnqueueTask fügt einen UploadTask zur Warteschlange hinzu.
20func EnqueueTask(task UploadTask) {
21	if taskQueue == nil {
22		log.Fatal("ERROR: Task queue not initialized.")
23	}
24	taskQueue <- task
25	log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask ruft einen UploadTask aus der Warteschlange ab, blockiert bei Leerstand.
29func DequeueTask() UploadTask {
30	if taskQueue == nil {
31		log.Fatal("ERROR: Task queue not initialized.")
32	}
33	task := <-taskQueue
34	log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35	return task
36}
37
38// GetQueueLength gibt die aktuelle Größe der Warteschlange zurück.
39func GetQueueLength() int {
40	if taskQueue == nil {
41		return 0
42	}
43	return len(taskQueue)
44}

Die als Beispiel bereitgestellte Aufgabenwarteschlange ist simpel implementiert. Diese Aufgabenwarteschlange besitzt eine einfache Struktur, die darin besteht, die in der Queue befindlichen Aufgaben aus dem Channel zu entnehmen.

Nachfolgend ist die Worker-Methode zur Durchführung des Pushs in den Container nach dem Datei-Upload aufgeführt. Die aktuelle Methode ist aufgrund der gewünschten guten Reaktionsfähigkeit und der einfachen Implementierung eine Endlosschleife, wobei es zulässig ist, je nach Verwendungszweck Algorithmen hinzuzufügen.

 1func StartFilePushWorker() {
 2	for {
 3		task := DequeueTask()
 4		log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
 5
 6		// Verschiebe die Bereinigung der temporären Datei auf später
 7		defer func(filePath string) {
 8			if err := os.Remove(filePath); err != nil {
 9				log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10			} else {
11				log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12			}
13		}(task.HostTempFilePath)
14
15		// Bearbeite die Aufgabe mit Wiederholungsversuchen bei transienten Incus-Fehlern
16		for i := 0; i <= MaxRetries; i++ {
17			err := processUploadTask(task) //separate Upload-Aufgabe
18			if err == nil {
19				log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20				break
21			}
22
23			isTransient := true
24			if err.Error() == "incus: container not found" { // Beispiel für einen permanenten Fehler
25				isTransient = false
26			}
27
28			if isTransient && i < MaxRetries {
29				log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30					task.ContainerName, i+1, MaxRetries, err)
31				time.Sleep(RetryDelay)
32			} else {
33				log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34					task.ContainerName, i+1, err)
35				break
36			}
37		}
38	}
39}

Zunächst versucht diese Funktion kontinuierlich in einer Schleife, eine Aufgabe aus der Warteschlange abzurufen. Anschließend wird die Phase des Uploads vom temporären Datei zum Container versucht, und zwar innerhalb des Wiederholungsrahmens und nicht die Phase vom Stream zur temporären Datei.

Vorteile

Der Vorteil dieses Verarbeitungsverfahrens besteht darin, dass die Verzögerungszeit für nachfolgende Operationen reduziert werden kann, sofern der Stream-Upload erfolgreich war, und dass eine Erschöpfung der Ressourcen durch simultane Container-Operationen verhindert werden kann. Wie im aktuellen Code ersichtlich, wird die Anzahl der gleichzeitig ausführbaren Container-Operationen durch die Kapazität des Channels begrenzt. Wir haben somit ein praktisches Beispiel für die Nutzung der Parallelverarbeitung von Go kennengelernt. Falls Sie weitere Beispiele einsehen möchten, besuchen Sie bitte die nachfolgenden Links.Modul inklusive Beispiel Projekt unter Anwendung des Beispiels Das Projekt selbst enthält zahlreiche ergänzende Komponenten; für das Verständnis des Workers selbst genügt es, kurz zu prüfen, wie die Worker-Init-Funktion in der main.go aufgerufen wird. Das Modul beinhaltet auch andere Formen von Workern, was zur Beachtung empfohlen wird.