Skracanie czasu odpowiedzi endpointów w Go – Wykorzystanie kolejek zadań
개요
Kiedy po raz pierwszy uczymy się Go, często zdarza się, że uczymy się poprzez implementację serwera backendowego. Rozważmy przykład tworzenia przykładu, w którym odbieramy strumień plików z RestAPI i przesyłamy go na serwer. Serwer Go net/http domyślnie obsługuje wiele żądań jednocześnie, więc sama jednoczesna wysyłka nie stanowi problemu. Jednakże, jeśli wszystkie operacje po odebraniu strumienia są przetwarzane synchronicznie, odpowiedź endpointu zostanie opóźniona. Przyjrzyjmy się technice zapobiegania takim sytuacjom.
원인
Odbieranie strumienia zazwyczaj zajmuje dużo czasu, a w przypadku dużych plików pojedyncze żądanie może być przetwarzane przez kilka minut. W takiej sytuacji ważne jest, aby jak najszybciej przetworzyć operacje po odebraniu. Ten scenariusz przykładu dotyczy odbierania strumienia, zapisywania go jako pliku tymczasowego, a następnie przesyłania do kontenera. W tym momencie, jeśli część odpowiedzialną za przesyłanie pliku tymczasowego do kontenera obsłużymy za pomocą puli workerów, możemy skrócić opóźnienie odpowiedzi.
1package file_upload
2
3import (
4 "fmt"
5 "io"
6 "log"
7 "net/http"
8 "os"
9 "path/filepath"
10 "time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host temporary directory
14
15// UploadTask holds data for asynchronous file push.
16type UploadTask struct {
17 HostTempFilePath string
18 ContainerName string
19 HostFilename string
20 ContainerDestinationPath string
21}
22
23// UploadHandler processes file uploads. Saves to temp file, then queues for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25 if req.Method != http.MethodPost {
26 http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27 return
28 }
29 originalFilePath := req.Header.Get("X-File-Path")
30 originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31 containerName := req.Header.Get("X-Container-Name")
32 if originalFilePath == "" || containerName == "" {
33 http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34 return
35 }
36
37 cleanContainerDestPath := filepath.Clean(originalFilePath)
38 if !filepath.IsAbs(cleanContainerDestPath) {
39 http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40 return
41 }
42
43 // Create unique temporary file path on host
44 tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45 hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47 if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48 log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49 http.Error(wr, "Server error.", http.StatusInternalServerError)
50 return
51 }
52
53 // Create and copy request body to temporary file (synchronous)
54 outFile, err := os.Create(hostTempFilePath)
55 if err != nil {
56 log.Printf("ERROR: Failed to create temporary file: %v", err)
57 http.Error(wr, "Server error.", http.StatusInternalServerError)
58 return
59 }
60 defer outFile.Close()
61
62 bytesWritten, err := io.Copy(outFile, req.Body)
63 if err != nil {
64 outFile.Close()
65 os.Remove(hostTempFilePath)
66 log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67 http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68 return
69 }
70 log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72 // Enqueue task for asynchronous Incus push
73 task := UploadTask{
74 HostTempFilePath: hostTempFilePath,
75 ContainerName: containerName,
76 HostFilename : originalFilename,
77 ContainerDestinationPath: cleanContainerDestPath,
78 }
79 EnqueueTask(task) //THIS PART
80 log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82 // Send immediate 202 Accepted response
83 wr.WriteHeader(http.StatusAccepted)
84 fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}
W części oznaczonej jako THIS PART, zauważyli Państwo zapewne, że zadanie jest dodawane do kolejki.
Przyjrzyjmy się teraz, jak działa kolejka zadań.
1package file_upload
2
3import (
4 "log"
5 "sync"
6)
7
8var taskQueue chan UploadTask
9var once sync.Once
10
11// InitWorkQueue initializes the in-memory task queue.
12func InitWorkQueue() {
13 once.Do(func() {
14 taskQueue = make(chan UploadTask, 100)
15 log.Println("Upload Info: Work queue initialized.")
16 })
17}
18
19// EnqueueTask adds an UploadTask to the queue.
20func EnqueueTask(task UploadTask) {
21 if taskQueue == nil {
22 log.Fatal("ERROR: Task queue not initialized.")
23 }
24 taskQueue <- task
25 log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask retrieves an UploadTask from the queue, blocking if empty.
29func DequeueTask() UploadTask {
30 if taskQueue == nil {
31 log.Fatal("ERROR: Task queue not initialized.")
32 }
33 task := <-taskQueue
34 log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35 return task
36}
37
38// GetQueueLength returns current queue size.
39func GetQueueLength() int {
40 if taskQueue == nil {
41 return 0
42 }
43 return len(taskQueue)
44}
Przykładowa kolejka zadań jest zaimplementowana w prosty sposób. Ta kolejka zadań ma prostą strukturę, która polega na pobieraniu zadań z kanału, w którym są one umieszczone.
Poniżej znajduje się metoda worker dla przesyłania plików do kontenera po ich załadowaniu. Obecna metoda to pętla nieskończona, zapewniająca dobrą responsywność i łatwą implementację, ale można dodać algorytm w zależności od przeznaczenia.
1func StartFilePushWorker() {
2 for {
3 task := DequeueTask()
4 log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
5
6 // Defer cleanup of the temporary file
7 defer func(filePath string) {
8 if err := os.Remove(filePath); err != nil {
9 log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10 } else {
11 log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12 }
13 }(task.HostTempFilePath)
14
15 // Process task with retries for transient Incus errors
16 for i := 0; i <= MaxRetries; i++ {
17 err := processUploadTask(task) //separate upload task
18 if err == nil {
19 log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20 break
21 }
22
23 isTransient := true
24 if err.Error() == "incus: container not found" { // Example permanent error
25 isTransient = false
26 }
27
28 if isTransient && i < MaxRetries {
29 log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30 task.ContainerName, i+1, MaxRetries, err)
31 time.Sleep(RetryDelay)
32 } else {
33 log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34 task.ContainerName, i+1, err)
35 break
36 }
37 }
38 }
39}
Najpierw funkcja ta w sposób ciągły wykonuje pętlę, próbując pobrać zadania z kolejki. Następnie, w zakresie ponownych prób, podejmuje próbę etapu przesyłania z pliku tymczasowego do kontenera, a nie ze strumienia do pliku tymczasowego.
이점
Zaletą takiego podejścia jest to, że jeśli przesyłanie strumieniowe jest prawidłowe, można skrócić czas opóźnienia dla kolejnych operacji i zapobiec wyczerpaniu zasobów spowodowanemu jednoczesnymi operacjami na kontenerach. Jak widać w obecnym kodzie, liczba jednoczesnych operacji na kontenerach jest ograniczona liczbą kanałów. W ten sposób przeanalizowaliśmy praktyczny przykład wykorzystania równoległego przetwarzania w Go. Aby zobaczyć więcej przykładów, proszę odwiedzić poniższe linki.Moduł zawierający przykłady Projekt wykorzystujący przykłady Sam projekt zawiera wiele dodatkowych komponentów, więc w celu nauki o workerach wystarczy pokrótce zapoznać się z tym, jak funkcja init workera jest wywoływana w main.go. Moduł zawiera również inne typy workerów, więc proszę się z nimi zapoznać.