GoSuda

Riduzione del tempo di risposta dell'endpoint in Go - Utilizzo delle code di lavoro

By Yunjin Lee
views ...

κ°œμš”

Goλ₯Ό 처음 배울 λ•Œ, λ°±μ—”λ“œ μ„œλ²„λ₯Ό κ΅¬ν˜„ν•˜λ©° λ°°μš°λŠ” κ²½μš°κ°€ 자주 μžˆμŠ΅λ‹ˆλ‹€. 이 λ•Œ RestAPI λ“±μ—μ„œ 파일 μŠ€νŠΈλ¦Όμ„ λ°›μ•„μ„œ μ„œλ²„μ— μ—…λ‘œλ“œν•˜λŠ” 예제λ₯Ό λ§Œλ“œλŠ” 경우λ₯Ό 예λ₯Ό λ“€μ–΄ λ΄…μ‹œλ‹€. Goμ–Έμ–΄ net/http μ„œλ²„λŠ” 기본적으둜 μ—¬λŸ¬ μš”μ²­μ„ λ™μ‹œμ— μ²˜λ¦¬ν•˜λ‹ˆ λ™μ‹œ λ‹€λ°œμ  μ—…λ‘œλ“œ μžμ²΄μ—λŠ” λ¬Έμ œκ°€ μ—†μŠ΅λ‹ˆλ‹€. ν•˜μ§€λ§Œ 슀트림 μˆ˜μ‹  이후 λ™μž‘λ“€μ„ λͺ¨λ‘ λ™κΈ°μ μœΌλ‘œ μ²˜λ¦¬ν•œλ‹€λ©΄ μ—”λ“œν¬μΈνŠΈμ˜ 응닡이 μ§€μ—°λ©λ‹ˆλ‹€. μ΄λŸ¬ν•œ 상황을 λ°©μ§€ν•˜κΈ° μœ„ν•œ 기법을 μ•Œμ•„ λ΄…μ‹œλ‹€.

원인

μŠ€νŠΈλ¦Όμ„ μˆ˜μ‹ ν•˜λŠ” 데에도 λŒ€μ²΄λ‘œ κΈ΄ μ‹œκ°„μ΄ μ†Œμš”λ˜λ©°, 큰 파일의 경우 단일 μš”μ²­μ΄ 수 λΆ„λ™μ•ˆ 처리될 수 μžˆμŠ΅λ‹ˆλ‹€. μ΄λŸ¬ν•œ 경우 μ‘°κΈˆμ΄λΌλ„ μˆ˜μ‹  μ΄ν›„μ˜ λ™μž‘μ„ μ‹ μ†ν•˜κ²Œ μ²˜λ¦¬ν•˜λŠ” 것이 μ€‘μš”ν•©λ‹ˆλ‹€. 이 예제 μ‹œλ‚˜λ¦¬μ˜€λŠ” μŠ€νŠΈλ¦Όμ„ μˆ˜μ‹  ν›„ μž„μ‹œνŒŒμΌλ‘œ μ €μž₯, μ»¨ν…Œμ΄λ„ˆμ— ν‘Έμ‹œν•˜λŠ” μ‹œλ‚˜λ¦¬μ˜€μž…λ‹ˆλ‹€. 이 λ•Œμ— μ»¨ν…Œμ΄λ„ˆμ— μž„μ‹œνŒŒμΌμ„ ν‘Έμ‹œν•˜λŠ” 뢀뢄을 μ›Œμ»€ ν’€λ‘œ μ²˜λ¦¬ν•œλ‹€λ©΄, 응닡 지연을 단좕할 수 μžˆμŠ΅λ‹ˆλ‹€.

 1package file_upload
 2
 3import (
 4	"fmt"
 5	"io"
 6	"log"
 7	"net/http"
 8	"os"
 9	"path/filepath"
10	"time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host temporary directory
14
15// UploadTask holds data for asynchronous file push.
16type UploadTask struct {
17	HostTempFilePath         string
18	ContainerName            string
19    HostFilename             string
20	ContainerDestinationPath string
21}
22
23// UploadHandler processes file uploads. Saves to temp file, then queues for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25	if req.Method != http.MethodPost {
26		http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27		return
28	}
29	originalFilePath := req.Header.Get("X-File-Path")
30    originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31	containerName := req.Header.Get("X-Container-Name")
32	if originalFilePath == "" || containerName == "" {
33		http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34		return
35	}
36
37	cleanContainerDestPath := filepath.Clean(originalFilePath)
38	if !filepath.IsAbs(cleanContainerDestPath) {
39		http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40		return
41	}
42
43	// Create unique temporary file path on host
44	tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45	hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47	if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48		log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49		http.Error(wr, "Server error.", http.StatusInternalServerError)
50		return
51	}
52
53	// Create and copy request body to temporary file (synchronous)
54	outFile, err := os.Create(hostTempFilePath)
55	if err != nil {
56		log.Printf("ERROR: Failed to create temporary file: %v", err)
57		http.Error(wr, "Server error.", http.StatusInternalServerError)
58		return
59	}
60	defer outFile.Close()
61
62	bytesWritten, err := io.Copy(outFile, req.Body)
63	if err != nil {
64		outFile.Close()
65		os.Remove(hostTempFilePath)
66		log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67		http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68		return
69	}
70	log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72	// Enqueue task for asynchronous Incus push
73	task := UploadTask{
74		HostTempFilePath:         hostTempFilePath,
75		ContainerName:            containerName,
76        HostFilename :            originalFilename,
77		ContainerDestinationPath: cleanContainerDestPath,
78	}
79	EnqueueTask(task) //THIS PART
80	log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82	// Send immediate 202 Accepted response
83	wr.WriteHeader(http.StatusAccepted)
84	fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}

μ—¬κΈ°μ—μ„œ THIS PART라고 적어둔 뢀뢄을 보면, μž‘μ—…μ„ 큐에 μ‚½μž…ν•œλ‹€λŠ” 것을 λˆˆμΉ˜μ±„μ…¨μ„ κ²ƒμž…λ‹ˆλ‹€.

이제 μž‘μ—… 큐가 μ–΄λ–»κ²Œ λ™μž‘ν•˜λŠ”μ§€ 보도둝 ν•©μ‹œλ‹€.

 1package file_upload
 2
 3import (
 4	"log"
 5	"sync"
 6)
 7
 8var taskQueue chan UploadTask
 9var once sync.Once
10
11// InitWorkQueue initializes the in-memory task queue.
12func InitWorkQueue() {
13	once.Do(func() {
14		taskQueue = make(chan UploadTask, 100)
15		log.Println("Upload Info: Work queue initialized.")
16	})
17}
18
19// EnqueueTask adds an UploadTask to the queue.
20func EnqueueTask(task UploadTask) {
21	if taskQueue == nil {
22		log.Fatal("ERROR: Task queue not initialized.")
23	}
24	taskQueue <- task
25	log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask retrieves an UploadTask from the queue, blocking if empty.
29func DequeueTask() UploadTask {
30	if taskQueue == nil {
31		log.Fatal("ERROR: Task queue not initialized.")
32	}
33	task := <-taskQueue
34	log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35	return task
36}
37
38// GetQueueLength returns current queue size.
39func GetQueueLength() int {
40	if taskQueue == nil {
41		return 0
42	}
43	return len(taskQueue)
44}

μ˜ˆμ‹œλ‘œ 제곡된 μž‘μ—… νλŠ” λ‹¨μˆœν•˜κ²Œ κ΅¬ν˜„λ˜μ–΄ μžˆμŠ΅λ‹ˆλ‹€. 이 μž‘μ—… νλŠ” 큐에 λ“€μ–΄κ°€ μžˆλŠ” μž‘μ—…λ“€μ„ μ±„λ„μ—μ„œ λΉΌλ‚΄λŠ” λ‹¨μˆœν•œ ꡬ쑰λ₯Ό κ°€μ§€κ³  μžˆμŠ΅λ‹ˆλ‹€.

μ•„λž˜λŠ” 파일 μ—…λ‘œλ“œ 이후 μ»¨ν…Œμ΄λ„ˆμ— ν‘Έμ‹œν•˜κΈ° μœ„ν•œ μ›Œμ»€ λ©”μ„œλ“œμž…λ‹ˆλ‹€. ν˜„μž¬ λ©”μ„œλ“œλŠ” 쒋은 λ°˜μ‘μ„±κ³Ό μ‰¬μš΄ κ΅¬ν˜„μ„ μœ„ν•΄ λ¬΄ν•œ λ£¨ν”„μ΄λ‚˜ μš©λ„μ— 따라 μ•Œκ³ λ¦¬μ¦˜μ„ μΆ”κ°€ν•˜μ—¬λ„ λ¬΄λ°©ν•©λ‹ˆλ‹€.

 1func StartFilePushWorker() {
 2	for {
 3		task := DequeueTask()
 4		log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
 5
 6		// Defer cleanup of the temporary file
 7		defer func(filePath string) {
 8			if err := os.Remove(filePath); err != nil {
 9				log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10			} else {
11				log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12			}
13		}(task.HostTempFilePath)
14
15		// Process task with retries for transient Incus errors
16		for i := 0; i <= MaxRetries; i++ {
17			err := processUploadTask(task) //separate upload task
18			if err == nil {
19				log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20				break
21			}
22
23			isTransient := true
24			if err.Error() == "incus: container not found" { // Example permanent error
25				isTransient = false
26			}
27
28			if isTransient && i < MaxRetries {
29				log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30					task.ContainerName, i+1, MaxRetries, err)
31				time.Sleep(RetryDelay)
32			} else {
33				log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34					task.ContainerName, i+1, err)
35				break
36			}
37		}
38	}
39}

λ¨Όμ € 이 ν•¨μˆ˜λŠ” κ³„μ†ν•΄μ„œ 루프λ₯Ό λŒλ©΄μ„œ νƒœμŠ€ν¬λ₯Ό νλ‘œλΆ€ν„° λ°›μ•„μ˜€κ³ μž μ‹œλ„ν•©λ‹ˆλ‹€. 이후, μž¬μ‹œλ„ λ²”μœ„ λ‚΄μ—μ„œ 슀트림-μž„μ‹œνŒŒμΌμ΄ μ•„λ‹Œ, μž„μ‹œνŒŒμΌ-μ»¨ν…Œμ΄λ„ˆλ‘œμ˜ μ—…λ‘œλ“œ 단계λ₯Ό μ‹œλ„ν•©λ‹ˆλ‹€.

이점

μ΄λŸ¬ν•œ 처리 λ°©μ‹μ˜ 이점은, 슀트림 μ—…λ‘œλ“œλ§Œ 정상이라면이후 μ²˜λ¦¬λ˜λŠ” μž‘μ—…μ— λŒ€ν•œ μ§€μ—° μ‹œκ°„μ„ 쀄일 수 있으며, λ™μ‹œλ‹€λ°œμ μΈ μ»¨ν…Œμ΄λ„ˆ μž‘μ—…μœΌλ‘œ μΈν•œ μžμ› κ³ κ°ˆμ„ μ˜ˆλ°©ν•  수 μžˆμŠ΅λ‹ˆλ‹€. ν˜„μž¬μ˜ μ½”λ“œμ—μ„œ 보이듯, λ™μ‹œμ— μ§„ν–‰ κ°€λŠ₯ν•œ μ»¨ν…Œμ΄λ„ˆ μž‘μ—…μ€ μ±„λ„μ˜ 수둜 μ œν•œλ©λ‹ˆλ‹€. 이와 같이 μ‹€μš©μ μœΌλ‘œ Go의 병렬 처리λ₯Ό μ‚¬μš© κ°€λŠ₯ν•œ 예제λ₯Ό μ•Œμ•„λ³΄μ•˜μŠ΅λ‹ˆλ‹€. 보닀 λ§Žμ€ μ˜ˆμ‹œλ₯Ό 보고 μ‹Άλ‹€λ©΄, μ•„λž˜μ˜ 링크λ₯Ό λ°©λ¬Έν•΄ μ£Όμ„Έμš”.예제λ₯Ό ν¬ν•¨ν•œ λͺ¨λ“ˆ 예제λ₯Ό ν™œμš©ν•œ ν”„λ‘œμ νŠΈ ν”„λ‘œμ νŠΈ μžμ²΄μ—λŠ” λΆ€μˆ˜μ μΈ κ΅¬μ„±μš”μ†Œκ°€ λ§ŽμœΌλ‹ˆ, μ›Œμ»€ μžμ²΄μ— λŒ€ν•œ ν•™μŠ΅μ€ main.goμ—μ„œ μ–΄λ–»κ²Œ μ›Œμ»€ init ν•¨μˆ˜λ₯Ό ν˜ΈμΆœν•˜λŠ”μ§€λ§Œ κ°„λž΅νžˆ λ³΄μ‹œκ³  λ„˜μ–΄κ°€λ©΄ λ©λ‹ˆλ‹€. λͺ¨λ“ˆμ—λŠ” λ‹€λ₯Έ ν˜•νƒœμ˜ μ›Œμ»€λ„ ν¬ν•¨ν•˜κ³  μžˆμœΌλ‹ˆ μ°Έκ³ ν•΄ μ£Όμ„Έμš”.