GoSuda

Zkrácení doby odezvy koncového bodu v Go – využití fronty úloh

By Yunjin Lee
views ...

Přehled

Při prvním učení Go se často učí implementací backend serveru. Vezměme si jako příklad scénář, kdy je vytvořen příklad pro příjem souborových streamů z RestAPI atd. a jejich nahrávání na server. Server Go net/http standardně zpracovává více požadavků současně, takže s paralelním nahráváním jako takovým není problém. Nicméně, pokud jsou všechny operace po přijetí streamu zpracovávány synchronně, dochází ke zpoždění odezvy koncového bodu. Pojďme prozkoumat techniku, jak takové situaci předejít.

Příčina

Přijímání streamů obvykle trvá dlouho a u velkých souborů může jediný požadavek trvat několik minut. V takovém případě je důležité co nejrychleji zpracovat operace po přijetí. Tento příklad scénáře spočívá v přijetí streamu, jeho uložení jako dočasného souboru a následném vložení do kontejneru. V tomto případě, pokud je část pro vložení dočasného souboru do kontejneru zpracována pomocí worker poolu, lze zkrátit zpoždění odezvy.

 1package file_upload
 2
 3import (
 4	"fmt"
 5	"io"
 6	"log"
 7	"net/http"
 8	"os"
 9	"path/filepath"
10	"time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host temporary directory
14
15// UploadTask holds data for asynchronous file push.
16type UploadTask struct {
17	HostTempFilePath         string
18	ContainerName            string
19    HostFilename             string
20	ContainerDestinationPath string
21}
22
23// UploadHandler processes file uploads. Saves to temp file, then queues for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25	if req.Method != http.MethodPost {
26		http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27		return
28	}
29	originalFilePath := req.Header.Get("X-File-Path")
30    originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31	containerName := req.Header.Get("X-Container-Name")
32	if originalFilePath == "" || containerName == "" {
33		http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34		return
35	}
36
37	cleanContainerDestPath := filepath.Clean(originalFilePath)
38	if !filepath.IsAbs(cleanContainerDestPath) {
39		http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40		return
41	}
42
43	// Create unique temporary file path on host
44	tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45	hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47	if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48		log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49		http.Error(wr, "Server error.", http.StatusInternalServerError)
50		return
51	}
52
53	// Create and copy request body to temporary file (synchronous)
54	outFile, err := os.Create(hostTempFilePath)
55	if err != nil {
56		log.Printf("ERROR: Failed to create temporary file: %v", err)
57		http.Error(wr, "Server error.", http.StatusInternalServerError)
58		return
59	}
60	defer outFile.Close()
61
62	bytesWritten, err := io.Copy(outFile, req.Body)
63	if err != nil {
64		outFile.Close()
65		os.Remove(hostTempFilePath)
66		log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67		http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68		return
69	}
70	log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72	// Enqueue task for asynchronous Incus push
73	task := UploadTask{
74		HostTempFilePath:         hostTempFilePath,
75		ContainerName:            containerName,
76        HostFilename :            originalFilename,
77		ContainerDestinationPath: cleanContainerDestPath,
78	}
79	EnqueueTask(task) //TATO ČÁST
80	log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82	// Send immediate 202 Accepted response
83	wr.WriteHeader(http.StatusAccepted)
84	fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}

Všimli jste si, že část označená jako THIS PART (TATO ČÁST) znamená, že se úloha vkládá do fronty.

Nyní se podívejme, jak fronta úloh funguje.

 1package file_upload
 2
 3import (
 4	"log"
 5	"sync"
 6)
 7
 8var taskQueue chan UploadTask
 9var once sync.Once
10
11// InitWorkQueue inicializuje frontu úloh v paměti.
12func InitWorkQueue() {
13	once.Do(func() {
14		taskQueue = make(chan UploadTask, 100)
15		log.Println("Upload Info: Work queue initialized.")
16	})
17}
18
19// EnqueueTask přidá UploadTask do fronty.
20func EnqueueTask(task UploadTask) {
21	if taskQueue == nil {
22		log.Fatal("ERROR: Task queue not initialized.")
23	}
24	taskQueue <- task
25	log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask načte UploadTask z fronty, blokuje, pokud je prázdná.
29func DequeueTask() UploadTask {
30	if taskQueue == nil {
31		log.Fatal("ERROR: Task queue not initialized.")
32	}
33	task := <-taskQueue
34	log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35	return task
36}
37
38// GetQueueLength vrátí aktuální velikost fronty.
39func GetQueueLength() int {
40	if taskQueue == nil {
41		return 0
42	}
43	return len(taskQueue)
44}

Příklad fronty úloh je jednoduše implementován. Tato fronta úloh má jednoduchou strukturu, která z kanálu vybírá úlohy, které jsou ve frontě.

Níže je metoda worker pro nahrání souboru do kontejneru po nahrání. Současná metoda je nekonečná smyčka pro dobrou odezvu a snadnou implementaci, ale je možné přidat algoritmy podle potřeby.

 1func StartFilePushWorker() {
 2	for {
 3		task := DequeueTask()
 4		log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
 5
 6		// Odloží vyčištění dočasného souboru
 7		defer func(filePath string) {
 8			if err := os.Remove(filePath); err != nil {
 9				log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10			} else {
11				log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12			}
13		}(task.HostTempFilePath)
14
15		// Zpracuje úlohu s opakovanými pokusy pro přechodné chyby Incus
16		for i := 0; i <= MaxRetries; i++ {
17			err := processUploadTask(task) //samostatná úloha nahrávání
18			if err == nil {
19				log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20				break
21			}
22
23			isTransient := true
24			if err.Error() == "incus: container not found" { // Příklad trvalé chyby
25				isTransient = false
26			}
27
28			if isTransient && i < MaxRetries {
29				log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30					task.ContainerName, i+1, MaxRetries, err)
31				time.Sleep(RetryDelay)
32			} else {
33				log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34					task.ContainerName, i+1, err)
35				break
36			}
37		}
38	}
39}

Za prvé, tato funkce nepřetržitě cykluje a pokouší se získat úlohu z fronty. Poté se v rámci rozsahu opakovaných pokusů pokusí o fázi nahrávání z dočasného souboru do kontejneru, nikoli ze streamu do dočasného souboru.

Výhody

Výhodou tohoto způsobu zpracování je, že pokud je nahrávání streamu v pořádku, lze zkrátit dobu zpoždění pro následné zpracovávané úlohy a zabránit vyčerpání zdrojů způsobenému souběžnými operacemi s kontejnery. Jak je vidět v aktuálním kódu, počet souběžných operací s kontejnery je omezen počtem kanálů. Tímto jsme prozkoumali praktický příklad využití paralelního zpracování v Go. Pokud byste chtěli vidět více příkladů, navštivte následující odkazy.Modul včetně příkladů Projekt využívající příklady Samotný projekt obsahuje mnoho vedlejších komponent, takže pro pochopení workeru stačí stručně zhlédnout, jak se v main.go volá inicializační funkce workeru. Modul také obsahuje jiné typy workerů, takže se na ně můžete podívat.