Намаляване на времето за отговор на крайна точка в Go – Използване на опашки за задачи
Общ преглед
При първоначалното изучаване на Go често се случва това да става чрез имплементиране на бекенд сървър. Нека разгледаме примера със създаване на пример, който получава файлови потоци от RestAPI и ги качва на сървъра. Сървърът net/http на езика Go по подразбиране обработва множество заявки едновременно, така че няма проблем със съвременното качване. Въпреки това, ако всички операции след получаване на потока се обработват синхронно, отговорът на крайната точка ще бъде забавен. Нека разгледаме техника за предотвратяване на подобни ситуации.
Причина
Приемането на поток обикновено отнема дълго време, а при големи файлове една заявка може да се обработва няколко минути. В такива случаи е важно операциите след получаване да се обработват възможно най-бързо. Този примерен сценарий включва получаване на поток, записването му във временен файл и след това изпращането му към контейнер. В този случай, ако частта, която изпраща временния файл към контейнера, се обработва от worker pool, забавянето на отговора може да бъде съкратено.
1package file_upload
2
3import (
4 "fmt"
5 "io"
6 "log"
7 "net/http"
8 "os"
9 "path/filepath"
10 "time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host temporary directory
14
15// UploadTask holds data for asynchronous file push.
16type UploadTask struct {
17 HostTempFilePath string
18 ContainerName string
19 HostFilename string
20 ContainerDestinationPath string
21}
22
23// UploadHandler processes file uploads. Saves to temp file, then queues for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25 if req.Method != http.MethodPost {
26 http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27 return
28 }
29 originalFilePath := req.Header.Get("X-File-Path")
30 originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31 containerName := req.Header.Get("X-Container-Name")
32 if originalFilePath == "" || containerName == "" {
33 http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34 return
35 }
36
37 cleanContainerDestPath := filepath.Clean(originalFilePath)
38 if !filepath.IsAbs(cleanContainerDestPath) {
39 http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40 return
41 }
42
43 // Създаване на уникален временен път към файла на хоста
44 tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45 hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47 if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48 log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49 http.Error(wr, "Server error.", http.StatusInternalServerError)
50 return
51 }
52
53 // Създаване и копиране на тялото на заявката във временен файл (синхронно)
54 outFile, err := os.Create(hostTempFilePath)
55 if err != nil {
56 log.Printf("ERROR: Failed to create temporary file: %v", err)
57 http.Error(wr, "Server error.", http.StatusInternalServerError)
58 return
59 }
60 defer outFile.Close()
61
62 bytesWritten, err := io.Copy(outFile, req.Body)
63 if err != nil {
64 outFile.Close()
65 os.Remove(hostTempFilePath)
66 log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67 http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68 return
69 }
70 log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72 // Поставяне на задача в опашка за асинхронно Incus изпращане
73 task := UploadTask{
74 HostTempFilePath: hostTempFilePath,
75 ContainerName: containerName,
76 HostFilename : originalFilename,
77 ContainerDestinationPath: cleanContainerDestPath,
78 }
79 EnqueueTask(task) //ТАЗИ ЧАСТ
80 log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82 // Изпращане на незабавен 202 Accepted отговор
83 wr.WriteHeader(http.StatusAccepted)
84 fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}
Може би сте забелязали, че частта, обозначена като THIS PART, показва, че задачата се вмъква в опашката.
Сега нека видим как работи опашката за задачи.
1package file_upload
2
3import (
4 "log"
5 "sync"
6)
7
8var taskQueue chan UploadTask
9var once sync.Once
10
11// InitWorkQueue инициализира опашката от задачи в паметта.
12func InitWorkQueue() {
13 once.Do(func() {
14 taskQueue = make(chan UploadTask, 100)
15 log.Println("Upload Info: Work queue initialized.")
16 })
17}
18
19// EnqueueTask добавя UploadTask към опашката.
20func EnqueueTask(task UploadTask) {
21 if taskQueue == nil {
22 log.Fatal("ERROR: Task queue not initialized.")
23 }
24 taskQueue <- task
25 log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask извлича UploadTask от опашката, блокирайки, ако е празна.
29func DequeueTask() UploadTask {
30 if taskQueue == nil {
31 log.Fatal("ERROR: Task queue not initialized.")
32 }
33 task := <-taskQueue
34 log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35 return task
36}
37
38// GetQueueLength връща текущия размер на опашката.
39func GetQueueLength() int {
40 if taskQueue == nil {
41 return 0
42 }
43 return len(taskQueue)
44}
Примерната опашка за задачи е имплементирана просто. Тази опашка за задачи има проста структура, която извлича задачите, намиращи се в опашката, от канала.
По-долу е даден методът worker за изпращане на файл към контейнер след качване. Настоящият метод е безкраен цикъл за добра отзивчивост и лесна имплементация, но може да се добавят алгоритми в зависимост от целта.
1func StartFilePushWorker() {
2 for {
3 task := DequeueTask()
4 log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
5
6 // Отлагане на почистването на временния файл
7 defer func(filePath string) {
8 if err := os.Remove(filePath); err != nil {
9 log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10 } else {
11 log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12 }
13 }(task.HostTempFilePath)
14
15 // Обработка на задачата с повторни опити за временни Incus грешки
16 for i := 0; i <= MaxRetries; i++ {
17 err := processUploadTask(task) //отделна задача за качване
18 if err == nil {
19 log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20 break
21 }
22
23 isTransient := true
24 if err.Error() == "incus: container not found" { // Пример за постоянна грешка
25 isTransient = false
26 }
27
28 if isTransient && i < MaxRetries {
29 log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30 task.ContainerName, i+1, MaxRetries, err)
31 time.Sleep(RetryDelay)
32 } else {
33 log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34 task.ContainerName, i+1, err)
35 break
36 }
37 }
38 }
39}
Първо, тази функция постоянно повтаря цикъл, опитвайки се да получи задачи от опашката. След това, в рамките на обхвата на повторните опити, тя се опитва да изпълни етапа на качване от временен файл към контейнер, а не от поток към временен файл.
Предимства
Предимството на този метод на обработка е, че ако качването на потока е успешно, времето за забавяне на последващите операции може да бъде намалено и изчерпването на ресурси, причинено от едновременни операции с контейнери, може да бъде предотвратено. Както се вижда в настоящия код, едновременно изпълняваните операции с контейнери са ограничени от броя на каналите. Разгледахме пример за практическо използване на паралелната обработка на Go. Ако желаете да видите повече примери, моля, посетете следните връзки.Модул, включващ примери Проект, използващ примери Самият проект съдържа много спомагателни компоненти, така че за да научите за самия worker, е достатъчно да прегледате накратко как се извиква функцията worker init в main.go. Модулът включва и други видове workers, така че, моля, имайте предвид това.