Riduzione del tempo di risposta dell'endpoint in Go - Utilizzo delle code di lavoro
κ°μ
Goλ₯Ό μ²μ λ°°μΈ λ, λ°±μλ μλ²λ₯Ό ꡬννλ©° λ°°μ°λ κ²½μ°κ° μμ£Ό μμ΅λλ€. μ΄ λ RestAPI λ±μμ νμΌ μ€νΈλ¦Όμ λ°μμ μλ²μ μ λ‘λνλ μμ λ₯Ό λ§λλ κ²½μ°λ₯Ό μλ₯Ό λ€μ΄ λ΄ μλ€. GoμΈμ΄ net/http μλ²λ κΈ°λ³Έμ μΌλ‘ μ¬λ¬ μμ²μ λμμ μ²λ¦¬νλ λμ λ€λ°μ μ λ‘λ μ체μλ λ¬Έμ κ° μμ΅λλ€. νμ§λ§ μ€νΈλ¦Ό μμ μ΄ν λμλ€μ λͺ¨λ λκΈ°μ μΌλ‘ μ²λ¦¬νλ€λ©΄ μλν¬μΈνΈμ μλ΅μ΄ μ§μ°λ©λλ€. μ΄λ¬ν μν©μ λ°©μ§νκΈ° μν κΈ°λ²μ μμ λ΄ μλ€.
μμΈ
μ€νΈλ¦Όμ μμ νλ λ°μλ λμ²΄λ‘ κΈ΄ μκ°μ΄ μμλλ©°, ν° νμΌμ κ²½μ° λ¨μΌ μμ²μ΄ μ λΆλμ μ²λ¦¬λ μ μμ΅λλ€. μ΄λ¬ν κ²½μ° μ‘°κΈμ΄λΌλ μμ μ΄νμ λμμ μ μνκ² μ²λ¦¬νλ κ²μ΄ μ€μν©λλ€. μ΄ μμ μλ리μ€λ μ€νΈλ¦Όμ μμ ν μμνμΌλ‘ μ μ₯, 컨ν μ΄λμ νΈμνλ μλ리μ€μ λλ€. μ΄ λμ 컨ν μ΄λμ μμνμΌμ νΈμνλ λΆλΆμ μ컀 νλ‘ μ²λ¦¬νλ€λ©΄, μλ΅ μ§μ°μ λ¨μΆν μ μμ΅λλ€.
1package file_upload
2
3import (
4 "fmt"
5 "io"
6 "log"
7 "net/http"
8 "os"
9 "path/filepath"
10 "time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host temporary directory
14
15// UploadTask holds data for asynchronous file push.
16type UploadTask struct {
17 HostTempFilePath string
18 ContainerName string
19 HostFilename string
20 ContainerDestinationPath string
21}
22
23// UploadHandler processes file uploads. Saves to temp file, then queues for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25 if req.Method != http.MethodPost {
26 http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27 return
28 }
29 originalFilePath := req.Header.Get("X-File-Path")
30 originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31 containerName := req.Header.Get("X-Container-Name")
32 if originalFilePath == "" || containerName == "" {
33 http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34 return
35 }
36
37 cleanContainerDestPath := filepath.Clean(originalFilePath)
38 if !filepath.IsAbs(cleanContainerDestPath) {
39 http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40 return
41 }
42
43 // Create unique temporary file path on host
44 tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45 hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47 if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48 log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49 http.Error(wr, "Server error.", http.StatusInternalServerError)
50 return
51 }
52
53 // Create and copy request body to temporary file (synchronous)
54 outFile, err := os.Create(hostTempFilePath)
55 if err != nil {
56 log.Printf("ERROR: Failed to create temporary file: %v", err)
57 http.Error(wr, "Server error.", http.StatusInternalServerError)
58 return
59 }
60 defer outFile.Close()
61
62 bytesWritten, err := io.Copy(outFile, req.Body)
63 if err != nil {
64 outFile.Close()
65 os.Remove(hostTempFilePath)
66 log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67 http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68 return
69 }
70 log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72 // Enqueue task for asynchronous Incus push
73 task := UploadTask{
74 HostTempFilePath: hostTempFilePath,
75 ContainerName: containerName,
76 HostFilename : originalFilename,
77 ContainerDestinationPath: cleanContainerDestPath,
78 }
79 EnqueueTask(task) //THIS PART
80 log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82 // Send immediate 202 Accepted response
83 wr.WriteHeader(http.StatusAccepted)
84 fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}
μ¬κΈ°μμ THIS PARTλΌκ³ μ μ΄λ λΆλΆμ 보면, μμ μ νμ μ½μ νλ€λ κ²μ λμΉμ±μ ¨μ κ²μ λλ€.
μ΄μ μμ νκ° μ΄λ»κ² λμνλμ§ λ³΄λλ‘ ν©μλ€.
1package file_upload
2
3import (
4 "log"
5 "sync"
6)
7
8var taskQueue chan UploadTask
9var once sync.Once
10
11// InitWorkQueue initializes the in-memory task queue.
12func InitWorkQueue() {
13 once.Do(func() {
14 taskQueue = make(chan UploadTask, 100)
15 log.Println("Upload Info: Work queue initialized.")
16 })
17}
18
19// EnqueueTask adds an UploadTask to the queue.
20func EnqueueTask(task UploadTask) {
21 if taskQueue == nil {
22 log.Fatal("ERROR: Task queue not initialized.")
23 }
24 taskQueue <- task
25 log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask retrieves an UploadTask from the queue, blocking if empty.
29func DequeueTask() UploadTask {
30 if taskQueue == nil {
31 log.Fatal("ERROR: Task queue not initialized.")
32 }
33 task := <-taskQueue
34 log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35 return task
36}
37
38// GetQueueLength returns current queue size.
39func GetQueueLength() int {
40 if taskQueue == nil {
41 return 0
42 }
43 return len(taskQueue)
44}
μμλ‘ μ 곡λ μμ νλ λ¨μνκ² κ΅¬νλμ΄ μμ΅λλ€. μ΄ μμ νλ νμ λ€μ΄κ° μλ μμ λ€μ μ±λμμ λΉΌλ΄λ λ¨μν ꡬ쑰λ₯Ό κ°μ§κ³ μμ΅λλ€.
μλλ νμΌ μ λ‘λ μ΄ν 컨ν μ΄λμ νΈμνκΈ° μν μ컀 λ©μλμ λλ€. νμ¬ λ©μλλ μ’μ λ°μμ±κ³Ό μ¬μ΄ ꡬνμ μν΄ λ¬΄ν 루νμ΄λ μ©λμ λ°λΌ μκ³ λ¦¬μ¦μ μΆκ°νμ¬λ 무방ν©λλ€.
1func StartFilePushWorker() {
2 for {
3 task := DequeueTask()
4 log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
5
6 // Defer cleanup of the temporary file
7 defer func(filePath string) {
8 if err := os.Remove(filePath); err != nil {
9 log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10 } else {
11 log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12 }
13 }(task.HostTempFilePath)
14
15 // Process task with retries for transient Incus errors
16 for i := 0; i <= MaxRetries; i++ {
17 err := processUploadTask(task) //separate upload task
18 if err == nil {
19 log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20 break
21 }
22
23 isTransient := true
24 if err.Error() == "incus: container not found" { // Example permanent error
25 isTransient = false
26 }
27
28 if isTransient && i < MaxRetries {
29 log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30 task.ContainerName, i+1, MaxRetries, err)
31 time.Sleep(RetryDelay)
32 } else {
33 log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34 task.ContainerName, i+1, err)
35 break
36 }
37 }
38 }
39}
λ¨Όμ μ΄ ν¨μλ κ³μν΄μ 루νλ₯Ό λλ©΄μ νμ€ν¬λ₯Ό νλ‘λΆν° λ°μμ€κ³ μ μλν©λλ€. μ΄ν, μ¬μλ λ²μ λ΄μμ μ€νΈλ¦Ό-μμνμΌμ΄ μλ, μμνμΌ-컨ν μ΄λλ‘μ μ λ‘λ λ¨κ³λ₯Ό μλν©λλ€.
μ΄μ
μ΄λ¬ν μ²λ¦¬ λ°©μμ μ΄μ μ, μ€νΈλ¦Ό μ λ‘λλ§ μ μμ΄λΌλ©΄μ΄ν μ²λ¦¬λλ μμ μ λν μ§μ° μκ°μ μ€μΌ μ μμΌλ©°, λμλ€λ°μ μΈ μ»¨ν μ΄λ μμ μΌλ‘ μΈν μμ κ³ κ°μ μλ°©ν μ μμ΅λλ€. νμ¬μ μ½λμμ 보μ΄λ―, λμμ μ§ν κ°λ₯ν 컨ν μ΄λ μμ μ μ±λμ μλ‘ μ νλ©λλ€. μ΄μ κ°μ΄ μ€μ©μ μΌλ‘ Goμ λ³λ ¬ μ²λ¦¬λ₯Ό μ¬μ© κ°λ₯ν μμ λ₯Ό μμ보μμ΅λλ€. λ³΄λ€ λ§μ μμλ₯Ό λ³΄κ³ μΆλ€λ©΄, μλμ λ§ν¬λ₯Ό λ°©λ¬Έν΄ μ£ΌμΈμ.μμ λ₯Ό ν¬ν¨ν λͺ¨λ μμ λ₯Ό νμ©ν νλ‘μ νΈ νλ‘μ νΈ μ체μλ λΆμμ μΈ κ΅¬μ±μμκ° λ§μΌλ, μ컀 μ체μ λν νμ΅μ main.goμμ μ΄λ»κ² μ컀 init ν¨μλ₯Ό νΈμΆνλμ§λ§ κ°λ΅ν 보μκ³ λμ΄κ°λ©΄ λ©λλ€. λͺ¨λμλ λ€λ₯Έ ννμ μ컀λ ν¬ν¨νκ³ μμΌλ μ°Έκ³ ν΄ μ£ΌμΈμ.