Reducering af Endpoint-responstid i Go – Brug af en Jobkø
Oversigt
Når man først lærer Go, er det ofte tilfældet, at man lærer det ved at implementere en backend-server. Lad os som eksempel forestille os et scenarie, hvor man skal oprette et eksempel, der modtager en filstream fra f.eks. RestAPI og uploader den til serveren. Go-sprogets net/http-server behandler som udgangspunkt flere anmodninger samtidigt, så der er ingen problemer med samtidige uploads i sig selv. Men hvis alle operationer efter modtagelse af streamet behandles synkront, vil endpointets respons blive forsinket. Lad os undersøge en teknik til at forhindre en sådan situation.
Årsag
Modtagelse af et stream tager typisk lang tid, og for store filer kan en enkelt anmodning behandles i flere minutter. I et sådant tilfælde er det vigtigt at behandle operationer efter modtagelse så hurtigt som muligt. Dette eksempels scenarie er at modtage et stream, gemme det som en midlertidig fil og derefter pushe det til en container. Hvis den del, der pusher den midlertidige fil til containeren, behandles med en worker pool, kan responstiden forkortes.
1package file_upload
2
3import (
4 "fmt"
5 "io"
6 "log"
7 "net/http"
8 "os"
9 "path/filepath"
10 "time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host temporary directory
14
15// UploadTask holds data for asynchronous file push.
16type UploadTask struct {
17 HostTempFilePath string
18 ContainerName string
19 HostFilename string
20 ContainerDestinationPath string
21}
22
23// UploadHandler processes file uploads. Saves to temp file, then queues for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25 if req.Method != http.MethodPost {
26 http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27 return
28 }
29 originalFilePath := req.Header.Get("X-File-Path")
30 originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31 containerName := req.Header.Get("X-Container-Name")
32 if originalFilePath == "" || containerName == "" {
33 http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34 return
35 }
36
37 cleanContainerDestPath := filepath.Clean(originalFilePath)
38 if !filepath.IsAbs(cleanContainerDestPath) {
39 http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40 return
41 }
42
43 // Create unique temporary file path on host
44 tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45 hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47 if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48 log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49 http.Error(wr, "Server error.", http.StatusInternalServerError)
50 return
51 }
52
53 // Create and copy request body to temporary file (synchronous)
54 outFile, err := os.Create(hostTempFilePath)
55 if err != nil {
56 log.Printf("ERROR: Failed to create temporary file: %v", err)
57 http.Error(wr, "Server error.", http.StatusInternalServerError)
58 return
59 }
60 defer outFile.Close()
61
62 bytesWritten, err := io.Copy(outFile, req.Body)
63 if err != nil {
64 outFile.Close()
65 os.Remove(hostTempFilePath)
66 log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67 http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68 return
69 }
70 log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72 // Enqueue task for asynchronous Incus push
73 task := UploadTask{
74 HostTempFilePath: hostTempFilePath,
75 ContainerName: containerName,
76 HostFilename : originalFilename,
77 ContainerDestinationPath: cleanContainerDestPath,
78 }
79 EnqueueTask(task) //THIS PART
80 log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82 // Send immediate 202 Accepted response
83 wr.WriteHeader(http.StatusAccepted)
84 fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}
Du har sikkert bemærket, at den del, der er markeret med THIS PART, indsætter en opgave i køen.
Lad os nu se, hvordan opgavekøen fungerer.
1package file_upload
2
3import (
4 "log"
5 "sync"
6)
7
8var taskQueue chan UploadTask
9var once sync.Once
10
11// InitWorkQueue initializes the in-memory task queue.
12func InitWorkQueue() {
13 once.Do(func() {
14 taskQueue = make(chan UploadTask, 100)
15 log.Println("Upload Info: Work queue initialized.")
16 })
17}
18
19// EnqueueTask adds an UploadTask to the queue.
20func EnqueueTask(task UploadTask) {
21 if taskQueue == nil {
22 log.Fatal("ERROR: Task queue not initialized.")
23 }
24 taskQueue <- task
25 log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask retrieves an UploadTask from the queue, blocking if empty.
29func DequeueTask() UploadTask {
30 if taskQueue == nil {
31 log.Fatal("ERROR: Task queue not initialized.")
32 }
33 task := <-taskQueue
34 log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35 return task
36}
37
38// GetQueueLength returns current queue size.
39func GetQueueLength() int {
40 if taskQueue == nil {
41 return 0
42 }
43 return len(taskQueue)
44}
Den medfølgende opgavekø er simpelt implementeret. Denne opgavekø har en simpel struktur, der trækker opgaver fra en kanal, som er i køen.
Nedenfor er worker-metoden til at pushe filer til containeren efter upload. Den nuværende metode er en uendelig løkke for god respons og nem implementering, men algoritmer kan tilføjes efter behov.
1func StartFilePushWorker() {
2 for {
3 task := DequeueTask()
4 log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
5
6 // Defer cleanup of the temporary file
7 defer func(filePath string) {
8 if err := os.Remove(filePath); err != nil {
9 log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10 } else {
11 log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12 }
13 }(task.HostTempFilePath)
14
15 // Process task with retries for transient Incus errors
16 for i := 0; i <= MaxRetries; i++ {
17 err := processUploadTask(task) //separate upload task
18 if err == nil {
19 log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20 break
21 }
22
23 isTransient := true
24 if err.Error() == "incus: container not found" { // Example permanent error
25 isTransient = false
26 }
27
28 if isTransient && i < MaxRetries {
29 log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30 task.ContainerName, i+1, MaxRetries, err)
31 time.Sleep(RetryDelay)
32 } else {
33 log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34 task.ContainerName, i+1, err)
35 break
36 }
37 }
38 }
39}
Først vil denne funktion gentagne gange forsøge at hente en opgave fra køen. Derefter vil den inden for genforsøgsintervallet forsøge at uploade fra den midlertidige fil til containeren i stedet for fra stream til den midlertidige fil.
Fordele
Fordelen ved denne behandlingsmetode er, at hvis stream-uploadet er normalt, kan forsinkelsen for de efterfølgende behandlede opgaver reduceres, og udtømning af ressourcer på grund af samtidige containeropgaver kan forhindres. Som det fremgår af den nuværende kode, er antallet af samtidige containeropgaver begrænset af antallet af kanaler. Vi har undersøgt et praktisk eksempel på, hvordan Go's parallelle behandling kan bruges på denne måde. Hvis du ønsker at se flere eksempler, bedes du besøge nedenstående links.Modul inklusive eksempler Projekt, der anvender eksemplerne Selve projektet indeholder mange yderligere komponenter, så for at lære om selve worker'en behøver du kun kort at se, hvordan worker init-funktionen kaldes i main.go. Modulet indeholder også andre typer af workers, så tag venligst det i betragtning.