GoSuda

Сокращение времени отклика конечных точек в Go — использование рабочих очередей

By Yunjin Lee
views ...

Обзор

При первоначальном изучении Go часто встречается случай, когда обучение происходит через реализацию бэкэнд-сервера. Рассмотрим пример создания приложения, которое получает файловый поток через RestAPI и загружает его на сервер. Сервер net/http на Go по умолчанию обрабатывает несколько запросов одновременно, поэтому одновременная загрузка сама по себе не является проблемой. Однако, если все действия после получения потока обрабатываются синхронно, ответ конечной точки задерживается. Рассмотрим технику, позволяющую избежать такой ситуации.

Причина

Прием потока обычно занимает много времени, и в случае больших файлов один запрос может обрабатываться в течение нескольких минут. В таких ситуациях крайне важно оперативно обрабатывать операции после приема данных. Данный сценарий примера предусматривает прием потока, сохранение его во временный файл и последующую отправку в контейнер. В этом случае, если часть, отвечающая за отправку временного файла в контейнер, будет обрабатываться пулом воркеров, задержка ответа может быть сокращена.

 1package file_upload
 2
 3import (
 4	"fmt"
 5	"io"
 6	"log"
 7	"net/http"
 8	"os"
 9	"path/filepath"
10	"time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host temporary directory
14
15// UploadTask holds data for asynchronous file push.
16type UploadTask struct {
17	HostTempFilePath         string
18	ContainerName            string
19    HostFilename             string
20	ContainerDestinationPath string
21}
22
23// UploadHandler processes file uploads. Saves to temp file, then queues for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25	if req.Method != http.MethodPost {
26		http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27		return
28	}
29	originalFilePath := req.Header.Get("X-File-Path")
30    originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31	containerName := req.Header.Get("X-Container-Name")
32	if originalFilePath == "" || containerName == "" {
33		http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34		return
35	}
36
37	cleanContainerDestPath := filepath.Clean(originalFilePath)
38	if !filepath.IsAbs(cleanContainerDestPath) {
39		http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40		return
41	}
42
43	// Create unique temporary file path on host
44	tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45	hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47	if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48		log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49		http.Error(wr, "Server error.", http.StatusInternalServerError)
50		return
51	}
52
53	// Create and copy request body to temporary file (synchronous)
54	outFile, err := os.Create(hostTempFilePath)
55	if err != nil {
56		log.Printf("ERROR: Failed to create temporary file: %v", err)
57		http.Error(wr, "Server error.", http.StatusInternalServerError)
58		return
59	}
60	defer outFile.Close()
61
62	bytesWritten, err := io.Copy(outFile, req.Body)
63	if err != nil {
64		outFile.Close()
65		os.Remove(hostTempFilePath)
66		log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67		http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68		return
69	}
70	log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72	// Enqueue task for asynchronous Incus push
73	task := UploadTask{
74		HostTempFilePath:         hostTempFilePath,
75		ContainerName:            containerName,
76        HostFilename :            originalFilename,
77		ContainerDestinationPath: cleanContainerDestPath,
78	}
79	EnqueueTask(task) //ЭТА ЧАСТЬ
80	log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82	// Send immediate 202 Accepted response
83	wr.WriteHeader(http.StatusAccepted)
84	fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}

Вы, вероятно, заметили, что в части, помеченной как THIS PART, задача добавляется в очередь.

Теперь давайте посмотрим, как работает очередь задач.

 1package file_upload
 2
 3import (
 4	"log"
 5	"sync"
 6)
 7
 8var taskQueue chan UploadTask
 9var once sync.Once
10
11// InitWorkQueue initializes the in-memory task queue.
12func InitWorkQueue() {
13	once.Do(func() {
14		taskQueue = make(chan UploadTask, 100)
15		log.Println("Upload Info: Work queue initialized.")
16	})
17}
18
19// EnqueueTask adds an UploadTask to the queue.
20func EnqueueTask(task UploadTask) {
21	if taskQueue == nil {
22		log.Fatal("ERROR: Task queue not initialized.")
23	}
24	taskQueue <- task
25	log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask retrieves an UploadTask from the queue, blocking if empty.
29func DequeueTask() UploadTask {
30	if taskQueue == nil {
31		log.Fatal("ERROR: Task queue not initialized.")
32	}
33	task := <-taskQueue
34	log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35	return task
36}
37
38// GetQueueLength returns current queue size.
39func GetQueueLength() int {
40	if taskQueue == nil {
41		return 0
42	}
43	return len(taskQueue)
44}

Предоставленная в примере очередь задач реализована просто. Эта очередь задач имеет простую структуру, которая извлекает задачи из канала, находящиеся в очереди.

Ниже приведен метод воркера для отправки файла в контейнер после загрузки. Текущий метод представляет собой бесконечный цикл для обеспечения хорошей отзывчивости и простоты реализации, но при необходимости можно добавить алгоритмы в зависимости от назначения.

 1func StartFilePushWorker() {
 2	for {
 3		task := DequeueTask()
 4		log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
 5
 6		// Defer cleanup of the temporary file
 7		defer func(filePath string) {
 8			if err := os.Remove(filePath); err != nil {
 9				log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10			} else {
11				log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12			}
13		}(task.HostTempFilePath)
14
15		// Process task with retries for transient Incus errors
16		for i := 0; i <= MaxRetries; i++ {
17			err := processUploadTask(task) //separate upload task
18			if err == nil {
19				log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20				break
21			}
22
23			isTransient := true
24			if err.Error() == "incus: container not found" { // Example permanent error
25				isTransient = false
26			}
27
28			if isTransient && i < MaxRetries {
29				log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30					task.ContainerName, i+1, MaxRetries, err)
31				time.Sleep(RetryDelay)
32			} else {
33				log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34					task.ContainerName, i+1, err)
35				break
36			}
37		}
38	}
39}

Сначала эта функция постоянно циклически пытается получить задачу из очереди. Затем она пытается выполнить этап загрузки из временного файла в контейнер, а не из потока во временный файл, в пределах диапазона повторных попыток.

Преимущества

Преимущество такого подхода заключается в том, что при условии успешной загрузки потока можно сократить время задержки для последующих операций и предотвратить исчерпание ресурсов из-за одновременных операций с контейнерами. Как видно из текущего кода, количество одновременно выполняемых операций с контейнерами ограничено количеством каналов. Таким образом, мы рассмотрели практический пример использования параллельной обработки Go. Если вы хотите увидеть больше примеров, пожалуйста, посетите следующие ссылки.Модуль с примерами Проект, использующий примеры Сам проект содержит много вспомогательных компонентов, поэтому для изучения воркера достаточно кратко ознакомиться с тем, как функция инициализации воркера вызывается в main.go. Модуль также включает другие типы воркеров, поэтому, пожалуйста, обратите на это внимание.