diff options
| author | nsfisis <nsfisis@gmail.com> | 2024-08-07 23:49:10 +0900 |
|---|---|---|
| committer | nsfisis <nsfisis@gmail.com> | 2024-08-08 04:02:37 +0900 |
| commit | cfe46bf104dec03c81ca38eb6b6a23b372a271b6 (patch) | |
| tree | 083ecee8ff2576772efc15501594ab0ff1e122be /backend/taskqueue/processor.go | |
| parent | 47d81ffbd3e4fe178d2935325e312cef77276250 (diff) | |
| download | iosdc-japan-2024-albatross-cfe46bf104dec03c81ca38eb6b6a23b372a271b6.tar.gz iosdc-japan-2024-albatross-cfe46bf104dec03c81ca38eb6b6a23b372a271b6.tar.zst iosdc-japan-2024-albatross-cfe46bf104dec03c81ca38eb6b6a23b372a271b6.zip | |
feat(backend): split task into smaller task types
Diffstat (limited to 'backend/taskqueue/processor.go')
| -rw-r--r-- | backend/taskqueue/processor.go | 253 |
1 files changed, 151 insertions, 102 deletions
diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go index b080c46..556bd78 100644 --- a/backend/taskqueue/processor.go +++ b/backend/taskqueue/processor.go @@ -13,125 +13,169 @@ import ( "github.com/nsfisis/iosdc-japan-2024-albatross/backend/db" ) -type ExecProcessor struct { +type processor struct { q *db.Queries - results chan TaskExecResult + results chan TaskResult } -func NewExecProcessor(q *db.Queries, results chan TaskExecResult) *ExecProcessor { - return &ExecProcessor{ +func newProcessor(q *db.Queries) *processor { + return &processor{ q: q, - results: results, + results: make(chan TaskResult), } } -func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { - var payload TaskExecPlayload +func (p *processor) processTaskCreateSubmissionRecord(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadCreateSubmissionRecord if err := json.Unmarshal(t.Payload(), &payload); err != nil { + // TODO: send result return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } // TODO: upsert // Create submission record. submissionID, err := p.q.CreateSubmission(ctx, db.CreateSubmissionParams{ - GameID: int32(payload.GameID), - UserID: int32(payload.UserID), - Code: payload.Code, - CodeSize: int32(len(payload.Code)), // TODO: exclude whitespaces. + GameID: int32(payload.GameID()), + UserID: int32(payload.UserID()), + Code: payload.Code(), + CodeSize: int32(payload.CodeSize), }) if err != nil { + // TODO: send result return fmt.Errorf("CreateSubmission failed: %v", err) } - { - type swiftcRequestData struct { - MaxDuration int `json:"max_duration_ms"` - Code string `json:"code"` - } - type swiftcResponseData struct { - Status string `json:"status"` - Stdout string `json:"stdout"` - Stderr string `json:"stderr"` - } - reqData := swiftcRequestData{ - MaxDuration: 5000, - Code: payload.Code, - } - reqJson, err := json.Marshal(reqData) - if err != nil { - return fmt.Errorf("json.Marshal failed: %v", err) - } - res, err := http.Post("http://worker:80/api/swiftc", "application/json", bytes.NewBuffer(reqJson)) + p.results <- &TaskResultCreateSubmissionRecord{ + TaskPayload: &payload, + SubmissionID: int(submissionID), + } + return nil +} + +func (p *processor) processTaskCompileSwiftToWasm(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadCompileSwiftToWasm + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + // TODO: send result + return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + } + + type swiftcRequestData struct { + MaxDuration int `json:"max_duration_ms"` + Code string `json:"code"` + } + type swiftcResponseData struct { + Status string `json:"status"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` + } + reqData := swiftcRequestData{ + MaxDuration: 5000, + Code: payload.Code(), + } + reqJson, err := json.Marshal(reqData) + if err != nil { + // TODO: send result + return fmt.Errorf("json.Marshal failed: %v", err) + } + res, err := http.Post("http://worker:80/api/swiftc", "application/json", bytes.NewBuffer(reqJson)) + if err != nil { + // TODO: send result + return fmt.Errorf("http.Post failed: %v", err) + } + resData := swiftcResponseData{} + if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { + // TODO: send result + return fmt.Errorf("json.Decode failed: %v", err) + } + if resData.Status != "success" { + err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{ + SubmissionID: int32(payload.SubmissionID), + Status: "compile_error", + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }) if err != nil { - return fmt.Errorf("http.Post failed: %v", err) + // TODO: send result + return fmt.Errorf("CreateTestcaseResult failed: %v", err) } - resData := swiftcResponseData{} - if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { - return fmt.Errorf("json.Decode failed: %v", err) - } - if resData.Status != "success" { - err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{ - SubmissionID: submissionID, - Status: "compile_error", - Stdout: resData.Stdout, - Stderr: resData.Stderr, - }) - if err != nil { - return fmt.Errorf("CreateTestcaseResult failed: %v", err) - } - p.results <- TaskExecResult{ - Task: &payload, - Status: "compile_error", - } - return fmt.Errorf("swiftc failed: %v", resData.Stderr) + p.results <- &TaskResultCompileSwiftToWasm{ + TaskPayload: &payload, + Status: "compile_error", } + return fmt.Errorf("swiftc failed: %v", resData.Stderr) } - { - type wasmcRequestData struct { - MaxDuration int `json:"max_duration_ms"` - Code string `json:"code"` - } - type wasmcResponseData struct { - Status string `json:"status"` - Stdout string `json:"stdout"` - Stderr string `json:"stderr"` - } - reqData := wasmcRequestData{ - MaxDuration: 5000, - Code: payload.Code, - } - reqJson, err := json.Marshal(reqData) - if err != nil { - return fmt.Errorf("json.Marshal failed: %v", err) - } - res, err := http.Post("http://worker:80/api/wasmc", "application/json", bytes.NewBuffer(reqJson)) + + // TODO: send result + return nil +} + +func (p *processor) processTaskCompileWasmToNativeExecutable(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadCompileWasmToNativeExecutable + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + // TODO: send result + return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + } + + type wasmcRequestData struct { + MaxDuration int `json:"max_duration_ms"` + Code string `json:"code"` + } + type wasmcResponseData struct { + Status string `json:"status"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` + } + reqData := wasmcRequestData{ + MaxDuration: 5000, + Code: payload.Code(), + } + reqJson, err := json.Marshal(reqData) + if err != nil { + // TODO: send result + return fmt.Errorf("json.Marshal failed: %v", err) + } + res, err := http.Post("http://worker:80/api/wasmc", "application/json", bytes.NewBuffer(reqJson)) + if err != nil { + // TODO: send result + return fmt.Errorf("http.Post failed: %v", err) + } + resData := wasmcResponseData{} + if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { + // TODO: send result + return fmt.Errorf("json.Decode failed: %v", err) + } + if resData.Status != "success" { + err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{ + SubmissionID: int32(payload.SubmissionID), + Status: "compile_error", + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }) if err != nil { - return fmt.Errorf("http.Post failed: %v", err) + // TODO: send result + return fmt.Errorf("CreateTestcaseResult failed: %v", err) } - resData := wasmcResponseData{} - if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { - return fmt.Errorf("json.Decode failed: %v", err) - } - if resData.Status != "success" { - err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{ - SubmissionID: submissionID, - Status: "compile_error", - Stdout: resData.Stdout, - Stderr: resData.Stderr, - }) - if err != nil { - return fmt.Errorf("CreateTestcaseResult failed: %v", err) - } - p.results <- TaskExecResult{ - Task: &payload, - Status: "compile_error", - } - return fmt.Errorf("wasmc failed: %v", resData.Stderr) + p.results <- &TaskResultCompileWasmToNativeExecutable{ + TaskPayload: &payload, + Status: "compile_error", } + return fmt.Errorf("wasmc failed: %v", resData.Stderr) + } + + // TODO: send result + return nil +} + +func (p *processor) processTaskRunTestcase(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadRunTestcase + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + // TODO: send result + return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } - testcases, err := p.q.ListTestcasesByGameID(ctx, int32(payload.GameID)) + testcases, err := p.q.ListTestcasesByGameID(ctx, int32(payload.GameID())) if err != nil { + // TODO: send result return fmt.Errorf("ListTestcasesByGameID failed: %v", err) } @@ -148,60 +192,65 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { } reqData := testrunRequestData{ MaxDuration: 5000, - Code: payload.Code, + Code: payload.Code(), Stdin: testcase.Stdin, } reqJson, err := json.Marshal(reqData) if err != nil { + // TODO: send result return fmt.Errorf("json.Marshal failed: %v", err) } res, err := http.Post("http://worker:80/api/testrun", "application/json", bytes.NewBuffer(reqJson)) if err != nil { + // TODO: send result return fmt.Errorf("http.Post failed: %v", err) } resData := testrunResponseData{} if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { + // TODO: send result return fmt.Errorf("json.Decode failed: %v", err) } if resData.Status != "success" { err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{ - SubmissionID: submissionID, + SubmissionID: int32(payload.SubmissionID), TestcaseID: testcase.TestcaseID, Status: resData.Status, Stdout: resData.Stdout, Stderr: resData.Stderr, }) if err != nil { + // TODO: send result return fmt.Errorf("CreateTestcaseResult failed: %v", err) } - p.results <- TaskExecResult{ - Task: &payload, - Status: resData.Status, + p.results <- &TaskResultRunTestcase{ + TaskPayload: &payload, + Status: resData.Status, } return fmt.Errorf("testrun failed: %v", resData.Stderr) } if !isTestcaseResultCorrect(testcase.Stdout, resData.Stdout) { err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{ - SubmissionID: submissionID, + SubmissionID: int32(payload.SubmissionID), TestcaseID: testcase.TestcaseID, Status: "wrong_answer", Stdout: resData.Stdout, Stderr: resData.Stderr, }) if err != nil { + // TODO: send result return fmt.Errorf("CreateTestcaseResult failed: %v", err) } - p.results <- TaskExecResult{ - Task: &payload, - Status: "wrong_answer", + p.results <- &TaskResultRunTestcase{ + TaskPayload: &payload, + Status: "wrong_answer", } return fmt.Errorf("testrun failed: %v", resData.Stdout) } } - p.results <- TaskExecResult{ - Task: &payload, - Status: "success", + p.results <- &TaskResultRunTestcase{ + TaskPayload: &payload, + Status: "success", } return nil } |
