Reduktion af Endpoint-responstid i Go – Anvendelse af Jobkøer
Oversigt
Når man først lærer Go, sker det ofte, at man lærer det ved at implementere en backend-server. Lad os tage et eksempel, hvor man skaber et eksempel, der modtager en filstrøm fra for eksempel en REST API og uploader den til serveren. Go-sprogets net/http-server håndterer som udgangspunkt flere anmodninger samtidigt, så der er ingen problemer med selve den samtidige upload. Men hvis alle handlinger efter modtagelsen af strømmen behandles synkront, vil endepunktets respons blive forsinket. Lad os undersøge en teknik til at forhindre en sådan situation.
Årsag
Det tager generelt lang tid at modtage en strøm, og for store filer kan en enkelt anmodning behandles i flere minutter. I sådanne tilfælde er det vigtigt at behandle handlinger efter modtagelsen så hurtigt som muligt. Dette eksempel-scenarie er et scenarie, hvor en strøm modtages, gemmes som en midlertidig fil, og derefter pushes til en container. På dette tidspunkt, hvis den del, der pusher den midlertidige fil til containeren, behandles med en worker pool, kan responstiden forkortes.
1package file_upload
2
3import (
4 "fmt"
5 "io"
6 "log"
7 "net/http"
8 "os"
9 "path/filepath"
10 "time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host temporary directory
14
15// UploadTask holds data for asynchronous file push.
16type UploadTask struct {
17 HostTempFilePath string
18 ContainerName string
19 HostFilename string
20 ContainerDestinationPath string
21}
22
23// UploadHandler processes file uploads. Saves to temp file, then queues for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25 if req.Method != http.MethodPost {
26 http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27 return
28 }
29 originalFilePath := req.Header.Get("X-File-Path")
30 originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31 containerName := req.Header.Get("X-Container-Name")
32 if originalFilePath == "" || containerName == "" {
33 http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34 return
35 }
36
37 cleanContainerDestPath := filepath.Clean(originalFilePath)
38 if !filepath.IsAbs(cleanContainerDestPath) {
39 http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40 return
41 }
42
43 // Create unique temporary file path on host
44 tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45 hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47 if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48 log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49 http.Error(wr, "Server error.", http.StatusInternalServerError)
50 return
51 }
52
53 // Create and copy request body to temporary file (synchronous)
54 outFile, err := os.Create(hostTempFilePath)
55 if err != nil {
56 log.Printf("ERROR: Failed to create temporary file: %v", err)
57 http.Error(wr, "Server error.", http.StatusInternalServerError)
58 return
59 }
60 defer outFile.Close()
61
62 bytesWritten, err := io.Copy(outFile, req.Body)
63 if err != nil {
64 outFile.Close()
65 os.Remove(hostTempFilePath)
66 log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67 http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68 return
69 }
70 log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72 // Enqueue task for asynchronous Incus push
73 task := UploadTask{
74 HostTempFilePath: hostTempFilePath,
75 ContainerName: containerName,
76 HostFilename : originalFilename,
77 ContainerDestinationPath: cleanContainerDestPath,
78 }
79 EnqueueTask(task) //THIS PART
80 log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82 // Send immediate 202 Accepted response
83 wr.WriteHeader(http.StatusAccepted)
84 fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}
Du har sikkert bemærket, at den del, der er markeret som THIS PART, indsætter opgaven i en kø.
Lad os nu se, hvordan opgavekøen fungerer.
1package file_upload
2
3import (
4 "log"
5 "sync"
6)
7
8var taskQueue chan UploadTask
9var once sync.Once
10
11// InitWorkQueue initializes the in-memory task queue.
12func InitWorkQueue() {
13 once.Do(func() {
14 taskQueue = make(chan UploadTask, 100)
15 log.Println("Upload Info: Work queue initialized.")
16 })
17}
18
19// EnqueueTask adds an UploadTask to the queue.
20func EnqueueTask(task UploadTask) {
21 if taskQueue == nil {
22 log.Fatal("ERROR: Task queue not initialized.")
23 }
24 taskQueue <- task
25 log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask retrieves an UploadTask from the queue, blocking if empty.
29func DequeueTask() UploadTask {
30 if taskQueue == nil {
31 log.Fatal("ERROR: Task queue not initialized.")
32 }
33 task := <-taskQueue
34 log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35 return task
36}
37
38// GetQueueLength returns current queue size.
39func GetQueueLength() int {
40 if taskQueue == nil {
41 return 0
42 }
43 return len(taskQueue)
44}
Den medfølgende opgavekø er simpelt implementeret. Denne opgavekø har en simpel struktur, der trækker opgaverne fra kanalen.
Nedenfor er worker-metoden til at pushe filen til containeren efter upload. Den nuværende metode er en uendelig løkke for god responsivitet og nem implementering, men der kan tilføjes algoritmer efter behov.
1func StartFilePushWorker() {
2 for {
3 task := DequeueTask()
4 log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
5
6 // Defer cleanup of the temporary file
7 defer func(filePath string) {
8 if err := os.Remove(filePath); err != nil {
9 log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10 } else {
11 log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12 }
13 }(task.HostTempFilePath)
14
15 // Process task with retries for transient Incus errors
16 for i := 0; i <= MaxRetries; i++ {
17 err := processUploadTask(task) //separate upload task
18 if err == nil {
19 log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20 break
21 }
22
23 isTransient := true
24 if err.Error() == "incus: container not found" { // Example permanent error
25 isTransient = false
26 }
27
28 if isTransient && i < MaxRetries {
29 log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30 task.ContainerName, i+1, MaxRetries, err)
31 time.Sleep(RetryDelay)
32 } else {
33 log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34 task.ContainerName, i+1, err)
35 break
36 }
37 }
38 }
39}
Først vil denne funktion gentagne gange forsøge at hente opgaver fra køen. Derefter vil den inden for genforsøgsrammen forsøge upload-trinnet fra den midlertidige fil til containeren, snarere end fra strømmen til den midlertidige fil.
Fordele
Fordelen ved denne behandlingsmetode er, at hvis kun strømuploadet er normalt, kan forsinkelsen for efterfølgende behandlingsopgaver reduceres, og udtømning af ressourcer på grund af samtidige containeropgaver kan forhindres. Som det fremgår af den nuværende kode, er antallet af samtidige containeropgaver begrænset af antallet af kanaler. Vi har nu set et praktisk eksempel på, hvordan Go's parallelle behandling kan bruges. Hvis du ønsker at se flere eksempler, bedes du besøge nedenstående links.Modul inklusive eksempler Projekt, der bruger eksempler Selve projektet indeholder mange yderligere komponenter, så for at lære om selve worker'en behøver du kun kort at se, hvordan worker init-funktionen kaldes i main.go. Modulet inkluderer også andre typer worker'e, så tag det med i betragtning.