diff options
| author | nsfisis <nsfisis@gmail.com> | 2025-03-04 22:55:01 +0900 |
|---|---|---|
| committer | nsfisis <nsfisis@gmail.com> | 2025-03-08 10:12:44 +0900 |
| commit | 1e6df136d8202c8adf65948527f4c3e7583b338c (patch) | |
| tree | 7c82476f6bbbc71d72ab7e71e39559eca197fd95 /backend/taskqueue | |
| parent | 54316868c3bec1ff9b04643dfe6c13cf56bf3246 (diff) | |
| download | phperkaigi-2025-albatross-1e6df136d8202c8adf65948527f4c3e7583b338c.tar.gz phperkaigi-2025-albatross-1e6df136d8202c8adf65948527f4c3e7583b338c.tar.zst phperkaigi-2025-albatross-1e6df136d8202c8adf65948527f4c3e7583b338c.zip | |
websocket to polling
Diffstat (limited to 'backend/taskqueue')
| -rw-r--r-- | backend/taskqueue/processor.go | 165 | ||||
| -rw-r--r-- | backend/taskqueue/processor_wrapper.go | 66 | ||||
| -rw-r--r-- | backend/taskqueue/queue.go | 65 | ||||
| -rw-r--r-- | backend/taskqueue/tasks.go | 162 | ||||
| -rw-r--r-- | backend/taskqueue/worker_server.go | 9 |
5 files changed, 28 insertions, 439 deletions
diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go index 222c586..0dfaf68 100644 --- a/backend/taskqueue/processor.go +++ b/backend/taskqueue/processor.go @@ -6,180 +6,45 @@ import ( "encoding/json" "fmt" "net/http" - - "github.com/nsfisis/phperkaigi-2025-albatross/backend/auth" - "github.com/nsfisis/phperkaigi-2025-albatross/backend/db" ) -type processor struct { - q *db.Queries -} +type processor struct{} -func newProcessor(q *db.Queries) processor { - return processor{ - q: q, - } +func newProcessor() processor { + return processor{} } -func (p *processor) doProcessTaskCreateSubmissionRecord( - ctx context.Context, - payload *TaskPayloadCreateSubmissionRecord, -) (*TaskResultCreateSubmissionRecord, error) { - // TODO: upsert - submissionID, err := p.q.CreateSubmission(ctx, db.CreateSubmissionParams{ - GameID: int32(payload.GameID()), - UserID: int32(payload.UserID()), - Code: payload.Code, - CodeSize: int32(payload.CodeSize), - CodeHash: string(payload.CodeHash()), - }) - if err != nil { - return nil, err - } - - return &TaskResultCreateSubmissionRecord{ - TaskPayload: payload, - SubmissionID: int(submissionID), - }, nil +type testrunRequestData struct { + Code string `json:"code"` + Stdin string `json:"stdin"` + MaxDuration int `json:"max_duration_ms"` } -func (p *processor) doProcessTaskCompileSwiftToWasm( - _ context.Context, - payload *TaskPayloadCompileSwiftToWasm, -) (*TaskResultCompileSwiftToWasm, error) { - type swiftcRequestData struct { - MaxDuration int `json:"max_duration_ms"` - Code string `json:"code"` - CodeHash string `json:"code_hash"` - } - type swiftcResponseData struct { - Status string `json:"status"` - Stdout string `json:"stdout"` - Stderr string `json:"stderr"` - } - reqData := swiftcRequestData{ - MaxDuration: 10000, - Code: payload.Code, - CodeHash: string(payload.CodeHash()), - } - reqJSON, err := json.Marshal(reqData) - if err != nil { - return nil, fmt.Errorf("json.Marshal failed: %v", err) - } - req, err := http.NewRequest("POST", "http://worker:80/api/swiftc", bytes.NewBuffer(reqJSON)) - if err != nil { - return nil, fmt.Errorf("http.NewRequest failed: %v", err) - } - req.Header.Set("Content-Type", "application/json") - jwt, err := auth.NewAnonymousJWT() - if err != nil { - return nil, fmt.Errorf("auth.NewAnonymousJWT failed: %v", err) - } - req.Header.Set("Authorization", "Bearer "+jwt) - - client := &http.Client{} - res, err := client.Do(req) - if err != nil { - return nil, fmt.Errorf("client.Do failed: %v", err) - } - defer res.Body.Close() - - resData := swiftcResponseData{} - if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { - return nil, fmt.Errorf("json.Decode failed: %v", err) - } - return &TaskResultCompileSwiftToWasm{ - TaskPayload: payload, - Status: resData.Status, - Stdout: resData.Stdout, - Stderr: resData.Stderr, - }, nil -} - -func (p *processor) doProcessTaskCompileWasmToNativeExecutable( - _ context.Context, - payload *TaskPayloadCompileWasmToNativeExecutable, -) (*TaskResultCompileWasmToNativeExecutable, error) { - type wasmcRequestData struct { - MaxDuration int `json:"max_duration_ms"` - CodeHash string `json:"code_hash"` - } - type wasmcResponseData struct { - Status string `json:"status"` - Stdout string `json:"stdout"` - Stderr string `json:"stderr"` - } - reqData := wasmcRequestData{ - MaxDuration: 10000, - CodeHash: string(payload.CodeHash()), - } - reqJSON, err := json.Marshal(reqData) - if err != nil { - return nil, fmt.Errorf("json.Marshal failed: %v", err) - } - req, err := http.NewRequest("POST", "http://worker:80/api/wasmc", bytes.NewBuffer(reqJSON)) - if err != nil { - return nil, fmt.Errorf("http.NewRequest failed: %v", err) - } - req.Header.Set("Content-Type", "application/json") - jwt, err := auth.NewAnonymousJWT() - if err != nil { - return nil, fmt.Errorf("auth.NewAnonymousJWT failed: %v", err) - } - req.Header.Set("Authorization", "Bearer "+jwt) - - client := &http.Client{} - res, err := client.Do(req) - if err != nil { - return nil, fmt.Errorf("client.Do failed: %v", err) - } - defer res.Body.Close() - - resData := wasmcResponseData{} - if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { - return nil, fmt.Errorf("json.Decode failed: %v", err) - } - return &TaskResultCompileWasmToNativeExecutable{ - TaskPayload: payload, - Status: resData.Status, - Stdout: resData.Stdout, - Stderr: resData.Stderr, - }, nil +type testrunResponseData struct { + Status string `json:"status"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` } func (p *processor) doProcessTaskRunTestcase( _ context.Context, payload *TaskPayloadRunTestcase, ) (*TaskResultRunTestcase, error) { - type testrunRequestData struct { - MaxDuration int `json:"max_duration_ms"` - CodeHash string `json:"code_hash"` - Stdin string `json:"stdin"` - } - type testrunResponseData struct { - Status string `json:"status"` - Stdout string `json:"stdout"` - Stderr string `json:"stderr"` - } reqData := testrunRequestData{ - MaxDuration: 5000, - CodeHash: string(payload.CodeHash()), + Code: payload.Code, Stdin: payload.Stdin, + MaxDuration: 5000, } reqJSON, err := json.Marshal(reqData) if err != nil { return nil, fmt.Errorf("json.Marshal failed: %v", err) } - req, err := http.NewRequest("POST", "http://worker:80/api/testrun", bytes.NewBuffer(reqJSON)) + req, err := http.NewRequest("POST", "http://worker:80/exec", bytes.NewBuffer(reqJSON)) if err != nil { return nil, fmt.Errorf("http.NewRequest failed: %v", err) } req.Header.Set("Content-Type", "application/json") - jwt, err := auth.NewAnonymousJWT() - if err != nil { - return nil, fmt.Errorf("auth.NewAnonymousJWT failed: %v", err) - } - req.Header.Set("Authorization", "Bearer "+jwt) + req.Header.Set("Accept", "application/json") client := &http.Client{} res, err := client.Do(req) diff --git a/backend/taskqueue/processor_wrapper.go b/backend/taskqueue/processor_wrapper.go index b1fbd16..e6ddef3 100644 --- a/backend/taskqueue/processor_wrapper.go +++ b/backend/taskqueue/processor_wrapper.go @@ -23,72 +23,6 @@ func newProcessorWrapper(impl processor) *processorWrapper { } } -func (p *processorWrapper) processTaskCompileSwiftToWasm(ctx context.Context, t *asynq.Task) error { - var payload TaskPayloadCompileSwiftToWasm - if err := json.Unmarshal(t.Payload(), &payload); err != nil { - err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) - p.results <- &TaskResultCompileSwiftToWasm{Err: err} - return err - } - - result, err := p.impl.doProcessTaskCompileSwiftToWasm(ctx, &payload) - if err != nil { - retryCount, _ := asynq.GetRetryCount(ctx) - maxRetry, _ := asynq.GetMaxRetry(ctx) - isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry - if !isRecoverable { - p.results <- &TaskResultCompileSwiftToWasm{Err: err} - } - return err - } - p.results <- result - return nil -} - -func (p *processorWrapper) processTaskCompileWasmToNativeExecutable(ctx context.Context, t *asynq.Task) error { - var payload TaskPayloadCompileWasmToNativeExecutable - if err := json.Unmarshal(t.Payload(), &payload); err != nil { - err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) - p.results <- &TaskResultCompileWasmToNativeExecutable{Err: err} - return err - } - - result, err := p.impl.doProcessTaskCompileWasmToNativeExecutable(ctx, &payload) - if err != nil { - retryCount, _ := asynq.GetRetryCount(ctx) - maxRetry, _ := asynq.GetMaxRetry(ctx) - isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry - if !isRecoverable { - p.results <- &TaskResultCompileWasmToNativeExecutable{Err: err} - } - return err - } - p.results <- result - return nil -} - -func (p *processorWrapper) processTaskCreateSubmissionRecord(ctx context.Context, t *asynq.Task) error { - var payload TaskPayloadCreateSubmissionRecord - if err := json.Unmarshal(t.Payload(), &payload); err != nil { - err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) - p.results <- &TaskResultCreateSubmissionRecord{Err: err} - return err - } - - result, err := p.impl.doProcessTaskCreateSubmissionRecord(ctx, &payload) - if err != nil { - retryCount, _ := asynq.GetRetryCount(ctx) - maxRetry, _ := asynq.GetMaxRetry(ctx) - isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry - if !isRecoverable { - p.results <- &TaskResultCreateSubmissionRecord{Err: err} - } - return err - } - p.results <- result - return nil -} - func (p *processorWrapper) processTaskRunTestcase(ctx context.Context, t *asynq.Task) error { var payload TaskPayloadRunTestcase if err := json.Unmarshal(t.Payload(), &payload); err != nil { diff --git a/backend/taskqueue/queue.go b/backend/taskqueue/queue.go index 30fe265..b348fca 100644 --- a/backend/taskqueue/queue.go +++ b/backend/taskqueue/queue.go @@ -20,82 +20,21 @@ func (q *Queue) Close() { q.client.Close() } -func (q *Queue) EnqueueTaskCreateSubmissionRecord( - gameID int, - userID int, - code string, - codeSize int, - codeHash MD5HexHash, -) error { - task, err := newTaskCreateSubmissionRecord( - gameID, - userID, - code, - codeSize, - codeHash, - ) - if err != nil { - return err - } - _, err = q.client.Enqueue(task) - return err -} - -func (q *Queue) EnqueueTaskCompileSwiftToWasm( - gameID int, - userID int, - code string, - codeHash MD5HexHash, - submissionID int, -) error { - task, err := newTaskCompileSwiftToWasm( - gameID, - userID, - code, - codeHash, - submissionID, - ) - if err != nil { - return err - } - _, err = q.client.Enqueue(task) - return err -} - -func (q *Queue) EnqueueTaskCompileWasmToNativeExecutable( - gameID int, - userID int, - codeHash MD5HexHash, - submissionID int, -) error { - task, err := newTaskCompileWasmToNativeExecutable( - gameID, - userID, - codeHash, - submissionID, - ) - if err != nil { - return err - } - _, err = q.client.Enqueue(task) - return err -} - func (q *Queue) EnqueueTaskRunTestcase( gameID int, userID int, - codeHash MD5HexHash, submissionID int, testcaseID int, + code string, stdin string, stdout string, ) error { task, err := newTaskRunTestcase( gameID, userID, - codeHash, submissionID, testcaseID, + code, stdin, stdout, ) diff --git a/backend/taskqueue/tasks.go b/backend/taskqueue/tasks.go index d5f2993..e595d99 100644 --- a/backend/taskqueue/tasks.go +++ b/backend/taskqueue/tasks.go @@ -8,121 +8,16 @@ import ( type TaskType string -// MD5 hash in hexadecimal format -type MD5HexHash string - const ( - TaskTypeCreateSubmissionRecord TaskType = "create_submission_record" - TaskTypeCompileSwiftToWasm TaskType = "compile_swift_to_wasm" - TaskTypeCompileWasmToNativeExecutable TaskType = "compile_wasm_to_native_executable" - TaskTypeRunTestcase TaskType = "run_testcase" + TaskTypeRunTestcase TaskType = "run_testcase" ) -type TaskPayloadBase struct { - GameID int - UserID int - CodeHash MD5HexHash -} - -type TaskPayloadCreateSubmissionRecord struct { - TaskPayloadBase - Code string - CodeSize int -} - -func newTaskCreateSubmissionRecord( - gameID int, - userID int, - code string, - codeSize int, - codeHash MD5HexHash, -) (*asynq.Task, error) { - payload, err := json.Marshal(TaskPayloadCreateSubmissionRecord{ - TaskPayloadBase: TaskPayloadBase{ - GameID: gameID, - UserID: userID, - CodeHash: codeHash, - }, - Code: code, - CodeSize: codeSize, - }) - if err != nil { - return nil, err - } - return asynq.NewTask(string(TaskTypeCreateSubmissionRecord), payload), nil -} - -func (t *TaskPayloadCreateSubmissionRecord) GameID() int { return t.TaskPayloadBase.GameID } -func (t *TaskPayloadCreateSubmissionRecord) UserID() int { return t.TaskPayloadBase.UserID } -func (t *TaskPayloadCreateSubmissionRecord) CodeHash() MD5HexHash { return t.TaskPayloadBase.CodeHash } - -type TaskPayloadCompileSwiftToWasm struct { - TaskPayloadBase - SubmissionID int - Code string -} - -func newTaskCompileSwiftToWasm( - gameID int, - userID int, - code string, - codeHash MD5HexHash, - submissionID int, -) (*asynq.Task, error) { - payload, err := json.Marshal(TaskPayloadCompileSwiftToWasm{ - TaskPayloadBase: TaskPayloadBase{ - GameID: gameID, - UserID: userID, - CodeHash: codeHash, - }, - SubmissionID: submissionID, - Code: code, - }) - if err != nil { - return nil, err - } - return asynq.NewTask(string(TaskTypeCompileSwiftToWasm), payload), nil -} - -func (t *TaskPayloadCompileSwiftToWasm) GameID() int { return t.TaskPayloadBase.GameID } -func (t *TaskPayloadCompileSwiftToWasm) UserID() int { return t.TaskPayloadBase.UserID } -func (t *TaskPayloadCompileSwiftToWasm) CodeHash() MD5HexHash { return t.TaskPayloadBase.CodeHash } - -type TaskPayloadCompileWasmToNativeExecutable struct { - TaskPayloadBase - SubmissionID int -} - -func newTaskCompileWasmToNativeExecutable( - gameID int, - userID int, - codeHash MD5HexHash, - submissionID int, -) (*asynq.Task, error) { - payload, err := json.Marshal(TaskPayloadCompileWasmToNativeExecutable{ - TaskPayloadBase: TaskPayloadBase{ - GameID: gameID, - UserID: userID, - CodeHash: codeHash, - }, - SubmissionID: submissionID, - }) - if err != nil { - return nil, err - } - return asynq.NewTask(string(TaskTypeCompileWasmToNativeExecutable), payload), nil -} - -func (t *TaskPayloadCompileWasmToNativeExecutable) GameID() int { return t.TaskPayloadBase.GameID } -func (t *TaskPayloadCompileWasmToNativeExecutable) UserID() int { return t.TaskPayloadBase.UserID } -func (t *TaskPayloadCompileWasmToNativeExecutable) CodeHash() MD5HexHash { - return t.TaskPayloadBase.CodeHash -} - type TaskPayloadRunTestcase struct { - TaskPayloadBase + GameID int + UserID int SubmissionID int TestcaseID int + Code string Stdin string Stdout string } @@ -130,20 +25,18 @@ type TaskPayloadRunTestcase struct { func newTaskRunTestcase( gameID int, userID int, - codeHash MD5HexHash, submissionID int, testcaseID int, + code string, stdin string, stdout string, ) (*asynq.Task, error) { payload, err := json.Marshal(TaskPayloadRunTestcase{ - TaskPayloadBase: TaskPayloadBase{ - GameID: gameID, - UserID: userID, - CodeHash: codeHash, - }, + GameID: gameID, + UserID: userID, SubmissionID: submissionID, TestcaseID: testcaseID, + Code: code, Stdin: stdin, Stdout: stdout, }) @@ -153,48 +46,11 @@ func newTaskRunTestcase( return asynq.NewTask(string(TaskTypeRunTestcase), payload), nil } -func (t *TaskPayloadRunTestcase) GameID() int { return t.TaskPayloadBase.GameID } -func (t *TaskPayloadRunTestcase) UserID() int { return t.TaskPayloadBase.UserID } -func (t *TaskPayloadRunTestcase) CodeHash() MD5HexHash { return t.TaskPayloadBase.CodeHash } - type TaskResult interface { Type() TaskType GameID() int } -type TaskResultCreateSubmissionRecord struct { - TaskPayload *TaskPayloadCreateSubmissionRecord - SubmissionID int - Err error -} - -func (r *TaskResultCreateSubmissionRecord) Type() TaskType { return TaskTypeCreateSubmissionRecord } -func (r *TaskResultCreateSubmissionRecord) GameID() int { return r.TaskPayload.GameID() } - -type TaskResultCompileSwiftToWasm struct { - TaskPayload *TaskPayloadCompileSwiftToWasm - Status string - Stdout string - Stderr string - Err error -} - -func (r *TaskResultCompileSwiftToWasm) Type() TaskType { return TaskTypeCompileSwiftToWasm } -func (r *TaskResultCompileSwiftToWasm) GameID() int { return r.TaskPayload.GameID() } - -type TaskResultCompileWasmToNativeExecutable struct { - TaskPayload *TaskPayloadCompileWasmToNativeExecutable - Status string - Stdout string - Stderr string - Err error -} - -func (r *TaskResultCompileWasmToNativeExecutable) Type() TaskType { - return TaskTypeCompileWasmToNativeExecutable -} -func (r *TaskResultCompileWasmToNativeExecutable) GameID() int { return r.TaskPayload.GameID() } - type TaskResultRunTestcase struct { TaskPayload *TaskPayloadRunTestcase Status string @@ -204,4 +60,4 @@ type TaskResultRunTestcase struct { } func (r *TaskResultRunTestcase) Type() TaskType { return TaskTypeRunTestcase } -func (r *TaskResultRunTestcase) GameID() int { return r.TaskPayload.GameID() } +func (r *TaskResultRunTestcase) GameID() int { return r.TaskPayload.GameID } diff --git a/backend/taskqueue/worker_server.go b/backend/taskqueue/worker_server.go index 51387d1..7effba7 100644 --- a/backend/taskqueue/worker_server.go +++ b/backend/taskqueue/worker_server.go @@ -2,8 +2,6 @@ package taskqueue import ( "github.com/hibiken/asynq" - - "github.com/nsfisis/phperkaigi-2025-albatross/backend/db" ) type WorkerServer struct { @@ -11,14 +9,14 @@ type WorkerServer struct { processor *processorWrapper } -func NewWorkerServer(redisAddr string, queries *db.Queries) *WorkerServer { +func NewWorkerServer(redisAddr string) *WorkerServer { server := asynq.NewServer( asynq.RedisClientOpt{ Addr: redisAddr, }, asynq.Config{}, ) - processor := newProcessorWrapper(newProcessor(queries)) + processor := newProcessorWrapper(newProcessor()) return &WorkerServer{ server: server, processor: processor, @@ -28,9 +26,6 @@ func NewWorkerServer(redisAddr string, queries *db.Queries) *WorkerServer { func (s *WorkerServer) Run() error { mux := asynq.NewServeMux() - mux.HandleFunc(string(TaskTypeCreateSubmissionRecord), s.processor.processTaskCreateSubmissionRecord) - mux.HandleFunc(string(TaskTypeCompileSwiftToWasm), s.processor.processTaskCompileSwiftToWasm) - mux.HandleFunc(string(TaskTypeCompileWasmToNativeExecutable), s.processor.processTaskCompileWasmToNativeExecutable) mux.HandleFunc(string(TaskTypeRunTestcase), s.processor.processTaskRunTestcase) return s.server.Run(mux) |
