Goにおけるエンドポイント応答時間の短縮—作業キューの活用
概要
Goを初めて学習する際、バックエンドサーバーを実装しながら学ぶことが多いです。この時、RestAPIなどでファイルストリームを受け取り、サーバーにアップロードする例を作成する場合を考えてみましょう。 Go言語のnet/httpサーバーは、基本的に複数のリクエストを同時に処理するため、同時多発的なアップロード自体には問題がありません。しかし、ストリーム受信後の動作をすべて同期的に処理すると、エンドポイントの応答が遅延します。このような状況を防ぐための手法を見ていきましょう。
原因
ストリームの受信には一般的に長い時間がかかり、大きなファイルの場合、単一のリクエストの処理に数分を要することがあります。このような場合、受信後の動作を少しでも迅速に処理することが重要です。この例のシナリオは、ストリームを受信後、一時ファイルとして保存し、コンテナにプッシュするシナリオです。この時、一時ファイルをコンテナにプッシュする部分をWorker Poolで処理すれば、応答遅延を短縮できます。
1package file_upload
2
3import (
4 "fmt"
5 "io"
6 "log"
7 "net/http"
8 "os"
9 "path/filepath"
10 "time"
11)
12
13const uploadTempDir = "/tmp/incus_uploads" // ホストの一時ディレクトリ
14
15// UploadTaskは非同期ファイルプッシュのためのデータを保持します。
16type UploadTask struct {
17 HostTempFilePath string
18 ContainerName string
19 HostFilename string
20 ContainerDestinationPath string
21}
22
23// UploadHandlerはファイルアップロードを処理します。一時ファイルに保存し、その後 Incus プッシュのためにキューに入れます。
24func UploadHandler(wr http.ResponseWriter, req *http.Request) {
25 if req.Method != http.MethodPost {
26 http.Error(wr, "POST method required.", http.StatusMethodNotAllowed)
27 return
28 }
29 originalFilePath := req.Header.Get("X-File-Path")
30 originalFilename := filepath.Base(req.Header.Get("X-Host-Path"))
31 containerName := req.Header.Get("X-Container-Name")
32 if originalFilePath == "" || containerName == "" {
33 http.Error(wr, "Missing X-File-Path or X-Container-Name header.", http.StatusBadRequest)
34 return
35 }
36
37 cleanContainerDestPath := filepath.Clean(originalFilePath)
38 if !filepath.IsAbs(cleanContainerDestPath) {
39 http.Error(wr, "File path must be absolute.", http.StatusBadRequest)
40 return
41 }
42
43 // ホスト上で一意な一時ファイルパスを作成
44 tempFileName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(originalFilePath))
45 hostTempFilePath := filepath.Join(uploadTempDir, tempFileName)
46
47 if err := os.MkdirAll(uploadTempDir, 0755); err != nil {
48 log.Printf("ERROR: Failed to create temp upload directory: %v", err)
49 http.Error(wr, "Server error.", http.StatusInternalServerError)
50 return
51 }
52
53 // リクエストボディを一時ファイルに作成してコピー(同期)
54 outFile, err := os.Create(hostTempFilePath)
55 if err != nil {
56 log.Printf("ERROR: Failed to create temporary file: %v", err)
57 http.Error(wr, "Server error.", http.StatusInternalServerError)
58 return
59 }
60 defer outFile.Close()
61
62 bytesWritten, err := io.Copy(outFile, req.Body)
63 if err != nil {
64 outFile.Close()
65 os.Remove(hostTempFilePath)
66 log.Printf("ERROR: Failed to copy request body to temp file: %v", err)
67 http.Error(wr, "File transfer failed.", http.StatusInternalServerError)
68 return
69 }
70 log.Printf("Upload Info: Received %d bytes, saved to %s.", bytesWritten, hostTempFilePath)
71
72 // 非同期 Incus プッシュのためにタスクをキューに入れる
73 task := UploadTask{
74 HostTempFilePath: hostTempFilePath,
75 ContainerName: containerName,
76 HostFilename : originalFilename,
77 ContainerDestinationPath: cleanContainerDestPath,
78 }
79 EnqueueTask(task) //この部分
80 log.Printf("Upload Info: Task enqueued for %s to %s.", originalFilePath, containerName)
81
82 // 即座に 202 Accepted 応答を送信
83 wr.WriteHeader(http.StatusAccepted)
84 fmt.Fprintf(wr, "File '%s' queued for processing on container '%s'.\n", originalFilename, containerName)
85}
ここでTHIS PARTと記されている部分を見ると、タスクがキューに挿入されることに気づかれたことでしょう。
次に、タスクキューがどのように動作するかを見ていきましょう。
1package file_upload
2
3import (
4 "log"
5 "sync"
6)
7
8var taskQueue chan UploadTask
9var once sync.Once
10
11// InitWorkQueueはインメモリのタスクキューを初期化します。
12func InitWorkQueue() {
13 once.Do(func() {
14 taskQueue = make(chan UploadTask, 100)
15 log.Println("Upload Info: Work queue initialized.")
16 })
17}
18
19// EnqueueTaskはUploadTaskをキューに追加します。
20func EnqueueTask(task UploadTask) {
21 if taskQueue == nil {
22 log.Fatal("ERROR: Task queue not initialized.")
23 }
24 taskQueue <- task
25 log.Printf("Upload Info: Queue: Task enqueued. Size: %d", len(taskQueue))
26}
27
28// DequeueTaskはUploadTaskをキューから取得します。キューが空の場合はブロックします。
29func DequeueTask() UploadTask {
30 if taskQueue == nil {
31 log.Fatal("ERROR: Task queue not initialized.")
32 }
33 task := <-taskQueue
34 log.Printf("Upload Info: Queue: Task dequeued. Size: %d", len(taskQueue))
35 return task
36}
37
38// GetQueueLengthは現在のキューのサイズを返します。
39func GetQueueLength() int {
40 if taskQueue == nil {
41 return 0
42 }
43 return len(taskQueue)
44}
提供されたタスクキューの例は、単純に実装されています。このタスクキューは、キューに入れられたタスクをチャネルから取り出すという単純な構造を持っています。
以下は、ファイルアップロード後にコンテナにプッシュするためのWorkerメソッドです。現在のメソッドは、良好な応答性と簡単な実装のために無限ループですが、用途に応じてアルゴリズムを追加しても問題ありません。
1func StartFilePushWorker() {
2 for {
3 task := DequeueTask()
4 log.Printf("Upload Info: Worker processing task for %s from %s.", task.ContainerName, task.HostFilename)
5
6 // 一時ファイルのクリーンアップを遅延実行
7 defer func(filePath string) {
8 if err := os.Remove(filePath); err != nil {
9 log.Printf("ERROR: Worker: Failed to remove temp file '%s': %v", filePath, err)
10 } else {
11 log.Printf("Upload Info: Worker: Cleaned up temp file: %s", filePath)
12 }
13 }(task.HostTempFilePath)
14
15 // 一時的な Incus エラーに対する再試行を伴うタスク処理
16 for i := 0; i <= MaxRetries; i++ {
17 err := processUploadTask(task) //アップロードタスクを分離
18 if err == nil {
19 log.Printf("SUCCESS: Worker: Task completed for %s.", task.ContainerName)
20 break
21 }
22
23 isTransient := true
24 if err.Error() == "incus: container not found" { // 永続的なエラーの例
25 isTransient = false
26 }
27
28 if isTransient && i < MaxRetries {
29 log.Printf("WARNING: Worker: Task failed for %s (attempt %d/%d): %v. Retrying.",
30 task.ContainerName, i+1, MaxRetries, err)
31 time.Sleep(RetryDelay)
32 } else {
33 log.Printf("ERROR: Worker: Task permanently failed for %s after %d attempts: %v.",
34 task.ContainerName, i+1, err)
35 break
36 }
37 }
38 }
39}
まず、この関数はループを継続して実行し、キューからタスクを受け取ろうと試みます。その後、再試行の範囲内で、ストリームから一時ファイルではなく、一時ファイルからコンテナへのアップロード段階を試行します。
利点
この処理方式の利点は、ストリームアップロードが正常であれば、その後の処理にかかる遅延時間を短縮でき、同時多発的なコンテナ作業によるリソース枯渇を防ぐことができる点です。現在のコードに見られるように、同時に実行可能なコンテナ作業はチャネルの数に制限されます。このように、Goの並行処理を実用的に活用できる例を考察しました。より多くの例をご覧になりたい場合は、以下のリンクをご覧ください。例を含むモジュール 例を活用したプロジェクト プロジェクト自体には多くの付随的な構成要素があるため、Worker自体の学習については、main.goでWorkerのinit関数がどのように呼び出されているかだけを簡潔に見て進めばよいでしょう。モジュールには他の形式のWorkerも含まれているので、参考にしてください。