Vastausajan lyhentäminen Go-päätepisteissä – Työjonojen hyödyntäminen
Yleiskatsaus
Kun Go-kieltä opiskellaan ensimmäistä kertaa, usein opitaan toteuttamalla backend-palvelimia. Otetaan esimerkiksi tapaus, jossa luodaan esimerkki RestAPI:n kaltaisesta tiedostovirran vastaanottamisesta ja lataamisesta palvelimelle. Go-kielen net/http-palvelin käsittelee luonnostaan useita pyyntöjä samanaikaisesti, joten samanaikaisessa lataamisessa itsessään ei ole ongelmaa. Jos kuitenkin kaikki toiminnot virran vastaanottamisen jälkeen käsitellään synkronisesti, päätepisteen vastaus viivästyy. Tarkastellaan tekniikoita tämän tilanteen estämiseksi.
Syy
Virran vastaanottaminen vie yleensä pitkän ajan, ja suurten tiedostojen tapauksessa yksittäisen pyynnön käsittely voi kestää useita minuutteja. Tällaisessa tilanteessa on tärkeää käsitellä virran vastaanottamisen jälkeiset toiminnot mahdollisimman nopeasti. Tämä esimerkkiskenaario on virran vastaanottaminen, väliaikaiseen tiedostoon tallentaminen ja sen jälkeen konttiin siirtäminen. Tässä tapauksessa, jos väliaikaisen tiedoston siirto konttiin käsitellään worker poolilla, vastausviivettä voidaan lyhentää.
1package file_upload
2
3import (
4 "fmt"
5 "io"
6 "log"
7 "net/http"
8 "os"
9 "path/filepath"
10 "time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Hostin väliaikainen hakemisto
14
15// UploadTask sisältää tiedot asynkronista tiedostojen siirtoa varten.
16type UploadTask struct {
17 HostTempFilePath string
18 ContainerName string
19 HostFilename string
20 ContainerDestinationPath string
21}
22
23// UploadHandler käsittelee tiedostojen latauksia. Tallentaa väliaikaiseen tiedostoon ja jonottaa Incus-siirtoa varten.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25 if req.Method != http.MethodPost {
26 http.Error(wr, "POST-metodi vaaditaan.", http.StatusMethodNotAllowed)
27 return
28 }
29 originalFilePath := req.Header.Get("X-File-Path")
30 originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31 containerName := req.Header.Get("X-Container-Name")
32 if originalFilePath == "" || containerName == "" {
33 http.Error(wr, "Puuttuva X-File-Path tai X-Container-Name otsake.", http.StatusBadRequest)
34 return
35 }
36
37 cleanContainerDestPath := filepath.Clean(originalFilePath)
38 if !filepath.IsAbs(cleanContainerDestPath) {
39 http.Error(wr, "Tiedostopolun on oltava absoluuttinen.", http.StatusBadRequest)
40 return
41 }
42
43 // Luo ainutlaatuinen väliaikainen tiedostopolku isännälle
44 tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45 hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47 if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48 log.Printf("ERROR: Väliaikaisen lataushakemiston luominen epäonnistui: %v", err)
49 http.Error(wr, "Palvelinvirhe.", http.StatusInternalServerError)
50 return
51 }
52
53 // Luo ja kopioi pyynnön runko väliaikaiseen tiedostoon (synkroninen)
54 outFile, err := os.Create(hostTempFilePath)
55 if err != nil {
56 log.Printf("ERROR: Väliaikaisen tiedoston luominen epäonnistui: %v", err)
57 http.Error(wr, "Palvelinvirhe.", http.StatusInternalServerError)
58 return
59 }
60 defer outFile.Close()
61
62 bytesWritten, err := io.Copy(outFile, req.Body)
63 if err != nil {
64 outFile.Close()
65 os.Remove(hostTempFilePath)
66 log.Printf("ERROR: Pyynnön rungon kopioiminen väliaikaiseen tiedostoon epäonnistui: %v", err)
67 http.Error(wr, "Tiedostonsiirto epäonnistui.", http.StatusInternalServerError)
68 return
69 }
70 log.Printf("Upload Info: Vastaanotettu %d tavua, tallennettu kohteeseen %s.", bytesWritten, hostTempFilePath)
71
72 // Jonota tehtävä asynkronista Incus-siirtoa varten
73 task := UploadTask{
74 HostTempFilePath: hostTempFilePath,
75 ContainerName: containerName,
76 HostFilename : originalFilename,
77 ContainerDestinationPath: cleanContainerDestPath,
78 }
79 EnqueueTask(task) //TÄMÄ OSA
80 log.Printf("Upload Info: Tehtävä jonotettu kohteelle %s kohteeseen %s.", originalFilePath, containerName)
81
82 // Lähetä välitön 202 Accepted -vastaus
83 wr.WriteHeader(http.StatusAccepted)
84 fmt.Fprintf(wr, "Tiedosto '%s' jonotettu käsittelyyn kontissa '%s'.\n", originalFilename, containerName)
85}
Tästä kohdasta, jossa lukee THIS PART, olette ehkä huomanneet, että tehtävä on lisätty jonoon.
Katsotaanpa nyt, miten tehtäväjono toimii.
1package file_upload
2
3import (
4 "log"
5 "sync"
6)
7
8var taskQueue chan UploadTask
9var once sync.Once
10
11// InitWorkQueue alustaa muistissa olevan tehtäväjonon.
12func InitWorkQueue() {
13 once.Do(func() {
14 taskQueue = make(chan UploadTask, 100)
15 log.Println("Upload Info: Työjono alustettu.")
16 })
17}
18
19// EnqueueTask lisää UploadTaskin jonoon.
20func EnqueueTask(task UploadTask) {
21 if taskQueue == nil {
22 log.Fatal("ERROR: Tehtäväjonoa ei ole alustettu.")
23 }
24 taskQueue <- task
25 log.Printf("Upload Info: Jono: Tehtävä jonotettu. Koko: %d", len(taskQueue))
26}
27
28// DequeueTask hakee UploadTaskin jonosta, ja estää, jos tyhjä.
29func DequeueTask() UploadTask {
30 if taskQueue == nil {
31 log.Fatal("ERROR: Tehtäväjonoa ei ole alustettu.")
32 }
33 task := <-taskQueue
34 log.Printf("Upload Info: Jono: Tehtävä poistettu jonosta. Koko: %d", len(taskQueue))
35 return task
36}
37
38// GetQueueLength palauttaa jonon nykyisen koon.
39func GetQueueLength() int {
40 if taskQueue == nil {
41 return 0
42 }
43 return len(taskQueue)
44}
Esimerkiksi annettu tehtäväjono on toteutettu yksinkertaisesti. Tällä tehtäväjonolla on yksinkertainen rakenne, joka poistaa jonossa olevat tehtävät kanavasta.
Alla on työntekijämetodi tiedostojen lataamisen jälkeen niiden siirtämiseksi konttiin. Nykyinen metodi on ääretön silmukka hyvän responsiivisuuden ja helpon toteutuksen vuoksi, mutta siihen voidaan lisätä algoritmeja käyttötarkoituksen mukaan.
1func StartFilePushWorker() {
2 for {
3 task := DequeueTask()
4 log.Printf("Upload Info: Työntekijä käsittelee tehtävää kohteelle %s lähteestä %s.", task.ContainerName, task.HostFilename)
5
6 // Lykkää väliaikaisen tiedoston puhdistusta
7 defer func(filePath string) {
8 if err := os.Remove(filePath); err != nil {
9 log.Printf("ERROR: Työntekijä: Väliaikaisen tiedoston '%s' poistaminen epäonnistui: %v", filePath, err)
10 } else {
11 log.Printf("Upload Info: Työntekijä: Väliaikainen tiedosto puhdistettu: %s", filePath)
12 }
13 }(task.HostTempFilePath)
14
15 // Käsittele tehtävä uudelleenyrityksillä transienttien Incus-virheiden varalta
16 for i := 0; i <= MaxRetries; i++ {
17 err := processUploadTask(task) //erillinen lataustehtävä
18 if err == nil {
19 log.Printf("SUCCESS: Työntekijä: Tehtävä suoritettu kohteelle %s.", task.ContainerName)
20 break
21 }
22
23 isTransient := true
24 if err.Error() == "incus: container not found" { // Esimerkki pysyvästä virheestä
25 isTransient = false
26 }
27
28 if isTransient && i < MaxRetries {
29 log.Printf("WARNING: Työntekijä: Tehtävä epäonnistui kohteelle %s (yritys %d/%d): %v. Yritetään uudelleen.",
30 task.ContainerName, i+1, MaxRetries, err)
31 time.Sleep(RetryDelay)
32 } else {
33 log.Printf("ERROR: Työntekijä: Tehtävä epäonnistui pysyvästi kohteelle %s %d yrityksen jälkeen: %v.",
34 task.ContainerName, i+1, err)
35 break
36 }
37 }
38 }
39}
Ensinnäkin tämä funktio yrittää jatkuvasti noutaa tehtäviä jonosta silmukassa. Tämän jälkeen se yrittää uudelleenyritysten rajoissa siirtää tiedoston väliaikaisesta tiedostosta konttiin, ei virrasta väliaikaiseen tiedostoon.
Edut
Tämän käsittelytavan etuna on, että jos virran lataus onnistuu, voidaan vähentää myöhemmin käsiteltävien tehtävien viivettä ja estää resurssien ehtyminen samanaikaisten konttitehtävien vuoksi. Kuten nykyisestä koodista näkyy, samanaikaisesti suoritettavien konttitehtävien määrää rajoittaa kanavien määrä. Tässä on esimerkki, miten Go:n rinnakkaiskäsittelyä voidaan käyttää käytännössä. Jos haluat nähdä lisää esimerkkejä, käy alla olevissa linkeissä.Moduuli esimerkkeineen Projekti esimerkkeineen Itse projekti sisältää paljon oheiskomponentteja, joten workerin oppimisen osalta riittää, kun katsot lyhyesti, miten workerin init-funktiota kutsutaan main.go:ssa. Moduuli sisältää myös muita worker-tyyppejä, joten huomioi tämä.