Skrátenie času odozvy koncového bodu v Go – využitie frontu úloh
Prehľad
Pri prvom učení sa Go sa často stáva, že sa učíte implementáciou backend servera. V tomto prípade si vezmime ako príklad vytvorenie príkladu, ktorý nahráva dátový prúd súborov zo služby RestAPI na server. Server Go net/http v zásade spracováva viacero požiadaviek súčasne, takže s paralelným nahrávaním nie je problém. Ak by sa však všetky operácie po prijatí dátového prúdu spracovávali synchrónne, odpoveď koncového bodu by sa oneskorila. Poďme sa pozrieť na techniku na prevenciu takejto situácie.
Príčina
Prijímanie dátového prúdu je zvyčajne časovo náročné a v prípade veľkých súborov môže spracovanie jednej požiadavky trvať niekoľko minút. V takom prípade je dôležité čo najrýchlejšie spracovať operácie po prijatí. Tento príklad scenára zahŕňa prijatie dátového prúdu, jeho uloženie do dočasného súboru a následné vloženie do kontajnera. Ak by sa časť vkladania dočasného súboru do kontajnera spracovala pomocou Worker Pool, mohlo by sa skrátiť oneskorenie odozvy.
1package file_upload
2
3import (
4 "fmt"
5 "io"
6 "log"
7 "net/http"
8 "os"
9 "path/filepath"
10 "time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host temporary directory
14
15// UploadTask holds data for asynchronous file push.
16type UploadTask struct {
17 HostTempFilePath string
18 ContainerName string
19 HostFilename string
20 ContainerDestinationPath string
21}
22
23// UploadHandler processes file uploads. Saves to temp file, then queues for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25 if req.Method != http.MethodPost {
26 http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27 return
28 }
29 originalFilePath := req.Header.Get("X-File-Path")
30 originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31 containerName := req.Header.Get("X-Container-Name")
32 if originalFilePath == "" || containerName == "" {
33 http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34 return
35 }
36
37 cleanContainerDestPath := filepath.Clean(originalFilePath)
38 if !filepath.IsAbs(cleanContainerDestPath) {
39 http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40 return
41 }
42
43 // Create unique temporary file path on host
44 tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45 hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47 if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48 log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49 http.Error(wr, "Server error.", http.StatusInternalServerError)
50 return
51 }
52
53 // Create and copy request body to temporary file (synchronous)
54 outFile, err := os.Create(hostTempFilePath)
55 if err != nil {
56 log.Printf("ERROR: Failed to create temporary file: %v", err)
57 http.Error(wr, "Server error.", http.StatusInternalServerError)
58 return
59 }
60 defer outFile.Close()
61
62 bytesWritten, err := io.Copy(outFile, req.Body)
63 if err != nil {
64 outFile.Close()
65 os.Remove(hostTempFilePath)
66 log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67 http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68 return
69 }
70 log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72 // Enqueue task for asynchronous Incus push
73 task := UploadTask{
74 HostTempFilePath: hostTempFilePath,
75 ContainerName: containerName,
76 HostFilename : originalFilename,
77 ContainerDestinationPath: cleanContainerDestPath,
78 }
79 EnqueueTask(task) //TÁTO ČASŤ
80 log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82 // Send immediate 202 Accepted response
83 wr.WriteHeader(http.StatusAccepted)
84 fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}
V časti označenej THIS PART ste si určite všimli, že úloha sa vkladá do frontu.
Teraz sa pozrime, ako funguje front úloh.
1package file_upload
2
3import (
4 "log"
5 "sync"
6)
7
8var taskQueue chan UploadTask
9var once sync.Once
10
11// InitWorkQueue initializes the in-memory task queue.
12func InitWorkQueue() {
13 once.Do(func() {
14 taskQueue = make(chan UploadTask, 100)
15 log.Println("Upload Info: Work queue initialized.")
16 })
17}
18
19// EnqueueTask adds an UploadTask to the queue.
20func EnqueueTask(task UploadTask) {
21 if taskQueue == nil {
22 log.Fatal("ERROR: Task queue not initialized.")
23 }
24 taskQueue <- task
25 log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask retrieves an UploadTask from the queue, blocking if empty.
29func DequeueTask() UploadTask {
30 if taskQueue == nil {
31 log.Fatal("ERROR: Task queue not initialized.")
32 }
33 task := <-taskQueue
34 log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35 return task
36}
37
38// GetQueueLength returns current queue size.
39func GetQueueLength() int {
40 if taskQueue == nil {
41 return 0
42 }
43 return len(taskQueue)
44}
Príklad frontu úloh je implementovaný jednoducho. Tento front úloh má jednoduchú štruktúru, ktorá odoberá úlohy z kanála, ktoré sú vo fronte.
Nižšie je uvedená metóda Worker na vloženie súboru do kontajnera po nahraní. Súčasná metóda je nekonečná slučka pre dobrú odozvu a jednoduchú implementáciu, ale je možné pridať algoritmy podľa potreby.
1func StartFilePushWorker() {
2 for {
3 task := DequeueTask()
4 log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
5
6 // Defer cleanup of the temporary file
7 defer func(filePath string) {
8 if err := os.Remove(filePath); err != nil {
9 log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10 } else {
11 log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12 }
13 }(task.HostTempFilePath)
14
15 // Process task with retries for transient Incus errors
16 for i := 0; i <= MaxRetries; i++ {
17 err := processUploadTask(task) //separate upload task
18 if err == nil {
19 log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20 break
21 }
22
23 isTransient := true
24 if err.Error() == "incus: container not found" { // Example permanent error
25 isTransient = false
26 }
27
28 if isTransient && i < MaxRetries {
29 log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30 task.ContainerName, i+1, MaxRetries, err)
31 time.Sleep(RetryDelay)
32 } else {
33 log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34 task.ContainerName, i+1, err)
35 break
36 }
37 }
38 }
39}
Najprv sa táto funkcia nepretržite pokúša získať úlohy z frontu. Následne sa v rámci rozsahu opakovaných pokusov pokúsi o krok nahrávania z dočasného súboru do kontajnera, a nie z dátového prúdu do dočasného súboru.
Výhody
Výhodou tohto spôsobu spracovania je, že ak je nahrávanie dátového prúdu úspešné, môže sa skrátiť čas oneskorenia pre následné operácie a môže sa zabrániť vyčerpaniu zdrojov v dôsledku súbežných operácií s kontajnermi. Ako je vidieť v súčasnom kóde, počet súbežných operácií s kontajnermi je obmedzený počtom kanálov. Takto sme preskúmali praktický príklad použitia súbežného spracovania v Go. Ak chcete vidieť ďalšie príklady, navštívte nasledujúce odkazy.Modul obsahujúci príklady Projekt využívajúci príklady Samotný projekt obsahuje mnoho vedľajších komponentov, takže pre štúdium samotného Worker stačí stručne si pozrieť, ako funkcia init Worker volá v main.go. Modul obsahuje aj iné typy Worker, takže sa na ne pozrite.