GoSuda

Go-kielellä toteutettujen päätepisteiden vasteajan lyhentäminen – Työjonon hyödyntäminen

By Yunjin Lee
views ...

Yhteenveto

Kun Go-kieltä opetellaan ensimmäistä kertaa, se tapahtuu usein toteuttamalla taustajärjestelmäpalvelin. Otetaan esimerkiksi tilanne, jossa luodaan esimerkki, jossa RestAPI:sta tai vastaavasta vastaanotetaan tiedostovirta ja ladataan se palvelimelle. Go-kielen net/http-palvelin käsittelee luonnostaan useita pyyntöjä samanaikaisesti, joten samanaikaisessa lataamisessa ei ole ongelmaa. Kuitenkin, jos kaikki toiminnot virran vastaanottamisen jälkeen käsitellään synkronisesti, päätepisteen vastaus viivästyy. Tarkastellaan tekniikkaa, jolla tällainen tilanne voidaan estää.

Syy

Virran vastaanottaminen vie yleensä pitkän ajan, ja suurten tiedostojen tapauksessa yksittäinen pyyntö voi kestää useita minuutteja. Tällaisessa tilanteessa on tärkeää käsitellä virran vastaanottamisen jälkeiset toiminnot mahdollisimman nopeasti. Tämä esimerkkiskenaario on sellainen, jossa virta vastaanotetaan, tallennetaan väliaikaistiedostoon ja työnnetään sitten konttiin. Tässä tapauksessa, jos väliaikaistiedoston konttiin työntäminen käsitellään Worker Poolilla, vastausviivettä voidaan lyhentää.

 1package file_upload
 2
 3import (
 4	"fmt"
 5	"io"
 6	"log"
 7	"net/http"
 8	"os"
 9	"path/filepath"
10	"time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host temporary directory
14
15// UploadTask holds data for asynchronous file push.
16type UploadTask struct {
17	HostTempFilePath         string
18	ContainerName            string
19    HostFilename             string
20	ContainerDestinationPath string
21}
22
23// UploadHandler processes file uploads. Saves to temp file, then queues for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25	if req.Method != http.MethodPost {
26		http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27		return
28	}
29	originalFilePath := req.Header.Get("X-File-Path")
30    originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31	containerName := req.Header.Get("X-Container-Name")
32	if originalFilePath == "" || containerName == "" {
33		http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34		return
35	}
36
37	cleanContainerDestPath := filepath.Clean(originalFilePath)
38	if !filepath.IsAbs(cleanContainerDestPath) {
39		http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40		return
41	}
42
43	// Create unique temporary file path on host
44	tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45	hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47	if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48		log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49		http.Error(wr, "Server error.", http.StatusInternalServerError)
50		return
51	}
52
53	// Create and copy request body to temporary file (synchronous)
54	outFile, err := os.Create(hostTempFilePath)
55	if err != nil {
56		log.Printf("ERROR: Failed to create temporary file: %v", err)
57		http.Error(wr, "Server error.", http.StatusInternalServerError)
58		return
59	}
60	defer outFile.Close()
61
62	bytesWritten, err := io.Copy(outFile, req.Body)
63	if err != nil {
64		outFile.Close()
65		os.Remove(hostTempFilePath)
66		log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67		http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68		return
69	}
70	log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72	// Enqueue task for asynchronous Incus push
73	task := UploadTask{
74		HostTempFilePath:         hostTempFilePath,
75		ContainerName:            containerName,
76        HostFilename :            originalFilename,
77		ContainerDestinationPath: cleanContainerDestPath,
78	}
79	EnqueueTask(task) //THIS PART
80	log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82	// Send immediate 202 Accepted response
83	wr.WriteHeader(http.StatusAccepted)
84	fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}

Olette todennäköisesti huomanneet, että osassa, jossa lukee "THIS PART", lisätään tehtävä jonoon.

Katsotaan nyt, miten tehtäväjono toimii.

 1package file_upload
 2
 3import (
 4	"log"
 5	"sync"
 6)
 7
 8var taskQueue chan UploadTask
 9var once sync.Once
10
11// InitWorkQueue initializes the in-memory task queue.
12func InitWorkQueue() {
13	once.Do(func() {
14		taskQueue = make(chan UploadTask, 100)
15		log.Println("Upload Info: Work queue initialized.")
16	})
17}
18
19// EnqueueTask adds an UploadTask to the queue.
20func EnqueueTask(task UploadTask) {
21	if taskQueue == nil {
22		log.Fatal("ERROR: Task queue not initialized.")
23	}
24	taskQueue <- task
25	log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask retrieves an UploadTask from the queue, blocking if empty.
29func DequeueTask() UploadTask {
30	if taskQueue == nil {
31		log.Fatal("ERROR: Task queue not initialized.")
32	}
33	task := <-taskQueue
34	log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35	return task
36}
37
38// GetQueueLength returns current queue size.
39func GetQueueLength() int {
40	if taskQueue == nil {
41		return 0
42	}
43	return len(taskQueue)
44}

Esimerkkinä annettu tehtäväjono on toteutettu yksinkertaisesti. Tällä tehtäväjonolla on yksinkertainen rakenne, jossa se poistaa kanavalta ne tehtävät, jotka ovat jonossa.

Alla on Worker-metodi tiedoston lataamisen jälkeistä konttiin työntämistä varten. Nykyinen metodi on ääretön silmukka hyvän responsiivisuuden ja helpon toteutuksen vuoksi, mutta siihen voidaan lisätä algoritmeja käyttötarkoituksen mukaan.

 1func StartFilePushWorker() {
 2	for {
 3		task := DequeueTask()
 4		log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
 5
 6		// Defer cleanup of the temporary file
 7		defer func(filePath string) {
 8			if err := os.Remove(filePath); err != nil {
 9				log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10			} else {
11				log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12			}
13		}(task.HostTempFilePath)
14
15		// Process task with retries for transient Incus errors
16		for i := 0; i <= MaxRetries; i++ {
17			err := processUploadTask(task) //separate upload task
18			if err == nil {
19				log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20				break
21			}
22
23			isTransient := true
24			if err.Error() == "incus: container not found" { // Example permanent error
25				isTransient = false
26			}
27
28			if isTransient && i < MaxRetries {
29				log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30					task.ContainerName, i+1, MaxRetries, err)
31				time.Sleep(RetryDelay)
32			} else {
33				log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34					task.ContainerName, i+1, err)
35				break
36			}
37		}
38	}
39}

Ensinnäkin tämä funktio yrittää jatkuvasti vastaanottaa tehtäviä jonosta pyörittämällä silmukkaa. Sen jälkeen se yrittää latausvaihetta väliaikaistiedostosta konttiin uudelleenyritysten puitteissa (ei siis virrasta väliaikaistiedostoon).

Hyödyt

Tämän käsittelytavan etuna on, että jos virran lataus onnistuu, myöhemmin käsiteltävien tehtävien viivettä voidaan lyhentää, ja samanaikaisista konttitoiminnoista johtuva resurssien ehtyminen voidaan estää. Kuten nykyisestä koodista näkyy, samanaikaisesti suoritettavien konttitoimintojen määrää rajoittaa kanavien lukumäärä. Näin olemme tarkastelleet käytännöllistä esimerkkiä Go-kielen rinnakkaiskäsittelyn hyödyntämisestä. Jos haluatte nähdä lisää esimerkkejä, käykää seuraavissa linkeissä.Moduuli, joka sisältää esimerkkejä Projekti, joka hyödyntää esimerkkejä Itse projekti sisältää monia oheiskomponentteja, joten Workerin oppimisen kannalta riittää, kun katsotte lyhyesti main.go-tiedostosta, miten Workerin init-funktiota kutsutaan. Moduuli sisältää myös muun tyyppisiä Workereita, joten huomioikaa se.