Zkrácení doby odezvy koncového bodu v Go – využití fronty úloh
Přehled
Při prvním učení Go se často učí implementací backend serveru. Vezměme si jako příklad scénář, kdy je vytvořen příklad pro příjem souborových streamů z RestAPI atd. a jejich nahrávání na server. Server Go net/http standardně zpracovává více požadavků současně, takže s paralelním nahráváním jako takovým není problém. Nicméně, pokud jsou všechny operace po přijetí streamu zpracovávány synchronně, dochází ke zpoždění odezvy koncového bodu. Pojďme prozkoumat techniku, jak takové situaci předejít.
Příčina
Přijímání streamů obvykle trvá dlouho a u velkých souborů může jediný požadavek trvat několik minut. V takovém případě je důležité co nejrychleji zpracovat operace po přijetí. Tento příklad scénáře spočívá v přijetí streamu, jeho uložení jako dočasného souboru a následném vložení do kontejneru. V tomto případě, pokud je část pro vložení dočasného souboru do kontejneru zpracována pomocí worker poolu, lze zkrátit zpoždění odezvy.
1package file_upload
2
3import (
4 "fmt"
5 "io"
6 "log"
7 "net/http"
8 "os"
9 "path/filepath"
10 "time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host temporary directory
14
15// UploadTask holds data for asynchronous file push.
16type UploadTask struct {
17 HostTempFilePath string
18 ContainerName string
19 HostFilename string
20 ContainerDestinationPath string
21}
22
23// UploadHandler processes file uploads. Saves to temp file, then queues for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25 if req.Method != http.MethodPost {
26 http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27 return
28 }
29 originalFilePath := req.Header.Get("X-File-Path")
30 originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31 containerName := req.Header.Get("X-Container-Name")
32 if originalFilePath == "" || containerName == "" {
33 http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34 return
35 }
36
37 cleanContainerDestPath := filepath.Clean(originalFilePath)
38 if !filepath.IsAbs(cleanContainerDestPath) {
39 http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40 return
41 }
42
43 // Create unique temporary file path on host
44 tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45 hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47 if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48 log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49 http.Error(wr, "Server error.", http.StatusInternalServerError)
50 return
51 }
52
53 // Create and copy request body to temporary file (synchronous)
54 outFile, err := os.Create(hostTempFilePath)
55 if err != nil {
56 log.Printf("ERROR: Failed to create temporary file: %v", err)
57 http.Error(wr, "Server error.", http.StatusInternalServerError)
58 return
59 }
60 defer outFile.Close()
61
62 bytesWritten, err := io.Copy(outFile, req.Body)
63 if err != nil {
64 outFile.Close()
65 os.Remove(hostTempFilePath)
66 log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67 http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68 return
69 }
70 log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72 // Enqueue task for asynchronous Incus push
73 task := UploadTask{
74 HostTempFilePath: hostTempFilePath,
75 ContainerName: containerName,
76 HostFilename : originalFilename,
77 ContainerDestinationPath: cleanContainerDestPath,
78 }
79 EnqueueTask(task) //TATO ČÁST
80 log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82 // Send immediate 202 Accepted response
83 wr.WriteHeader(http.StatusAccepted)
84 fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}
Všimli jste si, že část označená jako THIS PART (TATO ČÁST) znamená, že se úloha vkládá do fronty.
Nyní se podívejme, jak fronta úloh funguje.
1package file_upload
2
3import (
4 "log"
5 "sync"
6)
7
8var taskQueue chan UploadTask
9var once sync.Once
10
11// InitWorkQueue inicializuje frontu úloh v paměti.
12func InitWorkQueue() {
13 once.Do(func() {
14 taskQueue = make(chan UploadTask, 100)
15 log.Println("Upload Info: Work queue initialized.")
16 })
17}
18
19// EnqueueTask přidá UploadTask do fronty.
20func EnqueueTask(task UploadTask) {
21 if taskQueue == nil {
22 log.Fatal("ERROR: Task queue not initialized.")
23 }
24 taskQueue <- task
25 log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask načte UploadTask z fronty, blokuje, pokud je prázdná.
29func DequeueTask() UploadTask {
30 if taskQueue == nil {
31 log.Fatal("ERROR: Task queue not initialized.")
32 }
33 task := <-taskQueue
34 log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35 return task
36}
37
38// GetQueueLength vrátí aktuální velikost fronty.
39func GetQueueLength() int {
40 if taskQueue == nil {
41 return 0
42 }
43 return len(taskQueue)
44}
Příklad fronty úloh je jednoduše implementován. Tato fronta úloh má jednoduchou strukturu, která z kanálu vybírá úlohy, které jsou ve frontě.
Níže je metoda worker pro nahrání souboru do kontejneru po nahrání. Současná metoda je nekonečná smyčka pro dobrou odezvu a snadnou implementaci, ale je možné přidat algoritmy podle potřeby.
1func StartFilePushWorker() {
2 for {
3 task := DequeueTask()
4 log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
5
6 // Odloží vyčištění dočasného souboru
7 defer func(filePath string) {
8 if err := os.Remove(filePath); err != nil {
9 log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10 } else {
11 log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12 }
13 }(task.HostTempFilePath)
14
15 // Zpracuje úlohu s opakovanými pokusy pro přechodné chyby Incus
16 for i := 0; i <= MaxRetries; i++ {
17 err := processUploadTask(task) //samostatná úloha nahrávání
18 if err == nil {
19 log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20 break
21 }
22
23 isTransient := true
24 if err.Error() == "incus: container not found" { // Příklad trvalé chyby
25 isTransient = false
26 }
27
28 if isTransient && i < MaxRetries {
29 log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30 task.ContainerName, i+1, MaxRetries, err)
31 time.Sleep(RetryDelay)
32 } else {
33 log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34 task.ContainerName, i+1, err)
35 break
36 }
37 }
38 }
39}
Za prvé, tato funkce nepřetržitě cykluje a pokouší se získat úlohu z fronty. Poté se v rámci rozsahu opakovaných pokusů pokusí o fázi nahrávání z dočasného souboru do kontejneru, nikoli ze streamu do dočasného souboru.
Výhody
Výhodou tohoto způsobu zpracování je, že pokud je nahrávání streamu v pořádku, lze zkrátit dobu zpoždění pro následné zpracovávané úlohy a zabránit vyčerpání zdrojů způsobenému souběžnými operacemi s kontejnery. Jak je vidět v aktuálním kódu, počet souběžných operací s kontejnery je omezen počtem kanálů. Tímto jsme prozkoumali praktický příklad využití paralelního zpracování v Go. Pokud byste chtěli vidět více příkladů, navštivte následující odkazy.Modul včetně příkladů Projekt využívající příklady Samotný projekt obsahuje mnoho vedlejších komponent, takže pro pochopení workeru stačí stručně zhlédnout, jak se v main.go volá inicializační funkce workeru. Modul také obsahuje jiné typy workerů, takže se na ně můžete podívat.