GoSuda

Go에서 엔드포인트 응답 시간 단축-작업 큐 활용하기

By Yunjin Lee
views ...

개요

Go를 처음 배울 때, 백엔드 서버를 구현하며 배우는 경우가 자주 있습니다. 이 때 RestAPI 등에서 파일 스트림을 받아서 서버에 업로드하는 예제를 만드는 경우를 예를 들어 봅시다. Go언어 net/http 서버는 기본적으로 여러 요청을 동시에 처리하니 동시 다발적 업로드 자체에는 문제가 없습니다. 하지만 스트림 수신 이후 동작들을 모두 동기적으로 처리한다면 엔드포인트의 응답이 지연됩니다. 이러한 상황을 방지하기 위한 기법을 알아 봅시다.

원인

스트림을 수신하는 데에도 대체로 긴 시간이 소요되며, 큰 파일의 경우 단일 요청이 수 분동안 처리될 수 있습니다. 이러한 경우 조금이라도 수신 이후의 동작을 신속하게 처리하는 것이 중요합니다. 이 예제 시나리오는 스트림을 수신 후 임시파일로 저장, 컨테이너에 푸시하는 시나리오입니다. 이 때에 컨테이너에 임시파일을 푸시하는 부분을 워커 풀로 처리한다면, 응답 지연을 단축할 수 있습니다.

 1package file_upload
 2
 3import (
 4	"fmt"
 5	"io"
 6	"log"
 7	"net/http"
 8	"os"
 9	"path/filepath"
10	"time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // Host temporary directory
14
15// UploadTask holds data for asynchronous file push.
16type UploadTask struct {
17	HostTempFilePath         string
18	ContainerName            string
19    HostFilename             string
20	ContainerDestinationPath string
21}
22
23// UploadHandler processes file uploads. Saves to temp file, then queues for Incus push.
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25	if req.Method != http.MethodPost {
26		http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27		return
28	}
29	originalFilePath := req.Header.Get("X-File-Path")
30    originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31	containerName := req.Header.Get("X-Container-Name")
32	if originalFilePath == "" || containerName == "" {
33		http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34		return
35	}
36
37	cleanContainerDestPath := filepath.Clean(originalFilePath)
38	if !filepath.IsAbs(cleanContainerDestPath) {
39		http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40		return
41	}
42
43	// Create unique temporary file path on host
44	tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45	hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47	if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48		log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49		http.Error(wr, "Server error.", http.StatusInternalServerError)
50		return
51	}
52
53	// Create and copy request body to temporary file (synchronous)
54	outFile, err := os.Create(hostTempFilePath)
55	if err != nil {
56		log.Printf("ERROR: Failed to create temporary file: %v", err)
57		http.Error(wr, "Server error.", http.StatusInternalServerError)
58		return
59	}
60	defer outFile.Close()
61
62	bytesWritten, err := io.Copy(outFile, req.Body)
63	if err != nil {
64		outFile.Close()
65		os.Remove(hostTempFilePath)
66		log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67		http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68		return
69	}
70	log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72	// Enqueue task for asynchronous Incus push
73	task := UploadTask{
74		HostTempFilePath:         hostTempFilePath,
75		ContainerName:            containerName,
76        HostFilename :            originalFilename,
77		ContainerDestinationPath: cleanContainerDestPath,
78	}
79	EnqueueTask(task) //THIS PART
80	log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82	// Send immediate 202 Accepted response
83	wr.WriteHeader(http.StatusAccepted)
84	fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}

여기에서 THIS PART라고 적어둔 부분을 보면, 작업을 큐에 삽입한다는 것을 눈치채셨을 것입니다.

이제 작업 큐가 어떻게 동작하는지 보도록 합시다.

 1package file_upload
 2
 3import (
 4	"log"
 5	"sync"
 6)
 7
 8var taskQueue chan UploadTask
 9var once sync.Once
10
11// InitWorkQueue initializes the in-memory task queue.
12func InitWorkQueue() {
13	once.Do(func() {
14		taskQueue = make(chan UploadTask, 100)
15		log.Println("Upload Info: Work queue initialized.")
16	})
17}
18
19// EnqueueTask adds an UploadTask to the queue.
20func EnqueueTask(task UploadTask) {
21	if taskQueue == nil {
22		log.Fatal("ERROR: Task queue not initialized.")
23	}
24	taskQueue <- task
25	log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTask retrieves an UploadTask from the queue, blocking if empty.
29func DequeueTask() UploadTask {
30	if taskQueue == nil {
31		log.Fatal("ERROR: Task queue not initialized.")
32	}
33	task := <-taskQueue
34	log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35	return task
36}
37
38// GetQueueLength returns current queue size.
39func GetQueueLength() int {
40	if taskQueue == nil {
41		return 0
42	}
43	return len(taskQueue)
44}

예시로 제공된 작업 큐는 단순하게 구현되어 있습니다. 이 작업 큐는 큐에 들어가 있는 작업들을 채널에서 빼내는 단순한 구조를 가지고 있습니다.

아래는 파일 업로드 이후 컨테이너에 푸시하기 위한 워커 메서드입니다. 현재 메서드는 좋은 반응성과 쉬운 구현을 위해 무한 루프이나 용도에 따라 알고리즘을 추가하여도 무방합니다.

 1func StartFilePushWorker() {
 2	for {
 3		task := DequeueTask()
 4		log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
 5
 6		// Defer cleanup of the temporary file
 7		defer func(filePath string) {
 8			if err := os.Remove(filePath); err != nil {
 9				log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10			} else {
11				log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12			}
13		}(task.HostTempFilePath)
14
15		// Process task with retries for transient Incus errors
16		for i := 0; i <= MaxRetries; i++ {
17			err := processUploadTask(task) //separate upload task
18			if err == nil {
19				log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20				break
21			}
22
23			isTransient := true
24			if err.Error() == "incus: container not found" { // Example permanent error
25				isTransient = false
26			}
27
28			if isTransient && i < MaxRetries {
29				log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30					task.ContainerName, i+1, MaxRetries, err)
31				time.Sleep(RetryDelay)
32			} else {
33				log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34					task.ContainerName, i+1, err)
35				break
36			}
37		}
38	}
39}

먼저 이 함수는 계속해서 루프를 돌면서 태스크를 큐로부터 받아오고자 시도합니다. 이후, 재시도 범위 내에서 스트림-임시파일이 아닌, 임시파일-컨테이너로의 업로드 단계를 시도합니다.

이점

이러한 처리 방식의 이점은, 스트림 업로드만 정상이라면이후 처리되는 작업에 대한 지연 시간을 줄일 수 있으며, 동시다발적인 컨테이너 작업으로 인한 자원 고갈을 예방할 수 있습니다. 현재의 코드에서 보이듯, 동시에 진행 가능한 컨테이너 작업은 채널의 수로 제한됩니다. 이와 같이 실용적으로 Go의 병렬 처리를 사용 가능한 예제를 알아보았습니다. 보다 많은 예시를 보고 싶다면, 아래의 링크를 방문해 주세요.예제를 포함한 모듈 예제를 활용한 프로젝트 프로젝트 자체에는 부수적인 구성요소가 많으니, 워커 자체에 대한 학습은 main.go에서 어떻게 워커 init 함수를 호출하는지만 간략히 보시고 넘어가면 됩니다. 모듈에는 다른 형태의 워커도 포함하고 있으니 참고해 주세요.