GoSuda

Verkürzung der Endpunkt-Antwortzeit in Go – Nutzung von Arbeitswarteschlangen

By Yunjin Lee
views ...

Übersicht

Wenn man Go zum ersten Mal lernt, geschieht dies oft durch die Implementierung eines Backend-Servers. Nehmen wir als Beispiel den Fall, in dem man ein Beispiel erstellt, bei dem Dateistreams von einer RestAPI empfangen und auf den Server hochgeladen werden. Der net/http-Server der Go-Sprache verarbeitet standardmäßig mehrere Anfragen gleichzeitig, sodass es kein Problem mit simultanen Uploads gibt. Wenn jedoch alle Operationen nach dem Empfang des Streams synchron verarbeitet werden, verzögert sich die Antwort des Endpunkts. Lassen Sie uns eine Technik zur Vermeidung solcher Situationen untersuchen.

Ursache

Der Empfang eines Streams nimmt in der Regel viel Zeit in Anspruch, und bei großen Dateien kann eine einzelne Anfrage mehrere Minuten dauern. In solchen Fällen ist es wichtig, die Operationen nach dem Empfang so schnell wie möglich zu bearbeiten. Dieses Beispielszenario umfasst das Empfangen eines Streams, das Speichern als temporäre Datei und das Pushen in einen Container. Wenn der Teil des Pushens der temporären Datei in den Container mit einem Worker-Pool verarbeitet wird, kann die Antwortverzögerung verkürzt werden.

 1package file_upload
 2
 3import (
 4	"fmt"
 5	"io"
 6	"log"
 7	"net/http"
 8	"os"
 9	"path/filepath"
10	"time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host temporary directory
14
15// UploadTask holds data for asynchronous file push.
16type UploadTask struct {
17	HostTempFilePath         string
18	ContainerName            string
19    HostFilename             string
20	ContainerDestinationPath string
21}
22
23// UploadHandler processes file uploads. Saves to temp file, then queues for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25	if req.Method != http.MethodPost {
26		http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27		return
28	}
29	originalFilePath := req.Header.Get("X-File-Path")
30    originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31	containerName := req.Header.Get("X-Container-Name")
32	if originalFilePath == "" || containerName == "" {
33		http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34		return
35	}
36
37	cleanContainerDestPath := filepath.Clean(originalFilePath)
38	if !filepath.IsAbs(cleanContainerDestPath) {
39		http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40		return
41	}
42
43	// Erstelle einen eindeutigen temporären Dateipfad auf dem Host
44	tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45	hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47	if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48		log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49		http.Error(wr, "Server error.", http.StatusInternalServerError)
50		return
51	}
52
53	// Erstelle und kopiere den Anfragekörper in eine temporäre Datei (synchron)
54	outFile, err := os.Create(hostTempFilePath)
55	if err != nil {
56		log.Printf("ERROR: Failed to create temporary file: %v", err)
57		http.Error(wr, "Server error.", http.StatusInternalServerError)
58		return
59	}
60	defer outFile.Close()
61
62	bytesWritten, err := io.Copy(outFile, req.Body)
63	if err != nil {
64		outFile.Close()
65		os.Remove(hostTempFilePath)
66		log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67		http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68		return
69	}
70	log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72	// Füge die Aufgabe für den asynchronen Incus-Push in die Warteschlange ein
73	task := UploadTask{
74		HostTempFilePath:         hostTempFilePath,
75		ContainerName:            containerName,
76        HostFilename :            originalFilename,
77		ContainerDestinationPath: cleanContainerDestPath,
78	}
79	EnqueueTask(task) //DIESER TEIL
80	log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82	// Sende sofort eine 202 Accepted Antwort
83	wr.WriteHeader(http.StatusAccepted)
84	fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}

An der Stelle, die als THIS PART gekennzeichnet ist, haben Sie wahrscheinlich bemerkt, dass die Aufgabe in eine Warteschlange eingefügt wird.

Schauen wir uns nun an, wie die Aufgabenwarteschlange funktioniert.

 1package file_upload
 2
 3import (
 4	"log"
 5	"sync"
 6)
 7
 8var taskQueue chan UploadTask
 9var once sync.Once
10
11// InitWorkQueue initialisiert die In-Memory-Aufgabenwarteschlange.
12func InitWorkQueue() {
13	once.Do(func() {
14		taskQueue = make(chan UploadTask, 100)
15		log.Println("Upload Info: Work queue initialized.")
16	})
17}
18
19// EnqueueTask fügt eine UploadTask zur Warteschlange hinzu.
20func EnqueueTask(task UploadTask) {
21	if taskQueue == nil {
22		log.Fatal("ERROR: Task queue not initialized.")
23	}
24	taskQueue <- task
25	log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask ruft eine UploadTask aus der Warteschlange ab und blockiert, wenn diese leer ist.
29func DequeueTask() UploadTask {
30	if taskQueue == nil {
31		log.Fatal("ERROR: Task queue not initialized.")
32	}
33	task := <-taskQueue
34	log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35	return task
36}
37
38// GetQueueLength gibt die aktuelle Warteschlangengröße zurück.
39func GetQueueLength() int {
40	if taskQueue == nil {
41		return 0
42	}
43	return len(taskQueue)
44}

Die als Beispiel bereitgestellte Aufgabenwarteschlange ist einfach implementiert. Diese Aufgabenwarteschlange hat eine einfache Struktur, die Aufgaben aus dem Kanal entnimmt, die sich in der Warteschlange befinden.

Unten ist die Worker-Methode zum Pushen von Dateien in den Container nach dem Hochladen. Die aktuelle Methode ist für eine gute Reaktivität und einfache Implementierung eine Endlosschleife, aber es ist zulässig, Algorithmen je nach Verwendungszweck hinzuzufügen.

 1func StartFilePushWorker() {
 2	for {
 3		task := DequeueTask()
 4		log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
 5
 6		// Verschiebe die Bereinigung der temporären Datei
 7		defer func(filePath string) {
 8			if err := os.Remove(filePath); err != nil {
 9				log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10			} else {
11				log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12			}
13		}(task.HostTempFilePath)
14
15		// Verarbeite die Aufgabe mit Wiederholungsversuchen für vorübergehende Incus-Fehler
16		for i := 0; i <= MaxRetries; i++ {
17			err := processUploadTask(task) // separate Upload-Aufgabe
18			if err == nil {
19				log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20				break
21			}
22
23			isTransient := true
24			if err.Error() == "incus: container not found" { // Beispiel für einen permanenten Fehler
25				isTransient = false
26			}
27
28			if isTransient && i < MaxRetries {
29				log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30					task.ContainerName, i+1, MaxRetries, err)
31				time.Sleep(RetryDelay)
32			} else {
33				log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34					task.ContainerName, i+1, err)
35				break
36			}
37		}
38	}
39}

Zuerst versucht diese Funktion, kontinuierlich in einer Schleife Aufgaben aus der Warteschlange abzurufen. Danach versucht sie innerhalb der Wiederholungsgrenzen den Schritt des Uploads von der temporären Datei in den Container und nicht vom Stream in die temporäre Datei.

Vorteile

Der Vorteil dieser Verarbeitungsmethode besteht darin, dass, solange der Stream-Upload erfolgreich ist, die Verzögerungszeit für nachfolgende Verarbeitungsaufgaben reduziert und die Ressourcenerschöpfung durch gleichzeitige Container-Operationen verhindert werden kann. Wie im aktuellen Code ersichtlich, ist die Anzahl der gleichzeitig möglichen Container-Operationen durch die Anzahl der Kanäle begrenzt. So haben wir ein praktisches Beispiel für die Verwendung der Parallelverarbeitung von Go kennengelernt. Wenn Sie weitere Beispiele sehen möchten, besuchen Sie bitte die unten stehenden Links.Modul mit Beispielen Projekt mit Beispielen Das Projekt selbst enthält viele zusätzliche Komponenten. Für das Verständnis des Workers selbst müssen Sie nur kurz sehen, wie die Worker-Init-Funktion in main.go aufgerufen wird. Das Modul enthält auch andere Arten von Workern, beachten Sie dies bitte.