Reducerea timpului de răspuns al endpoint-ului în Go - Utilizarea cozilor de lucru
Prezentare generală
Atunci când se învață Go pentru prima dată, este adesea cazul ca învățarea să se facă prin implementarea unui server backend. Să luăm ca exemplu crearea unui exemplu care primește un stream de fișiere de la un RestAPI și îl încarcă pe server. Serverul net/http din Go procesează în mod implicit mai multe solicitări simultan, astfel încât nu există nicio problemă cu încărcarea concurentă în sine. Cu toate acestea, dacă toate operațiunile după primirea streamului sunt procesate sincron, răspunsul endpoint-ului va fi întârziat. Să examinăm o tehnică pentru a preveni o astfel de situație.
Cauză
Recepția unui stream necesită, în general, un timp îndelungat, iar în cazul fișierelor mari, o singură solicitare poate fi procesată timp de câteva minute. Într-un astfel de caz, este important să se proceseze operațiunile ulterioare recepției cât mai rapid posibil. Acest scenariu exemplu implică recepționarea unui stream, salvarea acestuia ca fișier temporar, apoi împingerea acestuia într-un container. În acest moment, dacă partea de împingere a fișierului temporar în container este gestionată de un worker pool, întârzierea răspunsului poate fi redusă.
1package file_upload
2
3import (
4 "fmt"
5 "io"
6 "log"
7 "net/http"
8 "os"
9 "path/filepath"
10 "time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Directorul temporar al gazdei
14
15// UploadTask reține datele pentru împingerea asincronă a fișierului.
16type UploadTask struct {
17 HostTempFilePath string
18 ContainerName string
19 HostFilename string
20 ContainerDestinationPath string
21}
22
23// UploadHandler procesează încărcările de fișiere. Salvează în fișier temporar, apoi pune în coadă pentru împingere Incus.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25 if req.Method != http.MethodPost {
26 http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27 return
28 }
29 originalFilePath := req.Header.Get("X-File-Path")
30 originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31 containerName := req.Header.Get("X-Container-Name")
32 if originalFilePath == "" || containerName == "" {
33 http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34 return
35 }
36
37 cleanContainerDestPath := filepath.Clean(originalFilePath)
38 if !filepath.IsAbs(cleanContainerDestPath) {
39 http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40 return
41 }
42
43 // Creează o cale unică pentru fișierul temporar pe gazdă
44 tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45 hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47 if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48 log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49 http.Error(wr, "Server error.", http.StatusInternalServerError)
50 return
51 }
52
53 // Creează și copiază corpul cererii în fișierul temporar (sincron)
54 outFile, err := os.Create(hostTempFilePath)
55 if err != nil {
56 log.Printf("ERROR: Failed to create temporary file: %v", err)
57 http.Error(wr, "Server error.", http.StatusInternalServerError)
58 return
59 }
60 defer outFile.Close()
61
62 bytesWritten, err := io.Copy(outFile, req.Body)
63 if err != nil {
64 outFile.Close()
65 os.Remove(hostTempFilePath)
66 log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67 http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68 return
69 }
70 log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72 // Pune în coadă sarcina pentru împingerea asincronă Incus
73 task := UploadTask{
74 HostTempFilePath: hostTempFilePath,
75 ContainerName: containerName,
76 HostFilename : originalFilename,
77 ContainerDestinationPath: cleanContainerDestPath,
78 }
79 EnqueueTask(task) //ACEASTĂ PARTE
80 log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82 // Trimite un răspuns imediat 202 Accepted
83 wr.WriteHeader(http.StatusAccepted)
84 fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}
Ați observat, probabil, că partea marcată cu THIS PART (ACEASTĂ PARTE) indică inserarea unei sarcini în coadă.
Să vedem acum cum funcționează coada de sarcini.
1package file_upload
2
3import (
4 "log"
5 "sync"
6)
7
8var taskQueue chan UploadTask
9var once sync.Once
10
11// InitWorkQueue inițializează coada de sarcini în memorie.
12func InitWorkQueue() {
13 once.Do(func() {
14 taskQueue = make(chan UploadTask, 100)
15 log.Println("Upload Info: Work queue initialized.")
16 })
17}
18
19// EnqueueTask adaugă o UploadTask în coadă.
20func EnqueueTask(task UploadTask) {
21 if taskQueue == nil {
22 log.Fatal("ERROR: Task queue not initialized.")
23 }
24 taskQueue <- task
25 log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask recuperează o UploadTask din coadă, blocând dacă este goală.
29func DequeueTask() UploadTask {
30 if taskQueue == nil {
31 log.Fatal("ERROR: Task queue not initialized.")
32 }
33 task := <-taskQueue
34 log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35 return task
36}
37
38// GetQueueLength returnează dimensiunea curentă a cozii.
39func GetQueueLength() int {
40 if taskQueue == nil {
41 return 0
42 }
43 return len(taskQueue)
44}
Coada de sarcini furnizată ca exemplu este implementată simplu. Această coadă de sarcini are o structură simplă care extrage sarcinile din canal.
Mai jos este metoda worker pentru împingerea fișierelor către container după încărcare. Metoda curentă este o buclă infinită pentru o reactivitate bună și o implementare ușoară, dar se poate adăuga un algoritm în funcție de scop.
1func StartFilePushWorker() {
2 for {
3 task := DequeueTask()
4 log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
5
6 // Amână curățarea fișierului temporar
7 defer func(filePath string) {
8 if err := os.Remove(filePath); err != nil {
9 log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10 } else {
11 log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12 }
13 }(task.HostTempFilePath)
14
15 // Procesează sarcina cu reîncercări pentru erorile tranzitorii Incus
16 for i := 0; i <= MaxRetries; i++ {
17 err := processUploadTask(task) //sarcină de încărcare separată
18 if err == nil {
19 log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20 break
21 }
22
23 isTransient := true
24 if err.Error() == "incus: container not found" { // Exemplu de eroare permanentă
25 isTransient = false
26 }
27
28 if isTransient && i < MaxRetries {
29 log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30 task.ContainerName, i+1, MaxRetries, err)
31 time.Sleep(RetryDelay)
32 } else {
33 log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34 task.ContainerName, i+1, err)
35 break
36 }
37 }
38 }
39}
În primul rând, această funcție rulează continuu într-o buclă pentru a încerca să primească sarcini din coadă. Apoi, în intervalul de reîncercări, încearcă etapa de încărcare de la fișierul temporar la container, și nu de la stream la fișierul temporar.
Avantaje
Avantajul acestei metode de procesare este că, dacă doar încărcarea streamului este normală, timpul de întârziere pentru operațiunile ulterioare poate fi redus, iar epuizarea resurselor cauzată de operațiuni concurente ale containerelor poate fi prevenită. Așa cum se poate observa din codul curent, operațiunile concurente ale containerelor sunt limitate de numărul de canale. Astfel, am examinat un exemplu practic de utilizare a procesării paralele în Go. Dacă doriți să vedeți mai multe exemple, vă rugăm să vizitați linkurile de mai jos.Module care includ exemple Proiect care utilizează exemple Proiectul în sine conține multe componente auxiliare, așa că, pentru a învăța despre worker-ul în sine, este suficient să examinați pe scurt cum este apelată funcția de inițializare a worker-ului în main.go. Modulul include, de asemenea, alte tipuri de worker, așa că vă rugăm să le consultați.