GoSuda

Zkrácení doby odezvy koncového bodu v Go – Využití fronty úloh

By Yunjin Lee
views ...

Přehled

Při prvním učení Go se často setkáváme s implementací backendových serverů. Vezměme si jako příklad vytvoření příkladu, kde se souborový stream přijímá z RestAPI a nahrává na server. Server Go net/http standardně zpracovává více požadavků současně, takže s paralelním nahráváním jako takovým není problém. Pokud by se však všechny operace po přijetí streamu zpracovávaly synchronně, došlo by ke zpoždění odezvy endpointu. Pojďme prozkoumat techniku, jak takové situaci zabránit.

Příčina

Přijímání streamu obvykle trvá dlouho a u velkých souborů může jeden požadavek trvat několik minut. V takových případech je důležité co nejrychleji zpracovat operace po přijetí streamu. Tento příklad scénáře zahrnuje přijetí streamu, jeho uložení do dočasného souboru a následné nahrání do kontejneru. V tomto případě, pokud je část nahrávání dočasného souboru do kontejneru zpracována pomocí worker poolu, lze zkrátit dobu odezvy.

 1package file_upload
 2
 3import (
 4	"fmt"
 5	"io"
 6	"log"
 7	"net/http"
 8	"os"
 9	"path/filepath"
10	"time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host temporary directory
14
15// UploadTask holds data for asynchronous file push.
16type UploadTask struct {
17	HostTempFilePath         string
18	ContainerName            string
19    HostFilename             string
20	ContainerDestinationPath string
21}
22
23// UploadHandler processes file uploads. Saves to temp file, then queues for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25	if req.Method != http.MethodPost {
26		http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27		return
28	}
29	originalFilePath := req.Header.Get("X-File-Path")
30    originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31	containerName := req.Header.Get("X-Container-Name")
32	if originalFilePath == "" || containerName == "" {
33		http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34		return
35	}
36
37	cleanContainerDestPath := filepath.Clean(originalFilePath)
38	if !filepath.IsAbs(cleanContainerDestPath) {
39		http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40		return
41	}
42
43	// Create unique temporary file path on host
44	tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45	hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47	if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48		log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49		http.Error(wr, "Server error.", http.StatusInternalServerError)
50		return
51	}
52
53	// Create and copy request body to temporary file (synchronous)
54	outFile, err := os.Create(hostTempFilePath)
55	if err != nil {
56		log.Printf("ERROR: Failed to create temporary file: %v", err)
57		http.Error(wr, "Server error.", http.StatusInternalServerError)
58		return
59	}
60	defer outFile.Close()
61
62	bytesWritten, err := io.Copy(outFile, req.Body)
63	if err != nil {
64		outFile.Close()
65		os.Remove(hostTempFilePath)
66		log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67		http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68		return
69	}
70	log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72	// Enqueue task for asynchronous Incus push
73	task := UploadTask{
74		HostTempFilePath:         hostTempFilePath,
75		ContainerName:            containerName,
76        HostFilename :            originalFilename,
77		ContainerDestinationPath: cleanContainerDestPath,
78	}
79	EnqueueTask(task) //TATO ČÁST
80	log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82	// Send immediate 202 Accepted response
83	wr.WriteHeader(http.StatusAccepted)
84	fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}

Zde, v části označené THIS PART, si jistě všimnete, že se úloha vkládá do fronty.

Nyní se podívejme, jak funguje fronta úloh.

 1package file_upload
 2
 3import (
 4	"log"
 5	"sync"
 6)
 7
 8var taskQueue chan UploadTask
 9var once sync.Once
10
11// InitWorkQueue initializes the in-memory task queue.
12func InitWorkQueue() {
13	once.Do(func() {
14		taskQueue = make(chan UploadTask, 100)
15		log.Println("Upload Info: Work queue initialized.")
16	})
17}
18
19// EnqueueTask adds an UploadTask to the queue.
20func EnqueueTask(task UploadTask) {
21	if taskQueue == nil {
22		log.Fatal("ERROR: Task queue not initialized.")
23	}
24	taskQueue <- task
25	log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask retrieves an UploadTask from the queue, blocking if empty.
29func DequeueTask() UploadTask {
30	if taskQueue == nil {
31		log.Fatal("ERROR: Task queue not initialized.")
32	}
33	task := <-taskQueue
34	log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35	return task
36}
37
38// GetQueueLength returns current queue size.
39func GetQueueLength() int {
40	if taskQueue == nil {
41		return 0
42	}
43	return len(taskQueue)
44}

Příklad fronty úloh je implementován jednoduše. Tato fronta úloh má jednoduchou strukturu, která odebírá úlohy z kanálu, do kterého byly vloženy.

Níže je metoda workeru pro nahrání souboru do kontejneru po nahrání souboru. Současná metoda používá nekonečnou smyčku pro dobrou odezvu a snadnou implementaci, ale algoritmus lze přidat podle potřeby.

 1func StartFilePushWorker() {
 2	for {
 3		task := DequeueTask()
 4		log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
 5
 6		// Defer cleanup of the temporary file
 7		defer func(filePath string) {
 8			if err := os.Remove(filePath); err != nil {
 9				log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10			} else {
11				log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12			}
13		}(task.HostTempFilePath)
14
15		// Process task with retries for transient Incus errors
16		for i := 0; i <= MaxRetries; i++ {
17			err := processUploadTask(task) //separate upload task
18			if err == nil {
19				log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20				break
21			}
22
23			isTransient := true
24			if err.Error() == "incus: container not found" { // Example permanent error
25				isTransient = false
26			}
27
28			if isTransient && i < MaxRetries {
29				log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30					task.ContainerName, i+1, MaxRetries, err)
31				time.Sleep(RetryDelay)
32			} else {
33				log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34					task.ContainerName, i+1, err)
35				break
36			}
37		}
38	}
39}

Tato funkce nejprve nepřetržitě cyklí a pokouší se přijímat úkoly z fronty. Následně se v rámci rozsahu opakovaných pokusů pokouší o fázi nahrávání z dočasného souboru do kontejneru, nikoli ze streamu do dočasného souboru.

Výhody

Výhodou tohoto způsobu zpracování je, že pokud je nahrávání streamu v pořádku, lze zkrátit dobu zpoždění následných operací a zabránit vyčerpání zdrojů způsobenému souběžnými operacemi kontejneru. Jak je vidět v aktuálním kódu, souběžně proveditelné operace kontejneru jsou omezeny počtem kanálů. Takto jsme prozkoumali praktický příklad použití Go pro paralelní zpracování. Pokud byste chtěli vidět více příkladů, navštivte prosím následující odkazy.Modul s příklady Projekt využívající příklady Samotný projekt obsahuje mnoho vedlejších komponent, takže pro pochopení workeru stačí stručně zhlédnout, jak se volá funkce worker init v main.go. Modul obsahuje i jiné typy workerů, prosím, mějte to na paměti.