Намаляване на времето за отговор на крайна точка в Go – Използване на опашки за задачи
Общ преглед
Когато започвате да изучавате Go, често се случва ученето да е свързано с имплементиране на бекенд сървър. Нека разгледаме като пример създаването на пример, който получава файлов поток от REST API и го качва на сървъра. Go езикът net/http сървърът по подразбиране обработва множество заявки едновременно, така че едновременното качване само по себе си не е проблем. Въпреки това, ако всички операции след получаване на потока се обработват синхронно, отговорът на крайната точка ще бъде забавен. Нека разгледаме техника за предотвратяване на тази ситуация.
Причина
Получаването на поток обикновено отнема дълго време, а при големи файлове една заявка може да се обработва няколко минути. В такива случаи е важно операциите след получаване да се обработват бързо. Този примерен сценарий е за получаване на поток, записването му във временен файл и избутването му в контейнер. В този случай, ако частта за избутване на временния файл в контейнера се обработва от Worker Pool, забавянето на отговора може да бъде съкратено.
1package file_upload
2
3import (
4 "fmt"
5 "io"
6 "log"
7 "net/http"
8 "os"
9 "path/filepath"
10 "time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Временна директория на хоста
14
15// UploadTask съдържа данни за асинхронно избутване на файл.
16type UploadTask struct {
17 HostTempFilePath string
18 ContainerName string
19 HostFilename string
20 ContainerDestinationPath string
21}
22
23// UploadHandler обработва качването на файлове. Записва във временен файл, след което поставя в опашка за избутване към Incus.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25 if req.Method != http.MethodPost {
26 http.Error(wr, "Изисква се POST метод.", http.StatusMethodNotAllowed)
27 return
28 }
29 originalFilePath := req.Header.Get("X-File-Path")
30 originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31 containerName := req.Header.Get("X-Container-Name")
32 if originalFilePath == "" || containerName == "" {
33 http.Error(wr, "Липсва хедър X-File-Path или X-Container-Name.", http.StatusBadRequest)
34 return
35 }
36
37 cleanContainerDestPath := filepath.Clean(originalFilePath)
38 if !filepath.IsAbs(cleanContainerDestPath) {
39 http.Error(wr, "Пътят до файла трябва да е абсолютен.", http.StatusBadRequest)
40 return
41 }
42
43 // Създаване на уникален временен път за файл на хоста
44 tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45 hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47 if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48 log.Printf("ГРЕШКА: Неуспешно създаване на временна директория за качване: %v", err)
49 http.Error(wr, "Грешка на сървъра.", http.StatusInternalServerError)
50 return
51 }
52
53 // Създаване и копиране на тялото на заявката във временен файл (синхронно)
54 outFile, err := os.Create(hostTempFilePath)
55 if err != nil {
56 log.Printf("ГРЕШКА: Неуспешно създаване на временен файл: %v", err)
57 http.Error(wr, "Грешка на сървъра.", http.StatusInternalServerError)
58 return
59 }
60 defer outFile.Close()
61
62 bytesWritten, err := io.Copy(outFile, req.Body)
63 if err != nil {
64 outFile.Close()
65 os.Remove(hostTempFilePath)
66 log.Printf("ГРЕШКА: Неуспешно копиране на тялото на заявката във временен файл: %v", err)
67 http.Error(wr, "Неуспешен трансфер на файл.", http.StatusInternalServerError)
68 return
69 }
70 log.Printf("Информация за качване: Получени %d байта, запазени в %s.", bytesWritten, hostTempFilePath)
71
72 // Поставяне на задача в опашка за асинхронно Incus избутване
73 task := UploadTask{
74 HostTempFilePath: hostTempFilePath,
75 ContainerName: containerName,
76 HostFilename : originalFilename,
77 ContainerDestinationPath: cleanContainerDestPath,
78 }
79 EnqueueTask(task) //ТАЗИ ЧАСТ
80 log.Printf("Информация за качване: Задачата е поставена в опашка за %s към %s.", originalFilePath, containerName)
81
82 // Изпращане на незабавен 202 Accepted отговор
83 wr.WriteHeader(http.StatusAccepted)
84 fmt.Fprintf(wr, "Файл '%s' е поставен в опашка за обработка в контейнер '%s'.\n", originalFilename, containerName)
85}
Както можете да забележите, частта, обозначена като THIS PART, показва, че задачата се поставя в опашка.
Сега нека видим как работи опашката за задачи.
1package file_upload
2
3import (
4 "log"
5 "sync"
6)
7
8var taskQueue chan UploadTask
9var once sync.Once
10
11// InitWorkQueue инициализира опашката за задачи в паметта.
12func InitWorkQueue() {
13 once.Do(func() {
14 taskQueue = make(chan UploadTask, 100)
15 log.Println("Информация за качване: Работната опашка е инициализирана.")
16 })
17}
18
19// EnqueueTask добавя UploadTask към опашката.
20func EnqueueTask(task UploadTask) {
21 if taskQueue == nil {
22 log.Fatal("ГРЕШКА: Опашката за задачи не е инициализирана.")
23 }
24 taskQueue <- task
25 log.Printf("Информация за качване: Опашка: Задачата е поставена в опашка. Размер: %d", len(taskQueue))
26}
27
28// DequeueTask извлича UploadTask от опашката, блокирайки, ако е празна.
29func DequeueTask() UploadTask {
30 if taskQueue == nil {
31 log.Fatal("ГРЕШКА: Опашката за задачи не е инициализирана.")
32 }
33 task := <-taskQueue
34 log.Printf("Информация за качване: Опашка: Задачата е извадена от опашката. Размер: %d", len(taskQueue))
35 return task
36}
37
38// GetQueueLength връща текущия размер на опашката.
39func GetQueueLength() int {
40 if taskQueue == nil {
41 return 0
42 }
43 return len(taskQueue)
44}
Предоставената примерна опашка за задачи е имплементирана просто. Тази опашка за задачи има проста структура, която извлича задачи от канала.
По-долу е методът на работника за избутване на файлове в контейнер след качване. Понастоящем методът е безкраен цикъл за добра реакция и лесна имплементация, но може да се добави алгоритъм в зависимост от предназначението.
1func StartFilePushWorker() {
2 for {
3 task := DequeueTask()
4 log.Printf("Информация за качване: Работникът обработва задача за %s от %s.", task.ContainerName, task.HostFilename)
5
6 // Отлагане на почистването на временния файл
7 defer func(filePath string) {
8 if err := os.Remove(filePath); err != nil {
9 log.Printf("ГРЕШКА: Работник: Неуспешно премахване на временен файл '%s': %v", filePath, err)
10 } else {
11 log.Printf("Информация за качване: Работник: Временният файл е почистен: %s", filePath)
12 }
13 }(task.HostTempFilePath)
14
15 // Обработка на задачата с повторни опити за временни Incus грешки
16 for i := 0; i <= MaxRetries; i++ {
17 err := processUploadTask(task) //отделна задача за качване
18 if err == nil {
19 log.Printf("УСПЕХ: Работник: Задачата е завършена за %s.", task.ContainerName)
20 break
21 }
22
23 isTransient := true
24 if err.Error() == "incus: container not found" { // Пример за постоянна грешка
25 isTransient = false
26 }
27
28 if isTransient && i < MaxRetries {
29 log.Printf("ПРЕДУПРЕЖДЕНИЕ: Работник: Задачата не успя за %s (опит %d/%d): %v. Повторен опит.",
30 task.ContainerName, i+1, MaxRetries, err)
31 time.Sleep(RetryDelay)
32 } else {
33 log.Printf("ГРЕШКА: Работник: Задачата не успя окончателно за %s след %d опита: %v.",
34 task.ContainerName, i+1, err)
35 break
36 }
37 }
38 }
39}
Първо, тази функция постоянно се върти в цикъл и се опитва да извлече задачи от опашката. Впоследствие, в рамките на обхвата на повторните опити, тя се опитва да извърши етапа на качване от временен файл към контейнер, а не от поток към временен файл.
Предимства
Предимството на този метод на обработка е, че ако качването на потока е успешно, може да се намали забавянето за последващите задачи и да се предотврати изчерпването на ресурси поради едновременни операции с контейнери. Както се вижда от настоящия код, едновременните операции с контейнери са ограничени от броя на каналите. Разгледахме практически пример за използване на паралелна обработка в Go. Ако искате да видите повече примери, моля, посетете връзките по-долу.Модул, съдържащ примери
Проект, използващ примери
Самият проект съдържа много допълнителни компоненти, така че за да научите повече за самия Worker, просто прегледайте накратко как функцията worker init
се извиква в main.go
. Модулът също така включва други типове работници, така че моля, обърнете внимание на това.