Verkürzung der Endpoint-Antwortzeit in Go – Nutzung von Job Queues
개요
Wenn man Go zum ersten Mal lernt, lernt man oft, indem man einen Backend-Server implementiert. Nehmen wir als Beispiel die Erstellung eines Beispiels, bei dem ein Dateistream von einer RestAPI usw. empfangen und auf den Server hochgeladen wird. Der Go-Sprach-net/http-Server verarbeitet standardmäßig mehrere Anfragen gleichzeitig, sodass simultane Uploads an sich kein Problem darstellen. Wenn jedoch alle Operationen nach dem Empfang des Streams synchron verarbeitet werden, verzögert sich die Antwort des Endpunkts. Schauen wir uns eine Technik an, um solche Situationen zu vermeiden.
원인
Der Empfang eines Streams dauert in der Regel lange, und bei großen Dateien kann eine einzelne Anfrage mehrere Minuten dauern. In solchen Fällen ist es wichtig, die Operationen nach dem Empfang so schnell wie möglich zu verarbeiten. Dieses Beispielszenario beschreibt das Empfangen eines Streams, das Speichern als temporäre Datei und das Pushen in einen Container. Wenn das Pushen der temporären Datei in den Container in diesem Fall mit einem Worker-Pool verarbeitet wird, kann die Antwortverzögerung verkürzt werden.
1package file_upload
2
3import (
4 "fmt"
5 "io"
6 "log"
7 "net/http"
8 "os"
9 "path/filepath"
10 "time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Temporäres Host-Verzeichnis
14
15// UploadTask enthält Daten für den asynchronen Dateipush.
16type UploadTask struct {
17 HostTempFilePath string
18 ContainerName string
19 HostFilename string
20 ContainerDestinationPath string
21}
22
23// UploadHandler verarbeitet Datei-Uploads. Speichert in temporärer Datei, reiht dann für Incus-Push ein.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25 if req.Method != http.MethodPost {
26 http.Error(wr, "POST-Methode erforderlich.", http.StatusMethodNotAllowed)
27 return
28 }
29 originalFilePath := req.Header.Get("X-File-Path")
30 originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31 containerName := req.Header.Get("X-Container-Name")
32 if originalFilePath == "" || containerName == "" {
33 http.Error(wr, "Header X-File-Path oder X-Container-Name fehlt.", http.StatusBadRequest)
34 return
35 }
36
37 cleanContainerDestPath := filepath.Clean(originalFilePath)
38 if !filepath.IsAbs(cleanContainerDestPath) {
39 http.Error(wr, "Dateipfad muss absolut sein.", http.StatusBadRequest)
40 return
41 }
42
43 // Eindeutigen temporären Dateipfad auf dem Host erstellen
44 tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45 hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47 if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48 log.Printf("ERROR: Fehler beim Erstellen des temporären Upload-Verzeichnisses: %v", err)
49 http.Error(wr, "Serverfehler.", http.StatusInternalServerError)
50 return
51 }
52
53 // Anforderungsbody in temporäre Datei erstellen und kopieren (synchron)
54 outFile, err := os.Create(hostTempFilePath)
55 if err != nil {
56 log.Printf("ERROR: Fehler beim Erstellen der temporären Datei: %v", err)
57 http.Error(wr, "Serverfehler.", http.StatusInternalServerError)
58 return
59 }
60 defer outFile.Close()
61
62 bytesWritten, err := io.Copy(outFile, req.Body)
63 if err != nil {
64 outFile.Close()
65 os.Remove(hostTempFilePath)
66 log.Printf("ERROR: Fehler beim Kopieren des Anforderungsbodys in die temporäre Datei: %v", err)
67 http.Error(wr, "Dateiübertragung fehlgeschlagen.", http.StatusInternalServerError)
68 return
69 }
70 log.Printf("Upload-Information: %d Bytes empfangen, gespeichert unter %s.", bytesWritten, hostTempFilePath)
71
72 // Aufgabe für asynchronen Incus-Push in die Warteschlange einreihen
73 task := UploadTask{
74 HostTempFilePath: hostTempFilePath,
75 ContainerName: containerName,
76 HostFilename : originalFilename,
77 ContainerDestinationPath: cleanContainerDestPath,
78 }
79 EnqueueTask(task) //DIESER TEIL
80 log.Printf("Upload-Information: Aufgabe für %s zu %s in die Warteschlange eingereiht.", originalFilePath, containerName)
81
82 // Sofortige 202 Accepted Antwort senden
83 wr.WriteHeader(http.StatusAccepted)
84 fmt.Fprintf(wr, "Datei '%s' zur Verarbeitung auf Container '%s' in die Warteschlange gestellt.\n", originalFilename, containerName)
85}
An der Stelle, die mit THIS PART gekennzeichnet ist, haben Sie vielleicht bemerkt, dass die Aufgabe in eine Warteschlange eingefügt wird.
Schauen wir uns nun an, wie die Aufgabenwarteschlange funktioniert.
1package file_upload
2
3import (
4 "log"
5 "sync"
6)
7
8var taskQueue chan UploadTask
9var once sync.Once
10
11// InitWorkQueue initialisiert die In-Memory-Aufgabenwarteschlange.
12func InitWorkQueue() {
13 once.Do(func() {
14 taskQueue = make(chan UploadTask, 100)
15 log.Println("Upload-Information: Arbeitswarteschlange initialisiert.")
16 })
17}
18
19// EnqueueTask fügt eine UploadTask zur Warteschlange hinzu.
20func EnqueueTask(task UploadTask) {
21 if taskQueue == nil {
22 log.Fatal("FEHLER: Aufgabenwarteschlange nicht initialisiert.")
23 }
24 taskQueue <- task
25 log.Printf("Upload-Information: Warteschlange: Aufgabe eingereiht. Größe: %d", len(taskQueue))
26}
27
28// DequeueTask ruft eine UploadTask aus der Warteschlange ab und blockiert, wenn leer.
29func DequeueTask() UploadTask {
30 if taskQueue == nil {
31 log.Fatal("FEHLER: Aufgabenwarteschlange nicht initialisiert.")
32 }
33 task := <-taskQueue
34 log.Printf("Upload-Information: Warteschlange: Aufgabe entfernt. Größe: %d", len(taskQueue))
35 return task
36}
37
38// GetQueueLength gibt die aktuelle Warteschlangengröße zurück.
39func GetQueueLength() int {
40 if taskQueue == nil {
41 return 0
42 }
43 return len(taskQueue)
44}
Die als Beispiel bereitgestellte Aufgabenwarteschlange ist einfach implementiert. Diese Aufgabenwarteschlange hat eine einfache Struktur, bei der die in der Warteschlange befindlichen Aufgaben aus dem Channel entnommen werden.
Unten ist die Worker-Methode zum Pushen von Dateien in einen Container nach dem Dateiupload. Die aktuelle Methode ist eine Endlosschleife für gute Reaktionsfähigkeit und einfache Implementierung, aber es ist auch möglich, Algorithmen nach Bedarf hinzuzufügen.
1func StartFilePushWorker() {
2 for {
3 task := DequeueTask()
4 log.Printf("Upload-Information: Worker verarbeitet Aufgabe für %s von %s.", task.ContainerName, task.HostFilename)
5
6 // Bereinigung der temporären Datei verzögern
7 defer func(filePath string) {
8 if err := os.Remove(filePath); err != nil {
9 log.Printf("ERROR: Worker: Fehler beim Entfernen der temporären Datei '%s': %v", filePath, err)
10 } else {
11 log.Printf("Upload-Information: Worker: Temporäre Datei bereinigt: %s", filePath)
12 }
13 }(task.HostTempFilePath)
14
15 // Aufgabe mit Wiederholungsversuchen für vorübergehende Incus-Fehler verarbeiten
16 for i := 0; i <= MaxRetries; i++ {
17 err := processUploadTask(task) //separate Upload-Aufgabe
18 if err == nil {
19 log.Printf("ERFOLG: Worker: Aufgabe für %s abgeschlossen.", task.ContainerName)
20 break
21 }
22
23 isTransient := true
24 if err.Error() == "incus: container not found" { // Beispiel für permanenten Fehler
25 isTransient = false
26 }
27
28 if isTransient && i < MaxRetries {
29 log.Printf("WARNUNG: Worker: Aufgabe für %s fehlgeschlagen (Versuch %d/%d): %v. Wiederholung.",
30 task.ContainerName, i+1, MaxRetries, err)
31 time.Sleep(RetryDelay)
32 } else {
33 log.Printf("FEHLER: Worker: Aufgabe für %s nach %d Versuchen dauerhaft fehlgeschlagen: %v.",
34 task.ContainerName, i+1, err)
35 break
36 }
37 }
38 }
39}
Zuerst versucht diese Funktion, in einer Schleife fortlaufend Aufgaben aus der Warteschlange abzurufen. Danach versucht sie innerhalb des Wiederholungsbereichs den Schritt des Uploads von der temporären Datei zum Container, nicht vom Stream zur temporären Datei.
이점
Der Vorteil dieser Verarbeitungsmethode besteht darin, dass sofern der Stream-Upload normal ist, die Verzögerungszeit für nachfolgende Verarbeitungsvorgänge reduziert und eine Ressourcenerschöpfung durch gleichzeitige Container-Operationen verhindert werden kann. Wie im aktuellen Code ersichtlich, ist die Anzahl der gleichzeitig möglichen Container-Operationen durch die Anzahl der Channels begrenzt. Wir haben ein praktisches Beispiel für die Nutzung der Parallelverarbeitung von Go in dieser Weise kennengelernt. Wenn Sie weitere Beispiele sehen möchten, besuchen Sie bitte die folgenden Links:Modul mit Beispielen Projekt mit Beispielen Das Projekt selbst enthält viele zusätzliche Komponenten, daher müssen Sie für das Verständnis des Workers nur kurz in main.go nachsehen, wie die Worker-Init-Funktion aufgerufen wird. Das Modul enthält auch andere Arten von Workern, bitte beachten Sie dies.