diff options
Diffstat (limited to 'backend/taskqueue')
| -rw-r--r-- | backend/taskqueue/processor.go | 253 | ||||
| -rw-r--r-- | backend/taskqueue/tasks.go | 180 | ||||
| -rw-r--r-- | backend/taskqueue/worker_server.go | 32 |
3 files changed, 336 insertions, 129 deletions
diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go index b080c46..556bd78 100644 --- a/backend/taskqueue/processor.go +++ b/backend/taskqueue/processor.go @@ -13,125 +13,169 @@ import ( "github.com/nsfisis/iosdc-japan-2024-albatross/backend/db" ) -type ExecProcessor struct { +type processor struct { q *db.Queries - results chan TaskExecResult + results chan TaskResult } -func NewExecProcessor(q *db.Queries, results chan TaskExecResult) *ExecProcessor { - return &ExecProcessor{ +func newProcessor(q *db.Queries) *processor { + return &processor{ q: q, - results: results, + results: make(chan TaskResult), } } -func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { - var payload TaskExecPlayload +func (p *processor) processTaskCreateSubmissionRecord(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadCreateSubmissionRecord if err := json.Unmarshal(t.Payload(), &payload); err != nil { + // TODO: send result return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } // TODO: upsert // Create submission record. submissionID, err := p.q.CreateSubmission(ctx, db.CreateSubmissionParams{ - GameID: int32(payload.GameID), - UserID: int32(payload.UserID), - Code: payload.Code, - CodeSize: int32(len(payload.Code)), // TODO: exclude whitespaces. + GameID: int32(payload.GameID()), + UserID: int32(payload.UserID()), + Code: payload.Code(), + CodeSize: int32(payload.CodeSize), }) if err != nil { + // TODO: send result return fmt.Errorf("CreateSubmission failed: %v", err) } - { - type swiftcRequestData struct { - MaxDuration int `json:"max_duration_ms"` - Code string `json:"code"` - } - type swiftcResponseData struct { - Status string `json:"status"` - Stdout string `json:"stdout"` - Stderr string `json:"stderr"` - } - reqData := swiftcRequestData{ - MaxDuration: 5000, - Code: payload.Code, - } - reqJson, err := json.Marshal(reqData) - if err != nil { - return fmt.Errorf("json.Marshal failed: %v", err) - } - res, err := http.Post("http://worker:80/api/swiftc", "application/json", bytes.NewBuffer(reqJson)) + p.results <- &TaskResultCreateSubmissionRecord{ + TaskPayload: &payload, + SubmissionID: int(submissionID), + } + return nil +} + +func (p *processor) processTaskCompileSwiftToWasm(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadCompileSwiftToWasm + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + // TODO: send result + return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + } + + type swiftcRequestData struct { + MaxDuration int `json:"max_duration_ms"` + Code string `json:"code"` + } + type swiftcResponseData struct { + Status string `json:"status"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` + } + reqData := swiftcRequestData{ + MaxDuration: 5000, + Code: payload.Code(), + } + reqJson, err := json.Marshal(reqData) + if err != nil { + // TODO: send result + return fmt.Errorf("json.Marshal failed: %v", err) + } + res, err := http.Post("http://worker:80/api/swiftc", "application/json", bytes.NewBuffer(reqJson)) + if err != nil { + // TODO: send result + return fmt.Errorf("http.Post failed: %v", err) + } + resData := swiftcResponseData{} + if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { + // TODO: send result + return fmt.Errorf("json.Decode failed: %v", err) + } + if resData.Status != "success" { + err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{ + SubmissionID: int32(payload.SubmissionID), + Status: "compile_error", + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }) if err != nil { - return fmt.Errorf("http.Post failed: %v", err) + // TODO: send result + return fmt.Errorf("CreateTestcaseResult failed: %v", err) } - resData := swiftcResponseData{} - if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { - return fmt.Errorf("json.Decode failed: %v", err) - } - if resData.Status != "success" { - err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{ - SubmissionID: submissionID, - Status: "compile_error", - Stdout: resData.Stdout, - Stderr: resData.Stderr, - }) - if err != nil { - return fmt.Errorf("CreateTestcaseResult failed: %v", err) - } - p.results <- TaskExecResult{ - Task: &payload, - Status: "compile_error", - } - return fmt.Errorf("swiftc failed: %v", resData.Stderr) + p.results <- &TaskResultCompileSwiftToWasm{ + TaskPayload: &payload, + Status: "compile_error", } + return fmt.Errorf("swiftc failed: %v", resData.Stderr) } - { - type wasmcRequestData struct { - MaxDuration int `json:"max_duration_ms"` - Code string `json:"code"` - } - type wasmcResponseData struct { - Status string `json:"status"` - Stdout string `json:"stdout"` - Stderr string `json:"stderr"` - } - reqData := wasmcRequestData{ - MaxDuration: 5000, - Code: payload.Code, - } - reqJson, err := json.Marshal(reqData) - if err != nil { - return fmt.Errorf("json.Marshal failed: %v", err) - } - res, err := http.Post("http://worker:80/api/wasmc", "application/json", bytes.NewBuffer(reqJson)) + + // TODO: send result + return nil +} + +func (p *processor) processTaskCompileWasmToNativeExecutable(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadCompileWasmToNativeExecutable + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + // TODO: send result + return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + } + + type wasmcRequestData struct { + MaxDuration int `json:"max_duration_ms"` + Code string `json:"code"` + } + type wasmcResponseData struct { + Status string `json:"status"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` + } + reqData := wasmcRequestData{ + MaxDuration: 5000, + Code: payload.Code(), + } + reqJson, err := json.Marshal(reqData) + if err != nil { + // TODO: send result + return fmt.Errorf("json.Marshal failed: %v", err) + } + res, err := http.Post("http://worker:80/api/wasmc", "application/json", bytes.NewBuffer(reqJson)) + if err != nil { + // TODO: send result + return fmt.Errorf("http.Post failed: %v", err) + } + resData := wasmcResponseData{} + if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { + // TODO: send result + return fmt.Errorf("json.Decode failed: %v", err) + } + if resData.Status != "success" { + err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{ + SubmissionID: int32(payload.SubmissionID), + Status: "compile_error", + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }) if err != nil { - return fmt.Errorf("http.Post failed: %v", err) + // TODO: send result + return fmt.Errorf("CreateTestcaseResult failed: %v", err) } - resData := wasmcResponseData{} - if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { - return fmt.Errorf("json.Decode failed: %v", err) - } - if resData.Status != "success" { - err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{ - SubmissionID: submissionID, - Status: "compile_error", - Stdout: resData.Stdout, - Stderr: resData.Stderr, - }) - if err != nil { - return fmt.Errorf("CreateTestcaseResult failed: %v", err) - } - p.results <- TaskExecResult{ - Task: &payload, - Status: "compile_error", - } - return fmt.Errorf("wasmc failed: %v", resData.Stderr) + p.results <- &TaskResultCompileWasmToNativeExecutable{ + TaskPayload: &payload, + Status: "compile_error", } + return fmt.Errorf("wasmc failed: %v", resData.Stderr) + } + + // TODO: send result + return nil +} + +func (p *processor) processTaskRunTestcase(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadRunTestcase + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + // TODO: send result + return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } - testcases, err := p.q.ListTestcasesByGameID(ctx, int32(payload.GameID)) + testcases, err := p.q.ListTestcasesByGameID(ctx, int32(payload.GameID())) if err != nil { + // TODO: send result return fmt.Errorf("ListTestcasesByGameID failed: %v", err) } @@ -148,60 +192,65 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { } reqData := testrunRequestData{ MaxDuration: 5000, - Code: payload.Code, + Code: payload.Code(), Stdin: testcase.Stdin, } reqJson, err := json.Marshal(reqData) if err != nil { + // TODO: send result return fmt.Errorf("json.Marshal failed: %v", err) } res, err := http.Post("http://worker:80/api/testrun", "application/json", bytes.NewBuffer(reqJson)) if err != nil { + // TODO: send result return fmt.Errorf("http.Post failed: %v", err) } resData := testrunResponseData{} if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { + // TODO: send result return fmt.Errorf("json.Decode failed: %v", err) } if resData.Status != "success" { err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{ - SubmissionID: submissionID, + SubmissionID: int32(payload.SubmissionID), TestcaseID: testcase.TestcaseID, Status: resData.Status, Stdout: resData.Stdout, Stderr: resData.Stderr, }) if err != nil { + // TODO: send result return fmt.Errorf("CreateTestcaseResult failed: %v", err) } - p.results <- TaskExecResult{ - Task: &payload, - Status: resData.Status, + p.results <- &TaskResultRunTestcase{ + TaskPayload: &payload, + Status: resData.Status, } return fmt.Errorf("testrun failed: %v", resData.Stderr) } if !isTestcaseResultCorrect(testcase.Stdout, resData.Stdout) { err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{ - SubmissionID: submissionID, + SubmissionID: int32(payload.SubmissionID), TestcaseID: testcase.TestcaseID, Status: "wrong_answer", Stdout: resData.Stdout, Stderr: resData.Stderr, }) if err != nil { + // TODO: send result return fmt.Errorf("CreateTestcaseResult failed: %v", err) } - p.results <- TaskExecResult{ - Task: &payload, - Status: "wrong_answer", + p.results <- &TaskResultRunTestcase{ + TaskPayload: &payload, + Status: "wrong_answer", } return fmt.Errorf("testrun failed: %v", resData.Stdout) } } - p.results <- TaskExecResult{ - Task: &payload, - Status: "success", + p.results <- &TaskResultRunTestcase{ + TaskPayload: &payload, + Status: "success", } return nil } diff --git a/backend/taskqueue/tasks.go b/backend/taskqueue/tasks.go index 5c518f4..679b6a0 100644 --- a/backend/taskqueue/tasks.go +++ b/backend/taskqueue/tasks.go @@ -6,31 +6,185 @@ import ( "github.com/hibiken/asynq" ) +type TaskType string + const ( - TaskTypeExec = "exec" + TaskTypeCreateSubmissionRecord TaskType = "create_submission_record" + TaskTypeCompileSwiftToWasm TaskType = "compile_swift_to_wasm" + TaskTypeCompileWasmToNativeExecutable TaskType = "compile_wasm_to_native_executable" + TaskTypeRunTestcase TaskType = "run_testcase" ) -type TaskExecPlayload struct { +type TaskPayloadBase struct { GameID int UserID int Code string } -func NewExecTask(gameID, userID int, code string) (*asynq.Task, error) { - payload, err := json.Marshal(TaskExecPlayload{ - GameID: gameID, - UserID: userID, - Code: code, +type TaskPayloadCreateSubmissionRecord struct { + TaskPayloadBase + CodeSize int +} + +func NewTaskCreateSubmissionRecord( + gameID int, + userID int, + code string, + codeSize int, +) (*asynq.Task, error) { + payload, err := json.Marshal(TaskPayloadCreateSubmissionRecord{ + TaskPayloadBase: TaskPayloadBase{ + GameID: gameID, + UserID: userID, + Code: code, + }, + CodeSize: codeSize, + }) + if err != nil { + return nil, err + } + return asynq.NewTask(string(TaskTypeCreateSubmissionRecord), payload), nil +} + +func (t *TaskPayloadCreateSubmissionRecord) GameID() int { return t.TaskPayloadBase.GameID } +func (t *TaskPayloadCreateSubmissionRecord) UserID() int { return t.TaskPayloadBase.UserID } +func (t *TaskPayloadCreateSubmissionRecord) Code() string { return t.TaskPayloadBase.Code } + +type TaskPayloadCompileSwiftToWasm struct { + TaskPayloadBase + SubmissionID int +} + +func NewTaskCompileSwiftToWasm( + gameID int, + userID int, + code string, + submissionID int, +) (*asynq.Task, error) { + payload, err := json.Marshal(TaskPayloadCompileSwiftToWasm{ + TaskPayloadBase: TaskPayloadBase{ + GameID: gameID, + UserID: userID, + Code: code, + }, + SubmissionID: submissionID, + }) + if err != nil { + return nil, err + } + return asynq.NewTask(string(TaskTypeCompileSwiftToWasm), payload), nil +} + +func (t *TaskPayloadCompileSwiftToWasm) GameID() int { return t.TaskPayloadBase.GameID } +func (t *TaskPayloadCompileSwiftToWasm) UserID() int { return t.TaskPayloadBase.UserID } +func (t *TaskPayloadCompileSwiftToWasm) Code() string { return t.TaskPayloadBase.Code } + +type TaskPayloadCompileWasmToNativeExecutable struct { + TaskPayloadBase + SubmissionID int +} + +func NewTaskCompileWasmToNativeExecutable( + gameID int, + userID int, + code string, + submissionID int, +) (*asynq.Task, error) { + payload, err := json.Marshal(TaskPayloadCompileWasmToNativeExecutable{ + TaskPayloadBase: TaskPayloadBase{ + GameID: gameID, + UserID: userID, + Code: code, + }, + SubmissionID: submissionID, + }) + if err != nil { + return nil, err + } + return asynq.NewTask(string(TaskTypeCompileWasmToNativeExecutable), payload), nil +} + +func (t *TaskPayloadCompileWasmToNativeExecutable) GameID() int { return t.TaskPayloadBase.GameID } +func (t *TaskPayloadCompileWasmToNativeExecutable) UserID() int { return t.TaskPayloadBase.UserID } +func (t *TaskPayloadCompileWasmToNativeExecutable) Code() string { return t.TaskPayloadBase.Code } + +type TaskPayloadRunTestcase struct { + TaskPayloadBase + SubmissionID int + TestcaseID int +} + +func NewTaskRunTestcase( + gameID int, + userID int, + code string, + submissionID int, + testcaseID int, +) (*asynq.Task, error) { + payload, err := json.Marshal(TaskPayloadRunTestcase{ + TaskPayloadBase: TaskPayloadBase{ + GameID: gameID, + UserID: userID, + Code: code, + }, + SubmissionID: submissionID, + TestcaseID: testcaseID, }) if err != nil { return nil, err } - return asynq.NewTask(TaskTypeExec, payload), nil + return asynq.NewTask(string(TaskTypeRunTestcase), payload), nil +} + +func (t *TaskPayloadRunTestcase) GameID() int { return t.TaskPayloadBase.GameID } +func (t *TaskPayloadRunTestcase) UserID() int { return t.TaskPayloadBase.UserID } +func (t *TaskPayloadRunTestcase) Code() string { return t.TaskPayloadBase.Code } + +type TaskResult interface { + Type() TaskType + GameID() int } -type TaskExecResult struct { - Task *TaskExecPlayload - Status string - Stdout string - Stderr string +type TaskResultCreateSubmissionRecord struct { + TaskPayload *TaskPayloadCreateSubmissionRecord + SubmissionID int + Err error } + +func (r *TaskResultCreateSubmissionRecord) Type() TaskType { return TaskTypeCreateSubmissionRecord } +func (r *TaskResultCreateSubmissionRecord) GameID() int { return r.TaskPayload.GameID() } + +type TaskResultCompileSwiftToWasm struct { + TaskPayload *TaskPayloadCompileSwiftToWasm + Status string + Stdout string + Stderr string + Err error +} + +func (r *TaskResultCompileSwiftToWasm) Type() TaskType { return TaskTypeCompileSwiftToWasm } +func (r *TaskResultCompileSwiftToWasm) GameID() int { return r.TaskPayload.GameID() } + +type TaskResultCompileWasmToNativeExecutable struct { + TaskPayload *TaskPayloadCompileWasmToNativeExecutable + Status string + Stdout string + Stderr string + Err error +} + +func (r *TaskResultCompileWasmToNativeExecutable) Type() TaskType { + return TaskTypeCompileWasmToNativeExecutable +} +func (r *TaskResultCompileWasmToNativeExecutable) GameID() int { return r.TaskPayload.GameID() } + +type TaskResultRunTestcase struct { + TaskPayload *TaskPayloadRunTestcase + Status string + Stdout string + Stderr string + Err error +} + +func (r *TaskResultRunTestcase) Type() TaskType { return TaskTypeRunTestcase } +func (r *TaskResultRunTestcase) GameID() int { return r.TaskPayload.GameID() } diff --git a/backend/taskqueue/worker_server.go b/backend/taskqueue/worker_server.go index 67803e2..6dc65d8 100644 --- a/backend/taskqueue/worker_server.go +++ b/backend/taskqueue/worker_server.go @@ -7,31 +7,35 @@ import ( ) type WorkerServer struct { - server *asynq.Server - queries *db.Queries - results chan TaskExecResult + server *asynq.Server + processor *processor } func NewWorkerServer(redisAddr string, queries *db.Queries) *WorkerServer { + server := asynq.NewServer( + asynq.RedisClientOpt{ + Addr: redisAddr, + }, + asynq.Config{}, + ) + processor := newProcessor(queries) return &WorkerServer{ - server: asynq.NewServer( - asynq.RedisClientOpt{ - Addr: redisAddr, - }, - asynq.Config{}, - ), - queries: queries, - results: make(chan TaskExecResult), + server: server, + processor: processor, } } func (s *WorkerServer) Run() error { mux := asynq.NewServeMux() - mux.Handle(TaskTypeExec, NewExecProcessor(s.queries, s.results)) + + mux.HandleFunc(string(TaskTypeCreateSubmissionRecord), s.processor.processTaskCreateSubmissionRecord) + mux.HandleFunc(string(TaskTypeCompileSwiftToWasm), s.processor.processTaskCompileSwiftToWasm) + mux.HandleFunc(string(TaskTypeCompileWasmToNativeExecutable), s.processor.processTaskCompileWasmToNativeExecutable) + mux.HandleFunc(string(TaskTypeRunTestcase), s.processor.processTaskRunTestcase) return s.server.Run(mux) } -func (s *WorkerServer) Results() chan TaskExecResult { - return s.results +func (s *WorkerServer) Results() chan TaskResult { + return s.processor.results } |
